Projekt

Obecné

Profil

« Předchozí | Další » 

Revize d0471de8

Přidáno uživatelem Michal Seják před téměř 4 roky(ů)

Re #8702 - Reformed CertController's `get_list_of_certs` method, improved filtering logic.

Zobrazit rozdíly:

src/controllers/certificates_controller.py
24 24
TREE_NODE_TYPE_COUNT = 3
25 25

  
26 26
FILTERING = "filtering"
27
PAGE = "page"
28
PER_PAGE = "per_page"
27 29
ISSUER = "issuer"
28 30
US = "usage"
29 31
NOT_AFTER = "notAfter"
......
33 35
USAGE = "usage"
34 36
SUBJECT = "subject"
35 37
VALIDITY_DAYS = "validityDays"
36
CA = "CA"
38
TYPE = "type"
37 39
ISSUED_BY = "issuedby"
38 40
STATUS = "status"
39 41
REASON = "reason"
......
52 54

  
53 55

  
54 56
class CertController:
55
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
56
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
57
    USAGE_KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
58
    INVERSE_USAGE_KEY_MAP = {k: v for v, k in USAGE_KEY_MAP.items()}
59
    FILTERING_TYPE_KEY_MAP = {'root': ROOT_CA_ID, 'inter': INTERMEDIATE_CA_ID, 'end': CERTIFICATE_ID}
60
    # INVERSE_FILTERING_TYPE_KEY_MAP = {k: v for v, k in FILTERING_TYPE_KEY_MAP.items()}
61

  
57 62

  
58 63
    @inject
59 64
    def __init__(self, certificate_service: CertificateService, key_service: KeyService):
......
103 108
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
104 109

  
105 110
            for k, v in body[USAGE].items():                                        # for each usage
106
                if k not in CertController.KEY_MAP:                                 # check that it is a valid usage
111
                if k not in CertController.USAGE_KEY_MAP:                                 # check that it is a valid usage
107 112
                    Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{k}].")
108 113
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
109
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
114
                usages_dict[CertController.USAGE_KEY_MAP[k]] = v                          # otherwise translate key and set
110 115

  
111 116
            key = self.key_service.create_new_key()                                      # TODO pass key
112 117

  
113
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
118
            if TYPE not in body or body[TYPE] is None:                                  # if issuer omitted (legal) or none
114 119
                cert = self.certificate_service.create_root_ca(                          # create a root CA
115 120
                    key,
116 121
                    subject,
......
118 123
                    days=body[VALIDITY_DAYS]
119 124
                )
120 125
            else:
121
                issuer = self.certificate_service.get_certificate(body[CA])              # get base issuer info
126
                issuer = self.certificate_service.get_certificate(body[TYPE])              # get base issuer info
122 127

  
123 128
                if issuer is None:                                                  # if such issuer does not exist
124 129
                    Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.")
......
240 245
        Logger.info(f"\n\t{request.referrer}"
241 246
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
242 247

  
243
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
244
        issuer_id = -1
245 248

  
246 249
        # the filtering parameter can be read as URL argument or as a request body
247
        if request.is_json or FILTERING in request.args.keys():                     # if the request carries JSON data
248
            if request.is_json:
249
                data = request.get_json()                                           # get it
250
            else:
251
                try:
252
                    data = {FILTERING: json.loads(request.args[FILTERING])}
253
                except JSONDecodeError:
254
                    Logger.error(f"The request must be JSON-formatted.")
255
                    return E_NOT_JSON_FORMAT, C_BAD_REQUEST
250
        if request.is_json:
251
            data = request.get_json()
252
        else:
253
            data = {}
256 254

  
257
            Logger.info(f"\n\tRequest body:"
258
                        f"\n{dict_to_string(data)}")
259

  
260
            if FILTERING in data:                                                   # if the 'filtering' field exists
261
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
262
                    if CA in data[FILTERING]:                                       # containing 'CA'
263
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
264
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
265
                                targets.remove(CERTIFICATE_ID)
266
                            else:
267
                                targets.remove(ROOT_CA_ID)
268
                                targets.remove(INTERMEDIATE_CA_ID)
269
                        else:
270
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{CA}'.")
255
        if FILTERING in request.args.keys():
256
            try:
257
                data[FILTERING] = json.loads(request.args[FILTERING])
258
            except JSONDecodeError:
259
                Logger.error(f"The request must be JSON-formatted.")
260
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
261

  
262
        if PAGE in request.args.keys():
263
            try:
264
                data[PAGE] = json.loads(request.args[PAGE])
265
            except JSONDecodeError:
266
                Logger.error(f"The request must be JSON-formatted.")
267
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
268

  
269
        if PER_PAGE in request.args.keys():
270
            try:
271
                data[PER_PAGE] = json.loads(request.args[PER_PAGE])
272
            except JSONDecodeError:
273
                Logger.error(f"The request must be JSON-formatted.")
274
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
275

  
276
        Logger.info(f"\n\tRequest body:"
277
                    f"\n{dict_to_string(data)}")
278

  
279
        target_types = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
280
        target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()}
281
        target_cn_substring = None
282
        issuer_id = -1
283
        if PER_PAGE in data:
284
            page = data.get(PAGE, 0)
285
            per_page = data[PER_PAGE]
286
        else:
287
            page = None
288
            per_page = None
289

  
290
        if FILTERING in data:                                                   # if the 'filtering' field exists
291
            if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
292

  
293
                # noinspection DuplicatedCode
294
                if TYPE in data[FILTERING]:                                     # containing 'type'
295
                    if isinstance(data[FILTERING][TYPE], list):                 # which is a 'list',
296
                                                                                # map every field to id
297
                        try:
298
                            target_types = {CertController.FILTERING_TYPE_KEY_MAP[v] for v in data[FILTERING][TYPE]}
299
                        except KeyError as e:
300
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.")
271 301
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
272
                    if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
273
                        if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
274
                            issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
275
                else:
276
                    Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
277
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
278
            if issuer_id >= 0:                                                      # if filtering by an issuer
279
                try:
280
                    children = self.certificate_service.get_certificates_issued_by(issuer_id)  # get his children
281
                except CertificateNotFoundException:                                # if id does not exist
282
                    Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
283
                    return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                     # throw
302
                    else:
303
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}'.")
304
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
305

  
306
                # noinspection DuplicatedCode
307
                if USAGE in data[FILTERING]:                                    # containing 'usage'
308
                    if isinstance(data[FILTERING][USAGE], list):                # which is a 'list',
309
                                                                                # map every field to id
310
                        try:
311
                            target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]}
312
                        except KeyError as e:
313
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.")
314
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
315
                    else:
316
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.")
317
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
284 318

  
285
                certs = [child for child in children if child.type_id in targets]
319
                if COMMON_NAME in data[FILTERING]:                              # containing 'CN'
320
                    if isinstance(data[FILTERING][COMMON_NAME], str):           # which is a 'str'
321
                        target_cn_substring = data[FILTERING][COMMON_NAME]
322
                    else:
323
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{COMMON_NAME}'.")
324
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
286 325

  
287
            elif len(targets) == TREE_NODE_TYPE_COUNT:                              # = 3 -> root node,
288
                                                                                    # intermediate node,
289
                                                                                    # or leaf node
326
                if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
327
                    if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
328
                        issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
290 329

  
291
                                                                                    # if filtering did not change the
292
                                                                                    # targets,
293
                certs = self.certificate_service.get_certificates()                 # fetch everything
294
            else:                                                                   # otherwise fetch targets only
295
                certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
330
            else:
331
                Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
332
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
333

  
334
            if issuer_id >= 0:                                                  # if filtering by an issuer
335
                try:
336
                                                                                # get his children, filtered
337
                    certs = self.certificate_service.get_certificates_issued_by_filter(
338
                        issuer_id=issuer_id,
339
                        target_types=target_types,
340
                        target_usages=target_usages,
341
                        target_cn_substring=target_cn_substring,
342
                        page=page,
343
                        per_page=per_page
344
                    )
345
                except CertificateNotFoundException:                            # if id does not exist
346
                    Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
347
                    return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                 # throw
348
            else:
349
                certs = self.certificate_service.get_certificates_filter(
350
                    target_types=target_types,
351
                    target_usages=target_usages,
352
                    target_cn_substring=target_cn_substring,
353
                    page=page,
354
                    per_page=per_page
355
                )
296 356
        else:
297
            certs = self.certificate_service.get_certificates()                     # if no params, fetch everything
357
            certs = self.certificate_service.get_certificates()
298 358

  
299 359
        if certs is None:
300 360
            Logger.error(f"Internal server error (unknown origin).")
......
469 529
            COMMON_NAME: c.common_name,
470 530
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
471 531
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
472
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
532
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
473 533
            ISSUER: {
474 534
                ID: c_issuer.certificate_id,
475 535
                COMMON_NAME: c_issuer.common_name
......
495 555
            SUBJECT: subj.to_dict(),
496 556
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
497 557
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
498
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
499
            CA: c_issuer.certificate_id
558
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
559
            TYPE: c_issuer.certificate_id
500 560
        }
501 561

  
502 562
    def get_private_key_of_a_certificate(self, id):

Také k dispozici: Unified diff