Projekt

Obecné

Profil

Stáhnout (28.7 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
from datetime import datetime
3
from itertools import chain
4
from json import JSONDecodeError
5

    
6
from flask import request
7
from injector import inject
8

    
9
from src.constants import CA_ID, \
10
    SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
11
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
12
from src.controllers.return_codes import *
13
from src.exceptions.database_exception import DatabaseException
14
from src.exceptions.unknown_exception import UnknownException
15
from src.model.subject import Subject
16
from src.services.certificate_service import CertificateService, RevocationReasonInvalidException, \
17
    CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException, \
18
    CertificateCannotBeSetToValid
19
#  responsibility.
20
from src.services.cryptography import CryptographyException
21
from src.services.key_service import KeyService
22
from src.utils.logger import Logger
23
from src.utils.util import dict_to_string
24

    
25
TREE_NODE_TYPE_COUNT = 3
26
KEY_PEM = "key_pem"
27
PASSWORD = "password"
28
KEY = "key"
29
FILTERING = "filtering"
30
ISSUER = "issuer"
31
US = "usage"
32
NOT_AFTER = "notAfter"
33
NOT_BEFORE = "notBefore"
34
COMMON_NAME = "CN"
35
ID = "id"
36
USAGE = "usage"
37
SUBJECT = "subject"
38
VALIDITY_DAYS = "validityDays"
39
CA = "CA"
40
ISSUED_BY = "issuedby"
41
STATUS = "status"
42
REASON = "reason"
43
REASON_UNDEFINED = "unspecified"
44

    
45
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
46
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
47
E_NO_CERTIFICATE_ALREADY_REVOKED = {"success": False, "data": "Certificate is already revoked."}
48
E_NO_CERT_PRIVATE_KEY_FOUND = {"success": False,
49
                               "data": "Internal server error (certificate's private key cannot be found)."}
50
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
51
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
52
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
53
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
54
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
55
E_WRONG_PASSWORD = {"success": False, "data": "The provided passphrase does not match the provided key."}
56

    
57

    
58
class CertController:
59
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
60
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
61

    
62
    @inject
63
    def __init__(self, certificate_service: CertificateService, key_service: KeyService):
64
        self.certificate_service = certificate_service
65
        self.key_service = key_service
66

    
67
    def create_certificate(self):
68
        """create new certificate
69

    
70
        Create a new certificate based on given information
71

    
72
        :param body: Certificate data to be created
73
        :type body: dict | bytes
74

    
75
        :rtype: CreatedResponse
76
        """
77

    
78
        Logger.info(f"\n\t{request.referrer}"
79
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
80

    
81
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
82

    
83
        if request.is_json:                                                         # accept JSON only
84
            body = request.get_json()
85

    
86
            Logger.info(f"\n\tRequest body:"
87
                        f"\n{dict_to_string(body)}")
88

    
89
            if not all(k in body for k in required_keys):                           # verify that all keys are present
90
                Logger.error(f"Invalid request, missing parameters")
91
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
92

    
93
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
94
                Logger.error(f"Invalid request, wrong parameter '{VALIDITY_DAYS}'.")
95
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
96

    
97
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
98

    
99
            if subject is None:                                                     # if the format is incorrect
100
                Logger.error(f"Invalid request, wrong parameter '{SUBJECT}'.")
101
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
102

    
103
            usages_dict = {}
104

    
105
            if USAGE not in body or not isinstance(body[USAGE], list):              # type checking
106
                Logger.error(f"Invalid request, wrong parameter '{USAGE}'.")
107
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
108

    
109
            for v in body[USAGE]:                                                   # for each usage
110
                if v not in CertController.KEY_MAP:                                 # check that it is a valid usage
111
                    Logger.error(f"Invalid request, wrong parameter '{USAGE}'[{v}].")
112
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
113
                usages_dict[CertController.KEY_MAP[v]] = True                       # otherwise translate key and set
114

    
115
            if KEY in body:
116
                if isinstance(body[KEY], dict):
117
                    if PASSWORD in body[KEY]:
118
                        passphrase = body[KEY][PASSWORD]
119
                        if KEY_PEM in body[KEY]:
120
                            key_pem = body[KEY][KEY_PEM]
121
                            if not self.key_service.verify_key(key_pem, passphrase=passphrase):
122
                                Logger.error(f"Passphrase specified but invalid.")
123
                                return E_WRONG_PASSWORD, C_BAD_REQUEST
124
                            key = self.key_service.wrap_custom_key(key_pem, passphrase=passphrase)
125
                        else:
126
                            key = self.key_service.create_new_key(passphrase)
127
                    else:
128
                        if KEY_PEM in body[KEY]:
129
                            key_pem = body[KEY][KEY_PEM]
130
                            if not self.key_service.verify_key(key_pem, passphrase=None):
131
                                Logger.error("Passphrase ommited but required.")
132
                                return E_WRONG_PASSWORD, C_BAD_REQUEST
133
                            key = self.key_service.wrap_custom_key(key_pem, passphrase=None)
134
                        else:
135
                            key = self.key_service.create_new_key()                 # if "key" exists but is empty
136
                else:
137
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
138
            else:
139
                key = self.key_service.create_new_key()                             # if "key" does not exist
140
                                                                                    # TODO Honza: line 134 / 138, should
141
                                                                                    #   they both be allowed? I say one
142
                                                                                    #   of those branches should return
143
                                                                                    #   wrong params bad request (mby).
144

    
145
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
146
                cert = self.certificate_service.create_root_ca(                     # create a root CA
147
                    key,
148
                    subject,
149
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
150
                    days=body[VALIDITY_DAYS]
151
                )
152
            else:
153
                issuer = self.certificate_service.get_certificate(body[CA])         # get base issuer info
154

    
155
                if issuer is None:                                                  # if such issuer does not exist
156
                    Logger.error(f"No certificate authority with such unique ID exists 'ID = {key.private_key_id}'.")
157
                    self.key_service.delete_key(key.private_key_id)                 # free
158
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
159

    
160
                issuer_key = self.key_service.get_key(issuer.private_key_id)        # get issuer's key, which must exist
161

    
162
                if issuer_key is None:                                              # if it does not
163
                    Logger.error(f"Internal server error (corrupted database).")
164
                    self.key_service.delete_key(key.private_key_id)                 # free
165
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
166

    
167
                f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
168
                    self.certificate_service.create_end_cert
169

    
170
                try:
171
                    # noinspection PyTypeChecker
172
                    cert = f(                                                           # create inter CA or end cert
173
                        key,                                                            # according to whether 'CA' is among
174
                        subject,                                                        # the usages' fields
175
                        issuer,
176
                        issuer_key,
177
                        usages=usages_dict,
178
                        days=body[VALIDITY_DAYS]
179
                    )
180
                except CryptographyException as e:
181
                    return {"success": False, "data": e.message}, C_BAD_REQUEST
182

    
183
            if cert is not None:
184
                return {"success": True,
185
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
186
            else:                                                                   # if this fails, then
187
                Logger.error(f"Internal error: The certificate could not have been created.")
188
                self.key_service.delete_key(key.private_key_id)                     # free
189
                return {"success": False,                                           # and wonder what the cause is,
190
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
191
                                                                                    # as obj/None carries only one bit
192
                                                                                    # of error information
193
        else:
194
            Logger.error(f"The request must be JSON-formatted.")
195
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
196

    
197
    def get_certificate_by_id(self, id):
198
        """get certificate by ID
199

    
200
        Get certificate in PEM format by ID
201

    
202
        :param id: ID of a certificate to be queried
203
        :type id: dict | bytes
204

    
205
        :rtype: PemResponse
206
        """
207

    
208
        Logger.info(f"\n\t{request.referrer}"
209
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
210
                    f"\n\tCertificate ID = {id}")
211
        try:
212
            v = int(id)
213
        except ValueError:
214
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
215
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
216

    
217
        cert = self.certificate_service.get_certificate(v)
218

    
219
        if cert is None:
220
            Logger.error(f"No such certificate found 'ID = {v}'.")
221
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
222
        else:
223
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
224

    
225
    def get_certificate_details_by_id(self, id):
226
        """get certificate's details by ID
227

    
228
        Get certificate details by ID
229

    
230
        :param id: ID of a certificate whose details are to be queried
231
        :type id: dict | bytes
232

    
233
        :rtype: CertificateResponse
234
        """
235

    
236
        Logger.info(f"\n\t{request.referrer}"
237
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
238
                    f"\n\tCertificate ID = {id}")
239

    
240
        try:
241
            v = int(id)
242
        except ValueError:
243
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
244
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
245

    
246
        cert = self.certificate_service.get_certificate(v)
247

    
248
        if cert is None:
249
            Logger.error(f"No such certificate found 'ID = {v}'.")
250
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
251

    
252
        data = self.cert_to_dict_full(cert)
253
        if data is None:
254
            return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
255

    
256
        try:
257
            state = self.certificate_service.get_certificate_state(v)
258
            data["status"] = state
259
        except CertificateNotFoundException:
260
            Logger.error(f"No such certificate found 'ID = {id}'.")
261

    
262
        return {"success": True, "data": data}, C_SUCCESS
263

    
264
    def get_certificate_list(self):
265
        """get list of certificates
266

    
267
        Lists certificates based on provided filtering options
268

    
269
        :param filtering: Filter certificate type to be queried
270
        :type filtering: dict | bytes
271

    
272
        :rtype: CertificateListResponse
273
        """
274

    
275
        Logger.info(f"\n\t{request.referrer}"
276
                    f"\n\t{request.method}   {request.path}   {request.scheme}")
277

    
278
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
279
        issuer_id = -1
280

    
281
        # the filtering parameter can be read as URL argument or as a request body
282
        if request.is_json or FILTERING in request.args.keys():                     # if the request carries JSON data
283
            if request.is_json:
284
                data = request.get_json()                                           # get it
285
            else:
286
                try:
287
                    data = {FILTERING: json.loads(request.args[FILTERING])}
288
                except JSONDecodeError:
289
                    Logger.error(f"The request must be JSON-formatted.")
290
                    return E_NOT_JSON_FORMAT, C_BAD_REQUEST
291

    
292
            Logger.info(f"\n\tRequest body:"
293
                        f"\n{dict_to_string(data)}")
294

    
295
            if FILTERING in data:                                                   # if the 'filtering' field exists
296
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
297
                    if CA in data[FILTERING]:                                       # containing 'CA'
298
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
299
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
300
                                targets.remove(CERTIFICATE_ID)
301
                            else:
302
                                targets.remove(ROOT_CA_ID)
303
                                targets.remove(INTERMEDIATE_CA_ID)
304
                        else:
305
                            Logger.error(f"Invalid request, wrong parameters '{FILTERING}.{CA}'.")
306
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
307
                    if ISSUED_BY in data[FILTERING]:                                # containing 'issuedby'
308
                        if isinstance(data[FILTERING][ISSUED_BY], int):             # which is an 'int'
309
                            issuer_id = data[FILTERING][ISSUED_BY]                  # then get its children only
310
                else:
311
                    Logger.error(f"Invalid request, wrong parameters '{FILTERING}'.")
312
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
313
            if issuer_id >= 0:                                                      # if filtering by an issuer
314
                try:
315
                    children = self.certificate_service.get_certificates_issued_by(issuer_id)  # get his children
316
                except CertificateNotFoundException:                                # if id does not exist
317
                    Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
318
                    return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND                     # throw
319

    
320
                certs = [child for child in children if child.type_id in targets]
321

    
322
            elif len(targets) == TREE_NODE_TYPE_COUNT:                              # = 3 -> root node,
323
                                                                                    # intermediate node,
324
                                                                                    # or leaf node
325

    
326
                                                                                    # if filtering did not change the
327
                                                                                    # targets,
328
                certs = self.certificate_service.get_certificates()                 # fetch everything
329
            else:                                                                   # otherwise fetch targets only
330
                certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
331
        else:
332
            certs = self.certificate_service.get_certificates()                     # if no params, fetch everything
333

    
334
        if certs is None:
335
            Logger.error(f"Internal server error (unknown origin).")
336
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
337
        elif len(certs) == 0:
338
            # TODO check log level
339
            Logger.warning(f"No such certificate found (empty list).")
340
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
341
        else:
342
            ret = []
343
            for c in certs:
344
                data = self.cert_to_dict_partial(c)
345
                if data is None:
346
                    Logger.error(f"Internal server error (corrupted database).")
347
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
348
                ret.append(
349
                    data
350
                )
351
            return {"success": True, "data": ret}, C_SUCCESS
352

    
353
    def get_certificate_root_by_id(self, id):
354
        """get certificate's root of trust chain by ID
355

    
356
        Get certificate's root of trust chain in PEM format by ID
357

    
358
        :param id: ID of a child certificate whose root is to be queried
359
        :type id: dict | bytes
360

    
361
        :rtype: PemResponse
362
        """
363

    
364
        Logger.info(f"\n\t{request.referrer}"
365
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
366
                    f"\n\tCertificate ID = {id}")
367

    
368
        try:
369
            v = int(id)
370
        except ValueError:
371
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
372
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
373

    
374
        cert = self.certificate_service.get_certificate(v)
375

    
376
        if cert is None:
377
            Logger.error(f"No such certificate found 'ID = {v}'.")
378
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
379

    
380
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
381
        if trust_chain is None or len(trust_chain) == 0:
382
            Logger.error(f"No such certificate found (empty list).")
383
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
384

    
385
        return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
386

    
387
    def get_certificate_trust_chain_by_id(self, id):
388
        """get certificate's trust chain by ID (including root certificate)
389

    
390
        Get certificate trust chain in PEM format by ID
391

    
392
        :param id: ID of a child certificate whose chain is to be queried
393
        :type id: dict | bytes
394

    
395
        :rtype: PemResponse
396
        """
397

    
398
        Logger.info(f"\n\t{request.referrer}"
399
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
400
                    f"\n\tCertificate ID = {id}")
401

    
402
        try:
403
            v = int(id)
404
        except ValueError:
405
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
406
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
407

    
408
        cert = self.certificate_service.get_certificate(v)
409

    
410
        if cert is None:
411
            Logger.error(f"No such certificate found 'ID = {v}'.")
412
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
413

    
414
        if cert.parent_id is None:
415
            Logger.error(f"Parent ID is empty in certificate 'ID = {v}'.")
416
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
417

    
418
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
419

    
420
        ret = []
421
        for intermediate in trust_chain:
422
            ret.append(intermediate.pem_data)
423

    
424
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
425

    
426
    def set_certificate_status(self, id):
427
        """
428
        Revoke a certificate given by ID
429
            - revocation request may contain revocation reason
430
            - revocation reason is verified based on the possible predefined values
431
            - if revocation reason is not specified 'undefined' value is used
432
        :param id: Identifier of the certificate to be revoked
433
        :type id: int
434

    
435
        :rtype: SuccessResponse | ErrorResponse (see OpenAPI definition)
436
        """
437

    
438
        Logger.info(f"\n\t{request.referrer}"
439
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
440
                    f"\n\tCertificate ID = {id}")
441

    
442
        required_keys = {STATUS}  # required keys
443

    
444
        # check if the request contains a JSON body
445
        if request.is_json:
446
            request_body = request.get_json()
447

    
448
            Logger.info(f"\n\tRequest body:"
449
                        f"\n{dict_to_string(request_body)}")
450

    
451
            # try to parse certificate identifier -> if it is not int return error 400
452
            try:
453
                identifier = int(id)
454
            except ValueError:
455
                Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
456
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
457

    
458
            # verify that all required keys are present
459
            if not all(k in request_body for k in required_keys):
460
                Logger.error(f"Invalid request, missing parameters.")
461
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
462

    
463
            # get status and reason from the request
464
            status = request_body[STATUS]
465
            reason = request_body.get(REASON, REASON_UNDEFINED)
466
            try:
467
                # set certificate status using certificate_service
468
                self.certificate_service.set_certificate_revocation_status(identifier, status, reason)
469
            except (RevocationReasonInvalidException, CertificateStatusInvalidException):
470
                # these exceptions are thrown in case invalid status or revocation reason is passed to the controller
471
                Logger.error(f"Invalid request, wrong parameters.")
472
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
473
            except CertificateAlreadyRevokedException:
474
                Logger.error(f"Certificate is already revoked 'ID = {identifier}'.")
475
                return E_NO_CERTIFICATE_ALREADY_REVOKED, C_BAD_REQUEST
476
            except CertificateNotFoundException:
477
                Logger.error(f"No such certificate found 'ID = {identifier}'.")
478
                return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
479
            except CertificateCannotBeSetToValid as e:
480
                return {"success": False, "data": str(e)}, C_BAD_REQUEST
481
            return {"success": True,
482
                    "data": "Certificate status updated successfully."}, C_SUCCESS
483
        # throw an error in case the request does not contain a json body
484
        else:
485
            Logger.error(f"The request must be JSON-formatted.")
486
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST
487

    
488
    def cert_to_dict_partial(self, c):
489
        """
490
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
491
        :param c: target cert
492
        :return: certificate dict (compliant with some parts of the REST API)
493
        """
494

    
495
        # TODO check log
496
        Logger.debug(f"Function launched.")
497

    
498
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
499
        if c_issuer is None:
500
            return None
501

    
502
        return {
503
            ID: c.certificate_id,
504
            COMMON_NAME: c.common_name,
505
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
506
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
507
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
508
            ISSUER: {
509
                ID: c_issuer.certificate_id,
510
                COMMON_NAME: c_issuer.common_name
511
            }
512
        }
513

    
514
    def cert_to_dict_full(self, c):
515
        """
516
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
517
        Contains full information.
518
        :param c: target cert
519
        :return: certificate dict (compliant with some parts of the REST API)
520
        """
521

    
522
        Logger.info(f"Function launched.")
523

    
524
        subj = self.certificate_service.get_subject_from_certificate(c)
525
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
526
        if c_issuer is None:
527
            return None
528

    
529
        return {
530
            SUBJECT: subj.to_dict(),
531
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
532
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
533
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
534
            CA: c_issuer.certificate_id
535
        }
536

    
537
    def get_private_key_of_a_certificate(self, id):
538
        """
539
        Get a private key used to sign a certificate in PEM format specified by certificate's ID
540

    
541
        :param id: ID of a certificate whose private key is to be queried
542
        :type id: dict | bytes
543

    
544
        :rtype: PemResponse
545
        """
546

    
547
        Logger.info(f"\n\t{request.referrer}"
548
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
549
                    f"\n\tCertificate ID = {id}")
550

    
551
        # try to parse the supplied ID
552
        try:
553
            v = int(id)
554
        except ValueError:
555
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
556
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
557

    
558
        # find a certificate using the given ID
559
        cert = self.certificate_service.get_certificate(v)
560

    
561
        if cert is None:
562
            Logger.error(f"No such certificate found 'ID = {v}'.")
563
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
564
        else:
565
            # certificate exists, fetch it's private key
566
            private_key = self.key_service.get_key(cert.private_key_id)
567
            if cert is None:
568
                Logger.error(f"Internal server error (certificate's private key cannot be found).")
569
                return E_NO_CERT_PRIVATE_KEY_FOUND, C_INTERNAL_SERVER_ERROR
570
            else:
571
                return {"success": True, "data": private_key.private_key}, C_SUCCESS
572

    
573
    def get_public_key_of_a_certificate(self, id):
574
        """
575
        Get a public key of a certificate in PEM format specified by certificate's ID
576

    
577
        :param id: ID of a certificate whose public key is to be queried
578
        :type id: dict | bytes
579

    
580
        :rtype: PemResponse
581
        """
582

    
583
        Logger.info(f"\n\t{request.referrer}"
584
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
585
                    f"\n\tCertificate ID = {id}")
586

    
587
        # try to parse the supplied ID
588
        try:
589
            v = int(id)
590
        except ValueError:
591
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
592
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
593

    
594
        # find a certificate using the given ID
595
        cert = self.certificate_service.get_certificate(v)
596

    
597
        if cert is None:
598
            Logger.error(f"No such certificate found 'ID = {v}'.")
599
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
600
        else:
601
            return {"success": True, "data": self.certificate_service.get_public_key_from_certificate(cert)}, C_SUCCESS
602

    
603
    def delete_certificate(self, id):
604
        """
605
        Deletes a certificate identified by ID, including its corresponding subtree (all descendants).
606
        :param id: target certificate ID
607
        :rtype: DeleteResponse
608
        """
609

    
610
        Logger.info(f"\n\t{request.referrer}"
611
                    f"\n\t{request.method}   {request.path}   {request.scheme}"
612
                    f"\n\tCertificate ID = {id}")
613

    
614
        try:
615
            v = int(id)
616
        except ValueError:
617
            Logger.error(f"Invalid request, wrong parameters 'id'[{id}].")
618
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
619

    
620
        try:
621
            self.certificate_service.delete_certificate(v)
622
        except CertificateNotFoundException:
623
            Logger.error(f"No such certificate found 'ID = {v}'.")
624
            return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND
625
        except DatabaseException:
626
            Logger.error(f"Internal server error (corrupted database).")
627
            return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
628
        except CertificateStatusInvalidException or RevocationReasonInvalidException or UnknownException:
629
            Logger.error(f"Internal server error (unknown origin).")
630
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
631

    
632
        return {"success": True, "data": "The certificate and its descendants have been successfully deleted."}
(2-2/4)