Projekt

Obecné

Profil

Stáhnout (24.3 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10
from src.dao.certificate_repository import CertificateRepository
11
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12
from src.exceptions.database_exception import DatabaseException
13
from src.exceptions.unknown_exception import UnknownException
14
from src.model.certificate import Certificate
15
from src.model.private_key import PrivateKey
16
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18

    
19
import time
20

    
21
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
22
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
23
    CLIENT_AUTH
24

    
25
from src.utils.logger import Logger
26

    
27
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
28
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
29
CRL_EXTENSION = "crlDistributionPoints=URI:"
30
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
31
STATUS_REVOKED = "revoked"
32
STATUS_VALID = "valid"
33

    
34
# define which flags are required for various usages
35
REQUIRED_USAGE_EXTENSION_FLAGS = {
36
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
37
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
38
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
39
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
40

    
41

    
42
class CertificateService:
43

    
44
    @inject
45
    def __init__(self, cryptography_service: CryptographyService,
46
                 certificate_repository: CertificateRepository,
47
                 configuration: Configuration):
48
        self.cryptography_service = cryptography_service
49
        self.certificate_repository = certificate_repository
50
        self.configuration = configuration
51

    
52
    # TODO usages present in method parameters but not in class diagram
53
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
54
                       usages=None, days=30):
55
        """
56
        Creates a root CA certificate based on the given parameters.
57
        :param key: Private key to be used when generating the certificate
58
        :param subject: Subject to be used put into the certificate
59
        :param config: String containing the configuration to be used
60
        :param extensions: Name of the section in the configuration representing extensions
61
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
62
        :param days: Number of days for which the generated cert. will be considered valid
63
        :return: An instance of Certificate class representing the generated root CA cert
64
        """
65

    
66
        Logger.debug("Function launched.")
67

    
68
        if usages is None:
69
            usages = {}
70

    
71
        cert_id = self.certificate_repository.get_next_id()
72

    
73
        # specify CA usage
74
        usages[CA_ID] = True
75

    
76
        # generate extension configuration lines based on the specified usages
77
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
78

    
79
        # create a new self signed  certificate
80
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
81
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
82

    
83
        # wrap into Certificate class
84
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
85
                                            ROOT_CA_ID)
86

    
87
        # store the wrapper into the repository
88
        created_id = self.certificate_repository.create(certificate)
89

    
90
        # assign the generated ID to the inserted certificate
91
        certificate.certificate_id = created_id
92

    
93
        return certificate
94

    
95
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
96
        """
97
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
98
        notAfter fields.
99
        :param cert_pem: PEM of the cert. to be wrapped
100
        :param private_key_id: ID of the private key used to create the given certificate
101
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
102
        :param parent_id: ID of the CA that issued this certificate
103
        :param cert_type: Type of this certificate (see constants.py)
104
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
105
        """
106

    
107
        Logger.debug("Function launched.")
108

    
109
        # parse the generated pem for subject and notBefore/notAfter fields
110
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
111
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
112
        # format the parsed date
113
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
114
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
115

    
116
        # create a certificate wrapper
117
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
118
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
119
                                  subj.organization, subj.organization_unit, subj.email_address)
120

    
121
        return certificate
122

    
123
    # TODO config parameter present in class diagram but not here (unused)
124
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
125
                  extensions: str = "", days: int = 30, usages=None):
126
        """
127
        Creates an intermediate CA certificate issued by the given parent CA.
128
        :param subject_key: Private key to be used when generating the certificate
129
        :param subject: Subject to be used put into the certificate
130
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
131
        :param issuer_key: PK used to generate the issuer certificate
132
        :param extensions: Extensions to be used when generating the certificate
133
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
134
        :param days: Number of days for which the generated cert. will be considered valid
135
        :return: An instance of Certificate class representing the generated intermediate CA cert
136
        """
137

    
138
        Logger.debug("Function launched.")
139

    
140
        if usages is None:
141
            usages = {}
142

    
143
        # specify CA usage
144
        usages[CA_ID] = True
145

    
146
        # generate extension configuration lines based on the specified usages
147
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
148

    
149
        # Add CRL and OCSP distribution point to certificate extensions
150
        cert_id = self.certificate_repository.get_next_id()
151
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
152
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
153

    
154
        # TODO implement AIA URI via extensions
155
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
156
                                                        issuer_key.private_key,
157
                                                        subject_key_pass=subject_key.password,
158
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
159
                                                        days=days,
160
                                                        sn=cert_id)
161

    
162
        # wrap into Certificate class
163
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
164
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
165

    
166
        # store the wrapper into the repository
167
        created_id = self.certificate_repository.create(certificate)
168

    
169
        # assign the generated ID to the inserted certificate
170
        certificate.certificate_id = created_id
171

    
172
        return certificate
173

    
174
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
175
                        issuer_key: PrivateKey,
176
                        extensions: str = "", days: int = 30, usages=None):
177
        """
178
        Creates an end certificate issued by the given parent CA.
179
        :param subject_key: Private key to be used when generating the certificate
180
        :param subject: Subject to be used put into the certificate
181
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
182
        :param issuer_key: PK used to generate the issuer certificate
183
        :param extensions: Extensions to be used when generating the certificate
184
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
185
        :param days: Number of days for which the generated cert. will be considered valid
186
        :return: An instance of Certificate class representing the generated cert
187
        """
188

    
189
        Logger.debug("Function launched.")
190

    
191
        if usages is None:
192
            usages = {}
193

    
194
        # get the next certificate ID in order to be able to specify the serial number
195
        cert_id = self.certificate_repository.get_next_id()
196

    
197
        # generate extension configuration lines based on the specified usages
198
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
199

    
200
        # Add CRL and OCSP distribution point to certificate extensions
201
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
202
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
203

    
204
        # generate a new certificate
205
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
206
                                                        issuer_key.private_key,
207
                                                        subject_key_pass=subject_key.password,
208
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
209
                                                        days=days,
210
                                                        sn=cert_id
211
                                                        )
212

    
213
        # wrap the generated certificate using Certificate class
214
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
215
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
216

    
217
        created_id = self.certificate_repository.create(certificate)
218

    
219
        certificate.certificate_id = created_id
220

    
221
        return certificate
222

    
223
    def get_certificate(self, unique_id: int) -> Certificate:
224
        """
225
        Tries to fetch a certificate from the certificate repository using a given id.
226
        :param unique_id: ID of the certificate to be fetched
227
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
228
        certificate is not found
229
        """
230

    
231
        Logger.debug("Function launched.")
232

    
233
        return self.certificate_repository.read(unique_id)
234

    
235
    def get_certificates(self, cert_type=None) -> List[Certificate]:
236
        """
237
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
238
        certificates of the given type can be returned.
239
        :param cert_type: Type of certificates to be returned
240
        :return: List of instances of the Certificate class representing all certificates present in the certificate
241
        repository. An empty list is returned when no certificates are found.
242
        """
243

    
244
        Logger.debug("Function launched.")
245

    
246
        return self.certificate_repository.read_all(cert_type)
247

    
248
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
249
        """
250
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
251
        root CA certificate is found. Root certificates are excluded from the chain by default.
252
        :param from_id: ID of the first certificate to be included in the chain of trust
253
        :param to_id: ID of the last certificate to be included in the chain of trust
254
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
255
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
256
        ID
257
        """
258

    
259
        Logger.debug("Function launched.")
260

    
261
        # read the first certificate of the chain
262
        start_cert = self.certificate_repository.read(from_id)
263

    
264
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
265
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
266
            return []
267

    
268
        current_cert = start_cert
269
        chain_of_trust = [current_cert]
270

    
271
        # TODO could possibly be simplified
272
        if start_cert.type_id == ROOT_CA_ID:
273
            # the first cert found is a root ca
274
            return chain_of_trust
275

    
276
        while True:
277
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
278

    
279
            # check whether parent certificate exists
280
            if parent_cert is None:
281
                break
282

    
283
            # check whether the found certificate is a root certificate
284
            if parent_cert.type_id == ROOT_CA_ID:
285
                if not exclude_root:
286
                    # append the found root cert only if root certificates should not be excluded from the CoT
287
                    chain_of_trust.append(parent_cert)
288
                break
289

    
290
            # append the certificate
291
            chain_of_trust.append(parent_cert)
292

    
293
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
294
            if parent_cert.certificate_id == to_id:
295
                break
296

    
297
            current_cert = parent_cert
298

    
299
        return chain_of_trust
300

    
301
    def delete_certificate(self, unique_id):
302
        """
303
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
304

    
305
        :param unique_id: ID of specific certificate
306
        """
307

    
308
        Logger.debug("Function launched.")
309

    
310
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
311
        if to_delete is None:
312
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
313
            raise CertificateNotFoundException(unique_id)
314

    
315
        for cert in to_delete:
316
            try:
317
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
318
            except CertificateAlreadyRevokedException:
319
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
320
                # TODO log as an info/debug, not warning or above <-- perfectly legal
321
                pass
322

    
323
            if not self.certificate_repository.delete(cert.certificate_id):
324
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
325

    
326

    
327
    def get_certificates_issued_by(self, unique_id):
328
        """
329
        Returns a list of all children of a certificate identified by an unique_id.
330
        Raises a DatabaseException should any unexpected behavior occur.
331
        :param unique_id: target certificate ID
332
        :return: children of unique_id
333
        """
334

    
335
        Logger.debug("Function launched.")
336

    
337
        try:
338
            if self.certificate_repository.read(unique_id) is None:
339
                Logger.error(f"No such certificate found 'ID = {unique_id}'.")
340
                raise CertificateNotFoundException(unique_id)
341
        except DatabaseException:
342
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
343
            raise CertificateNotFoundException(unique_id)
344

    
345
        return self.certificate_repository.get_all_issued_by(unique_id)
346

    
347
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
348
        """
349
        Set certificate status to 'valid' or 'revoked'.
350
        If the new status is revoked a reason can be provided -> default is unspecified
351
        :param reason: reason for revocation
352
        :param id: identifier of the certificate whose status is to be changed
353
        :param status: new status of the certificate
354
        :raises CertificateStatusInvalidException: if status is not valid
355
        :raises RevocationReasonInvalidException: if reason is not valid
356
        :raises CertificateNotFoundException: if certificate with given id cannot be found
357
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
358
                it cannot be set revalidated
359
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
360
        :raises UnknownException: if the database is corrupted
361
        """
362

    
363
        Logger.debug("Function launched.")
364

    
365
        if status not in CERTIFICATE_STATES:
366
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
367
            raise CertificateStatusInvalidException(status)
368
        if reason not in CERTIFICATE_REVOCATION_REASONS:
369
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
370
            raise RevocationReasonInvalidException(reason)
371

    
372
        # check whether the certificate exists
373
        certificate = self.certificate_repository.read(id)
374
        if certificate is None:
375
            Logger.error(f"No such certificate found 'ID = {id}'.")
376
            raise CertificateNotFoundException(id)
377

    
378
        updated = False
379
        if status == STATUS_VALID:
380
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
381
            #    -> throw an exception
382
            if certificate.revocation_reason != "" and \
383
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
384
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
385
            updated = self.certificate_repository.clear_certificate_revocation(id)
386
        elif status == STATUS_REVOKED:
387
            # check if the certificate is not revoked already
388
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
389
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
390
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
391
                raise CertificateAlreadyRevokedException(id)
392

    
393
            revocation_timestamp = int(time.time())
394
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
395

    
396
        if not updated:
397
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
398
                         f"or set_certificate_revoked().")
399
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
400
                                   "or set_certificate_revoked().")
401

    
402
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
403
        """
404
        Get Subject distinguished name from a Certificate
405
        :param certificate: certificate instance whose Subject shall be parsed
406
        :return: instance of Subject class containing resulting distinguished name
407
        """
408

    
409
        Logger.debug("Function launched.")
410

    
411
        subject = Subject(certificate.common_name,
412
                          certificate.country_code,
413
                          certificate.locality,
414
                          certificate.province,
415
                          certificate.organization,
416
                          certificate.organizational_unit,
417
                          certificate.email_address)
418
        return subject
419

    
420
    def get_public_key_from_certificate(self, certificate: Certificate):
421
        """
422
        Extracts a public key from the given certificate
423
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
424
        should be extracted.
425
        :return: a string containing the extracted public key in PEM format
426
        """
427

    
428
        Logger.debug("Function launched.")
429

    
430
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
431

    
432
    def get_certificate_state(self, id: int) -> str:
433
        """
434
        Check whether the certificate is expired, valid or revoked.
435
            - in case it's revoked and expired, revoked is returned
436
        :param id: identifier of the certificate
437
        :return: certificates state from {valid, revoked, expired}
438
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
439
        """
440
        Logger.debug("Function launched.")
441
        status = CERTIFICATE_VALID
442

    
443
        # Read the selected certificate from the repository
444
        certificate = self.certificate_repository.read(id)
445
        if certificate is None:
446
            Logger.error("Certificate whose details were requested does not exists.")
447
            raise CertificateNotFoundException(id)
448

    
449
        # check the expiration date using OpenSSL
450
        if not self.cryptography_service.verify_cert(certificate.pem_data):
451
            status = CERTIFICATE_EXPIRED
452

    
453
        # check certificate revocation
454
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
455
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
456

    
457
        if id in all_revoked_by_parent_ids:
458
            status = CERTIFICATE_REVOKED
459

    
460
        return status
461

    
462

    
463
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
464
        """
465
        Get URL address of CRL distribution endpoint based on
466
        issuer's ID
467

    
468
        :param ca_identifier: ID of issuing authority
469
        :return: CRL endpoint for the given CA
470
        """
471

    
472
        Logger.debug("Function launched.")
473

    
474
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
475

    
476
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
477
        """
478
        Get URL address of OCSP distribution endpoint based on
479
        issuer's ID
480

    
481
        :param ca_identifier: ID of issuing authority
482
        :return: OCSP endpoint for the given CA
483
        """
484

    
485
        Logger.debug("Function launched.")
486

    
487
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
488

    
489

    
490
class RevocationReasonInvalidException(Exception):
491
    """
492
    Exception that denotes that the caller was trying to revoke
493
    a certificate using an invalid revocation reason
494
    """
495

    
496
    def __init__(self, reason):
497
        self.reason = reason
498

    
499
    def __str__(self):
500
        return f"Revocation reason '{self.reason}' is not valid."
501

    
502

    
503
class CertificateStatusInvalidException(Exception):
504
    """
505
    Exception that denotes that the caller was trying to set
506
    a certificate to an invalid state
507
    """
508

    
509
    def __init__(self, status):
510
        self.status = status
511

    
512
    def __str__(self):
513
        return f"Certificate status '{self.status}' is not valid."
514

    
515

    
516
class CertificateAlreadyRevokedException(Exception):
517
    """
518
    Exception that denotes that the caller was trying to revoke
519
    a certificate that is already revoked
520
    """
521

    
522
    def __init__(self, id):
523
        self.id = id
524

    
525
    def __str__(self):
526
        return f"Certificate id '{self.id}' is already revoked."
527

    
528

    
529
class CertificateCannotBeSetToValid(Exception):
530
    """
531
    Exception that denotes that the caller was trying to
532
    set certificate to valid if the certificate was already
533
    revoked but not certificateHold.
534
    """
535

    
536
    def __init__(self, old_reason):
537
        self.old_state = old_reason
538

    
539
    def __str__(self):
540
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
541
               "certificateHold. " \
542
               f"The revocation reason of the certificate is {self.old_state}"
(2-2/4)