Revize 5b6d9513
Přidáno uživatelem Michal Seják před asi 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
50 | 50 | |
51 | 51 | |
52 | 52 |
class CertController: |
53 |
KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
54 |
INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()} |
|
53 | 55 | |
54 | 56 |
@staticmethod |
55 | 57 |
def setup(): |
... | ... | |
76 | 78 |
""" |
77 | 79 |
CertController.setup() # TODO remove after issue fixed |
78 | 80 | |
79 |
key_map = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
80 | 81 |
required_keys = {SUBJECT, USAGE, VALIDITY_DAYS} |
81 | 82 | |
82 | 83 |
if request.is_json: |
... | ... | |
92 | 93 |
if subject is None: |
93 | 94 |
return E_WRONG_PARAMETERS, 400 |
94 | 95 | |
95 |
usages_dict = DICT_USAGES.copy()
|
|
96 |
usages_dict = {}
|
|
96 | 97 | |
97 | 98 |
if not isinstance(body[USAGE], dict): |
98 | 99 |
return E_WRONG_PARAMETERS, 400 |
99 | 100 | |
100 | 101 |
for k, v in body[USAGE].items(): |
101 |
if k not in key_map:
|
|
102 |
if k not in CertController.KEY_MAP:
|
|
102 | 103 |
return E_WRONG_PARAMETERS, 400 |
103 |
usages_dict[key_map[k]] = v
|
|
104 |
usages_dict[CertController.KEY_MAP[k]] = v
|
|
104 | 105 | |
105 | 106 |
key = KEY_SERVICE.create_new_key() # TODO pass key |
106 | 107 | |
... | ... | |
115 | 116 |
issuer = CERTIFICATE_SERVICE.get_certificate(body[CA]) |
116 | 117 | |
117 | 118 |
if issuer is None: |
119 |
KEY_SERVICE.delete_key(key.private_key_id) |
|
118 | 120 |
return E_NO_ISSUER_FOUND, 400 |
119 | 121 | |
120 | 122 |
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) |
121 | 123 | |
122 | 124 |
if issuer_key is None: |
125 |
KEY_SERVICE.delete_key(key.private_key_id) |
|
123 | 126 |
return E_CORRUPTED_DATABASE, 500 |
124 | 127 | |
125 | 128 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
... | ... | |
138 | 141 |
return {"success": True, |
139 | 142 |
"data": cert.certificate_id}, 201 |
140 | 143 |
else: |
144 |
KEY_SERVICE.delete_key(key.private_key_id) |
|
141 | 145 |
return {"success": False, |
142 | 146 |
"data": "Internal error: The certificate could not have been created."}, 400 |
143 | 147 |
else: |
... | ... | |
181 | 185 |
""" |
182 | 186 |
CertController.setup() # TODO remove after issue fixed |
183 | 187 | |
184 |
if connexion.request.is_json: |
|
185 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
186 |
return 'do some magic!' |
|
188 |
try: |
|
189 |
v = int(id) |
|
190 |
except ValueError: |
|
191 |
return E_WRONG_PARAMETERS, 400 |
|
192 | ||
193 |
cert = CERTIFICATE_SERVICE.get_certificate(v) |
|
194 | ||
195 |
if cert is None: |
|
196 |
return E_NO_CERTIFICATES_FOUND, 205 # TODO related to 204 issue |
|
197 |
else: |
|
198 |
data = CertController.cert_to_dict_full(cert) |
|
199 |
if data is None: |
|
200 |
return E_CORRUPTED_DATABASE, 500 |
|
201 |
return {"success": True, "data": data} |
|
187 | 202 | |
188 | 203 |
@staticmethod |
189 | 204 |
def get_certificate_list(): # noqa: E501 |
... | ... | |
198 | 213 |
""" |
199 | 214 |
CertController.setup() # TODO remove after issue fixed |
200 | 215 | |
201 |
key_map = {CA_ID: 'CA', SSL_ID: 'SSL', SIGNATURE_ID: 'digitalSignature', AUTHENTICATION_ID: 'authentication'} |
|
202 | ||
203 | 216 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} |
204 | 217 |
if request.is_json: |
205 | 218 |
data = request.get_json() |
... | ... | |
217 | 230 |
else: |
218 | 231 |
return E_WRONG_PARAMETERS, 400 |
219 | 232 | |
220 |
if len(targets) == 3: |
|
233 |
if len(targets) == 3: # potentially risky
|
|
221 | 234 |
certs = CERTIFICATE_SERVICE.get_certificates() |
222 | 235 |
else: |
223 | 236 |
certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets))) |
... | ... | |
229 | 242 |
else: |
230 | 243 |
ret = [] |
231 | 244 |
for c in certs: |
232 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
|
|
233 |
if c_issuer is None:
|
|
245 |
data = CertController.cert_to_dict_partial(c)
|
|
246 |
if data is None:
|
|
234 | 247 |
return E_CORRUPTED_DATABASE, 500 |
235 | ||
236 | 248 |
ret.append( |
237 |
{ |
|
238 |
ID: c.certificate_id, |
|
239 |
COMMON_NAME: c.common_name, |
|
240 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
241 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
242 |
USAGE: {key_map[k]: v for k, v in c.usages.items()}, |
|
243 |
ISSUER: { |
|
244 |
ID: c_issuer.certificate_id, |
|
245 |
COMMON_NAME: c_issuer.common_name |
|
246 |
} |
|
247 |
} |
|
249 |
data |
|
248 | 250 |
) |
249 | 251 |
return {"success": True, "data": ret} |
250 | 252 | |
251 | 253 | |
252 | ||
253 | 254 |
@staticmethod |
254 | 255 |
def get_certificate_root_by_id(id): # noqa: E501 |
255 | 256 |
"""get certificate's root of trust chain by ID |
... | ... | |
283 | 284 |
if connexion.request.is_json: |
284 | 285 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
285 | 286 |
return 'do some magic!' |
287 | ||
288 | ||
289 |
@staticmethod |
|
290 |
def cert_to_dict_partial(c): |
|
291 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
292 |
if c_issuer is None: |
|
293 |
return None |
|
294 | ||
295 |
return { |
|
296 |
ID: c.certificate_id, |
|
297 |
COMMON_NAME: c.common_name, |
|
298 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
299 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
300 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
301 |
ISSUER: { |
|
302 |
ID: c_issuer.certificate_id, |
|
303 |
COMMON_NAME: c_issuer.common_name |
|
304 |
} |
|
305 |
} |
|
306 | ||
307 |
@staticmethod |
|
308 |
def cert_to_dict_full(c): |
|
309 |
subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c) |
|
310 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
311 |
if c_issuer is None: |
|
312 |
return None |
|
313 | ||
314 |
return { |
|
315 |
SUBJECT: subj.to_dict(), |
|
316 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
317 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
318 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
319 |
CA: c_issuer.certificate_id |
|
320 |
} |
Také k dispozici: Unified diff
Re #8476 - Implemented and tested `get_cert_details(id)`.