Projekt

Obecné

Profil

Stáhnout (28.7 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
from datetime import datetime
3
from itertools import chain
4
from json import JSONDecodeError
5

    
6
from flask import request
7
from injector import inject
8

    
9
from src.constants import CA_ID, \
10
    SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
11
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
12
from src.controllers.return_codes import *
13
from src.exceptions.database_exception import DatabaseException
14
from src.exceptions.unknown_exception import UnknownException
15
from src.model.subject import Subject
16
from src.services.certificate_service import CertificateService, RevocationReasonInvalidException, \
17
    CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException, \
18
    CertificateCannotBeSetToValid
19
#  responsibility.
20
from src.services.key_service import KeyService
21
from src.utils.logger import Logger
22
from src.utils.util import dict_to_string
23

    
24
TREE_NODE_TYPE_COUNT = 3
25

    
26
FILTERING = "filtering"
27
PAGE = "page"
28
PER_PAGE = "per_page"
29
ISSUER = "issuer"
30
US = "usage"
31
NOT_AFTER = "notAfter"
32
NOT_BEFORE = "notBefore"
33
COMMON_NAME = "CN"
34
ID = "id"
35
CA = "CA"
36
USAGE = "usage"
37
SUBJECT = "subject"
38
VALIDITY_DAYS = "validityDays"
39
TYPE = "type"
40
ISSUED_BY = "issuedby"
41
STATUS = "status"
42
REASON = "reason"
43
REASON_UNDEFINED = "unspecified"
44

    
45
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
46
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
47
E_NO_CERTIFICATE_ALREADY_REVOKED = {"success": False, "data": "Certificate is already revoked."}
48
E_NO_CERT_PRIVATE_KEY_FOUND = {"success": False,
49
                               "data": "Internal server error (certificate's private key cannot be found)."}
50
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
51
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
52
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
53
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
54
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
55

    
56

    
57
class CertController:
58
    USAGE_KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
59
    INVERSE_USAGE_KEY_MAP = {k: v for v, k in USAGE_KEY_MAP.items()}
60
    FILTERING_TYPE_KEY_MAP = {'root': ROOT_CA_ID, 'inter': INTERMEDIATE_CA_ID, 'end': CERTIFICATE_ID}
61
    # INVERSE_FILTERING_TYPE_KEY_MAP = {k: v for v, k in FILTERING_TYPE_KEY_MAP.items()}
62

    
63

    
64
    @inject
65
    def __init__(self, certificate_service: CertificateService, key_service: KeyService):
66
        self.certificate_service = certificate_service
67
        self.key_service = key_service
68

    
69
    def create_certificate(self):
70
        """create new certificate
71

    
72
        Create a new certificate based on given information
73

    
74
        :param body: Certificate data to be created
75
        :type body: dict | bytes
76

    
77
        :rtype: CreatedResponse
78
        """
79

    
80
        Logger.info(f"\n\t{request.referrer}"
81
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
82

    
83
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
84

    
85
        if request.is_json:                                                         # accept JSON only
86
            body = request.get_json()
87

    
88
            Logger.info(f"\n\tRequest body:"
89
                        f"\n{dict_to_string(body)}")
90

    
91
            if not all(k in body for k in required_keys):                           # verify that all keys are present
92
                Logger.error(f"Invalid request, missing parameters")
93
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
94

    
95
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
96
                Logger.error(f"Invalid request, wrong parameter '{VALIDITY_DAYS}'.")
97
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
98

    
99
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
100

    
101
            if subject is None:                                                     # if the format is incorrect
102
                Logger.error(f"Invalid request, wrong parameter '{SUBJECT}'.")
103
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
104

    
105
            usages_dict = {}
106

    
107
            if not isinstance(body[USAGE], dict):                                   # type checking
108
                Logger.error(f"Invalid request, wrong parameter '{USAGE}'.")
109
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
110

    
111
            for k, v in body[USAGE].items():                                        # for each usage
112
                if k not in CertController.USAGE_KEY_MAP:                                 # check that it is a valid usage
113
                    Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{k}].")
114
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
115
                usages_dict[CertController.USAGE_KEY_MAP[k]] = v                          # otherwise translate key and set
116

    
117
            key = self.key_service.create_new_key()                                      # TODO pass key
118

    
119
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
120
                cert = self.certificate_service.create_root_ca(                          # create a root CA
121
                    key,
122
                    subject,
123
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
124
                    days=body[VALIDITY_DAYS]
125
                )
126
            else:
127
                issuer = self.certificate_service.get_certificate(body[CA])              # get base issuer info
128

    
129
                if issuer is None:                                                  # if such issuer does not exist
130
                    Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.")
131
                    self.key_service.delete_key(key.private_key_id)                      # free
132
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
133

    
134
                issuer_key = self.key_service.get_key(issuer.private_key_id)             # get issuer's key, which must exist
135

    
136
                if issuer_key is None:                                              # if it does not
137
                    Logger.error(f"Internal server error (corrupted database).")
138
                    self.key_service.delete_key(key.private_key_id)                      # free
139
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
140

    
141
                f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
142
                    self.certificate_service.create_end_cert
143

    
144
                # noinspection PyTypeChecker
145
                cert = f(                                                           # create inter CA or end cert
146
                    key,                                                            # according to whether 'CA' is among
147
                    subject,                                                        # the usages' fields
148
                    issuer,
149
                    issuer_key,
150
                    usages=usages_dict,
151
                    days=body[VALIDITY_DAYS]
152
                )
153

    
154
            if cert is not None:
155
                return {"success": True,
156
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
157
            else:                                                                   # if this fails, then
158
                Logger.error(f"Internal error: The certificate could not have been created.")
159
                self.key_service.delete_key(key.private_key_id)                          # free
160
                return {"success": False,                                           # and wonder what the cause is,
161
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
162
                                                                                    # as obj/None carries only one bit
163
                                                                                    # of error information
164
        else:
165
            Logger.error(f"The request must be JSON-formatted.")
166
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
167

    
168
    def get_certificate_by_id(self, id):
169
        """get certificate by ID
170

    
171
        Get certificate in PEM format by ID
172

    
173
        :param id: ID of a certificate to be queried
174
        :type id: dict | bytes
175

    
176
        :rtype: PemResponse
177
        """
178

    
179
        Logger.info(f"\n\t{request.referrer}"
180
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
181
                    f"\n\tCertificate ID = {id}")
182
        try:
183
            v = int(id)
184
        except ValueError:
185
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
186
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
187

    
188
        cert = self.certificate_service.get_certificate(v)
189

    
190
        if cert is None:
191
            Logger.error(f"No such certificate found 'ID = {v}'.")
192
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
193
        else:
194
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
195

    
196
    def get_certificate_details_by_id(self, id):
197
        """get certificate's details by ID
198

    
199
        Get certificate details by ID
200

    
201
        :param id: ID of a certificate whose details are to be queried
202
        :type id: dict | bytes
203

    
204
        :rtype: CertificateResponse
205
        """
206

    
207
        Logger.info(f"\n\t{request.referrer}"
208
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
209
                    f"\n\tCertificate ID = {id}")
210

    
211
        try:
212
            v = int(id)
213
        except ValueError:
214
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
215
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
216

    
217
        cert = self.certificate_service.get_certificate(v)
218

    
219
        if cert is None:
220
            Logger.error(f"No such certificate found 'ID = {v}'.")
221
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
222

    
223
        data = self.cert_to_dict_full(cert)
224
        if data is None:
225
            return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
226

    
227
        try:
228
            state = self.certificate_service.get_certificate_state(v)
229
            data["status"] = state
230
        except CertificateNotFoundException:
231
            Logger.error(f"No such certificate found 'ID = {id}'.")
232

    
233
        return {"success": True, "data": data}, C_SUCCESS
234

    
235
    def get_certificate_list(self):
236
        """get list of certificates
237

    
238
        Lists certificates based on provided filtering options
239

    
240
        :param filtering: Filter certificate type to be queried
241
        :type filtering: dict | bytes
242

    
243
        :rtype: CertificateListResponse
244
        """
245

    
246
        Logger.info(f"\n\t{request.referrer}"
247
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
248

    
249

    
250
        # the filtering parameter can be read as URL argument or as a request body
251
        if request.is_json:
252
            data = request.get_json()
253
        else:
254
            data = {}
255

    
256
        if FILTERING in request.args.keys():
257
            try:
258
                data[FILTERING] = json.loads(request.args[FILTERING])
259
            except JSONDecodeError:
260
                Logger.error(f"The request must be JSON-formatted.")
261
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
262

    
263
        if PAGE in request.args.keys():
264
            try:
265
                data[PAGE] = json.loads(request.args[PAGE])
266
            except JSONDecodeError:
267
                Logger.error(f"The request must be JSON-formatted.")
268
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
269

    
270
        if PER_PAGE in request.args.keys():
271
            try:
272
                data[PER_PAGE] = json.loads(request.args[PER_PAGE])
273
            except JSONDecodeError:
274
                Logger.error(f"The request must be JSON-formatted.")
275
                return E_NOT_JSON_FORMAT, C_BAD_REQUEST
276

    
277
        Logger.info(f"\n\tRequest body:"
278
                    f"\n{dict_to_string(data)}")
279

    
280
        target_types = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
281
        target_usages = {v for v in CertController.INVERSE_USAGE_KEY_MAP.keys()}
282
        target_cn_substring = None
283
        issuer_id = -1
284

    
285
        unfiltered = True
286

    
287
        if PER_PAGE in data:
288
            unfiltered = False
289
            page = data.get(PAGE, 0)
290
            per_page = data[PER_PAGE]
291
        else:
292
            page = None
293
            per_page = None
294

    
295
        if FILTERING in data:                                                   # if the 'filtering' field exists
296
            unfiltered = False
297
            if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
298

    
299
                # noinspection DuplicatedCode
300
                if TYPE in data[FILTERING]:                                     # containing 'type'
301
                    if isinstance(data[FILTERING][TYPE], list):                 # which is a 'list',
302
                                                                                # map every field to id
303
                        try:
304
                            target_types = {CertController.FILTERING_TYPE_KEY_MAP[v] for v in data[FILTERING][TYPE]}
305
                        except KeyError as e:
306
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}' - '{e}'.")
307
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
308
                    else:
309
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{TYPE}'.")
310
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
311

    
312
                # noinspection DuplicatedCode
313
                if USAGE in data[FILTERING]:                                    # containing 'usage'
314
                    if isinstance(data[FILTERING][USAGE], list):                # which is a 'list',
315
                                                                                # map every field to id
316
                        try:
317
                            target_usages = {CertController.USAGE_KEY_MAP[v] for v in data[FILTERING][USAGE]}
318
                        except KeyError as e:
319
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}' - '{e}'.")
320
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
321
                    else:
322
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{USAGE}'.")
323
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
324

    
325
                if COMMON_NAME in data[FILTERING]:                              # containing 'CN'
326
                    if isinstance(data[FILTERING][COMMON_NAME], str):           # which is a 'str'
327
                        target_cn_substring = data[FILTERING][COMMON_NAME]
328
                    else:
329
                        Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{COMMON_NAME}'.")
330
                        return E_WRONG_PARAMETERS, C_BAD_REQUEST
331

    
332
                if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
333
                    if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
334
                        issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
335

    
336
            else:
337
                Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
338
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
339

    
340
        if unfiltered:                                                      # if not filtering
341
            certs = self.certificate_service.get_certificates()
342
        elif issuer_id >= 0:                                                # if filtering by an issuer
343
            try:
344
                                                                            # get his children, filtered
345
                certs = self.certificate_service.get_certificates_issued_by_filter(
346
                    issuer_id=issuer_id,
347
                    target_types=target_types,
348
                    target_usages=target_usages,
349
                    target_cn_substring=target_cn_substring,
350
                    page=page,
351
                    per_page=per_page
352
                )
353
            except CertificateNotFoundException:                            # if id does not exist
354
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
355
                return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                 # throw
356
        else:
357
            certs = self.certificate_service.get_certificates_filter(
358
                target_types=target_types,
359
                target_usages=target_usages,
360
                target_cn_substring=target_cn_substring,
361
                page=page,
362
                per_page=per_page
363
            )
364

    
365
        if certs is None:
366
            Logger.error(f"Internal server error (unknown origin).")
367
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
368
        elif len(certs) == 0:
369
            # TODO check log level
370
            Logger.warning(f"No such certificate found (empty list).")
371
            return {"success": True, "data": []}, C_SUCCESS
372
        else:
373
            ret = []
374
            for c in certs:
375
                data = self.cert_to_dict_partial(c)
376
                if data is None:
377
                    Logger.error(f"Internal server error (corrupted database).")
378
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
379
                ret.append(
380
                    data
381
                )
382
            return {"success": True, "data": ret}, C_SUCCESS
383

    
384
    def get_certificate_root_by_id(self, id):
385
        """get certificate's root of trust chain by ID
386

    
387
        Get certificate's root of trust chain in PEM format by ID
388

    
389
        :param id: ID of a child certificate whose root is to be queried
390
        :type id: dict | bytes
391

    
392
        :rtype: PemResponse
393
        """
394

    
395
        Logger.info(f"\n\t{request.referrer}"
396
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
397
                    f"\n\tCertificate ID = {id}")
398

    
399
        try:
400
            v = int(id)
401
        except ValueError:
402
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
403
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
404

    
405
        cert = self.certificate_service.get_certificate(v)
406

    
407
        if cert is None:
408
            Logger.error(f"No such certificate found 'ID = {v}'.")
409
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
410

    
411
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
412
        if trust_chain is None or len(trust_chain) == 0:
413
            Logger.error(f"No such certificate found (empty list).")
414
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
415

    
416
        return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
417

    
418
    def get_certificate_trust_chain_by_id(self, id):
419
        """get certificate's trust chain by ID (including root certificate)
420

    
421
        Get certificate trust chain in PEM format by ID
422

    
423
        :param id: ID of a child certificate whose chain is to be queried
424
        :type id: dict | bytes
425

    
426
        :rtype: PemResponse
427
        """
428

    
429
        Logger.info(f"\n\t{request.referrer}"
430
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
431
                    f"\n\tCertificate ID = {id}")
432

    
433
        try:
434
            v = int(id)
435
        except ValueError:
436
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
437
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
438

    
439
        cert = self.certificate_service.get_certificate(v)
440

    
441
        if cert is None:
442
            Logger.error(f"No such certificate found 'ID = {v}'.")
443
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
444

    
445
        if cert.parent_id is None:
446
            Logger.error(f"Parent ID is empty in certificate 'ID = {v}'.")
447
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
448

    
449
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
450

    
451
        ret = []
452
        for intermediate in trust_chain:
453
            ret.append(intermediate.pem_data)
454

    
455
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
456

    
457
    def set_certificate_status(self, id):
458
        """
459
        Revoke a certificate given by ID
460
            - revocation request may contain revocation reason
461
            - revocation reason is verified based on the possible predefined values
462
            - if revocation reason is not specified 'undefined' value is used
463
        :param id: Identifier of the certificate to be revoked
464
        :type id: int
465

    
466
        :rtype: SuccessResponse | ErrorResponse (see OpenAPI definition)
467
        """
468

    
469
        Logger.info(f"\n\t{request.referrer}"
470
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
471
                    f"\n\tCertificate ID = {id}")
472

    
473
        required_keys = {STATUS}  # required keys
474

    
475
        # check if the request contains a JSON body
476
        if request.is_json:
477
            request_body = request.get_json()
478

    
479
            Logger.info(f"\n\tRequest body:"
480
                        f"\n{dict_to_string(request_body)}")
481

    
482
            # try to parse certificate identifier -> if it is not int return error 400
483
            try:
484
                identifier = int(id)
485
            except ValueError:
486
                Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
487
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
488

    
489
            # verify that all required keys are present
490
            if not all(k in request_body for k in required_keys):
491
                Logger.error(f"Invalid request, missing parameters.")
492
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
493

    
494
            # get status and reason from the request
495
            status = request_body[STATUS]
496
            reason = request_body.get(REASON, REASON_UNDEFINED)
497
            try:
498
                # set certificate status using certificate_service
499
                self.certificate_service.set_certificate_revocation_status(identifier, status, reason)
500
            except (RevocationReasonInvalidException, CertificateStatusInvalidException):
501
                # these exceptions are thrown in case invalid status or revocation reason is passed to the controller
502
                Logger.error(f"Invalid request, wrong parameters.")
503
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
504
            except CertificateAlreadyRevokedException:
505
                Logger.error(f"Certificate is already revoked 'ID = {identifier}'.")
506
                return E_NO_CERTIFICATE_ALREADY_REVOKED, C_BAD_REQUEST
507
            except CertificateNotFoundException:
508
                Logger.error(f"No such certificate found 'ID = {identifier}'.")
509
                return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
510
            except CertificateCannotBeSetToValid as e:
511
                return {"success": False, "data": str(e)}, C_BAD_REQUEST
512
            return {"success": True,
513
                    "data": "Certificate status updated successfully."}, C_SUCCESS
514
        # throw an error in case the request does not contain a json body
515
        else:
516
            Logger.error(f"The request must be JSON-formatted.")
517
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST
518

    
519
    def cert_to_dict_partial(self, c):
520
        """
521
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
522
        :param c: target cert
523
        :return: certificate dict (compliant with some parts of the REST API)
524
        """
525

    
526
        # TODO check log
527
        Logger.debug(f"Function launched.")
528

    
529
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
530
        if c_issuer is None:
531
            return None
532

    
533
        return {
534
            ID: c.certificate_id,
535
            COMMON_NAME: c.common_name,
536
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
537
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
538
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
539
            ISSUER: {
540
                ID: c_issuer.certificate_id,
541
                COMMON_NAME: c_issuer.common_name
542
            }
543
        }
544

    
545
    def cert_to_dict_full(self, c):
546
        """
547
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
548
        Contains full information.
549
        :param c: target cert
550
        :return: certificate dict (compliant with some parts of the REST API)
551
        """
552

    
553
        Logger.info(f"Function launched.")
554

    
555
        subj = self.certificate_service.get_subject_from_certificate(c)
556
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
557
        if c_issuer is None:
558
            return None
559

    
560
        return {
561
            SUBJECT: subj.to_dict(),
562
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
563
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
564
            USAGE: {CertController.INVERSE_USAGE_KEY_MAP[k]: v for k, v in c.usages.items()},
565
            CA: c_issuer.certificate_id
566
        }
567

    
568
    def get_private_key_of_a_certificate(self, id):
569
        """
570
        Get a private key used to sign a certificate in PEM format specified by certificate's ID
571

    
572
        :param id: ID of a certificate whose private key is to be queried
573
        :type id: dict | bytes
574

    
575
        :rtype: PemResponse
576
        """
577

    
578
        Logger.info(f"\n\t{request.referrer}"
579
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
580
                    f"\n\tCertificate ID = {id}")
581

    
582
        # try to parse the supplied ID
583
        try:
584
            v = int(id)
585
        except ValueError:
586
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
587
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
588

    
589
        # find a certificate using the given ID
590
        cert = self.certificate_service.get_certificate(v)
591

    
592
        if cert is None:
593
            Logger.error(f"No such certificate found 'ID = {v}'.")
594
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
595
        else:
596
            # certificate exists, fetch it's private key
597
            private_key = self.key_service.get_key(cert.private_key_id)
598
            if cert is None:
599
                Logger.error(f"Internal server error (certificate's private key cannot be found).")
600
                return E_NO_CERT_PRIVATE_KEY_FOUND, C_INTERNAL_SERVER_ERROR
601
            else:
602
                return {"success": True, "data": private_key.private_key}, C_SUCCESS
603

    
604
    def get_public_key_of_a_certificate(self, id):
605
        """
606
        Get a public key of a certificate in PEM format specified by certificate's ID
607

    
608
        :param id: ID of a certificate whose public key is to be queried
609
        :type id: dict | bytes
610

    
611
        :rtype: PemResponse
612
        """
613

    
614
        Logger.info(f"\n\t{request.referrer}"
615
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
616
                    f"\n\tCertificate ID = {id}")
617

    
618
        # try to parse the supplied ID
619
        try:
620
            v = int(id)
621
        except ValueError:
622
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
623
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
624

    
625
        # find a certificate using the given ID
626
        cert = self.certificate_service.get_certificate(v)
627

    
628
        if cert is None:
629
            Logger.error(f"No such certificate found 'ID = {v}'.")
630
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
631
        else:
632
            return {"success": True, "data": self.certificate_service.get_public_key_from_certificate(cert)}, C_SUCCESS
633

    
634
    def delete_certificate(self, id):
635
        """
636
        Deletes a certificate identified by ID, including its corresponding subtree (all descendants).
637
        :param id: target certificate ID
638
        :rtype: DeleteResponse
639
        """
640

    
641
        Logger.info(f"\n\t{request.referrer}"
642
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
643
                    f"\n\tCertificate ID = {id}")
644

    
645
        try:
646
            v = int(id)
647
        except ValueError:
648
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
649
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
650

    
651
        try:
652
            self.certificate_service.delete_certificate(v)
653
        except CertificateNotFoundException:
654
            Logger.error(f"No such certificate found 'ID = {v}'.")
655
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
656
        except DatabaseException:
657
            Logger.error(f"Internal server error (corrupted database).")
658
            return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
659
        except CertificateStatusInvalidException or RevocationReasonInvalidException or UnknownException:
660
            Logger.error(f"Internal server error (unknown origin).")
661
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
662

    
663
        return {"success": True, "data": "The certificate and its descendants have been successfully deleted."}
(2-2/4)