Projekt

Obecné

Profil

Stáhnout (24.7 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8
    CERTIFICATE_REVOKED
9
from src.dao.certificate_repository import CertificateRepository
10
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
11
from src.exceptions.database_exception import DatabaseException
12
from src.exceptions.unknown_exception import UnknownException
13
from src.model.certificate import Certificate
14
from src.model.private_key import PrivateKey
15
from src.model.subject import Subject
16
from src.services.cryptography import CryptographyService
17

    
18
import time
19

    
20
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
21
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
22
    CLIENT_AUTH
23

    
24
from src.utils.logger import Logger
25

    
26
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
27
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
28
CRL_EXTENSION = "crlDistributionPoints=URI:"
29
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
30
STATUS_REVOKED = "revoked"
31
STATUS_VALID = "valid"
32

    
33
# define which flags are required for various usages
34
REQUIRED_USAGE_EXTENSION_FLAGS = {
35
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
36
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
37
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
38
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
39

    
40

    
41
class CertificateService:
42

    
43
    @inject
44
    def __init__(self, cryptography_service: CryptographyService,
45
                 certificate_repository: CertificateRepository,
46
                 configuration: Configuration):
47
        self.cryptography_service = cryptography_service
48
        self.certificate_repository = certificate_repository
49
        self.configuration = configuration
50

    
51
    # TODO usages present in method parameters but not in class diagram
52
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
53
                       usages=None, days=30):
54
        """
55
        Creates a root CA certificate based on the given parameters.
56
        :param key: Private key to be used when generating the certificate
57
        :param subject: Subject to be used put into the certificate
58
        :param config: String containing the configuration to be used
59
        :param extensions: Name of the section in the configuration representing extensions
60
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
61
        :param days: Number of days for which the generated cert. will be considered valid
62
        :return: An instance of Certificate class representing the generated root CA cert
63
        """
64

    
65
        Logger.debug("Function launched.")
66

    
67
        if usages is None:
68
            usages = {}
69

    
70
        cert_id = self.certificate_repository.get_next_id()
71

    
72
        # specify CA usage
73
        usages[CA_ID] = True
74

    
75
        # generate extension configuration lines based on the specified usages
76
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
77

    
78
        # create a new self signed  certificate
79
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
80
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
81

    
82
        # wrap into Certificate class
83
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
84
                                            ROOT_CA_ID)
85

    
86
        # store the wrapper into the repository
87
        created_id = self.certificate_repository.create(certificate)
88

    
89
        # assign the generated ID to the inserted certificate
90
        certificate.certificate_id = created_id
91

    
92
        return certificate
93

    
94
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
95
        """
96
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
97
        notAfter fields.
98
        :param cert_pem: PEM of the cert. to be wrapped
99
        :param private_key_id: ID of the private key used to create the given certificate
100
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
101
        :param parent_id: ID of the CA that issued this certificate
102
        :param cert_type: Type of this certificate (see constants.py)
103
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
104
        """
105

    
106
        Logger.debug("Function launched.")
107

    
108
        # parse the generated pem for subject and notBefore/notAfter fields
109
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
110
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
111
        # format the parsed date
112
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
113
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
114

    
115
        # create a certificate wrapper
116
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
117
                                  private_key_id, cert_type, parent_id, usages)
118

    
119
        return certificate
120

    
121
    # TODO config parameter present in class diagram but not here (unused)
122
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
123
                  extensions: str = "", days: int = 30, usages=None):
124
        """
125
        Creates an intermediate CA certificate issued by the given parent CA.
126
        :param subject_key: Private key to be used when generating the certificate
127
        :param subject: Subject to be used put into the certificate
128
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
129
        :param issuer_key: PK used to generate the issuer certificate
130
        :param extensions: Extensions to be used when generating the certificate
131
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
132
        :param days: Number of days for which the generated cert. will be considered valid
133
        :return: An instance of Certificate class representing the generated intermediate CA cert
134
        """
135

    
136
        Logger.debug("Function launched.")
137

    
138
        if usages is None:
139
            usages = {}
140

    
141
        # specify CA usage
142
        usages[CA_ID] = True
143

    
144
        # generate extension configuration lines based on the specified usages
145
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
146

    
147
        # Add CRL and OCSP distribution point to certificate extensions
148
        cert_id = self.certificate_repository.get_next_id()
149
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
150
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
151

    
152
        # TODO implement AIA URI via extensions
153
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
154
                                                        issuer_key.private_key,
155
                                                        subject_key_pass=subject_key.password,
156
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
157
                                                        days=days,
158
                                                        sn=cert_id)
159

    
160
        # wrap into Certificate class
161
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
162
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
163

    
164
        # parse the generated pem for subject and notBefore/notAfter fields
165
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
166

    
167
        # format the parsed date
168
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
169
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
170

    
171
        # create a certificate wrapper
172
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
173
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
174

    
175
        # store the wrapper into the repository
176
        created_id = self.certificate_repository.create(certificate)
177

    
178
        # assign the generated ID to the inserted certificate
179
        certificate.certificate_id = created_id
180

    
181
        return certificate
182

    
183
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
184
                        issuer_key: PrivateKey,
185
                        extensions: str = "", days: int = 30, usages=None):
186
        """
187
        Creates an end certificate issued by the given parent CA.
188
        :param subject_key: Private key to be used when generating the certificate
189
        :param subject: Subject to be used put into the certificate
190
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
191
        :param issuer_key: PK used to generate the issuer certificate
192
        :param extensions: Extensions to be used when generating the certificate
193
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
194
        :param days: Number of days for which the generated cert. will be considered valid
195
        :return: An instance of Certificate class representing the generated cert
196
        """
197

    
198
        Logger.debug("Function launched.")
199

    
200
        if usages is None:
201
            usages = {}
202

    
203
        # get the next certificate ID in order to be able to specify the serial number
204
        cert_id = self.certificate_repository.get_next_id()
205

    
206
        # generate extension configuration lines based on the specified usages
207
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
208

    
209
        # Add CRL and OCSP distribution point to certificate extensions
210
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
211
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
212

    
213
        # generate a new certificate
214
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
215
                                                        issuer_key.private_key,
216
                                                        subject_key_pass=subject_key.password,
217
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
218
                                                        days=days,
219
                                                        sn=cert_id
220
                                                        )
221

    
222
        # wrap the generated certificate using Certificate class
223
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
224
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
225

    
226
        created_id = self.certificate_repository.create(certificate)
227

    
228
        certificate.certificate_id = created_id
229

    
230
        return certificate
231

    
232
    def get_certificate(self, unique_id: int) -> Certificate:
233
        """
234
        Tries to fetch a certificate from the certificate repository using a given id.
235
        :param unique_id: ID of the certificate to be fetched
236
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
237
        certificate is not found
238
        """
239

    
240
        Logger.debug("Function launched.")
241

    
242
        return self.certificate_repository.read(unique_id)
243

    
244
    def get_certificates(self, cert_type=None) -> List[Certificate]:
245
        """
246
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
247
        certificates of the given type can be returned.
248
        :param cert_type: Type of certificates to be returned
249
        :return: List of instances of the Certificate class representing all certificates present in the certificate
250
        repository. An empty list is returned when no certificates are found.
251
        """
252

    
253
        Logger.debug("Function launched.")
254

    
255
        return self.certificate_repository.read_all(cert_type)
256

    
257
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
258
        """
259
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
260
        root CA certificate is found. Root certificates are excluded from the chain by default.
261
        :param from_id: ID of the first certificate to be included in the chain of trust
262
        :param to_id: ID of the last certificate to be included in the chain of trust
263
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
264
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
265
        ID
266
        """
267

    
268
        Logger.debug("Function launched.")
269

    
270
        # read the first certificate of the chain
271
        start_cert = self.certificate_repository.read(from_id)
272

    
273
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
274
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
275
            return []
276

    
277
        current_cert = start_cert
278
        chain_of_trust = [current_cert]
279

    
280
        # TODO could possibly be simplified
281
        if start_cert.type_id == ROOT_CA_ID:
282
            # the first cert found is a root ca
283
            return chain_of_trust
284

    
285
        while True:
286
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
287

    
288
            # check whether parent certificate exists
289
            if parent_cert is None:
290
                break
291

    
292
            # check whether the found certificate is a root certificate
293
            if parent_cert.type_id == ROOT_CA_ID:
294
                if not exclude_root:
295
                    # append the found root cert only if root certificates should not be excluded from the CoT
296
                    chain_of_trust.append(parent_cert)
297
                break
298

    
299
            # append the certificate
300
            chain_of_trust.append(parent_cert)
301

    
302
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
303
            if parent_cert.certificate_id == to_id:
304
                break
305

    
306
            current_cert = parent_cert
307

    
308
        return chain_of_trust
309

    
310
    def delete_certificate(self, unique_id):
311
        """
312
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
313

    
314
        :param unique_id: ID of specific certificate
315
        """
316

    
317
        Logger.debug("Function launched.")
318

    
319
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
320
        if to_delete is None:
321
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
322
            raise CertificateNotFoundException(unique_id)
323

    
324
        for cert in to_delete:
325
            try:
326
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
327
            except CertificateAlreadyRevokedException:
328
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
329
                # TODO log as an info/debug, not warning or above <-- perfectly legal
330
                pass
331

    
332
            if not self.certificate_repository.delete(cert.certificate_id):
333
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
334

    
335

    
336
    def get_certificates_issued_by(self, unique_id):
337
        """
338
        Returns a list of all children of a certificate identified by an unique_id.
339
        Raises a DatabaseException should any unexpected behavior occur.
340
        :param unique_id: target certificate ID
341
        :return: children of unique_id
342
        """
343

    
344
        Logger.debug("Function launched.")
345

    
346
        try:
347
            if self.certificate_repository.read(unique_id) is None:
348
                Logger.error(f"No such certificate found 'ID = {unique_id}'.")
349
                raise CertificateNotFoundException(unique_id)
350
        except DatabaseException:
351
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
352
            raise CertificateNotFoundException(unique_id)
353

    
354
        return self.certificate_repository.get_all_issued_by(unique_id)
355

    
356
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
357
        """
358
        Set certificate status to 'valid' or 'revoked'.
359
        If the new status is revoked a reason can be provided -> default is unspecified
360
        :param reason: reason for revocation
361
        :param id: identifier of the certificate whose status is to be changed
362
        :param status: new status of the certificate
363
        """
364

    
365
        Logger.debug("Function launched.")
366

    
367
        if status not in CERTIFICATE_STATES:
368
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
369
            raise CertificateStatusInvalidException(status)
370
        if reason not in CERTIFICATE_REVOCATION_REASONS:
371
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
372
            raise RevocationReasonInvalidException(reason)
373

    
374
        # check whether the certificate exists
375
        certificate = self.certificate_repository.read(id)
376
        if certificate is None:
377
            Logger.error(f"No such certificate found 'ID = {id}'.")
378
            raise CertificateNotFoundException(id)
379

    
380
        updated = False
381
        if status == STATUS_VALID:
382
            updated = self.certificate_repository.clear_certificate_revocation(id)
383
        elif status == STATUS_REVOKED:
384
            # check if the certificate is not revoked already
385
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
386
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
387
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
388
                raise CertificateAlreadyRevokedException(id)
389

    
390
            revocation_timestamp = int(time.time())
391
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
392

    
393
        if not updated:
394
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
395
                         f"or set_certificate_revoked().")
396
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
397
                                   "or set_certificate_revoked().")
398

    
399
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
400
        """
401
        Get Subject distinguished name from a Certificate
402
        :param certificate: certificate instance whose Subject shall be parsed
403
        :return: instance of Subject class containing resulting distinguished name
404
        """
405

    
406
        Logger.debug("Function launched.")
407

    
408
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
409
        return subject
410

    
411
    def get_public_key_from_certificate(self, certificate: Certificate):
412
        """
413
        Extracts a public key from the given certificate
414
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
415
        should be extracted.
416
        :return: a string containing the extracted public key in PEM format
417
        """
418

    
419
        Logger.debug("Function launched.")
420

    
421
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
422

    
423
    def get_certificate_state(self, id: int) -> str:
424
        """
425
        Check whether the certificate is expired, valid or revoked.
426
            - in case it's revoked and expired, revoked is returned
427
        :param id: identifier of the certificate
428
        :return: certificates state from {valid, revoked, expired}
429
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
430
        """
431
        Logger.debug("Function launched.")
432
        status = CERTIFICATE_VALID
433

    
434
        # Read the selected certificate from the repository
435
        certificate = self.certificate_repository.read(id)
436
        if certificate is None:
437
            Logger.error("Certificate whose details were requested does not exist.")
438
            raise CertificateNotFoundException(id)
439

    
440
        # check the expiration date using OpenSSL
441
        if not self.cryptography_service.verify_cert(certificate.pem_data):
442
            status = CERTIFICATE_EXPIRED
443

    
444
        # check certificate revocation
445
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
446
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
447

    
448
        if id in all_revoked_by_parent_ids:
449
            status = CERTIFICATE_REVOKED
450

    
451
        return status
452

    
453

    
454
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
455
        """
456
        Get URL address of CRL distribution endpoint based on
457
        issuer's ID
458

    
459
        :param ca_identifier: ID of issuing authority
460
        :return: CRL endpoint for the given CA
461
        """
462

    
463
        Logger.debug("Function launched.")
464

    
465
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
466

    
467
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
468
        """
469
        Get URL address of OCSP distribution endpoint based on
470
        issuer's ID
471

    
472
        :param ca_identifier: ID of issuing authority
473
        :return: OCSP endpoint for the given CA
474
        """
475

    
476
        Logger.debug("Function launched.")
477

    
478
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
479

    
480
    def generate_pkcs_identity(self, cert_id: int, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
481
        """
482
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
483
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
484
        of certificate's private key (if encrypted).
485
        :param cert_id: ID of the certificate to be put into the PKCS identity store
486
        :param cert_key: key used to sign the given certificate
487
        :param identity_name: name to be given to the identity to be created
488
        :param identity_passphrase: passphrase to be used to encrypt the identity
489
        :return: byte array containing the generated identity (PKCS12 store)
490
        """
491
        Logger.debug("Function launched.")
492

    
493
        # Read the selected certificate from the repository
494
        certificate = self.certificate_repository.read(cert_id)
495
        if certificate is None:
496
            Logger.error("Certificate whose identity should be generated does not exist.")
497
            raise CertificateNotFoundException(cert_id)
498

    
499
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
500
        # whose chain of trust we are querying
501
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(cert_id, exclude_root=False)[1:]]
502

    
503
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
504
                                                                identity_name,
505
                                                                identity_passphrase, cot_pem_list, cert_key.password)
506

    
507

    
508
class RevocationReasonInvalidException(Exception):
509
    """
510
    Exception that denotes that the caller was trying to revoke
511
    a certificate using an invalid revocation reason
512
    """
513

    
514
    def __init__(self, reason):
515
        self.reason = reason
516

    
517
    def __str__(self):
518
        return f"Revocation reason '{self.reason}' is not valid."
519

    
520

    
521
class CertificateStatusInvalidException(Exception):
522
    """
523
    Exception that denotes that the caller was trying to set
524
    a certificate to an invalid state
525
    """
526

    
527
    def __init__(self, status):
528
        self.status = status
529

    
530
    def __str__(self):
531
        return f"Certificate status '{self.status}' is not valid."
532

    
533

    
534
class CertificateAlreadyRevokedException(Exception):
535
    """
536
    Exception that denotes that the caller was trying to revoke
537
    a certificate that is already revoked
538
    """
539

    
540
    def __init__(self, id):
541
        self.id = id
542

    
543
    def __str__(self):
544
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)