Revize b8f5752f
Přidáno uživatelem Michal Seják před téměř 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
17 | 17 |
CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException, \ |
18 | 18 |
CertificateCannotBeSetToValid |
19 | 19 |
# responsibility. |
20 |
from src.services.cryptography import CryptographyException |
|
20 | 21 |
from src.services.key_service import KeyService |
21 | 22 |
from src.utils.logger import Logger |
22 | 23 |
from src.utils.util import dict_to_string |
23 | 24 |
|
24 | 25 |
TREE_NODE_TYPE_COUNT = 3 |
25 |
|
|
26 |
KEY_PEM = "key_pem" |
|
27 |
PASSWORD = "password" |
|
28 |
KEY = "key" |
|
26 | 29 |
FILTERING = "filtering" |
27 | 30 |
ISSUER = "issuer" |
28 | 31 |
US = "usage" |
... | ... | |
49 | 52 |
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."} |
50 | 53 |
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."} |
51 | 54 |
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."} |
55 |
E_WRONG_PASSWORD = {"success": False, "data": "The provided passphrase does not match the provided key."} |
|
52 | 56 |
|
53 | 57 |
|
54 | 58 |
class CertController: |
... | ... | |
98 | 102 |
|
99 | 103 |
usages_dict = {} |
100 | 104 |
|
101 |
if not isinstance(body[USAGE], dict): # type checking
|
|
105 |
if USAGE not in body or not isinstance(body[USAGE], list): # type checking
|
|
102 | 106 |
Logger.error(f"Invalid request, wrong parameter '{USAGE}'.") |
103 | 107 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
104 | 108 |
|
105 |
for k, v in body[USAGE].items(): # for each usage
|
|
106 |
if k not in CertController.KEY_MAP: # check that it is a valid usage
|
|
107 |
Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{k}].")
|
|
109 |
for v in body[USAGE]: # for each usage
|
|
110 |
if v not in CertController.KEY_MAP: # check that it is a valid usage
|
|
111 |
Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{v}].")
|
|
108 | 112 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST # and throw if it is not |
109 |
usages_dict[CertController.KEY_MAP[k]] = v # otherwise translate key and set |
|
110 |
|
|
111 |
key = self.key_service.create_new_key() # TODO pass key |
|
113 |
usages_dict[CertController.KEY_MAP[v]] = True # otherwise translate key and set |
|
114 |
|
|
115 |
if KEY in body: |
|
116 |
if isinstance(body[KEY], dict): |
|
117 |
if PASSWORD in body[KEY]: |
|
118 |
passphrase = body[KEY][PASSWORD] |
|
119 |
if KEY_PEM in body[KEY]: |
|
120 |
key_pem = body[KEY][KEY_PEM] |
|
121 |
if not self.key_service.verify_key(key_pem, passphrase=passphrase): |
|
122 |
Logger.error(f"Passphrase specified but invalid.") |
|
123 |
return E_WRONG_PASSWORD, C_BAD_REQUEST |
|
124 |
key = self.key_service.wrap_custom_key(key_pem, passphrase=passphrase) |
|
125 |
else: |
|
126 |
key = self.key_service.create_new_key(passphrase) |
|
127 |
else: |
|
128 |
if KEY_PEM in body[KEY]: |
|
129 |
key_pem = body[KEY][KEY_PEM] |
|
130 |
if not self.key_service.verify_key(key_pem, passphrase=None): |
|
131 |
Logger.error("Passphrase ommited but required.") |
|
132 |
return E_WRONG_PASSWORD, C_BAD_REQUEST |
|
133 |
key = self.key_service.wrap_custom_key(key_pem, passphrase=None) |
|
134 |
else: |
|
135 |
key = self.key_service.create_new_key() # if "key" exists but is empty |
|
136 |
else: |
|
137 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
138 |
else: |
|
139 |
key = self.key_service.create_new_key() # if "key" does not exist |
|
140 |
# TODO Honza: line 134 / 138, should |
|
141 |
# they both be allowed? I say one |
|
142 |
# of those branches should return |
|
143 |
# wrong params bad request (mby). |
|
112 | 144 |
|
113 | 145 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none |
114 |
cert = self.certificate_service.create_root_ca( # create a root CA
|
|
146 |
cert = self.certificate_service.create_root_ca( # create a root CA |
|
115 | 147 |
key, |
116 | 148 |
subject, |
117 | 149 |
usages=usages_dict, # TODO ignoring usages -> discussion |
118 | 150 |
days=body[VALIDITY_DAYS] |
119 | 151 |
) |
120 | 152 |
else: |
121 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info
|
|
153 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info |
|
122 | 154 |
|
123 | 155 |
if issuer is None: # if such issuer does not exist |
124 | 156 |
Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.") |
125 |
self.key_service.delete_key(key.private_key_id) # free
|
|
157 |
self.key_service.delete_key(key.private_key_id) # free |
|
126 | 158 |
return E_NO_ISSUER_FOUND, C_BAD_REQUEST # and throw |
127 | 159 |
|
128 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist
|
|
160 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist |
|
129 | 161 |
|
130 | 162 |
if issuer_key is None: # if it does not |
131 | 163 |
Logger.error(f"Internal server error (corrupted database).") |
132 |
self.key_service.delete_key(key.private_key_id) # free
|
|
164 |
self.key_service.delete_key(key.private_key_id) # free |
|
133 | 165 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR # and throw |
134 | 166 |
|
135 | 167 |
f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
136 | 168 |
self.certificate_service.create_end_cert |
137 | 169 |
|
138 |
# noinspection PyTypeChecker |
|
139 |
cert = f( # create inter CA or end cert |
|
140 |
key, # according to whether 'CA' is among |
|
141 |
subject, # the usages' fields |
|
142 |
issuer, |
|
143 |
issuer_key, |
|
144 |
usages=usages_dict, |
|
145 |
days=body[VALIDITY_DAYS] |
|
146 |
) |
|
170 |
try: |
|
171 |
# noinspection PyTypeChecker |
|
172 |
cert = f( # create inter CA or end cert |
|
173 |
key, # according to whether 'CA' is among |
|
174 |
subject, # the usages' fields |
|
175 |
issuer, |
|
176 |
issuer_key, |
|
177 |
usages=usages_dict, |
|
178 |
days=body[VALIDITY_DAYS] |
|
179 |
) |
|
180 |
except CryptographyException as e: |
|
181 |
return {"success": False, "data": e.message}, C_BAD_REQUEST |
|
147 | 182 |
|
148 | 183 |
if cert is not None: |
149 | 184 |
return {"success": True, |
150 | 185 |
"data": cert.certificate_id}, C_CREATED_SUCCESSFULLY |
151 | 186 |
else: # if this fails, then |
152 | 187 |
Logger.error(f"Internal error: The certificate could not have been created.") |
153 |
self.key_service.delete_key(key.private_key_id) # free
|
|
188 |
self.key_service.delete_key(key.private_key_id) # free |
|
154 | 189 |
return {"success": False, # and wonder what the cause is, |
155 | 190 |
"data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST |
156 | 191 |
# as obj/None carries only one bit |
Také k dispozici: Unified diff
Re #8705 - Adjusted CertController to handle all combinations of PEM/pass supply. Added calls to a not-yet-existing `verify_key(...)` method of PK service.