Projekt

Obecné

Profil

Stáhnout (22.1 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10
from src.exceptions.database_exception import DatabaseException
11
from src.exceptions.unknown_exception import UnknownException
12
from src.model.certificate import Certificate
13
from src.model.private_key import PrivateKey
14
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16

    
17
import time
18

    
19
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
20
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
21
    CLIENT_AUTH
22

    
23
from src.utils.logger import Logger
24

    
25
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
26
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
27
CRL_EXTENSION = "crlDistributionPoints=URI:"
28
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
29
STATUS_REVOKED = "revoked"
30
STATUS_VALID = "valid"
31

    
32
# define which flags are required for various usages
33
REQUIRED_USAGE_EXTENSION_FLAGS = {
34
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
35
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
36
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
37
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
38

    
39

    
40
class CertificateService:
41

    
42
    @inject
43
    def __init__(self, cryptography_service: CryptographyService,
44
                 certificate_repository: CertificateRepository,
45
                 configuration: Configuration):
46
        self.cryptography_service = cryptography_service
47
        self.certificate_repository = certificate_repository
48
        self.configuration = configuration
49

    
50
    # TODO usages present in method parameters but not in class diagram
51
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
52
                       usages=None, days=30):
53
        """
54
        Creates a root CA certificate based on the given parameters.
55
        :param key: Private key to be used when generating the certificate
56
        :param subject: Subject to be used put into the certificate
57
        :param config: String containing the configuration to be used
58
        :param extensions: Name of the section in the configuration representing extensions
59
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
60
        :param days: Number of days for which the generated cert. will be considered valid
61
        :return: An instance of Certificate class representing the generated root CA cert
62
        """
63

    
64
        Logger.debug("Function launched.")
65

    
66
        if usages is None:
67
            usages = {}
68

    
69
        cert_id = self.certificate_repository.get_next_id()
70

    
71
        # specify CA usage
72
        usages[CA_ID] = True
73

    
74
        # generate extension configuration lines based on the specified usages
75
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
76

    
77
        # create a new self signed  certificate
78
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
79
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
80

    
81
        # wrap into Certificate class
82
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
83
                                            ROOT_CA_ID)
84

    
85
        # store the wrapper into the repository
86
        created_id = self.certificate_repository.create(certificate)
87

    
88
        # assign the generated ID to the inserted certificate
89
        certificate.certificate_id = created_id
90

    
91
        return certificate
92

    
93
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
94
        """
95
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
96
        notAfter fields.
97
        :param cert_pem: PEM of the cert. to be wrapped
98
        :param private_key_id: ID of the private key used to create the given certificate
99
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
100
        :param parent_id: ID of the CA that issued this certificate
101
        :param cert_type: Type of this certificate (see constants.py)
102
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
103
        """
104

    
105
        Logger.debug("Function launched.")
106

    
107
        # parse the generated pem for subject and notBefore/notAfter fields
108
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
109
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
110
        # format the parsed date
111
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
112
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
113

    
114
        # create a certificate wrapper
115
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
116
                                  private_key_id, cert_type, parent_id, usages)
117

    
118
        return certificate
119

    
120
    # TODO config parameter present in class diagram but not here (unused)
121
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
122
                  extensions: str = "", days: int = 30, usages=None):
123
        """
124
        Creates an intermediate CA certificate issued by the given parent CA.
125
        :param subject_key: Private key to be used when generating the certificate
126
        :param subject: Subject to be used put into the certificate
127
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
128
        :param issuer_key: PK used to generate the issuer certificate
129
        :param extensions: Extensions to be used when generating the certificate
130
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
131
        :param days: Number of days for which the generated cert. will be considered valid
132
        :return: An instance of Certificate class representing the generated intermediate CA cert
133
        """
134

    
135
        Logger.debug("Function launched.")
136

    
137
        if usages is None:
138
            usages = {}
139

    
140
        # specify CA usage
141
        usages[CA_ID] = True
142

    
143
        # generate extension configuration lines based on the specified usages
144
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
145

    
146
        # Add CRL and OCSP distribution point to certificate extensions
147
        cert_id = self.certificate_repository.get_next_id()
148
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
149
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
150

    
151
        # TODO implement AIA URI via extensions
152
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
153
                                                        issuer_key.private_key,
154
                                                        subject_key_pass=subject_key.password,
155
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
156
                                                        days=days,
157
                                                        sn=cert_id)
158

    
159
        # wrap into Certificate class
160
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
161
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
162

    
163
        # parse the generated pem for subject and notBefore/notAfter fields
164
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
165

    
166
        # format the parsed date
167
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
168
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
169

    
170
        # create a certificate wrapper
171
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
172
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
173

    
174
        # store the wrapper into the repository
175
        created_id = self.certificate_repository.create(certificate)
176

    
177
        # assign the generated ID to the inserted certificate
178
        certificate.certificate_id = created_id
179

    
180
        return certificate
181

    
182
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
183
                        issuer_key: PrivateKey,
184
                        extensions: str = "", days: int = 30, usages=None):
185
        """
186
        Creates an end certificate issued by the given parent CA.
187
        :param subject_key: Private key to be used when generating the certificate
188
        :param subject: Subject to be used put into the certificate
189
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
190
        :param issuer_key: PK used to generate the issuer certificate
191
        :param extensions: Extensions to be used when generating the certificate
192
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
193
        :param days: Number of days for which the generated cert. will be considered valid
194
        :return: An instance of Certificate class representing the generated cert
195
        """
196

    
197
        Logger.debug("Function launched.")
198

    
199
        if usages is None:
200
            usages = {}
201

    
202
        # get the next certificate ID in order to be able to specify the serial number
203
        cert_id = self.certificate_repository.get_next_id()
204

    
205
        # generate extension configuration lines based on the specified usages
206
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
207

    
208
        # Add CRL and OCSP distribution point to certificate extensions
209
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
210
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
211

    
212
        # generate a new certificate
213
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
214
                                                        issuer_key.private_key,
215
                                                        subject_key_pass=subject_key.password,
216
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
217
                                                        days=days,
218
                                                        sn=cert_id
219
                                                        )
220

    
221
        # wrap the generated certificate using Certificate class
222
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
223
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
224

    
225
        created_id = self.certificate_repository.create(certificate)
226

    
227
        certificate.certificate_id = created_id
228

    
229
        return certificate
230

    
231
    def get_certificate(self, unique_id: int) -> Certificate:
232
        """
233
        Tries to fetch a certificate from the certificate repository using a given id.
234
        :param unique_id: ID of the certificate to be fetched
235
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
236
        certificate is not found
237
        """
238

    
239
        Logger.debug("Function launched.")
240

    
241
        return self.certificate_repository.read(unique_id)
242

    
243
    def get_certificates(self, cert_type=None) -> List[Certificate]:
244
        """
245
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
246
        certificates of the given type can be returned.
247
        :param cert_type: Type of certificates to be returned
248
        :return: List of instances of the Certificate class representing all certificates present in the certificate
249
        repository. An empty list is returned when no certificates are found.
250
        """
251

    
252
        Logger.debug("Function launched.")
253

    
254
        return self.certificate_repository.read_all(cert_type)
255

    
256
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
257
        """
258
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
259
        root CA certificate is found. Root certificates are excluded from the chain by default.
260
        :param from_id: ID of the first certificate to be included in the chain of trust
261
        :param to_id: ID of the last certificate to be included in the chain of trust
262
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
263
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
264
        ID
265
        """
266

    
267
        Logger.debug("Function launched.")
268

    
269
        # read the first certificate of the chain
270
        start_cert = self.certificate_repository.read(from_id)
271

    
272
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
273
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
274
            return []
275

    
276
        current_cert = start_cert
277
        chain_of_trust = [current_cert]
278

    
279
        # TODO could possibly be simplified
280
        if start_cert.type_id == ROOT_CA_ID:
281
            # the first cert found is a root ca
282
            return chain_of_trust
283

    
284
        while True:
285
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
286

    
287
            # check whether parent certificate exists
288
            if parent_cert is None:
289
                break
290

    
291
            # check whether the found certificate is a root certificate
292
            if parent_cert.type_id == ROOT_CA_ID:
293
                if not exclude_root:
294
                    # append the found root cert only if root certificates should not be excluded from the CoT
295
                    chain_of_trust.append(parent_cert)
296
                break
297

    
298
            # append the certificate
299
            chain_of_trust.append(parent_cert)
300

    
301
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
302
            if parent_cert.certificate_id == to_id:
303
                break
304

    
305
            current_cert = parent_cert
306

    
307
        return chain_of_trust
308

    
309
    def delete_certificate(self, unique_id):
310
        """
311
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
312

    
313
        :param unique_id: ID of specific certificate
314
        """
315

    
316
        Logger.debug("Function launched.")
317

    
318
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
319
        if to_delete is None:
320
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
321
            raise CertificateNotFoundException(unique_id)
322

    
323
        for cert in to_delete:
324
            try:
325
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
326
            except CertificateAlreadyRevokedException:
327
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
328
                # TODO log as an info/debug, not warning or above <-- perfectly legal
329
                pass
330

    
331
            if not self.certificate_repository.delete(cert.certificate_id):
332
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
333

    
334

    
335
    def get_certificates_issued_by(self, unique_id):
336
        """
337
        Returns a list of all children of a certificate identified by an unique_id.
338
        Raises a DatabaseException should any unexpected behavior occur.
339
        :param unique_id: target certificate ID
340
        :return: children of unique_id
341
        """
342

    
343
        Logger.debug("Function launched.")
344

    
345
        try:
346
            if self.certificate_repository.read(unique_id) is None:
347
                Logger.error(f"No such certificate found 'ID = {unique_id}'.")
348
                raise CertificateNotFoundException(unique_id)
349
        except DatabaseException:
350
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
351
            raise CertificateNotFoundException(unique_id)
352

    
353
        return self.certificate_repository.get_all_issued_by(unique_id)
354

    
355
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
356
        """
357
        Set certificate status to 'valid' or 'revoked'.
358
        If the new status is revoked a reason can be provided -> default is unspecified
359
        :param reason: reason for revocation
360
        :param id: identifier of the certificate whose status is to be changed
361
        :param status: new status of the certificate
362
        """
363

    
364
        Logger.debug("Function launched.")
365

    
366
        if status not in CERTIFICATE_STATES:
367
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
368
            raise CertificateStatusInvalidException(status)
369
        if reason not in CERTIFICATE_REVOCATION_REASONS:
370
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
371
            raise RevocationReasonInvalidException(reason)
372

    
373
        # check whether the certificate exists
374
        certificate = self.certificate_repository.read(id)
375
        if certificate is None:
376
            Logger.error(f"No such certificate found 'ID = {id}'.")
377
            raise CertificateNotFoundException(id)
378

    
379
        updated = False
380
        if status == STATUS_VALID:
381
            updated = self.certificate_repository.clear_certificate_revocation(id)
382
        elif status == STATUS_REVOKED:
383
            # check if the certificate is not revoked already
384
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
385
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
386
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
387
                raise CertificateAlreadyRevokedException(id)
388

    
389
            revocation_timestamp = int(time.time())
390
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
391

    
392
        if not updated:
393
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
394
                         f"or set_certificate_revoked().")
395
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
396
                                   "or set_certificate_revoked().")
397

    
398
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
399
        """
400
        Get Subject distinguished name from a Certificate
401
        :param certificate: certificate instance whose Subject shall be parsed
402
        :return: instance of Subject class containing resulting distinguished name
403
        """
404

    
405
        Logger.debug("Function launched.")
406

    
407
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
408
        return subject
409

    
410
    def get_public_key_from_certificate(self, certificate: Certificate):
411
        """
412
        Extracts a public key from the given certificate
413
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
414
        should be extracted.
415
        :return: a string containing the extracted public key in PEM format
416
        """
417

    
418
        Logger.debug("Function launched.")
419

    
420
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
421

    
422
    def get_certificate_state(self, id: int) -> str:
423
        """
424
        Check whether the certificate is expired, valid or revoked.
425
            - in case it's revoked and expired, revoked is returned
426
        :param id: identifier of the certificate
427
        :return: certificates state from {valid, revoked, expired}
428
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
429
        """
430
        pass
431

    
432
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
433
        """
434
        Get URL address of CRL distribution endpoint based on
435
        issuer's ID
436

    
437
        :param ca_identifier: ID of issuing authority
438
        :return: CRL endpoint for the given CA
439
        """
440

    
441
        Logger.debug("Function launched.")
442

    
443
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
444

    
445
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
446
        """
447
        Get URL address of OCSP distribution endpoint based on
448
        issuer's ID
449

    
450
        :param ca_identifier: ID of issuing authority
451
        :return: OCSP endpoint for the given CA
452
        """
453

    
454
        Logger.debug("Function launched.")
455

    
456
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
457

    
458

    
459
class RevocationReasonInvalidException(Exception):
460
    """
461
    Exception that denotes that the caller was trying to revoke
462
    a certificate using an invalid revocation reason
463
    """
464

    
465
    def __init__(self, reason):
466
        self.reason = reason
467

    
468
    def __str__(self):
469
        return f"Revocation reason '{self.reason}' is not valid."
470

    
471

    
472
class CertificateStatusInvalidException(Exception):
473
    """
474
    Exception that denotes that the caller was trying to set
475
    a certificate to an invalid state
476
    """
477

    
478
    def __init__(self, status):
479
        self.status = status
480

    
481
    def __str__(self):
482
        return f"Certificate status '{self.status}' is not valid."
483

    
484

    
485
class CertificateAlreadyRevokedException(Exception):
486
    """
487
    Exception that denotes that the caller was trying to revoke
488
    a certificate that is already revoked
489
    """
490

    
491
    def __init__(self, id):
492
        self.id = id
493

    
494
    def __str__(self):
495
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)