Projekt

Obecné

Profil

Stáhnout (28.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10
from src.dao.certificate_repository import CertificateRepository
11
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12
from src.exceptions.database_exception import DatabaseException
13
from src.exceptions.unknown_exception import UnknownException
14
from src.model.certificate import Certificate
15
from src.model.private_key import PrivateKey
16
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18

    
19
import time
20

    
21
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
22
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
23
    CLIENT_AUTH
24

    
25
from src.utils.logger import Logger
26

    
27
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
28
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
29
CRL_EXTENSION = "crlDistributionPoints=URI:"
30
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
31
STATUS_REVOKED = "revoked"
32
STATUS_VALID = "valid"
33

    
34
# define which flags are required for various usages
35
REQUIRED_USAGE_EXTENSION_FLAGS = {
36
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
37
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
38
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
39
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
40

    
41

    
42
class CertificateService:
43

    
44
    @inject
45
    def __init__(self, cryptography_service: CryptographyService,
46
                 certificate_repository: CertificateRepository,
47
                 configuration: Configuration):
48
        self.cryptography_service = cryptography_service
49
        self.certificate_repository = certificate_repository
50
        self.configuration = configuration
51

    
52
    @staticmethod
53
    def __check_subject_is_valid(subject: Subject):
54
        if subject.country is not None and len(subject.country) != 2:
55
            raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters.")
56

    
57
    # TODO usages present in method parameters but not in class diagram
58
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
59
                       usages=None, days=30):
60
        """
61
        Creates a root CA certificate based on the given parameters.
62
        :param key: Private key to be used when generating the certificate
63
        :param subject: Subject to be used put into the certificate
64
        :param config: String containing the configuration to be used
65
        :param extensions: Name of the section in the configuration representing extensions
66
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
67
        :param days: Number of days for which the generated cert. will be considered valid
68
        :return: An instance of Certificate class representing the generated root CA cert
69
        """
70

    
71
        Logger.debug("Function launched.")
72

    
73
        # check whether the given subject is valid
74
        self.__check_subject_is_valid(subject)
75

    
76
        if usages is None:
77
            usages = {}
78

    
79
        cert_id = self.certificate_repository.get_next_id()
80

    
81
        # specify CA usage
82
        usages[CA_ID] = True
83

    
84
        # generate extension configuration lines based on the specified usages
85
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
86

    
87
        # create a new self signed  certificate
88
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
89
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
90

    
91
        # wrap into Certificate class
92
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
93
                                            ROOT_CA_ID)
94

    
95
        # store the wrapper into the repository
96
        created_id = self.certificate_repository.create(certificate)
97

    
98
        # assign the generated ID to the inserted certificate
99
        certificate.certificate_id = created_id
100

    
101
        return certificate
102

    
103
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
104
        """
105
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
106
        notAfter fields.
107
        :param cert_pem: PEM of the cert. to be wrapped
108
        :param private_key_id: ID of the private key used to create the given certificate
109
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
110
        :param parent_id: ID of the CA that issued this certificate
111
        :param cert_type: Type of this certificate (see constants.py)
112
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
113
        """
114

    
115
        Logger.debug("Function launched.")
116

    
117
        # parse the generated pem for subject and notBefore/notAfter fields
118
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
119
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
120
        # format the parsed date
121
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
122
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
123

    
124
        # create a certificate wrapper
125
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
126
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
127
                                  subj.organization, subj.organization_unit, subj.email_address)
128

    
129
        return certificate
130

    
131
    # TODO config parameter present in class diagram but not here (unused)
132
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
133
                  extensions: str = "", days: int = 30, usages=None):
134
        """
135
        Creates an intermediate CA certificate issued by the given parent CA.
136
        :param subject_key: Private key to be used when generating the certificate
137
        :param subject: Subject to be used put into the certificate
138
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
139
        :param issuer_key: PK used to generate the issuer certificate
140
        :param extensions: Extensions to be used when generating the certificate
141
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
142
        :param days: Number of days for which the generated cert. will be considered valid
143
        :return: An instance of Certificate class representing the generated intermediate CA cert
144
        """
145

    
146
        Logger.debug("Function launched.")
147

    
148
        # check whether the given subject is valid
149
        self.__check_subject_is_valid(subject)
150

    
151
        if usages is None:
152
            usages = {}
153

    
154
        # specify CA usage
155
        usages[CA_ID] = True
156

    
157
        # generate extension configuration lines based on the specified usages
158
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
159

    
160
        # Add CRL and OCSP distribution point to certificate extensions
161
        cert_id = self.certificate_repository.get_next_id()
162
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
163
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
164

    
165
        # TODO implement AIA URI via extensions
166
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
167
                                                        issuer_key.private_key,
168
                                                        subject_key_pass=subject_key.password,
169
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
170
                                                        days=days,
171
                                                        sn=cert_id)
172

    
173
        # wrap into Certificate class
174
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
175
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
176

    
177
        # store the wrapper into the repository
178
        created_id = self.certificate_repository.create(certificate)
179

    
180
        # assign the generated ID to the inserted certificate
181
        certificate.certificate_id = created_id
182

    
183
        return certificate
184

    
185
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
186
                        issuer_key: PrivateKey,
187
                        extensions: str = "", days: int = 30, usages=None):
188
        """
189
        Creates an end certificate issued by the given parent CA.
190
        :param subject_key: Private key to be used when generating the certificate
191
        :param subject: Subject to be used put into the certificate
192
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
193
        :param issuer_key: PK used to generate the issuer certificate
194
        :param extensions: Extensions to be used when generating the certificate
195
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
196
        :param days: Number of days for which the generated cert. will be considered valid
197
        :return: An instance of Certificate class representing the generated cert
198
        """
199

    
200
        Logger.debug("Function launched.")
201

    
202
        # check whether the given subject is valid
203
        self.__check_subject_is_valid(subject)
204

    
205
        if usages is None:
206
            usages = {}
207

    
208
        # get the next certificate ID in order to be able to specify the serial number
209
        cert_id = self.certificate_repository.get_next_id()
210

    
211
        # generate extension configuration lines based on the specified usages
212
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
213

    
214
        # Add CRL and OCSP distribution point to certificate extensions
215
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
216
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
217

    
218
        # generate a new certificate
219
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
220
                                                        issuer_key.private_key,
221
                                                        subject_key_pass=subject_key.password,
222
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
223
                                                        days=days,
224
                                                        sn=cert_id
225
                                                        )
226

    
227
        # wrap the generated certificate using Certificate class
228
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
229
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
230

    
231
        created_id = self.certificate_repository.create(certificate)
232

    
233
        certificate.certificate_id = created_id
234

    
235
        return certificate
236

    
237
    def get_certificate(self, unique_id: int) -> Certificate:
238
        """
239
        Tries to fetch a certificate from the certificate repository using a given id.
240
        :param unique_id: ID of the certificate to be fetched
241
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
242
        certificate is not found
243
        """
244

    
245
        Logger.debug("Function launched.")
246

    
247
        return self.certificate_repository.read(unique_id)
248

    
249
    def get_certificates(self, cert_type=None) -> List[Certificate]:
250
        """
251
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
252
        certificates of the given type can be returned.
253
        :param cert_type: Type of certificates to be returned
254
        :return: List of instances of the Certificate class representing all certificates present in the certificate
255
        repository. An empty list is returned when no certificates are found.
256
        """
257

    
258
        Logger.debug("Function launched.")
259

    
260
        return self.certificate_repository.read_all(cert_type)
261

    
262
    def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
263
        """
264
        Tries to fetch a filtered list of all certificates from the certificate repository.
265
        :param target_types: certificate types (filter)
266
        :param target_usages: certificate usages (filter)
267
        :param target_cn_substring: certificate CN substring (filter)
268
        :param page: target page
269
        :param per_page: target page size
270
        :return: List of instances of the Certificate class representing filtered certificates
271
                    present in the certificate repository. An empty list is returned when no certificates are found.
272
        """
273
        return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
274
                                                           page, per_page)
275

    
276
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
277
        """
278
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
279
        root CA certificate is found. Root certificates are excluded from the chain by default.
280
        :param from_id: ID of the first certificate to be included in the chain of trust
281
        :param to_id: ID of the last certificate to be included in the chain of trust
282
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
283
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
284
        ID
285
        """
286

    
287
        Logger.debug("Function launched.")
288

    
289
        # read the first certificate of the chain
290
        start_cert = self.certificate_repository.read(from_id)
291

    
292
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
293
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
294
            return []
295

    
296
        current_cert = start_cert
297
        chain_of_trust = [current_cert]
298

    
299
        # TODO could possibly be simplified
300
        if start_cert.type_id == ROOT_CA_ID:
301
            # the first cert found is a root ca
302
            return chain_of_trust
303

    
304
        while True:
305
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
306

    
307
            # check whether parent certificate exists
308
            if parent_cert is None:
309
                break
310

    
311
            # check whether the found certificate is a root certificate
312
            if parent_cert.type_id == ROOT_CA_ID:
313
                if not exclude_root:
314
                    # append the found root cert only if root certificates should not be excluded from the CoT
315
                    chain_of_trust.append(parent_cert)
316
                break
317

    
318
            # append the certificate
319
            chain_of_trust.append(parent_cert)
320

    
321
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
322
            if parent_cert.certificate_id == to_id:
323
                break
324

    
325
            current_cert = parent_cert
326

    
327
        return chain_of_trust
328

    
329
    def delete_certificate(self, unique_id):
330
        """
331
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
332

    
333
        :param unique_id: ID of specific certificate
334
        """
335

    
336
        Logger.debug("Function launched.")
337

    
338
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
339
        if to_delete is None:
340
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
341
            raise CertificateNotFoundException(unique_id)
342

    
343
        for cert in to_delete:
344
            try:
345
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
346
            except CertificateAlreadyRevokedException:
347
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
348
                # TODO log as an info/debug, not warning or above <-- perfectly legal
349
                pass
350

    
351
            if not self.certificate_repository.delete(cert.certificate_id):
352
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
353

    
354
    def get_certificates_issued_by(self, issuer_id):
355
        """
356
        Returns a list of all children of a certificate identified by an unique_id.
357
        Raises a DatabaseException should any unexpected behavior occur.
358
        :param issuer_id: target certificate ID
359
        :return: children of unique_id
360
        """
361

    
362
        Logger.debug("Function launched.")
363

    
364
        try:
365
            if self.certificate_repository.read(issuer_id) is None:
366
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
367
                raise CertificateNotFoundException(issuer_id)
368
        except DatabaseException:
369
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
370
            raise CertificateNotFoundException(issuer_id)
371

    
372
        return self.certificate_repository.get_all_issued_by(issuer_id)
373

    
374
    def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
375
                                          per_page):
376
        """
377
        Returns a list of all children of a certificate identified by an unique_id.
378
        Filters the results according to target types, usages, cn substring and pagination.
379
        Raises a DatabaseException should any unexpected behavior occur.
380
        :param issuer_id: target certificate ID
381
        :param target_types: filter of types
382
        :param target_usages: filter of usages
383
        :param target_cn_substring: CN substring
384
        :param page: target page or None
385
        :param per_page: certs per page or None
386
        :return: list of certificates (a page if specified)
387
        """
388

    
389
        Logger.debug("Function launched.")
390

    
391
        try:
392
            if self.certificate_repository.read(issuer_id) is None:
393
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
394
                raise CertificateNotFoundException(issuer_id)
395
        except DatabaseException:
396
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
397
            raise CertificateNotFoundException(issuer_id)
398

    
399
        return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
400
                                                                    target_cn_substring, page, per_page)
401

    
402
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
403
        """
404
        Set certificate status to 'valid' or 'revoked'.
405
        If the new status is revoked a reason can be provided -> default is unspecified
406
        :param reason: reason for revocation
407
        :param id: identifier of the certificate whose status is to be changed
408
        :param status: new status of the certificate
409
        :raises CertificateStatusInvalidException: if status is not valid
410
        :raises RevocationReasonInvalidException: if reason is not valid
411
        :raises CertificateNotFoundException: if certificate with given id cannot be found
412
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
413
                it cannot be set revalidated
414
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
415
        :raises UnknownException: if the database is corrupted
416
        """
417

    
418
        Logger.debug("Function launched.")
419

    
420
        if status not in CERTIFICATE_STATES:
421
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
422
            raise CertificateStatusInvalidException(status)
423
        if reason not in CERTIFICATE_REVOCATION_REASONS:
424
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
425
            raise RevocationReasonInvalidException(reason)
426

    
427
        # check whether the certificate exists
428
        certificate = self.certificate_repository.read(id)
429
        if certificate is None:
430
            Logger.error(f"No such certificate found 'ID = {id}'.")
431
            raise CertificateNotFoundException(id)
432

    
433
        updated = False
434
        if status == STATUS_VALID:
435
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
436
            #    -> throw an exception
437
            if certificate.revocation_reason != "" and \
438
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
439
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
440
            updated = self.certificate_repository.clear_certificate_revocation(id)
441
        elif status == STATUS_REVOKED:
442
            # check if the certificate is not revoked already
443
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
444
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
445
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
446
                raise CertificateAlreadyRevokedException(id)
447

    
448
            revocation_timestamp = int(time.time())
449
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
450

    
451
        if not updated:
452
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
453
                         f"or set_certificate_revoked().")
454
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
455
                                   "or set_certificate_revoked().")
456

    
457
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
458
        """
459
        Get Subject distinguished name from a Certificate
460
        :param certificate: certificate instance whose Subject shall be parsed
461
        :return: instance of Subject class containing resulting distinguished name
462
        """
463

    
464
        Logger.debug("Function launched.")
465

    
466
        subject = Subject(certificate.common_name,
467
                          certificate.country_code,
468
                          certificate.locality,
469
                          certificate.province,
470
                          certificate.organization,
471
                          certificate.organizational_unit,
472
                          certificate.email_address)
473
        return subject
474

    
475
    def get_public_key_from_certificate(self, certificate: Certificate):
476
        """
477
        Extracts a public key from the given certificate
478
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
479
        should be extracted.
480
        :return: a string containing the extracted public key in PEM format
481
        """
482

    
483
        Logger.debug("Function launched.")
484

    
485
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
486

    
487
    def get_certificate_state(self, id: int) -> str:
488
        """
489
        Check whether the certificate is expired, valid or revoked.
490
            - in case it's revoked and expired, revoked is returned
491
        :param id: identifier of the certificate
492
        :return: certificates state from {valid, revoked, expired}
493
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
494
        """
495
        Logger.debug("Function launched.")
496
        status = CERTIFICATE_VALID
497

    
498
        # Read the selected certificate from the repository
499
        certificate = self.certificate_repository.read(id)
500
        if certificate is None:
501
            Logger.error("Certificate whose details were requested does not exist.")
502
            raise CertificateNotFoundException(id)
503

    
504
        # check the expiration date using OpenSSL
505
        if not self.cryptography_service.verify_cert(certificate.pem_data):
506
            status = CERTIFICATE_EXPIRED
507

    
508
        # check certificate revocation
509
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
510
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
511

    
512
        if id in all_revoked_by_parent_ids:
513
            status = CERTIFICATE_REVOKED
514

    
515
        return status
516

    
517
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
518
        """
519
        Get URL address of CRL distribution endpoint based on
520
        issuer's ID
521

    
522
        :param ca_identifier: ID of issuing authority
523
        :return: CRL endpoint for the given CA
524
        """
525

    
526
        Logger.debug("Function launched.")
527

    
528
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
529

    
530
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
531
        """
532
        Get URL address of OCSP distribution endpoint based on
533
        issuer's ID
534

    
535
        :param ca_identifier: ID of issuing authority
536
        :return: OCSP endpoint for the given CA
537
        """
538

    
539
        Logger.debug("Function launched.")
540

    
541
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
542

    
543
    def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
544
        """
545
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
546
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
547
        of certificate's private key (if encrypted).
548
        :param certificate: certificate to be put into the PKCS identity store
549
        :param cert_key: key used to sign the given certificate
550
        :param identity_name: name to be given to the identity to be created
551
        :param identity_passphrase: passphrase to be used to encrypt the identity
552
        :return: byte array containing the generated identity (PKCS12 store)
553
        """
554
        Logger.debug("Function launched.")
555

    
556
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
557
        # whose chain of trust we are querying
558
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
559

    
560
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
561
                                                                identity_name,
562
                                                                identity_passphrase, cot_pem_list, cert_key.password)
563

    
564

    
565
class RevocationReasonInvalidException(Exception):
566
    """
567
    Exception that denotes that the caller was trying to revoke
568
    a certificate using an invalid revocation reason
569
    """
570

    
571
    def __init__(self, reason):
572
        self.reason = reason
573

    
574
    def __str__(self):
575
        return f"Revocation reason '{self.reason}' is not valid."
576

    
577

    
578
class CertificateStatusInvalidException(Exception):
579
    """
580
    Exception that denotes that the caller was trying to set
581
    a certificate to an invalid state
582
    """
583

    
584
    def __init__(self, status):
585
        self.status = status
586

    
587
    def __str__(self):
588
        return f"Certificate status '{self.status}' is not valid."
589

    
590

    
591
class CertificateAlreadyRevokedException(Exception):
592
    """
593
    Exception that denotes that the caller was trying to revoke
594
    a certificate that is already revoked
595
    """
596

    
597
    def __init__(self, id):
598
        self.id = id
599

    
600
    def __str__(self):
601
        return f"Certificate id '{self.id}' is already revoked."
602

    
603

    
604
class CertificateCannotBeSetToValid(Exception):
605
    """
606
    Exception that denotes that the caller was trying to
607
    set certificate to valid if the certificate was already
608
    revoked but not certificateHold.
609
    """
610

    
611
    def __init__(self, old_reason):
612
        self.old_state = old_reason
613

    
614
    def __str__(self):
615
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
616
               "certificateHold. " \
617
               f"The revocation reason of the certificate is {self.old_state}"
618

    
619

    
620
class InvalidSubjectAttribute(Exception):
621
    """
622
    Exception that denotes that the caller was trying to
623
    create a certificate while passing a subject with an invalid
624
    attribute.
625
    """
626

    
627
    def __init__(self, attribute_name, reason):
628
        self.attribute_name = attribute_name
629
        self.reason = reason
630

    
631
    def __str__(self):
632
        return f"""Subject attribute "{self.attribute_name} is invalid (reason: {self.reason})."""
(2-2/4)