Revize a2965e9a
Přidáno uživatelem Michal Seják před téměř 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
17 | 17 |
CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException, \ |
18 | 18 |
CertificateCannotBeSetToValid |
19 | 19 |
# responsibility. |
20 |
from src.services.cryptography import CryptographyException |
|
20 | 21 |
from src.services.key_service import KeyService |
21 | 22 |
from src.utils.logger import Logger |
22 | 23 |
from src.utils.util import dict_to_string |
23 | 24 |
|
25 |
EXTENSIONS = "extensions" |
|
26 |
|
|
24 | 27 |
TREE_NODE_TYPE_COUNT = 3 |
25 | 28 |
|
26 | 29 |
FILTERING = "filtering" |
... | ... | |
108 | 111 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST # and throw if it is not |
109 | 112 |
usages_dict[CertController.KEY_MAP[k]] = v # otherwise translate key and set |
110 | 113 |
|
111 |
key = self.key_service.create_new_key() # TODO pass key |
|
112 |
|
|
113 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none |
|
114 |
cert = self.certificate_service.create_root_ca( # create a root CA |
|
115 |
key, |
|
116 |
subject, |
|
117 |
usages=usages_dict, # TODO ignoring usages -> discussion |
|
118 |
days=body[VALIDITY_DAYS] |
|
119 |
) |
|
120 |
else: |
|
121 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info |
|
122 |
|
|
123 |
if issuer is None: # if such issuer does not exist |
|
124 |
Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.") |
|
125 |
self.key_service.delete_key(key.private_key_id) # free |
|
126 |
return E_NO_ISSUER_FOUND, C_BAD_REQUEST # and throw |
|
114 |
key = self.key_service.create_new_key() # TODO pass key |
|
127 | 115 |
|
128 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist |
|
116 |
extensions = None |
|
117 |
if EXTENSIONS in body: |
|
118 |
extensions = body[EXTENSIONS] |
|
129 | 119 |
|
130 |
if issuer_key is None: # if it does not |
|
131 |
Logger.error(f"Internal server error (corrupted database).") |
|
132 |
self.key_service.delete_key(key.private_key_id) # free |
|
133 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR # and throw |
|
134 |
|
|
135 |
f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
|
136 |
self.certificate_service.create_end_cert |
|
137 |
|
|
138 |
# noinspection PyTypeChecker |
|
139 |
cert = f( # create inter CA or end cert |
|
140 |
key, # according to whether 'CA' is among |
|
141 |
subject, # the usages' fields |
|
142 |
issuer, |
|
143 |
issuer_key, |
|
144 |
usages=usages_dict, |
|
145 |
days=body[VALIDITY_DAYS] |
|
146 |
) |
|
120 |
try: |
|
121 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none |
|
122 |
cert = self.certificate_service.create_root_ca( # create a root CA |
|
123 |
key, |
|
124 |
subject, |
|
125 |
usages=usages_dict, # TODO ignoring usages -> discussion |
|
126 |
days=body[VALIDITY_DAYS], |
|
127 |
extensions=extensions |
|
128 |
) |
|
129 |
else: |
|
130 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info |
|
131 |
|
|
132 |
if issuer is None: # if such issuer does not exist |
|
133 |
Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.") |
|
134 |
self.key_service.delete_key(key.private_key_id) # free |
|
135 |
return E_NO_ISSUER_FOUND, C_BAD_REQUEST # and throw |
|
136 |
|
|
137 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist |
|
138 |
|
|
139 |
if issuer_key is None: # if it does not |
|
140 |
Logger.error(f"Internal server error (corrupted database).") |
|
141 |
self.key_service.delete_key(key.private_key_id) # free |
|
142 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR # and throw |
|
143 |
|
|
144 |
f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
|
145 |
self.certificate_service.create_end_cert |
|
146 |
|
|
147 |
# noinspection PyArgumentList |
|
148 |
cert = f( # create inter CA or end cert |
|
149 |
key, # according to whether 'CA' is among |
|
150 |
subject, # the usages' fields |
|
151 |
issuer, |
|
152 |
issuer_key, |
|
153 |
usages=usages_dict, |
|
154 |
days=body[VALIDITY_DAYS], |
|
155 |
extensions=extensions |
|
156 |
) |
|
157 |
except CryptographyException as e: |
|
158 |
return {"success": False, "data": e.message}, C_INTERNAL_SERVER_ERROR |
|
147 | 159 |
|
148 | 160 |
if cert is not None: |
149 | 161 |
return {"success": True, |
Také k dispozici: Unified diff
Re #8704 - Added the `extension` parameter to the create endpoint.