Projekt

Obecné

Profil

« Předchozí | Další » 

Revize da92e92a

Přidáno uživatelem Jan Pašek před téměř 4 roky(ů)

[Merge conflict] - reverted controller version from #8705

Zobrazit rozdíly:

src/controllers/certificates_controller.py
3 3
from itertools import chain
4 4
from json import JSONDecodeError
5 5

  
6
from flask import request, Response
6
from flask import request
7 7
from injector import inject
8 8

  
9 9
from src.constants import CA_ID, \
......
22 22
from src.utils.logger import Logger
23 23
from src.utils.util import dict_to_string
24 24

  
25
EXTENSIONS = "extensions"
26

  
27 25
TREE_NODE_TYPE_COUNT = 3
28 26
KEY_PEM = "key_pem"
29 27
PASSWORD = "password"
30 28
KEY = "key"
31 29
FILTERING = "filtering"
32
PAGE = "page"
33
PER_PAGE = "per_page"
34 30
ISSUER = "issuer"
35 31
US = "usage"
36 32
NOT_AFTER = "notAfter"
37 33
NOT_BEFORE = "notBefore"
38 34
COMMON_NAME = "CN"
39 35
ID = "id"
40
CA = "CA"
41 36
USAGE = "usage"
42 37
SUBJECT = "subject"
43 38
VALIDITY_DAYS = "validityDays"
44
TYPE = "type"
39
CA = "CA"
45 40
ISSUED_BY = "issuedby"
46 41
STATUS = "status"
47 42
REASON = "reason"
48 43
REASON_UNDEFINED = "unspecified"
49
NAME = "name"
50
PASSWORD = "password"
51 44

  
52 45
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
53 46
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
......
60 53
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
61 54
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
62 55
E_WRONG_PASSWORD = {"success": False, "data": "The provided passphrase does not match the provided key."}
63
E_IDENTITY_NAME_NOT_SPECIFIED = {"success": False, "data": "Invalid request, missing identity name."}
64
E_IDENTITY_PASSWORD_NOT_SPECIFIED = {"success": False, "data": "Invalid request, missing identity password."}
65 56

  
66 57

  
67 58
class CertController:
68
    USAGE_KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
69
    INVERSE_USAGE_KEY_MAP = {k: v for v, k in USAGE_KEY_MAP.items()}
70
    FILTERING_TYPE_KEY_MAP = {'root': ROOT_CA_ID, 'inter': INTERMEDIATE_CA_ID, 'end': CERTIFICATE_ID}
71
    # INVERSE_FILTERING_TYPE_KEY_MAP = {k: v for v, k in FILTERING_TYPE_KEY_MAP.items()}
72

  
59
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
60
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
73 61

  
74 62
    @inject
75 63
    def __init__(self, certificate_service: CertificateService, key_service: KeyService):
......
280 268
        Logger.info(f"\n\t{request.referrer}"
281 269
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
282 270

  
283

  
284
        # the filtering parameter can be read as URL argument or as a request body
285
        if request.is_json:
286
            data = request.get_json()
287
        else:
288
            data = {}
289

  
290
        if FILTERING in request.args.keys():
291
            try:
292
                data[FILTERING] = json.loads(request.args[FILTERING])
293
            except JSONDecodeError:
294
                Logger.error(f"The request must be JSON-formatted.")
295
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
296

  
297
        if PAGE in request.args.keys():
298
            try:
299
                data[PAGE] = json.loads(request.args[PAGE])
300
            except JSONDecodeError:
301
                Logger.error(f"The request must be JSON-formatted.")
302
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
303

  
304
        if PER_PAGE in request.args.keys():
305
            try:
306
                data[PER_PAGE] = json.loads(request.args[PER_PAGE])
307
            except JSONDecodeError:
308
                Logger.error(f"The request must be JSON-formatted.")
309
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
310

  
311
        Logger.info(f"\n\tRequest body:"
312
                    f"\n{dict_to_string(data)}")
313

  
314
        target_types = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
315
        target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()}
316
        target_cn_substring = None
271
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
317 272
        issuer_id = -1
318 273

  
319
        unfiltered = True
320

  
321
        if PER_PAGE in data:
322
            unfiltered = False
323
            page = data.get(PAGE, 0)
324
            per_page = data[PER_PAGE]
325
        else:
326
            page = None
327
            per_page = None
328

  
329
        if FILTERING in data:                                                   # if the 'filtering' field exists
330
            unfiltered = False
331
            if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
332

  
333
                # noinspection DuplicatedCode
334
                if TYPE in data[FILTERING]:                                     # containing 'type'
335
                    if isinstance(data[FILTERING][TYPE], list):                 # which is a 'list',
336
                                                                                # map every field to id
337
                        try:
338
                            target_types = {CertController.FILTERING_TYPE_KEY_MAP[v] for v in data[FILTERING][TYPE]}
339
                        except KeyError as e:
340
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.")
341
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
342
                    else:
343
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}'.")
344
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
345

  
346
                # noinspection DuplicatedCode
347
                if USAGE in data[FILTERING]:                                    # containing 'usage'
348
                    if isinstance(data[FILTERING][USAGE], list):                # which is a 'list',
349
                                                                                # map every field to id
350
                        try:
351
                            target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]}
352
                        except KeyError as e:
353
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}' - '{e}'.")
354
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
355
                    else:
356
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.")
357
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
358

  
359
                if COMMON_NAME in data[FILTERING]:                              # containing 'CN'
360
                    if isinstance(data[FILTERING][COMMON_NAME], str):           # which is a 'str'
361
                        target_cn_substring = data[FILTERING][COMMON_NAME]
362
                    else:
363
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{COMMON_NAME}'.")
364
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
365

  
366
                if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
367
                    if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
368
                        issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
369

  
274
        # the filtering parameter can be read as URL argument or as a request body
275
        if request.is_json or FILTERING in request.args.keys():                     # if the request carries JSON data
276
            if request.is_json:
277
                data = request.get_json()                                           # get it
370 278
            else:
371
                Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
372
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
279
                try:
280
                    data = {FILTERING: json.loads(request.args[FILTERING])}
281
                except JSONDecodeError:
282
                    Logger.error(f"The request must be JSON-formatted.")
283
                    return E_NOT_JSON_FORMAT, C_BAD_REQUEST
373 284

  
374
        if unfiltered:                                                      # if not filtering
375
            certs = self.certificate_service.get_certificates()
376
        elif issuer_id >= 0:                                                # if filtering by an issuer
377
            try:
378
                                                                            # get his children, filtered
379
                certs = self.certificate_service.get_certificates_issued_by_filter(
380
                    issuer_id=issuer_id,
381
                    target_types=target_types,
382
                    target_usages=target_usages,
383
                    target_cn_substring=target_cn_substring,
384
                    page=page,
385
                    per_page=per_page
386
                )
387
            except CertificateNotFoundException:                            # if id does not exist
388
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
389
                return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                 # throw
285
            Logger.info(f"\n\tRequest body:"
286
                        f"\n{dict_to_string(data)}")
287

  
288
            if FILTERING in data:                                                   # if the 'filtering' field exists
289
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
290
                    if CA in data[FILTERING]:                                       # containing 'CA'
291
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
292
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
293
                                targets.remove(CERTIFICATE_ID)
294
                            else:
295
                                targets.remove(ROOT_CA_ID)
296
                                targets.remove(INTERMEDIATE_CA_ID)
297
                        else:
298
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{CA}'.")
299
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
300
                    if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
301
                        if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
302
                            issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
303
                else:
304
                    Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
305
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
306
            if issuer_id >= 0:                                                      # if filtering by an issuer
307
                try:
308
                    children = self.certificate_service.get_certificates_issued_by(issuer_id)  # get his children
309
                except CertificateNotFoundException:                                # if id does not exist
310
                    Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
311
                    return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                     # throw
312

  
313
                certs = [child for child in children if child.type_id in targets]
314

  
315
            elif len(targets) == TREE_NODE_TYPE_COUNT:                              # = 3 -> root node,
316
                                                                                    # intermediate node,
317
                                                                                    # or leaf node
318

  
319
                                                                                    # if filtering did not change the
320
                                                                                    # targets,
321
                certs = self.certificate_service.get_certificates()                 # fetch everything
322
            else:                                                                   # otherwise fetch targets only
323
                certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
390 324
        else:
391
            certs = self.certificate_service.get_certificates_filter(
392
                target_types=target_types,
393
                target_usages=target_usages,
394
                target_cn_substring=target_cn_substring,
395
                page=page,
396
                per_page=per_page
397
            )
325
            certs = self.certificate_service.get_certificates()                     # if no params, fetch everything
398 326

  
399 327
        if certs is None:
400 328
            Logger.error(f"Internal server error (unknown origin).")
......
402 330
        elif len(certs) == 0:
403 331
            # TODO check log level
404 332
            Logger.warning(f"No such certificate found (empty list).")
405
            return {"success": True, "data": []}, C_SUCCESS
333
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
406 334
        else:
407 335
            ret = []
408 336
            for c in certs:
......
569 497
            COMMON_NAME: c.common_name,
570 498
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
571 499
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
572
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
500
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
573 501
            ISSUER: {
574 502
                ID: c_issuer.certificate_id,
575 503
                COMMON_NAME: c_issuer.common_name
......
595 523
            SUBJECT: subj.to_dict(),
596 524
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
597 525
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
598
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
526
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
599 527
            CA: c_issuer.certificate_id
600 528
        }
601 529

  
......
694 622
            Logger.error(f"Internal server error (unknown origin).")
695 623
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
696 624

  
697
        return {"success": True, "data": "The certificate and its descendants have been successfully deleted."}
698

  
699
    def generate_certificate_pkcs_identity(self, id):
700
        """
701
        Generates a PKCS12 identity (including the chain of trust) of the certificate given by the specified ID.
702
        Response is of application/x-pkcs12 type.
703

  
704
        :param id: ID of a certificate whose PKCS12 identity should be generated
705
        :type id: int
706

  
707
        :rtype: Response
708
        """
709

  
710
        Logger.info(f"\n\t{request.referrer}"
711
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
712
                    f"\n\tCertificate ID = {id}")
713

  
714
        # try to parse the supplied ID
715
        try:
716
            v = int(id)
717
        except ValueError:
718
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}] (expected integer).")
719
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
720

  
721
        # find a certificate using the given ID
722
        cert = self.certificate_service.get_certificate(v)
723

  
724
        if request.is_json:                                                         # accept JSON only
725
            body = request.get_json()
726

  
727
            # check whether the request is well formed meaning that it contains all required fields
728
            if NAME not in body.keys():
729
                return E_IDENTITY_NAME_NOT_SPECIFIED, C_BAD_REQUEST
730

  
731
            if PASSWORD not in body.keys():
732
                return E_IDENTITY_PASSWORD_NOT_SPECIFIED, C_BAD_REQUEST
733

  
734
            # parse required fields from the request
735
            identity_name = body[NAME]
736
            identity_password = body[PASSWORD]
737

  
738
            # check whether a certificated specified by the given ID exists
739
            if cert is None:
740
                Logger.error(f"No such certificate found 'ID = {v}'.")
741
                return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
742
            else:
743
                # try to load it's private key
744
                key = self.key_service.get_key(cert.private_key_id)
745
                if key is None:
746
                    Logger.error(
747
                        f"The private key 'ID = {cert.private_key_id}'of the certificate 'ID = {cert.certificate_id}' does not exist.")
748
                    return E_NO_CERTIFICATES_FOUND, C_INTERNAL_SERVER_ERROR
749
                else:
750
                    # generate PKCS12 identity
751
                    identity_byte_array = self.certificate_service.generate_pkcs_identity(cert, key,
752
                                                                                          identity_name,
753
                                                                                          identity_password)
754
                    return Response(identity_byte_array, mimetype='application/x-pkcs12')
625
        return {"success": True, "data": "The certificate and its descendants have been successfully deleted."}

Také k dispozici: Unified diff