Revize da92e92a
Přidáno uživatelem Jan Pašek před téměř 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
3 | 3 |
from itertools import chain |
4 | 4 |
from json import JSONDecodeError |
5 | 5 |
|
6 |
from flask import request, Response
|
|
6 |
from flask import request |
|
7 | 7 |
from injector import inject |
8 | 8 |
|
9 | 9 |
from src.constants import CA_ID, \ |
... | ... | |
22 | 22 |
from src.utils.logger import Logger |
23 | 23 |
from src.utils.util import dict_to_string |
24 | 24 |
|
25 |
EXTENSIONS = "extensions" |
|
26 |
|
|
27 | 25 |
TREE_NODE_TYPE_COUNT = 3 |
28 | 26 |
KEY_PEM = "key_pem" |
29 | 27 |
PASSWORD = "password" |
30 | 28 |
KEY = "key" |
31 | 29 |
FILTERING = "filtering" |
32 |
PAGE = "page" |
|
33 |
PER_PAGE = "per_page" |
|
34 | 30 |
ISSUER = "issuer" |
35 | 31 |
US = "usage" |
36 | 32 |
NOT_AFTER = "notAfter" |
37 | 33 |
NOT_BEFORE = "notBefore" |
38 | 34 |
COMMON_NAME = "CN" |
39 | 35 |
ID = "id" |
40 |
CA = "CA" |
|
41 | 36 |
USAGE = "usage" |
42 | 37 |
SUBJECT = "subject" |
43 | 38 |
VALIDITY_DAYS = "validityDays" |
44 |
TYPE = "type"
|
|
39 |
CA = "CA"
|
|
45 | 40 |
ISSUED_BY = "issuedby" |
46 | 41 |
STATUS = "status" |
47 | 42 |
REASON = "reason" |
48 | 43 |
REASON_UNDEFINED = "unspecified" |
49 |
NAME = "name" |
|
50 |
PASSWORD = "password" |
|
51 | 44 |
|
52 | 45 |
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."} |
53 | 46 |
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."} |
... | ... | |
60 | 53 |
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."} |
61 | 54 |
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."} |
62 | 55 |
E_WRONG_PASSWORD = {"success": False, "data": "The provided passphrase does not match the provided key."} |
63 |
E_IDENTITY_NAME_NOT_SPECIFIED = {"success": False, "data": "Invalid request, missing identity name."} |
|
64 |
E_IDENTITY_PASSWORD_NOT_SPECIFIED = {"success": False, "data": "Invalid request, missing identity password."} |
|
65 | 56 |
|
66 | 57 |
|
67 | 58 |
class CertController: |
68 |
USAGE_KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
69 |
INVERSE_USAGE_KEY_MAP = {k: v for v, k in USAGE_KEY_MAP.items()} |
|
70 |
FILTERING_TYPE_KEY_MAP = {'root': ROOT_CA_ID, 'inter': INTERMEDIATE_CA_ID, 'end': CERTIFICATE_ID} |
|
71 |
# INVERSE_FILTERING_TYPE_KEY_MAP = {k: v for v, k in FILTERING_TYPE_KEY_MAP.items()} |
|
72 |
|
|
59 |
KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
60 |
INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()} |
|
73 | 61 |
|
74 | 62 |
@inject |
75 | 63 |
def __init__(self, certificate_service: CertificateService, key_service: KeyService): |
... | ... | |
280 | 268 |
Logger.info(f"\n\t{request.referrer}" |
281 | 269 |
f"\n\t{request.method} {request.path} {request.scheme}") |
282 | 270 |
|
283 |
|
|
284 |
# the filtering parameter can be read as URL argument or as a request body |
|
285 |
if request.is_json: |
|
286 |
data = request.get_json() |
|
287 |
else: |
|
288 |
data = {} |
|
289 |
|
|
290 |
if FILTERING in request.args.keys(): |
|
291 |
try: |
|
292 |
data[FILTERING] = json.loads(request.args[FILTERING]) |
|
293 |
except JSONDecodeError: |
|
294 |
Logger.error(f"The request must be JSON-formatted.") |
|
295 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
296 |
|
|
297 |
if PAGE in request.args.keys(): |
|
298 |
try: |
|
299 |
data[PAGE] = json.loads(request.args[PAGE]) |
|
300 |
except JSONDecodeError: |
|
301 |
Logger.error(f"The request must be JSON-formatted.") |
|
302 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
303 |
|
|
304 |
if PER_PAGE in request.args.keys(): |
|
305 |
try: |
|
306 |
data[PER_PAGE] = json.loads(request.args[PER_PAGE]) |
|
307 |
except JSONDecodeError: |
|
308 |
Logger.error(f"The request must be JSON-formatted.") |
|
309 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
310 |
|
|
311 |
Logger.info(f"\n\tRequest body:" |
|
312 |
f"\n{dict_to_string(data)}") |
|
313 |
|
|
314 |
target_types = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} |
|
315 |
target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()} |
|
316 |
target_cn_substring = None |
|
271 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} # all targets |
|
317 | 272 |
issuer_id = -1 |
318 | 273 |
|
319 |
unfiltered = True |
|
320 |
|
|
321 |
if PER_PAGE in data: |
|
322 |
unfiltered = False |
|
323 |
page = data.get(PAGE, 0) |
|
324 |
per_page = data[PER_PAGE] |
|
325 |
else: |
|
326 |
page = None |
|
327 |
per_page = None |
|
328 |
|
|
329 |
if FILTERING in data: # if the 'filtering' field exists |
|
330 |
unfiltered = False |
|
331 |
if isinstance(data[FILTERING], dict): # and it is also a 'dict' |
|
332 |
|
|
333 |
# noinspection DuplicatedCode |
|
334 |
if TYPE in data[FILTERING]: # containing 'type' |
|
335 |
if isinstance(data[FILTERING][TYPE], list): # which is a 'list', |
|
336 |
# map every field to id |
|
337 |
try: |
|
338 |
target_types = {CertController.FILTERING_TYPE_KEY_MAP[v] for v in data[FILTERING][TYPE]} |
|
339 |
except KeyError as e: |
|
340 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.") |
|
341 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
342 |
else: |
|
343 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}'.") |
|
344 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
345 |
|
|
346 |
# noinspection DuplicatedCode |
|
347 |
if USAGE in data[FILTERING]: # containing 'usage' |
|
348 |
if isinstance(data[FILTERING][USAGE], list): # which is a 'list', |
|
349 |
# map every field to id |
|
350 |
try: |
|
351 |
target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]} |
|
352 |
except KeyError as e: |
|
353 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}' - '{e}'.") |
|
354 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
355 |
else: |
|
356 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.") |
|
357 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
358 |
|
|
359 |
if COMMON_NAME in data[FILTERING]: # containing 'CN' |
|
360 |
if isinstance(data[FILTERING][COMMON_NAME], str): # which is a 'str' |
|
361 |
target_cn_substring = data[FILTERING][COMMON_NAME] |
|
362 |
else: |
|
363 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{COMMON_NAME}'.") |
|
364 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
365 |
|
|
366 |
if ISSUED_BY in data[FILTERING]: # containing 'issuedby' |
|
367 |
if isinstance(data[FILTERING][ISSUED_BY], int): # which is an 'int' |
|
368 |
issuer_id = data[FILTERING][ISSUED_BY] # then get its children only |
|
369 |
|
|
274 |
# the filtering parameter can be read as URL argument or as a request body |
|
275 |
if request.is_json or FILTERING in request.args.keys(): # if the request carries JSON data |
|
276 |
if request.is_json: |
|
277 |
data = request.get_json() # get it |
|
370 | 278 |
else: |
371 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.") |
|
372 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
279 |
try: |
|
280 |
data = {FILTERING: json.loads(request.args[FILTERING])} |
|
281 |
except JSONDecodeError: |
|
282 |
Logger.error(f"The request must be JSON-formatted.") |
|
283 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
373 | 284 |
|
374 |
if unfiltered: # if not filtering |
|
375 |
certs = self.certificate_service.get_certificates() |
|
376 |
elif issuer_id >= 0: # if filtering by an issuer |
|
377 |
try: |
|
378 |
# get his children, filtered |
|
379 |
certs = self.certificate_service.get_certificates_issued_by_filter( |
|
380 |
issuer_id=issuer_id, |
|
381 |
target_types=target_types, |
|
382 |
target_usages=target_usages, |
|
383 |
target_cn_substring=target_cn_substring, |
|
384 |
page=page, |
|
385 |
per_page=per_page |
|
386 |
) |
|
387 |
except CertificateNotFoundException: # if id does not exist |
|
388 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
389 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
285 |
Logger.info(f"\n\tRequest body:" |
|
286 |
f"\n{dict_to_string(data)}") |
|
287 |
|
|
288 |
if FILTERING in data: # if the 'filtering' field exists |
|
289 |
if isinstance(data[FILTERING], dict): # and it is also a 'dict' |
|
290 |
if CA in data[FILTERING]: # containing 'CA' |
|
291 |
if isinstance(data[FILTERING][CA], bool): # which is a 'bool', |
|
292 |
if data[FILTERING][CA]: # then filter according to 'CA'. |
|
293 |
targets.remove(CERTIFICATE_ID) |
|
294 |
else: |
|
295 |
targets.remove(ROOT_CA_ID) |
|
296 |
targets.remove(INTERMEDIATE_CA_ID) |
|
297 |
else: |
|
298 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{CA}'.") |
|
299 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
300 |
if ISSUED_BY in data[FILTERING]: # containing 'issuedby' |
|
301 |
if isinstance(data[FILTERING][ISSUED_BY], int): # which is an 'int' |
|
302 |
issuer_id = data[FILTERING][ISSUED_BY] # then get its children only |
|
303 |
else: |
|
304 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.") |
|
305 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
306 |
if issuer_id >= 0: # if filtering by an issuer |
|
307 |
try: |
|
308 |
children = self.certificate_service.get_certificates_issued_by(issuer_id) # get his children |
|
309 |
except CertificateNotFoundException: # if id does not exist |
|
310 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
311 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
312 |
|
|
313 |
certs = [child for child in children if child.type_id in targets] |
|
314 |
|
|
315 |
elif len(targets) == TREE_NODE_TYPE_COUNT: # = 3 -> root node, |
|
316 |
# intermediate node, |
|
317 |
# or leaf node |
|
318 |
|
|
319 |
# if filtering did not change the |
|
320 |
# targets, |
|
321 |
certs = self.certificate_service.get_certificates() # fetch everything |
|
322 |
else: # otherwise fetch targets only |
|
323 |
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets))) |
|
390 | 324 |
else: |
391 |
certs = self.certificate_service.get_certificates_filter( |
|
392 |
target_types=target_types, |
|
393 |
target_usages=target_usages, |
|
394 |
target_cn_substring=target_cn_substring, |
|
395 |
page=page, |
|
396 |
per_page=per_page |
|
397 |
) |
|
325 |
certs = self.certificate_service.get_certificates() # if no params, fetch everything |
|
398 | 326 |
|
399 | 327 |
if certs is None: |
400 | 328 |
Logger.error(f"Internal server error (unknown origin).") |
... | ... | |
402 | 330 |
elif len(certs) == 0: |
403 | 331 |
# TODO check log level |
404 | 332 |
Logger.warning(f"No such certificate found (empty list).") |
405 |
return {"success": True, "data": []}, C_SUCCESS
|
|
333 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
|
406 | 334 |
else: |
407 | 335 |
ret = [] |
408 | 336 |
for c in certs: |
... | ... | |
569 | 497 |
COMMON_NAME: c.common_name, |
570 | 498 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
571 | 499 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
572 |
USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
|
|
500 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
573 | 501 |
ISSUER: { |
574 | 502 |
ID: c_issuer.certificate_id, |
575 | 503 |
COMMON_NAME: c_issuer.common_name |
... | ... | |
595 | 523 |
SUBJECT: subj.to_dict(), |
596 | 524 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
597 | 525 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
598 |
USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
|
|
526 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
599 | 527 |
CA: c_issuer.certificate_id |
600 | 528 |
} |
601 | 529 |
|
... | ... | |
694 | 622 |
Logger.error(f"Internal server error (unknown origin).") |
695 | 623 |
return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR |
696 | 624 |
|
697 |
return {"success": True, "data": "The certificate and its descendants have been successfully deleted."} |
|
698 |
|
|
699 |
def generate_certificate_pkcs_identity(self, id): |
|
700 |
""" |
|
701 |
Generates a PKCS12 identity (including the chain of trust) of the certificate given by the specified ID. |
|
702 |
Response is of application/x-pkcs12 type. |
|
703 |
|
|
704 |
:param id: ID of a certificate whose PKCS12 identity should be generated |
|
705 |
:type id: int |
|
706 |
|
|
707 |
:rtype: Response |
|
708 |
""" |
|
709 |
|
|
710 |
Logger.info(f"\n\t{request.referrer}" |
|
711 |
f"\n\t{request.method} {request.path} {request.scheme}" |
|
712 |
f"\n\tCertificate ID = {id}") |
|
713 |
|
|
714 |
# try to parse the supplied ID |
|
715 |
try: |
|
716 |
v = int(id) |
|
717 |
except ValueError: |
|
718 |
Logger.error(f"Invalid request, wrong parameters 'id'[{id}] (expected integer).") |
|
719 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
720 |
|
|
721 |
# find a certificate using the given ID |
|
722 |
cert = self.certificate_service.get_certificate(v) |
|
723 |
|
|
724 |
if request.is_json: # accept JSON only |
|
725 |
body = request.get_json() |
|
726 |
|
|
727 |
# check whether the request is well formed meaning that it contains all required fields |
|
728 |
if NAME not in body.keys(): |
|
729 |
return E_IDENTITY_NAME_NOT_SPECIFIED, C_BAD_REQUEST |
|
730 |
|
|
731 |
if PASSWORD not in body.keys(): |
|
732 |
return E_IDENTITY_PASSWORD_NOT_SPECIFIED, C_BAD_REQUEST |
|
733 |
|
|
734 |
# parse required fields from the request |
|
735 |
identity_name = body[NAME] |
|
736 |
identity_password = body[PASSWORD] |
|
737 |
|
|
738 |
# check whether a certificated specified by the given ID exists |
|
739 |
if cert is None: |
|
740 |
Logger.error(f"No such certificate found 'ID = {v}'.") |
|
741 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND |
|
742 |
else: |
|
743 |
# try to load it's private key |
|
744 |
key = self.key_service.get_key(cert.private_key_id) |
|
745 |
if key is None: |
|
746 |
Logger.error( |
|
747 |
f"The private key 'ID = {cert.private_key_id}'of the certificate 'ID = {cert.certificate_id}' does not exist.") |
|
748 |
return E_NO_CERTIFICATES_FOUND, C_INTERNAL_SERVER_ERROR |
|
749 |
else: |
|
750 |
# generate PKCS12 identity |
|
751 |
identity_byte_array = self.certificate_service.generate_pkcs_identity(cert, key, |
|
752 |
identity_name, |
|
753 |
identity_password) |
|
754 |
return Response(identity_byte_array, mimetype='application/x-pkcs12') |
|
625 |
return {"success": True, "data": "The certificate and its descendants have been successfully deleted."} |
Také k dispozici: Unified diff
[Merge conflict] - reverted controller version from #8705