Projekt

Obecné

Profil

Stáhnout (29.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10
from src.dao.certificate_repository import CertificateRepository
11
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12
from src.exceptions.database_exception import DatabaseException
13
from src.exceptions.unknown_exception import UnknownException
14
from src.model.certificate import Certificate
15
from src.model.private_key import PrivateKey
16
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18

    
19
import time
20
from calendar import timegm
21

    
22
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
23
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
24
    CLIENT_AUTH
25

    
26
from src.utils.logger import Logger
27

    
28
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
29
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
30
CRL_EXTENSION = "crlDistributionPoints=URI:"
31
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
32
STATUS_REVOKED = "revoked"
33
STATUS_VALID = "valid"
34

    
35
# define which flags are required for various usages
36
REQUIRED_USAGE_EXTENSION_FLAGS = {
37
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
38
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
39
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
40
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
41

    
42

    
43
class CertificateService:
44

    
45
    @inject
46
    def __init__(self, cryptography_service: CryptographyService,
47
                 certificate_repository: CertificateRepository,
48
                 configuration: Configuration):
49
        self.cryptography_service = cryptography_service
50
        self.certificate_repository = certificate_repository
51
        self.configuration = configuration
52

    
53
    @staticmethod
54
    def __check_subject_is_valid(subject: Subject):
55
        """
56
        Checks whether the given subject is valid
57
        :param subject: A subject to be verified
58
        :raises InvalidCertificateAttribute: When a subject with an invalid attribute is passed
59
        """
60
        if subject.country is not None and subject.country != "" and len(subject.country) != 2:
61
            raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters")
62

    
63
    # TODO usages present in method parameters but not in class diagram
64
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
65
                       usages=None, days=30):
66
        """
67
        Creates a root CA certificate based on the given parameters.
68
        :param key: Private key to be used when generating the certificate
69
        :param subject: Subject to be used put into the certificate
70
        :param config: String containing the configuration to be used
71
        :param extensions: Name of the section in the configuration representing extensions
72
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
73
        :param days: Number of days for which the generated cert. will be considered valid
74
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
75
        :return: An instance of Certificate class representing the generated root CA cert
76
        """
77

    
78
        Logger.debug("Function launched.")
79

    
80
        # check whether the given subject is valid
81
        self.__check_subject_is_valid(subject)
82

    
83
        if usages is None:
84
            usages = {}
85

    
86
        cert_id = self.certificate_repository.get_next_id()
87

    
88
        # specify CA usage
89
        usages[CA_ID] = True
90

    
91
        # generate extension configuration lines based on the specified usages
92
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
93

    
94
        # create a new self signed  certificate
95
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
96
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
97

    
98
        # wrap into Certificate class
99
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
100
                                            ROOT_CA_ID)
101

    
102
        # store the wrapper into the repository
103
        created_id = self.certificate_repository.create(certificate)
104

    
105
        # assign the generated ID to the inserted certificate
106
        certificate.certificate_id = created_id
107

    
108
        return certificate
109

    
110
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
111
        """
112
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
113
        notAfter fields.
114
        :param cert_pem: PEM of the cert. to be wrapped
115
        :param private_key_id: ID of the private key used to create the given certificate
116
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
117
        :param parent_id: ID of the CA that issued this certificate
118
        :param cert_type: Type of this certificate (see constants.py)
119
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
120
        """
121

    
122
        Logger.debug("Function launched.")
123

    
124
        # parse the generated pem for subject and notBefore/notAfter fields
125
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
126
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
127
        # format the parsed date
128
        not_before_formatted = int(timegm(not_before))
129
        not_after_formatted = int(timegm(not_after))
130

    
131
        # create a certificate wrapper
132
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
133
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
134
                                  subj.organization, subj.organization_unit, subj.email_address)
135

    
136
        return certificate
137

    
138
    # TODO config parameter present in class diagram but not here (unused)
139
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
140
                  extensions: str = "", days: int = 30, usages=None):
141
        """
142
        Creates an intermediate CA certificate issued by the given parent CA.
143
        :param subject_key: Private key to be used when generating the certificate
144
        :param subject: Subject to be used put into the certificate
145
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
146
        :param issuer_key: PK used to generate the issuer certificate
147
        :param extensions: Extensions to be used when generating the certificate
148
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
149
        :param days: Number of days for which the generated cert. will be considered valid
150
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
151
        :return: An instance of Certificate class representing the generated intermediate CA cert
152
        """
153

    
154
        Logger.debug("Function launched.")
155

    
156
        # check whether the given subject is valid
157
        self.__check_subject_is_valid(subject)
158

    
159
        if usages is None:
160
            usages = {}
161

    
162
        # specify CA usage
163
        usages[CA_ID] = True
164

    
165
        # generate extension configuration lines based on the specified usages
166
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
167

    
168
        # Add CRL and OCSP distribution point to certificate extensions
169
        cert_id = self.certificate_repository.get_next_id()
170
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
171
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
172

    
173
        # TODO implement AIA URI via extensions
174
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
175
                                                        issuer_key.private_key,
176
                                                        subject_key_pass=subject_key.password,
177
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
178
                                                        days=days,
179
                                                        sn=cert_id)
180

    
181
        # wrap into Certificate class
182
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
183
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
184

    
185
        # store the wrapper into the repository
186
        created_id = self.certificate_repository.create(certificate)
187

    
188
        # assign the generated ID to the inserted certificate
189
        certificate.certificate_id = created_id
190

    
191
        return certificate
192

    
193
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
194
                        issuer_key: PrivateKey,
195
                        extensions: str = "", days: int = 30, usages=None):
196
        """
197
        Creates an end certificate issued by the given parent CA.
198
        :param subject_key: Private key to be used when generating the certificate
199
        :param subject: Subject to be used put into the certificate
200
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
201
        :param issuer_key: PK used to generate the issuer certificate
202
        :param extensions: Extensions to be used when generating the certificate
203
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
204
        :param days: Number of days for which the generated cert. will be considered valid
205
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
206
        :return: An instance of Certificate class representing the generated cert
207
        """
208

    
209
        Logger.debug("Function launched.")
210

    
211
        # check whether the given subject is valid
212
        self.__check_subject_is_valid(subject)
213

    
214
        if usages is None:
215
            usages = {}
216

    
217
        # get the next certificate ID in order to be able to specify the serial number
218
        cert_id = self.certificate_repository.get_next_id()
219

    
220
        # generate extension configuration lines based on the specified usages
221
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
222

    
223
        # Add CRL and OCSP distribution point to certificate extensions
224
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
225
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
226

    
227
        # generate a new certificate
228
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
229
                                                        issuer_key.private_key,
230
                                                        subject_key_pass=subject_key.password,
231
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
232
                                                        days=days,
233
                                                        sn=cert_id
234
                                                        )
235

    
236
        # wrap the generated certificate using Certificate class
237
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
238
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
239

    
240
        created_id = self.certificate_repository.create(certificate)
241

    
242
        certificate.certificate_id = created_id
243

    
244
        return certificate
245

    
246
    def get_certificate(self, unique_id: int) -> Certificate:
247
        """
248
        Tries to fetch a certificate from the certificate repository using a given id.
249
        :param unique_id: ID of the certificate to be fetched
250
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
251
        certificate is not found
252
        """
253

    
254
        Logger.debug("Function launched.")
255

    
256
        return self.certificate_repository.read(unique_id)
257

    
258
    def get_certificates(self, cert_type=None) -> List[Certificate]:
259
        """
260
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
261
        certificates of the given type can be returned.
262
        :param cert_type: Type of certificates to be returned
263
        :return: List of instances of the Certificate class representing all certificates present in the certificate
264
        repository. An empty list is returned when no certificates are found.
265
        """
266

    
267
        Logger.debug("Function launched.")
268

    
269
        return self.certificate_repository.read_all(cert_type)
270

    
271
    def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
272
        """
273
        Tries to fetch a filtered list of all certificates from the certificate repository.
274
        :param target_types: certificate types (filter)
275
        :param target_usages: certificate usages (filter)
276
        :param target_cn_substring: certificate CN substring (filter)
277
        :param page: target page
278
        :param per_page: target page size
279
        :return: List of instances of the Certificate class representing filtered certificates
280
                    present in the certificate repository. An empty list is returned when no certificates are found.
281
        """
282
        return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
283
                                                           page, per_page)
284

    
285
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
286
        """
287
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
288
        root CA certificate is found. Root certificates are excluded from the chain by default.
289
        :param from_id: ID of the first certificate to be included in the chain of trust
290
        :param to_id: ID of the last certificate to be included in the chain of trust
291
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
292
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
293
        ID
294
        """
295

    
296
        Logger.debug("Function launched.")
297

    
298
        # read the first certificate of the chain
299
        start_cert = self.certificate_repository.read(from_id)
300

    
301
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
302
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
303
            return []
304

    
305
        current_cert = start_cert
306
        chain_of_trust = [current_cert]
307

    
308
        # TODO could possibly be simplified
309
        if start_cert.type_id == ROOT_CA_ID:
310
            # the first cert found is a root ca
311
            return chain_of_trust
312

    
313
        while True:
314
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
315

    
316
            # check whether parent certificate exists
317
            if parent_cert is None:
318
                break
319

    
320
            # check whether the found certificate is a root certificate
321
            if parent_cert.type_id == ROOT_CA_ID:
322
                if not exclude_root:
323
                    # append the found root cert only if root certificates should not be excluded from the CoT
324
                    chain_of_trust.append(parent_cert)
325
                break
326

    
327
            # append the certificate
328
            chain_of_trust.append(parent_cert)
329

    
330
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
331
            if parent_cert.certificate_id == to_id:
332
                break
333

    
334
            current_cert = parent_cert
335

    
336
        return chain_of_trust
337

    
338
    def delete_certificate(self, unique_id):
339
        """
340
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
341

    
342
        :param unique_id: ID of specific certificate
343
        """
344

    
345
        Logger.debug("Function launched.")
346

    
347
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
348
        if to_delete is None:
349
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
350
            raise CertificateNotFoundException(unique_id)
351

    
352
        for cert in to_delete:
353
            try:
354
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
355
            except CertificateAlreadyRevokedException:
356
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
357
                # TODO log as an info/debug, not warning or above <-- perfectly legal
358
                pass
359

    
360
            if not self.certificate_repository.delete(cert.certificate_id):
361
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
362

    
363
    def get_certificates_issued_by(self, issuer_id):
364
        """
365
        Returns a list of all children of a certificate identified by an unique_id.
366
        Raises a DatabaseException should any unexpected behavior occur.
367
        :param issuer_id: target certificate ID
368
        :return: children of unique_id
369
        """
370

    
371
        Logger.debug("Function launched.")
372

    
373
        try:
374
            if self.certificate_repository.read(issuer_id) is None:
375
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
376
                raise CertificateNotFoundException(issuer_id)
377
        except DatabaseException:
378
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
379
            raise CertificateNotFoundException(issuer_id)
380

    
381
        return self.certificate_repository.get_all_issued_by(issuer_id)
382

    
383
    def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
384
                                          per_page):
385
        """
386
        Returns a list of all children of a certificate identified by an unique_id.
387
        Filters the results according to target types, usages, cn substring and pagination.
388
        Raises a DatabaseException should any unexpected behavior occur.
389
        :param issuer_id: target certificate ID
390
        :param target_types: filter of types
391
        :param target_usages: filter of usages
392
        :param target_cn_substring: CN substring
393
        :param page: target page or None
394
        :param per_page: certs per page or None
395
        :return: list of certificates (a page if specified)
396
        """
397

    
398
        Logger.debug("Function launched.")
399

    
400
        try:
401
            if self.certificate_repository.read(issuer_id) is None:
402
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
403
                raise CertificateNotFoundException(issuer_id)
404
        except DatabaseException:
405
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
406
            raise CertificateNotFoundException(issuer_id)
407

    
408
        return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
409
                                                                    target_cn_substring, page, per_page)
410

    
411
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
412
        """
413
        Set certificate status to 'valid' or 'revoked'.
414
        If the new status is revoked a reason can be provided -> default is unspecified
415
        :param reason: reason for revocation
416
        :param id: identifier of the certificate whose status is to be changed
417
        :param status: new status of the certificate
418
        :raises CertificateStatusInvalidException: if status is not valid
419
        :raises RevocationReasonInvalidException: if reason is not valid
420
        :raises CertificateNotFoundException: if certificate with given id cannot be found
421
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
422
                it cannot be set revalidated
423
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
424
        :raises UnknownException: if the database is corrupted
425
        """
426

    
427
        Logger.debug("Function launched.")
428

    
429
        if status not in CERTIFICATE_STATES:
430
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
431
            raise CertificateStatusInvalidException(status)
432
        if reason not in CERTIFICATE_REVOCATION_REASONS:
433
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
434
            raise RevocationReasonInvalidException(reason)
435

    
436
        # check whether the certificate exists
437
        certificate = self.certificate_repository.read(id)
438
        if certificate is None:
439
            Logger.error(f"No such certificate found 'ID = {id}'.")
440
            raise CertificateNotFoundException(id)
441

    
442
        updated = False
443
        if status == STATUS_VALID:
444
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
445
            #    -> throw an exception
446
            if certificate.revocation_reason != "" and \
447
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
448
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
449
            updated = self.certificate_repository.clear_certificate_revocation(id)
450
        elif status == STATUS_REVOKED:
451
            # check if the certificate is not revoked already
452
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
453
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
454
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
455
                raise CertificateAlreadyRevokedException(id)
456

    
457
            revocation_timestamp = int(time.time())
458
            updated = self.certificate_repository.set_certificate_revoked(id, revocation_timestamp, reason)
459

    
460
        if not updated:
461
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
462
                         f"or set_certificate_revoked().")
463
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
464
                                   "or set_certificate_revoked().")
465

    
466
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
467
        """
468
        Get Subject distinguished name from a Certificate
469
        :param certificate: certificate instance whose Subject shall be parsed
470
        :return: instance of Subject class containing resulting distinguished name
471
        """
472

    
473
        Logger.debug("Function launched.")
474

    
475
        subject = Subject(certificate.common_name,
476
                          certificate.country_code,
477
                          certificate.locality,
478
                          certificate.province,
479
                          certificate.organization,
480
                          certificate.organizational_unit,
481
                          certificate.email_address)
482
        return subject
483

    
484
    def get_public_key_from_certificate(self, certificate: Certificate):
485
        """
486
        Extracts a public key from the given certificate
487
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
488
        should be extracted.
489
        :return: a string containing the extracted public key in PEM format
490
        """
491

    
492
        Logger.debug("Function launched.")
493

    
494
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
495

    
496
    def get_certificate_state(self, id: int) -> str:
497
        """
498
        Check whether the certificate is expired, valid or revoked.
499
            - in case it's revoked and expired, revoked is returned
500
        :param id: identifier of the certificate
501
        :return: certificates state from {valid, revoked, expired}
502
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
503
        """
504
        Logger.debug("Function launched.")
505
        status = CERTIFICATE_VALID
506

    
507
        # Read the selected certificate from the repository
508
        certificate = self.certificate_repository.read(id)
509
        if certificate is None:
510
            Logger.error("Certificate whose details were requested does not exist.")
511
            raise CertificateNotFoundException(id)
512

    
513
        # check the expiration date using OpenSSL
514
        if not self.cryptography_service.verify_cert(certificate.pem_data):
515
            status = CERTIFICATE_EXPIRED
516

    
517
        # check certificate revocation
518
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
519
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
520

    
521
        if id in all_revoked_by_parent_ids:
522
            status = CERTIFICATE_REVOKED
523

    
524
        return status
525

    
526
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
527
        """
528
        Get URL address of CRL distribution endpoint based on
529
        issuer's ID
530

    
531
        :param ca_identifier: ID of issuing authority
532
        :return: CRL endpoint for the given CA
533
        """
534

    
535
        Logger.debug("Function launched.")
536

    
537
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
538

    
539
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
540
        """
541
        Get URL address of OCSP distribution endpoint based on
542
        issuer's ID
543

    
544
        :param ca_identifier: ID of issuing authority
545
        :return: OCSP endpoint for the given CA
546
        """
547

    
548
        Logger.debug("Function launched.")
549

    
550
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
551

    
552
    def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
553
        """
554
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
555
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
556
        of certificate's private key (if encrypted).
557
        :param certificate: certificate to be put into the PKCS identity store
558
        :param cert_key: key used to sign the given certificate
559
        :param identity_name: name to be given to the identity to be created
560
        :param identity_passphrase: passphrase to be used to encrypt the identity
561
        :return: byte array containing the generated identity (PKCS12 store)
562
        """
563
        Logger.debug("Function launched.")
564

    
565
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
566
        # whose chain of trust we are querying
567
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
568

    
569
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
570
                                                                identity_name,
571
                                                                identity_passphrase, cot_pem_list, cert_key.password)
572

    
573

    
574
class RevocationReasonInvalidException(Exception):
575
    """
576
    Exception that denotes that the caller was trying to revoke
577
    a certificate using an invalid revocation reason
578
    """
579

    
580
    def __init__(self, reason):
581
        self.reason = reason
582

    
583
    def __str__(self):
584
        return f"Revocation reason '{self.reason}' is not valid."
585

    
586

    
587
class CertificateStatusInvalidException(Exception):
588
    """
589
    Exception that denotes that the caller was trying to set
590
    a certificate to an invalid state
591
    """
592

    
593
    def __init__(self, status):
594
        self.status = status
595

    
596
    def __str__(self):
597
        return f"Certificate status '{self.status}' is not valid."
598

    
599

    
600
class CertificateAlreadyRevokedException(Exception):
601
    """
602
    Exception that denotes that the caller was trying to revoke
603
    a certificate that is already revoked
604
    """
605

    
606
    def __init__(self, id):
607
        self.id = id
608

    
609
    def __str__(self):
610
        return f"Certificate id '{self.id}' is already revoked."
611

    
612

    
613
class CertificateCannotBeSetToValid(Exception):
614
    """
615
    Exception that denotes that the caller was trying to
616
    set certificate to valid if the certificate was already
617
    revoked but not certificateHold.
618
    """
619

    
620
    def __init__(self, old_reason):
621
        self.old_state = old_reason
622

    
623
    def __str__(self):
624
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
625
               "certificateHold. " \
626
               f"The revocation reason of the certificate is {self.old_state}"
627

    
628

    
629
class InvalidSubjectAttribute(Exception):
630
    """
631
    Exception that denotes that the caller was trying to
632
    create a certificate while passing a subject with an invalid
633
    attribute.
634
    """
635

    
636
    def __init__(self, attribute_name, reason):
637
        self.attribute_name = attribute_name
638
        self.reason = reason
639

    
640
    def __str__(self):
641
        return f"""Subject attribute "{self.attribute_name}" is invalid (reason: {self.reason})."""
(2-2/4)