Projekt

Obecné

Profil

Stáhnout (21.7 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_REVOCATION_REASON_HOLD
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10
from src.exceptions.database_exception import DatabaseException
11
from src.exceptions.unknown_exception import UnknownException
12
from src.model.certificate import Certificate
13
from src.model.private_key import PrivateKey
14
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16

    
17
import time
18

    
19
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
20
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
21
    CLIENT_AUTH
22

    
23
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
24
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
25
CRL_EXTENSION = "crlDistributionPoints=URI:"
26
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
27
STATUS_REVOKED = "revoked"
28
STATUS_VALID = "valid"
29

    
30
# define which flags are required for various usages
31
REQUIRED_USAGE_EXTENSION_FLAGS = {
32
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
33
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
34
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
35
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
36

    
37

    
38
class CertificateService:
39

    
40
    @inject
41
    def __init__(self, cryptography_service: CryptographyService,
42
                 certificate_repository: CertificateRepository,
43
                 configuration: Configuration):
44
        self.cryptography_service = cryptography_service
45
        self.certificate_repository = certificate_repository
46
        self.configuration = configuration
47

    
48
    # TODO usages present in method parameters but not in class diagram
49
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
50
                       usages=None, days=30):
51
        """
52
        Creates a root CA certificate based on the given parameters.
53
        :param key: Private key to be used when generating the certificate
54
        :param subject: Subject to be used put into the certificate
55
        :param config: String containing the configuration to be used
56
        :param extensions: Name of the section in the configuration representing extensions
57
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
58
        :param days: Number of days for which the generated cert. will be considered valid
59
        :return: An instance of Certificate class representing the generated root CA cert
60
        """
61
        if usages is None:
62
            usages = {}
63

    
64
        cert_id = self.certificate_repository.get_next_id()
65

    
66
        # specify CA usage
67
        usages[CA_ID] = True
68

    
69
        # generate extension configuration lines based on the specified usages
70
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
71

    
72
        # create a new self signed  certificate
73
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
74
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
75

    
76
        # wrap into Certificate class
77
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
78
                                            ROOT_CA_ID)
79

    
80
        # store the wrapper into the repository
81
        created_id = self.certificate_repository.create(certificate)
82

    
83
        # assign the generated ID to the inserted certificate
84
        certificate.certificate_id = created_id
85

    
86
        return certificate
87

    
88
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
89
        """
90
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
91
        notAfter fields.
92
        :param cert_pem: PEM of the cert. to be wrapped
93
        :param private_key_id: ID of the private key used to create the given certificate
94
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
95
        :param parent_id: ID of the CA that issued this certificate
96
        :param cert_type: Type of this certificate (see constants.py)
97
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
98
        """
99
        # parse the generated pem for subject and notBefore/notAfter fields
100
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
101
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
102
        # format the parsed date
103
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
104
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
105

    
106
        # create a certificate wrapper
107
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
108
                                  private_key_id, cert_type, parent_id, usages)
109

    
110
        return certificate
111

    
112
    # TODO config parameter present in class diagram but not here (unused)
113
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
114
                  extensions: str = "", days: int = 30, usages=None):
115
        """
116
        Creates an intermediate CA certificate issued by the given parent CA.
117
        :param subject_key: Private key to be used when generating the certificate
118
        :param subject: Subject to be used put into the certificate
119
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
120
        :param issuer_key: PK used to generate the issuer certificate
121
        :param extensions: Extensions to be used when generating the certificate
122
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
123
        :param days: Number of days for which the generated cert. will be considered valid
124
        :return: An instance of Certificate class representing the generated intermediate CA cert
125
        """
126
        if usages is None:
127
            usages = {}
128

    
129
        # specify CA usage
130
        usages[CA_ID] = True
131

    
132
        # generate extension configuration lines based on the specified usages
133
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
134

    
135
        # Add CRL and OCSP distribution point to certificate extensions
136
        cert_id = self.certificate_repository.get_next_id()
137
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
138
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
139

    
140
        # TODO implement AIA URI via extensions
141
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
142
                                                        issuer_key.private_key,
143
                                                        subject_key_pass=subject_key.password,
144
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
145
                                                        days=days,
146
                                                        sn=cert_id)
147

    
148
        # wrap into Certificate class
149
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
150
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
151

    
152
        # parse the generated pem for subject and notBefore/notAfter fields
153
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
154

    
155
        # format the parsed date
156
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
157
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
158

    
159
        # create a certificate wrapper
160
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
161
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
162

    
163
        # store the wrapper into the repository
164
        created_id = self.certificate_repository.create(certificate)
165

    
166
        # assign the generated ID to the inserted certificate
167
        certificate.certificate_id = created_id
168

    
169
        return certificate
170

    
171
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
172
                        issuer_key: PrivateKey,
173
                        extensions: str = "", days: int = 30, usages=None):
174
        """
175
        Creates an end certificate issued by the given parent CA.
176
        :param subject_key: Private key to be used when generating the certificate
177
        :param subject: Subject to be used put into the certificate
178
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
179
        :param issuer_key: PK used to generate the issuer certificate
180
        :param extensions: Extensions to be used when generating the certificate
181
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
182
        :param days: Number of days for which the generated cert. will be considered valid
183
        :return: An instance of Certificate class representing the generated cert
184
        """
185
        if usages is None:
186
            usages = {}
187

    
188
        # get the next certificate ID in order to be able to specify the serial number
189
        cert_id = self.certificate_repository.get_next_id()
190

    
191
        # generate extension configuration lines based on the specified usages
192
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
193

    
194
        # Add CRL and OCSP distribution point to certificate extensions
195
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
196
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
197

    
198
        # generate a new certificate
199
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
200
                                                        issuer_key.private_key,
201
                                                        subject_key_pass=subject_key.password,
202
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
203
                                                        days=days,
204
                                                        sn=cert_id
205
                                                        )
206

    
207
        # wrap the generated certificate using Certificate class
208
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
209
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
210

    
211
        created_id = self.certificate_repository.create(certificate)
212

    
213
        certificate.certificate_id = created_id
214

    
215
        return certificate
216

    
217
    def get_certificate(self, unique_id: int) -> Certificate:
218
        """
219
        Tries to fetch a certificate from the certificate repository using a given id.
220
        :param unique_id: ID of the certificate to be fetched
221
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
222
        certificate is not found
223
        """
224
        return self.certificate_repository.read(unique_id)
225

    
226
    def get_certificates(self, cert_type=None) -> List[Certificate]:
227
        """
228
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
229
        certificates of the given type can be returned.
230
        :param cert_type: Type of certificates to be returned
231
        :return: List of instances of the Certificate class representing all certificates present in the certificate
232
        repository. An empty list is returned when no certificates are found.
233
        """
234
        return self.certificate_repository.read_all(cert_type)
235

    
236
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
237
        """
238
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
239
        root CA certificate is found. Root certificates are excluded from the chain by default.
240
        :param from_id: ID of the first certificate to be included in the chain of trust
241
        :param to_id: ID of the last certificate to be included in the chain of trust
242
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
243
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
244
        ID
245
        """
246
        # read the first certificate of the chain
247
        start_cert = self.certificate_repository.read(from_id)
248

    
249
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
250
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
251
            return []
252

    
253
        current_cert = start_cert
254
        chain_of_trust = [current_cert]
255

    
256
        # TODO could possibly be simplified
257
        if start_cert.type_id == ROOT_CA_ID:
258
            # the first cert found is a root ca
259
            return chain_of_trust
260

    
261
        while True:
262
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
263

    
264
            # check whether parent certificate exists
265
            if parent_cert is None:
266
                break
267

    
268
            # check whether the found certificate is a root certificate
269
            if parent_cert.type_id == ROOT_CA_ID:
270
                if not exclude_root:
271
                    # append the found root cert only if root certificates should not be excluded from the CoT
272
                    chain_of_trust.append(parent_cert)
273
                break
274

    
275
            # append the certificate
276
            chain_of_trust.append(parent_cert)
277

    
278
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
279
            if parent_cert.certificate_id == to_id:
280
                break
281

    
282
            current_cert = parent_cert
283

    
284
        return chain_of_trust
285

    
286
    def delete_certificate(self, unique_id):
287
        """
288
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
289

    
290
        :param unique_id: ID of specific certificate
291
        """
292

    
293
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
294
        if to_delete is None:
295
            raise CertificateNotFoundException(unique_id)
296

    
297
        for cert in to_delete:
298
            try:
299
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
300
            except CertificateAlreadyRevokedException:
301
                # TODO log as an info/debug, not warning or above <-- perfectly legal
302
                pass
303

    
304
            self.certificate_repository.delete(cert.certificate_id)
305
            # TODO log if not successfully deleted
306

    
307
    def get_certificates_issued_by(self, unique_id):
308
        """
309
        Returns a list of all children of a certificate identified by an unique_id.
310
        Raises a DatabaseException should any unexpected behavior occur.
311
        :param unique_id: target certificate ID
312
        :return: children of unique_id
313
        """
314
        try:
315
            if self.certificate_repository.read(unique_id) is None:
316
                raise CertificateNotFoundException(unique_id)
317
        except DatabaseException:
318
            raise CertificateNotFoundException(unique_id)
319

    
320
        return self.certificate_repository.get_all_issued_by(unique_id)
321

    
322
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
323
        """
324
        Set certificate status to 'valid' or 'revoked'.
325
        If the new status is revoked a reason can be provided -> default is unspecified
326
        :param reason: reason for revocation
327
        :param id: identifier of the certificate whose status is to be changed
328
        :param status: new status of the certificate
329
        :raises CertificateStatusInvalidException: if status is not valid
330
        :raises RevocationReasonInvalidException: if reason is not valid
331
        :raises CertificateNotFoundException: if certificate with given id cannot be found
332
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
333
                it cannot be set revalidated
334
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
335
        :raises UnknownException: if the database is corrupted
336
        """
337
        if status not in CERTIFICATE_STATES:
338
            raise CertificateStatusInvalidException(status)
339
        if reason not in CERTIFICATE_REVOCATION_REASONS:
340
            raise RevocationReasonInvalidException(reason)
341

    
342
        # check whether the certificate exists
343
        certificate = self.certificate_repository.read(id)
344
        if certificate is None:
345
            raise CertificateNotFoundException(id)
346

    
347
        updated = False
348
        if status == STATUS_VALID:
349
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
350
            #    -> throw an exception
351
            if certificate.revocation_reason != "" and \
352
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
353
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
354
            updated = self.certificate_repository.clear_certificate_revocation(id)
355
        elif status == STATUS_REVOKED:
356
            # check if the certificate is not revoked already
357
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
358
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
359
                raise CertificateAlreadyRevokedException(id)
360

    
361
            revocation_timestamp = int(time.time())
362
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
363

    
364
        if not updated:
365
            # TODO log this
366
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
367
                                   "or set_certificate_revoked().")
368

    
369
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
370
        """
371
        Get Subject distinguished name from a Certificate
372
        :param certificate: certificate instance whose Subject shall be parsed
373
        :return: instance of Subject class containing resulting distinguished name
374
        """
375
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
376
        return subject
377

    
378
    def get_public_key_from_certificate(self, certificate: Certificate):
379
        """
380
        Extracts a public key from the given certificate
381
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
382
        should be extracted.
383
        :return: a string containing the extracted public key in PEM format
384
        """
385
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
386

    
387
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
388
        """
389
        Get URL address of CRL distribution endpoint based on
390
        issuer's ID
391

    
392
        :param ca_identifier: ID of issuing authority
393
        :return: CRL endpoint for the given CA
394
        """
395
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
396

    
397
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
398
        """
399
        Get URL address of OCSP distribution endpoint based on
400
        issuer's ID
401

    
402
        :param ca_identifier: ID of issuing authority
403
        :return: OCSP endpoint for the given CA
404
        """
405
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
406

    
407

    
408
class RevocationReasonInvalidException(Exception):
409
    """
410
    Exception that denotes that the caller was trying to revoke
411
    a certificate using an invalid revocation reason
412
    """
413

    
414
    def __init__(self, reason):
415
        self.reason = reason
416

    
417
    def __str__(self):
418
        return f"Revocation reason '{self.reason}' is not valid."
419

    
420

    
421
class CertificateStatusInvalidException(Exception):
422
    """
423
    Exception that denotes that the caller was trying to set
424
    a certificate to an invalid state
425
    """
426

    
427
    def __init__(self, status):
428
        self.status = status
429

    
430
    def __str__(self):
431
        return f"Certificate status '{self.status}' is not valid."
432

    
433

    
434
class CertificateAlreadyRevokedException(Exception):
435
    """
436
    Exception that denotes that the caller was trying to revoke
437
    a certificate that is already revoked
438
    """
439

    
440
    def __init__(self, id):
441
        self.id = id
442

    
443
    def __str__(self):
444
        return f"Certificate id '{self.id}' is already revoked."
445

    
446

    
447
class CertificateCannotBeSetToValid(Exception):
448
    """
449
    Exception that denotes that the caller was trying to
450
    set certificate to valid if the certificate was already
451
    revoked but not certificateHold.
452
    """
453

    
454
    def __init__(self, old_reason):
455
        self.old_state = old_reason
456

    
457
    def __str__(self):
458
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
459
               "certificateHold. " \
460
               f"The revocation reason of the certificate is {self.old_state}"
(2-2/4)