Revize d0471de8
Přidáno uživatelem Michal Seják před téměř 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
24 | 24 |
TREE_NODE_TYPE_COUNT = 3 |
25 | 25 |
|
26 | 26 |
FILTERING = "filtering" |
27 |
PAGE = "page" |
|
28 |
PER_PAGE = "per_page" |
|
27 | 29 |
ISSUER = "issuer" |
28 | 30 |
US = "usage" |
29 | 31 |
NOT_AFTER = "notAfter" |
... | ... | |
33 | 35 |
USAGE = "usage" |
34 | 36 |
SUBJECT = "subject" |
35 | 37 |
VALIDITY_DAYS = "validityDays" |
36 |
CA = "CA"
|
|
38 |
TYPE = "type"
|
|
37 | 39 |
ISSUED_BY = "issuedby" |
38 | 40 |
STATUS = "status" |
39 | 41 |
REASON = "reason" |
... | ... | |
52 | 54 |
|
53 | 55 |
|
54 | 56 |
class CertController: |
55 |
KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
56 |
INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()} |
|
57 |
USAGE_KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
58 |
INVERSE_USAGE_KEY_MAP = {k: v for v, k in USAGE_KEY_MAP.items()} |
|
59 |
FILTERING_TYPE_KEY_MAP = {'root': ROOT_CA_ID, 'inter': INTERMEDIATE_CA_ID, 'end': CERTIFICATE_ID} |
|
60 |
# INVERSE_FILTERING_TYPE_KEY_MAP = {k: v for v, k in FILTERING_TYPE_KEY_MAP.items()} |
|
61 |
|
|
57 | 62 |
|
58 | 63 |
@inject |
59 | 64 |
def __init__(self, certificate_service: CertificateService, key_service: KeyService): |
... | ... | |
103 | 108 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
104 | 109 |
|
105 | 110 |
for k, v in body[USAGE].items(): # for each usage |
106 |
if k not in CertController.KEY_MAP: # check that it is a valid usage |
|
111 |
if k not in CertController.USAGE_KEY_MAP: # check that it is a valid usage
|
|
107 | 112 |
Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{k}].") |
108 | 113 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST # and throw if it is not |
109 |
usages_dict[CertController.KEY_MAP[k]] = v # otherwise translate key and set |
|
114 |
usages_dict[CertController.USAGE_KEY_MAP[k]] = v # otherwise translate key and set
|
|
110 | 115 |
|
111 | 116 |
key = self.key_service.create_new_key() # TODO pass key |
112 | 117 |
|
113 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none
|
|
118 |
if TYPE not in body or body[TYPE] is None: # if issuer omitted (legal) or none
|
|
114 | 119 |
cert = self.certificate_service.create_root_ca( # create a root CA |
115 | 120 |
key, |
116 | 121 |
subject, |
... | ... | |
118 | 123 |
days=body[VALIDITY_DAYS] |
119 | 124 |
) |
120 | 125 |
else: |
121 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info
|
|
126 |
issuer = self.certificate_service.get_certificate(body[TYPE]) # get base issuer info
|
|
122 | 127 |
|
123 | 128 |
if issuer is None: # if such issuer does not exist |
124 | 129 |
Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.") |
... | ... | |
240 | 245 |
Logger.info(f"\n\t{request.referrer}" |
241 | 246 |
f"\n\t{request.method} {request.path} {request.scheme}") |
242 | 247 |
|
243 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} # all targets |
|
244 |
issuer_id = -1 |
|
245 | 248 |
|
246 | 249 |
# the filtering parameter can be read as URL argument or as a request body |
247 |
if request.is_json or FILTERING in request.args.keys(): # if the request carries JSON data |
|
248 |
if request.is_json: |
|
249 |
data = request.get_json() # get it |
|
250 |
else: |
|
251 |
try: |
|
252 |
data = {FILTERING: json.loads(request.args[FILTERING])} |
|
253 |
except JSONDecodeError: |
|
254 |
Logger.error(f"The request must be JSON-formatted.") |
|
255 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
250 |
if request.is_json: |
|
251 |
data = request.get_json() |
|
252 |
else: |
|
253 |
data = {} |
|
256 | 254 |
|
257 |
Logger.info(f"\n\tRequest body:" |
|
258 |
f"\n{dict_to_string(data)}") |
|
259 |
|
|
260 |
if FILTERING in data: # if the 'filtering' field exists |
|
261 |
if isinstance(data[FILTERING], dict): # and it is also a 'dict' |
|
262 |
if CA in data[FILTERING]: # containing 'CA' |
|
263 |
if isinstance(data[FILTERING][CA], bool): # which is a 'bool', |
|
264 |
if data[FILTERING][CA]: # then filter according to 'CA'. |
|
265 |
targets.remove(CERTIFICATE_ID) |
|
266 |
else: |
|
267 |
targets.remove(ROOT_CA_ID) |
|
268 |
targets.remove(INTERMEDIATE_CA_ID) |
|
269 |
else: |
|
270 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{CA}'.") |
|
255 |
if FILTERING in request.args.keys(): |
|
256 |
try: |
|
257 |
data[FILTERING] = json.loads(request.args[FILTERING]) |
|
258 |
except JSONDecodeError: |
|
259 |
Logger.error(f"The request must be JSON-formatted.") |
|
260 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
261 |
|
|
262 |
if PAGE in request.args.keys(): |
|
263 |
try: |
|
264 |
data[PAGE] = json.loads(request.args[PAGE]) |
|
265 |
except JSONDecodeError: |
|
266 |
Logger.error(f"The request must be JSON-formatted.") |
|
267 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
268 |
|
|
269 |
if PER_PAGE in request.args.keys(): |
|
270 |
try: |
|
271 |
data[PER_PAGE] = json.loads(request.args[PER_PAGE]) |
|
272 |
except JSONDecodeError: |
|
273 |
Logger.error(f"The request must be JSON-formatted.") |
|
274 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST |
|
275 |
|
|
276 |
Logger.info(f"\n\tRequest body:" |
|
277 |
f"\n{dict_to_string(data)}") |
|
278 |
|
|
279 |
target_types = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} |
|
280 |
target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()} |
|
281 |
target_cn_substring = None |
|
282 |
issuer_id = -1 |
|
283 |
if PER_PAGE in data: |
|
284 |
page = data.get(PAGE, 0) |
|
285 |
per_page = data[PER_PAGE] |
|
286 |
else: |
|
287 |
page = None |
|
288 |
per_page = None |
|
289 |
|
|
290 |
if FILTERING in data: # if the 'filtering' field exists |
|
291 |
if isinstance(data[FILTERING], dict): # and it is also a 'dict' |
|
292 |
|
|
293 |
# noinspection DuplicatedCode |
|
294 |
if TYPE in data[FILTERING]: # containing 'type' |
|
295 |
if isinstance(data[FILTERING][TYPE], list): # which is a 'list', |
|
296 |
# map every field to id |
|
297 |
try: |
|
298 |
target_types = {CertController.FILTERING_TYPE_KEY_MAP[v] for v in data[FILTERING][TYPE]} |
|
299 |
except KeyError as e: |
|
300 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.") |
|
271 | 301 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
272 |
if ISSUED_BY in data[FILTERING]: # containing 'issuedby' |
|
273 |
if isinstance(data[FILTERING][ISSUED_BY], int): # which is an 'int' |
|
274 |
issuer_id = data[FILTERING][ISSUED_BY] # then get its children only |
|
275 |
else: |
|
276 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.") |
|
277 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
278 |
if issuer_id >= 0: # if filtering by an issuer |
|
279 |
try: |
|
280 |
children = self.certificate_service.get_certificates_issued_by(issuer_id) # get his children |
|
281 |
except CertificateNotFoundException: # if id does not exist |
|
282 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
283 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
302 |
else: |
|
303 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}'.") |
|
304 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
305 |
|
|
306 |
# noinspection DuplicatedCode |
|
307 |
if USAGE in data[FILTERING]: # containing 'usage' |
|
308 |
if isinstance(data[FILTERING][USAGE], list): # which is a 'list', |
|
309 |
# map every field to id |
|
310 |
try: |
|
311 |
target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]} |
|
312 |
except KeyError as e: |
|
313 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.") |
|
314 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
315 |
else: |
|
316 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.") |
|
317 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
284 | 318 |
|
285 |
certs = [child for child in children if child.type_id in targets] |
|
319 |
if COMMON_NAME in data[FILTERING]: # containing 'CN' |
|
320 |
if isinstance(data[FILTERING][COMMON_NAME], str): # which is a 'str' |
|
321 |
target_cn_substring = data[FILTERING][COMMON_NAME] |
|
322 |
else: |
|
323 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{COMMON_NAME}'.") |
|
324 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
286 | 325 |
|
287 |
elif len(targets) == TREE_NODE_TYPE_COUNT: # = 3 -> root node,
|
|
288 |
# intermediate node,
|
|
289 |
# or leaf node
|
|
326 |
if ISSUED_BY in data[FILTERING]: # containing 'issuedby'
|
|
327 |
if isinstance(data[FILTERING][ISSUED_BY], int): # which is an 'int'
|
|
328 |
issuer_id = data[FILTERING][ISSUED_BY] # then get its children only
|
|
290 | 329 |
|
291 |
# if filtering did not change the |
|
292 |
# targets, |
|
293 |
certs = self.certificate_service.get_certificates() # fetch everything |
|
294 |
else: # otherwise fetch targets only |
|
295 |
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets))) |
|
330 |
else: |
|
331 |
Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.") |
|
332 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
|
333 |
|
|
334 |
if issuer_id >= 0: # if filtering by an issuer |
|
335 |
try: |
|
336 |
# get his children, filtered |
|
337 |
certs = self.certificate_service.get_certificates_issued_by_filter( |
|
338 |
issuer_id=issuer_id, |
|
339 |
target_types=target_types, |
|
340 |
target_usages=target_usages, |
|
341 |
target_cn_substring=target_cn_substring, |
|
342 |
page=page, |
|
343 |
per_page=per_page |
|
344 |
) |
|
345 |
except CertificateNotFoundException: # if id does not exist |
|
346 |
Logger.error(f"No such certificate found 'ID = {issuer_id}'.") |
|
347 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND # throw |
|
348 |
else: |
|
349 |
certs = self.certificate_service.get_certificates_filter( |
|
350 |
target_types=target_types, |
|
351 |
target_usages=target_usages, |
|
352 |
target_cn_substring=target_cn_substring, |
|
353 |
page=page, |
|
354 |
per_page=per_page |
|
355 |
) |
|
296 | 356 |
else: |
297 |
certs = self.certificate_service.get_certificates() # if no params, fetch everything
|
|
357 |
certs = self.certificate_service.get_certificates() |
|
298 | 358 |
|
299 | 359 |
if certs is None: |
300 | 360 |
Logger.error(f"Internal server error (unknown origin).") |
... | ... | |
469 | 529 |
COMMON_NAME: c.common_name, |
470 | 530 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
471 | 531 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
472 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
532 |
USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
|
|
473 | 533 |
ISSUER: { |
474 | 534 |
ID: c_issuer.certificate_id, |
475 | 535 |
COMMON_NAME: c_issuer.common_name |
... | ... | |
495 | 555 |
SUBJECT: subj.to_dict(), |
496 | 556 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
497 | 557 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
498 |
USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()}, |
|
499 |
CA: c_issuer.certificate_id
|
|
558 |
USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
|
|
559 |
TYPE: c_issuer.certificate_id
|
|
500 | 560 |
} |
501 | 561 |
|
502 | 562 |
def get_private_key_of_a_certificate(self, id): |
Také k dispozici: Unified diff
Re #8702 - Reformed CertController's `get_list_of_certs` method, improved filtering logic.