Projekt

Obecné

Profil

Stáhnout (31 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10
from src.dao.certificate_repository import CertificateRepository
11
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12
from src.exceptions.database_exception import DatabaseException
13
from src.exceptions.unknown_exception import UnknownException
14
from src.model.certificate import Certificate
15
from src.model.private_key import PrivateKey
16
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18

    
19
import time
20

    
21
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
22
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
23
    CLIENT_AUTH
24

    
25
from src.utils.logger import Logger
26

    
27
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
28
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
29
CRL_EXTENSION = "crlDistributionPoints=URI:"
30
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
31
STATUS_REVOKED = "revoked"
32
STATUS_VALID = "valid"
33

    
34
# define which flags are required for various usages
35
REQUIRED_USAGE_EXTENSION_FLAGS = {
36
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
37
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
38
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
39
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
40

    
41

    
42
class CertificateService:
43

    
44
    @inject
45
    def __init__(self, cryptography_service: CryptographyService,
46
                 certificate_repository: CertificateRepository,
47
                 configuration: Configuration):
48
        self.cryptography_service = cryptography_service
49
        self.certificate_repository = certificate_repository
50
        self.configuration = configuration
51

    
52
    @staticmethod
53
    def __check_subject_is_valid(subject: Subject):
54
        """
55
        Checks whether the given subject is valid
56
        :param subject: A subject to be verified
57
        :raises InvalidCertificateAttribute: When a subject with an invalid attribute is passed
58
        """
59
        if subject.country is not None and subject.country != "" and len(subject.country) != 2:
60
            raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters")
61

    
62
    # TODO usages present in method parameters but not in class diagram
63
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
64
                       usages=None, days=30):
65
        """
66
        Creates a root CA certificate based on the given parameters.
67
        :param key: Private key to be used when generating the certificate
68
        :param subject: Subject to be used put into the certificate
69
        :param config: String containing the configuration to be used
70
        :param extensions: Name of the section in the configuration representing extensions
71
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
72
        :param days: Number of days for which the generated cert. will be considered valid
73
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
74
        :return: An instance of Certificate class representing the generated root CA cert
75
        """
76

    
77
        Logger.debug("Function launched.")
78

    
79
        # check whether the given subject is valid
80
        self.__check_subject_is_valid(subject)
81

    
82
        if usages is None:
83
            usages = {}
84

    
85
        cert_id = self.certificate_repository.get_next_id()
86

    
87
        # specify CA usage
88
        usages[CA_ID] = True
89

    
90
        # generate extension configuration lines based on the specified usages
91
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
92

    
93
        # create a new self signed  certificate
94
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
95
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
96

    
97
        # wrap into Certificate class
98
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
99
                                            ROOT_CA_ID)
100

    
101
        # store the wrapper into the repository
102
        created_id = self.certificate_repository.create(certificate)
103

    
104
        # assign the generated ID to the inserted certificate
105
        certificate.certificate_id = created_id
106

    
107
        return certificate
108

    
109
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
110
        """
111
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
112
        notAfter fields.
113
        :param cert_pem: PEM of the cert. to be wrapped
114
        :param private_key_id: ID of the private key used to create the given certificate
115
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
116
        :param parent_id: ID of the CA that issued this certificate
117
        :param cert_type: Type of this certificate (see constants.py)
118
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
119
        """
120

    
121
        Logger.debug("Function launched.")
122

    
123
        # parse the generated pem for subject and notBefore/notAfter fields
124
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
125
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
126
        # format the parsed date
127
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
128
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
129

    
130
        # create a certificate wrapper
131
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
132
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
133
                                  subj.organization, subj.organization_unit, subj.email_address)
134

    
135
        return certificate
136

    
137
    # TODO config parameter present in class diagram but not here (unused)
138
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
139
                  extensions: str = "", days: int = 30, usages=None):
140
        """
141
        Creates an intermediate CA certificate issued by the given parent CA.
142
        :param subject_key: Private key to be used when generating the certificate
143
        :param subject: Subject to be used put into the certificate
144
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
145
        :param issuer_key: PK used to generate the issuer certificate
146
        :param extensions: Extensions to be used when generating the certificate
147
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
148
        :param days: Number of days for which the generated cert. will be considered valid
149
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
150
        :return: An instance of Certificate class representing the generated intermediate CA cert
151
        """
152

    
153
        Logger.debug("Function launched.")
154

    
155
        # check whether the given subject is valid
156
        self.__check_subject_is_valid(subject)
157

    
158
        if usages is None:
159
            usages = {}
160

    
161
        # specify CA usage
162
        usages[CA_ID] = True
163

    
164
        # generate extension configuration lines based on the specified usages
165
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
166

    
167
        # Add CRL and OCSP distribution point to certificate extensions
168
        cert_id = self.certificate_repository.get_next_id()
169
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
170
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
171

    
172
        # TODO implement AIA URI via extensions
173
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
174
                                                        issuer_key.private_key,
175
                                                        subject_key_pass=subject_key.password,
176
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
177
                                                        days=days,
178
                                                        sn=cert_id)
179

    
180
        # wrap into Certificate class
181
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
182
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
183

    
184
        # store the wrapper into the repository
185
        created_id = self.certificate_repository.create(certificate)
186

    
187
        # assign the generated ID to the inserted certificate
188
        certificate.certificate_id = created_id
189

    
190
        return certificate
191

    
192
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
193
                        issuer_key: PrivateKey,
194
                        extensions: str = "", days: int = 30, usages=None):
195
        """
196
        Creates an end certificate issued by the given parent CA.
197
        :param subject_key: Private key to be used when generating the certificate
198
        :param subject: Subject to be used put into the certificate
199
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
200
        :param issuer_key: PK used to generate the issuer certificate
201
        :param extensions: Extensions to be used when generating the certificate
202
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
203
        :param days: Number of days for which the generated cert. will be considered valid
204
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
205
        :return: An instance of Certificate class representing the generated cert
206
        """
207

    
208
        Logger.debug("Function launched.")
209

    
210
        # check whether the given subject is valid
211
        self.__check_subject_is_valid(subject)
212

    
213
        if usages is None:
214
            usages = {}
215

    
216
        # get the next certificate ID in order to be able to specify the serial number
217
        cert_id = self.certificate_repository.get_next_id()
218

    
219
        # generate extension configuration lines based on the specified usages
220
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
221

    
222
        # Add CRL and OCSP distribution point to certificate extensions
223
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
224
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
225

    
226
        # generate a new certificate
227
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
228
                                                        issuer_key.private_key,
229
                                                        subject_key_pass=subject_key.password,
230
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
231
                                                        days=days,
232
                                                        sn=cert_id
233
                                                        )
234

    
235
        # wrap the generated certificate using Certificate class
236
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
237
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
238

    
239
        created_id = self.certificate_repository.create(certificate)
240

    
241
        certificate.certificate_id = created_id
242

    
243
        return certificate
244

    
245
    def get_certificate(self, unique_id: int) -> Certificate:
246
        """
247
        Tries to fetch a certificate from the certificate repository using a given id.
248
        :param unique_id: ID of the certificate to be fetched
249
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
250
        certificate is not found
251
        """
252

    
253
        Logger.debug("Function launched.")
254

    
255
        return self.certificate_repository.read(unique_id)
256

    
257
    def get_certificates(self, cert_type=None) -> List[Certificate]:
258
        """
259
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
260
        certificates of the given type can be returned.
261
        :param cert_type: Type of certificates to be returned
262
        :return: List of instances of the Certificate class representing all certificates present in the certificate
263
        repository. An empty list is returned when no certificates are found.
264
        """
265

    
266
        Logger.debug("Function launched.")
267

    
268
        return self.certificate_repository.read_all(cert_type)
269

    
270
    def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
271
        """
272
        Tries to fetch a filtered list of all certificates from the certificate repository.
273
        :param target_types: certificate types (filter)
274
        :param target_usages: certificate usages (filter)
275
        :param target_cn_substring: certificate CN substring (filter)
276
        :param page: target page
277
        :param per_page: target page size
278
        :return: List of instances of the Certificate class representing filtered certificates
279
                    present in the certificate repository. An empty list is returned when no certificates are found.
280
        """
281
        return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
282
                                                           page, per_page)
283

    
284
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
285
        """
286
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
287
        root CA certificate is found. Root certificates are excluded from the chain by default.
288
        :param from_id: ID of the first certificate to be included in the chain of trust
289
        :param to_id: ID of the last certificate to be included in the chain of trust
290
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
291
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
292
        ID
293
        """
294

    
295
        Logger.debug("Function launched.")
296

    
297
        # read the first certificate of the chain
298
        start_cert = self.certificate_repository.read(from_id)
299

    
300
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
301
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
302
            return []
303

    
304
        current_cert = start_cert
305
        chain_of_trust = [current_cert]
306

    
307
        # TODO could possibly be simplified
308
        if start_cert.type_id == ROOT_CA_ID:
309
            # the first cert found is a root ca
310
            return chain_of_trust
311

    
312
        while True:
313
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
314

    
315
            # check whether parent certificate exists
316
            if parent_cert is None:
317
                break
318

    
319
            # check whether the found certificate is a root certificate
320
            if parent_cert.type_id == ROOT_CA_ID:
321
                if not exclude_root:
322
                    # append the found root cert only if root certificates should not be excluded from the CoT
323
                    chain_of_trust.append(parent_cert)
324
                break
325

    
326
            # append the certificate
327
            chain_of_trust.append(parent_cert)
328

    
329
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
330
            if parent_cert.certificate_id == to_id:
331
                break
332

    
333
            current_cert = parent_cert
334

    
335
        return chain_of_trust
336

    
337
    def delete_certificate(self, unique_id):
338
        """
339
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
340

    
341
        :param unique_id: ID of specific certificate
342
        """
343

    
344
        Logger.debug("Function launched.")
345

    
346
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
347
        if to_delete is None:
348
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
349
            raise CertificateNotFoundException(unique_id)
350

    
351
        for cert in to_delete:
352
            try:
353
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
354
            except CertificateAlreadyRevokedException:
355
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
356
                # TODO log as an info/debug, not warning or above <-- perfectly legal
357
                pass
358

    
359
            if not self.certificate_repository.delete(cert.certificate_id):
360
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
361

    
362
    def get_certificates_issued_by(self, issuer_id):
363
        """
364
        Returns a list of all children of a certificate identified by an unique_id.
365
        Raises a DatabaseException should any unexpected behavior occur.
366
        :param issuer_id: target certificate ID
367
        :return: children of unique_id
368
        """
369

    
370
        Logger.debug("Function launched.")
371

    
372
        try:
373
            if self.certificate_repository.read(issuer_id) is None:
374
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
375
                raise CertificateNotFoundException(issuer_id)
376
        except DatabaseException:
377
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
378
            raise CertificateNotFoundException(issuer_id)
379

    
380
        return self.certificate_repository.get_all_issued_by(issuer_id)
381

    
382
    def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
383
                                          per_page):
384
        """
385
        Returns a list of all children of a certificate identified by an unique_id.
386
        Filters the results according to target types, usages, cn substring and pagination.
387
        Raises a DatabaseException should any unexpected behavior occur.
388
        :param issuer_id: target certificate ID
389
        :param target_types: filter of types
390
        :param target_usages: filter of usages
391
        :param target_cn_substring: CN substring
392
        :param page: target page or None
393
        :param per_page: certs per page or None
394
        :return: list of certificates (a page if specified)
395
        """
396

    
397
        Logger.debug("Function launched.")
398

    
399
        try:
400
            if self.certificate_repository.read(issuer_id) is None:
401
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
402
                raise CertificateNotFoundException(issuer_id)
403
        except DatabaseException:
404
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
405
            raise CertificateNotFoundException(issuer_id)
406

    
407
        return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
408
                                                                    target_cn_substring, page, per_page)
409

    
410
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
411
        """
412
        Set certificate status to 'valid' or 'revoked'.
413
        If the new status is revoked a reason can be provided -> default is unspecified
414
        :param reason: reason for revocation
415
        :param id: identifier of the certificate whose status is to be changed
416
        :param status: new status of the certificate
417
        :raises CertificateStatusInvalidException: if status is not valid
418
        :raises RevocationReasonInvalidException: if reason is not valid
419
        :raises CertificateNotFoundException: if certificate with given id cannot be found
420
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
421
                it cannot be set revalidated
422
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
423
        :raises UnknownException: if the database is corrupted
424
        """
425

    
426
        Logger.debug("Function launched.")
427

    
428
        if status not in CERTIFICATE_STATES:
429
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
430
            raise CertificateStatusInvalidException(status)
431
        if reason not in CERTIFICATE_REVOCATION_REASONS:
432
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
433
            raise RevocationReasonInvalidException(reason)
434

    
435
        # check whether the certificate exists
436
        certificate = self.certificate_repository.read(id)
437
        if certificate is None:
438
            Logger.error(f"No such certificate found 'ID = {id}'.")
439
            raise CertificateNotFoundException(id)
440

    
441
        updated = False
442
        if status == STATUS_VALID:
443
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
444
            #    -> throw an exception
445
            if certificate.revocation_reason != "" and \
446
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
447
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
448
            updated = self.certificate_repository.clear_certificate_revocation(id)
449
        elif status == STATUS_REVOKED:
450
            # check if the certificate is not revoked already
451
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
452
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
453
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
454
                raise CertificateAlreadyRevokedException(id)
455

    
456
            revocation_timestamp = int(time.time())
457
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
458

    
459
        if not updated:
460
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
461
                         f"or set_certificate_revoked().")
462
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
463
                                   "or set_certificate_revoked().")
464

    
465
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
466
        """
467
        Get Subject distinguished name from a Certificate
468
        :param certificate: certificate instance whose Subject shall be parsed
469
        :return: instance of Subject class containing resulting distinguished name
470
        """
471

    
472
        Logger.debug("Function launched.")
473

    
474
        subject = Subject(certificate.common_name,
475
                          certificate.country_code,
476
                          certificate.locality,
477
                          certificate.province,
478
                          certificate.organization,
479
                          certificate.organizational_unit,
480
                          certificate.email_address)
481
        return subject
482

    
483
    def get_public_key_from_certificate(self, certificate: Certificate):
484
        """
485
        Extracts a public key from the given certificate
486
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
487
        should be extracted.
488
        :return: a string containing the extracted public key in PEM format
489
        """
490

    
491
        Logger.debug("Function launched.")
492

    
493
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
494

    
495
    def get_certificate_state(self, id: int) -> str:
496
        """
497
        Check whether the certificate is expired, valid or revoked.
498
            - in case it's revoked and expired, revoked is returned
499
        :param id: identifier of the certificate
500
        :return: certificates state from {valid, revoked, expired}
501
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
502
        """
503
        Logger.debug("Function launched.")
504
        status = CERTIFICATE_VALID
505

    
506
        # Read the selected certificate from the repository
507
        certificate = self.certificate_repository.read(id)
508
        if certificate is None:
509
            Logger.error("Certificate whose details were requested does not exist.")
510
            raise CertificateNotFoundException(id)
511

    
512
        # check the expiration date using OpenSSL
513
        if not self.cryptography_service.verify_cert(certificate.pem_data):
514
            status = CERTIFICATE_EXPIRED
515

    
516
        # check certificate revocation
517
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
518
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
519

    
520
        if id in all_revoked_by_parent_ids:
521
            status = CERTIFICATE_REVOKED
522

    
523
        return status
524

    
525
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
526
        """
527
        Get URL address of CRL distribution endpoint based on
528
        issuer's ID
529

    
530
        :param ca_identifier: ID of issuing authority
531
        :return: CRL endpoint for the given CA
532
        """
533

    
534
        Logger.debug("Function launched.")
535

    
536
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
537

    
538
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
539
        """
540
        Get URL address of OCSP distribution endpoint based on
541
        issuer's ID
542

    
543
        :param ca_identifier: ID of issuing authority
544
        :return: OCSP endpoint for the given CA
545
        """
546

    
547
        Logger.debug("Function launched.")
548

    
549
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
550

    
551
    def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
552
        """
553
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
554
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
555
        of certificate's private key (if encrypted).
556
        :param certificate: certificate to be put into the PKCS identity store
557
        :param cert_key: key used to sign the given certificate
558
        :param identity_name: name to be given to the identity to be created
559
        :param identity_passphrase: passphrase to be used to encrypt the identity
560
        :return: byte array containing the generated identity (PKCS12 store)
561
        """
562
        Logger.debug("Function launched.")
563

    
564
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
565
        # whose chain of trust we are querying
566
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
567

    
568
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
569
                                                                identity_name,
570
                                                                identity_passphrase, cot_pem_list, cert_key.password)
571

    
572
    def get_root(self, unique_id: int):
573
        """
574
        Function that calls CertificateService.get_chain_of_trust() and extract root CA from the returned chain.
575
        :param unique_id: ID of the certificate to which to find the root CA
576
        :return: Instance of the Certificate class containing a root certificate which was found in the chain
577
        """
578
        Logger.debug("Function launched.")
579

    
580
        chain_of_trust = self.get_chain_of_trust(from_id=unique_id, exclude_root=False)
581

    
582
        if len(chain_of_trust) == 0:
583
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
584
            raise CertificateNotFoundException(unique_id)
585

    
586
        root_ca = chain_of_trust[len(chain_of_trust) - 1]
587

    
588
        if root_ca.type_id != ROOT_CA_ID or root_ca.certificate_id != root_ca.parent_id:
589
            Logger.error(f"Certificate id '{root_ca.certificate_id}' has not same parent_id '{root_ca.parent_id} "
590
                         f"or type_id '{root_ca.type_id}' is not a ROOT_CA_ID '{ROOT_CA_ID}'")
591
            raise InvalidRootCA(root_ca.type_id, root_ca.certificate_id, root_ca.parent_id)
592

    
593
        return root_ca
594

    
595

    
596
class RevocationReasonInvalidException(Exception):
597
    """
598
    Exception that denotes that the caller was trying to revoke
599
    a certificate using an invalid revocation reason
600
    """
601

    
602
    def __init__(self, reason):
603
        self.reason = reason
604

    
605
    def __str__(self):
606
        return f"Revocation reason '{self.reason}' is not valid."
607

    
608

    
609
class CertificateStatusInvalidException(Exception):
610
    """
611
    Exception that denotes that the caller was trying to set
612
    a certificate to an invalid state
613
    """
614

    
615
    def __init__(self, status):
616
        self.status = status
617

    
618
    def __str__(self):
619
        return f"Certificate status '{self.status}' is not valid."
620

    
621

    
622
class CertificateAlreadyRevokedException(Exception):
623
    """
624
    Exception that denotes that the caller was trying to revoke
625
    a certificate that is already revoked
626
    """
627

    
628
    def __init__(self, id):
629
        self.id = id
630

    
631
    def __str__(self):
632
        return f"Certificate id '{self.id}' is already revoked."
633

    
634

    
635
class CertificateCannotBeSetToValid(Exception):
636
    """
637
    Exception that denotes that the caller was trying to
638
    set certificate to valid if the certificate was already
639
    revoked but not certificateHold.
640
    """
641

    
642
    def __init__(self, old_reason):
643
        self.old_state = old_reason
644

    
645
    def __str__(self):
646
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
647
               "certificateHold. " \
648
               f"The revocation reason of the certificate is {self.old_state}"
649

    
650

    
651
class InvalidSubjectAttribute(Exception):
652
    """
653
    Exception that denotes that the caller was trying to
654
    create a certificate while passing a subject with an invalid
655
    attribute.
656
    """
657

    
658
    def __init__(self, attribute_name, reason):
659
        self.attribute_name = attribute_name
660
        self.reason = reason
661

    
662
    def __str__(self):
663
        return f"""Subject attribute "{self.attribute_name}" is invalid (reason: {self.reason})."""
664

    
665

    
666
class InvalidRootCA(Exception):
667
    """
668
    Exception that denotes that certificate has invalid root CA parameters.
669
    """
670

    
671
    def __init__(self, type_id, id, parent_id):
672
        self.type_id = type_id
673
        self.id = id
674
        self.parent_id = parent_id
675

    
676
    def __str__(self):
677
        return f"Certificate id '{self.id}' has not same parent_id '{self.parent_id} " \
678
               f"or type_id '{self.type_id}' is not a ROOT_CA_ID '{ROOT_CA_ID}'"
(2-2/4)