Revize a767aa70
Přidáno uživatelem Michal Seják před téměř 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
32 | 32 |
NOT_BEFORE = "notBefore" |
33 | 33 |
COMMON_NAME = "CN" |
34 | 34 |
ID = "id" |
35 |
CA = "CA" |
|
35 | 36 |
USAGE = "usage" |
36 | 37 |
SUBJECT = "subject" |
37 | 38 |
VALIDITY_DAYS = "validityDays" |
... | ... | |
115 | 116 |
|
116 | 117 |
key = self.key_service.create_new_key() # TODO pass key |
117 | 118 |
|
118 |
if TYPE not in body or body[TYPE] is None: # if issuer omitted (legal) or none
|
|
119 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none
|
|
119 | 120 |
cert = self.certificate_service.create_root_ca( # create a root CA |
120 | 121 |
key, |
121 | 122 |
subject, |
... | ... | |
123 | 124 |
days=body[VALIDITY_DAYS] |
124 | 125 |
) |
125 | 126 |
else: |
126 |
issuer = self.certificate_service.get_certificate(body[TYPE]) # get base issuer info
|
|
127 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info
|
|
127 | 128 |
|
128 | 129 |
if issuer is None: # if such issuer does not exist |
129 | 130 |
Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.") |
... | ... | |
280 | 281 |
target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()} |
281 | 282 |
target_cn_substring = None |
282 | 283 |
issuer_id = -1 |
284 |
|
|
285 |
unfiltered = True |
|
286 |
|
|
283 | 287 |
if PER_PAGE in data: |
288 |
unfiltered = False |
|
284 | 289 |
page = data.get(PAGE, 0) |
285 | 290 |
per_page = data[PER_PAGE] |
286 | 291 |
else: |
... | ... | |
288 | 293 |
per_page = None |
289 | 294 |
|
290 | 295 |
if FILTERING in data: # if the 'filtering' field exists |
296 |
unfiltered = False |
|
291 | 297 |
if isinstance(data[FILTERING], dict): # and it is also a 'dict' |
292 | 298 |
|
293 | 299 |
# noinspection DuplicatedCode |
... | ... | |
310 | 316 |
try: |
311 | 317 |
target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]} |
312 | 318 |
except KeyError as e: |
313 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.")
|
|
319 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}' - '{e}'.")
|
|
314 | 320 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
315 | 321 |
else: |
316 | 322 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.") |
... | ... | |
331 | 337 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.") |
332 | 338 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
333 | 339 |
|
334 |
if issuer_id >= 0: # if filtering by an issuer |
|
335 |
try: |
|
336 |
# get his children, filtered |
|
337 |
certs = self.certificate_service.get_certificates_issued_by_filter( |
|
338 |
issuer_id=issuer_id, |
|
339 |
target_types=target_types, |
|
340 |
target_usages=target_usages, |
|
341 |
target_cn_substring=target_cn_substring, |
|
342 |
page=page, |
|
343 |
per_page=per_page |
|
344 |
) |
|
345 |
except CertificateNotFoundException: # if id does not exist |
|
346 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
347 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
348 |
else: |
|
349 |
certs = self.certificate_service.get_certificates_filter( |
|
340 |
if unfiltered: # if not filtering |
|
341 |
certs = self.certificate_service.get_certificates() |
|
342 |
elif issuer_id >= 0: # if filtering by an issuer |
|
343 |
try: |
|
344 |
# get his children, filtered |
|
345 |
certs = self.certificate_service.get_certificates_issued_by_filter( |
|
346 |
issuer_id=issuer_id, |
|
350 | 347 |
target_types=target_types, |
351 | 348 |
target_usages=target_usages, |
352 | 349 |
target_cn_substring=target_cn_substring, |
353 | 350 |
page=page, |
354 | 351 |
per_page=per_page |
355 | 352 |
) |
353 |
except CertificateNotFoundException: # if id does not exist |
|
354 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
355 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
356 | 356 |
else: |
357 |
certs = self.certificate_service.get_certificates() |
|
357 |
certs = self.certificate_service.get_certificates_filter( |
|
358 |
target_types=target_types, |
|
359 |
target_usages=target_usages, |
|
360 |
target_cn_substring=target_cn_substring, |
|
361 |
page=page, |
|
362 |
per_page=per_page |
|
363 |
) |
|
358 | 364 |
|
359 | 365 |
if certs is None: |
360 | 366 |
Logger.error(f"Internal server error (unknown origin).") |
... | ... | |
362 | 368 |
elif len(certs) == 0: |
363 | 369 |
# TODO check log level |
364 | 370 |
Logger.warning(f"No such certificate found (empty list).") |
365 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
|
371 |
return {"success": True, "data": []}, C_SUCCESS
|
|
366 | 372 |
else: |
367 | 373 |
ret = [] |
368 | 374 |
for c in certs: |
... | ... | |
556 | 562 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
557 | 563 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
558 | 564 |
USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
559 |
TYPE: c_issuer.certificate_id
|
|
565 |
CA: c_issuer.certificate_id
|
|
560 | 566 |
} |
561 | 567 |
|
562 | 568 |
def get_private_key_of_a_certificate(self, id): |
Také k dispozici: Unified diff
Re #8702 - Minor bug and flow fixes.