Projekt

Obecné

Profil

Stáhnout (18.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10
from src.exceptions.database_exception import DatabaseException
11
from src.exceptions.unknown_exception import UnknownException
12
from src.model.certificate import Certificate
13
from src.model.private_key import PrivateKey
14
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16

    
17
import time
18

    
19
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
20
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
21
CRL_EXTENSION = "crlDistributionPoints=URI:"
22
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
23
STATUS_REVOKED = "revoked"
24
STATUS_VALID = "valid"
25

    
26

    
27
class CertificateService:
28

    
29
    @inject
30
    def __init__(self, cryptography_service: CryptographyService,
31
                 certificate_repository: CertificateRepository,
32
                 configuration: Configuration):
33
        self.cryptography_service = cryptography_service
34
        self.certificate_repository = certificate_repository
35
        self.configuration = configuration
36

    
37
    # TODO usages present in method parameters but not in class diagram
38
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
39
                       usages=None, days=30):
40
        """
41
        Creates a root CA certificate based on the given parameters.
42
        :param key: Private key to be used when generating the certificate
43
        :param subject: Subject to be used put into the certificate
44
        :param config: String containing the configuration to be used
45
        :param extensions: Name of the section in the configuration representing extensions
46
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
47
        :param days: Number of days for which the generated cert. will be considered valid
48
        :return: An instance of Certificate class representing the generated root CA cert
49
        """
50
        if usages is None:
51
            usages = {}
52

    
53
        cert_id = self.certificate_repository.get_next_id()
54
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
55
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
56

    
57
        # create a new self signed  certificate
58
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
59
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
60
        # specify CA usage
61
        usages[CA_ID] = True
62

    
63
        # wrap into Certificate class
64
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
65
                                            ROOT_CA_ID)
66

    
67
        # store the wrapper into the repository
68
        created_id = self.certificate_repository.create(certificate)
69

    
70
        # assign the generated ID to the inserted certificate
71
        certificate.certificate_id = created_id
72

    
73
        return certificate
74

    
75
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
76
        """
77
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
78
        notAfter fields.
79
        :param cert_pem: PEM of the cert. to be wrapped
80
        :param private_key_id: ID of the private key used to create the given certificate
81
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
82
        :param parent_id: ID of the CA that issued this certificate
83
        :param cert_type: Type of this certificate (see constants.py)
84
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
85
        """
86
        # parse the generated pem for subject and notBefore/notAfter fields
87
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
88
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
89
        # format the parsed date
90
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
91
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
92

    
93
        # create a certificate wrapper
94
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
95
                                  private_key_id, cert_type, parent_id, usages)
96

    
97
        return certificate
98

    
99
    # TODO config parameter present in class diagram but not here (unused)
100
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
101
                  extensions: str = "", days: int = 30, usages=None):
102
        """
103
        Creates an intermediate CA certificate issued by the given parent CA.
104
        :param subject_key: Private key to be used when generating the certificate
105
        :param subject: Subject to be used put into the certificate
106
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
107
        :param issuer_key: PK used to generate the issuer certificate
108
        :param extensions: Extensions to be used when generating the certificate
109
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
110
        :param days: Number of days for which the generated cert. will be considered valid
111
        :return: An instance of Certificate class representing the generated intermediate CA cert
112
        """
113
        if usages is None:
114
            usages = {}
115

    
116
        extensions = extensions + "\n" + CA_EXTENSIONS
117
        # Add CRL and OCSP distribution point to certificate extensions
118
        cert_id = self.certificate_repository.get_next_id()
119
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
120
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
121

    
122
        # TODO implement AIA URI via extensions
123
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
124
                                                        issuer_key.private_key,
125
                                                        subject_key_pass=subject_key.password,
126
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
127
                                                        days=days,
128
                                                        sn=cert_id)
129

    
130
        # specify CA usage
131
        usages[CA_ID] = True
132

    
133
        # wrap into Certificate class
134
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
135
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
136

    
137
        # parse the generated pem for subject and notBefore/notAfter fields
138
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
139

    
140
        # format the parsed date
141
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
142
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
143

    
144
        # specify CA usage
145
        usages[CA_ID] = True
146

    
147
        # create a certificate wrapper
148
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
149
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
150

    
151
        # store the wrapper into the repository
152
        created_id = self.certificate_repository.create(certificate)
153

    
154
        # assign the generated ID to the inserted certificate
155
        certificate.certificate_id = created_id
156

    
157
        return certificate
158

    
159
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
160
                        issuer_key: PrivateKey,
161
                        extensions: str = "", days: int = 30, usages=None):
162
        """
163
        Creates an end certificate issued by the given parent CA.
164
        :param subject_key: Private key to be used when generating the certificate
165
        :param subject: Subject to be used put into the certificate
166
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
167
        :param issuer_key: PK used to generate the issuer certificate
168
        :param extensions: Extensions to be used when generating the certificate
169
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
170
        :param days: Number of days for which the generated cert. will be considered valid
171
        :return: An instance of Certificate class representing the generated cert
172
        """
173
        if usages is None:
174
            usages = {}
175

    
176
        # get the next certificate ID in order to be able to specify the serial number
177
        cert_id = self.certificate_repository.get_next_id()
178

    
179
        # generate a new certificate
180
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
181
                                                        issuer_key.private_key,
182
                                                        subject_key_pass=subject_key.password,
183
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
184
                                                        days=days,
185
                                                        sn=cert_id
186
                                                        )
187

    
188
        # wrap the generated certificate using Certificate class
189
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
190
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
191

    
192
        created_id = self.certificate_repository.create(certificate)
193

    
194
        certificate.certificate_id = created_id
195

    
196
        return certificate
197

    
198
    def get_certificate(self, unique_id: int) -> Certificate:
199
        """
200
        Tries to fetch a certificate from the certificate repository using a given id.
201
        :param unique_id: ID of the certificate to be fetched
202
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
203
        certificate is not found
204
        """
205
        return self.certificate_repository.read(unique_id)
206

    
207
    def get_certificates(self, cert_type=None) -> List[Certificate]:
208
        """
209
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
210
        certificates of the given type can be returned.
211
        :param cert_type: Type of certificates to be returned
212
        :return: List of instances of the Certificate class representing all certificates present in the certificate
213
        repository. An empty list is returned when no certificates are found.
214
        """
215
        return self.certificate_repository.read_all(cert_type)
216

    
217
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
218
        """
219
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
220
        root CA certificate is found. Root certificates are excluded from the chain by default.
221
        :param from_id: ID of the first certificate to be included in the chain of trust
222
        :param to_id: ID of the last certificate to be included in the chain of trust
223
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
224
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
225
        ID
226
        """
227
        # read the first certificate of the chain
228
        start_cert = self.certificate_repository.read(from_id)
229

    
230
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
231
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
232
            return []
233

    
234
        current_cert = start_cert
235
        chain_of_trust = [current_cert]
236

    
237
        # TODO could possibly be simplified
238
        if start_cert.type_id == ROOT_CA_ID:
239
            # the first cert found is a root ca
240
            return chain_of_trust
241

    
242
        while True:
243
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
244

    
245
            # check whether parent certificate exists
246
            if parent_cert is None:
247
                break
248

    
249
            # check whether the found certificate is a root certificate
250
            if parent_cert.type_id == ROOT_CA_ID:
251
                if not exclude_root:
252
                    # append the found root cert only if root certificates should not be excluded from the CoT
253
                    chain_of_trust.append(parent_cert)
254
                break
255

    
256
            # append the certificate
257
            chain_of_trust.append(parent_cert)
258

    
259
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
260
            if parent_cert.certificate_id == to_id:
261
                break
262

    
263
            current_cert = parent_cert
264

    
265
        return chain_of_trust
266

    
267
    def delete_certificate(self, unique_id):
268
        """
269
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
270

    
271
        :param unique_id: ID of specific certificate
272
        """
273

    
274
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
275
        if to_delete is None:
276
            raise CertificateNotFoundException(unique_id)
277

    
278
        for cert in to_delete:
279
            try:
280
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
281
            except CertificateAlreadyRevokedException:
282
                # TODO log as an info/debug, not warning or above <-- perfectly legal
283
                continue
284

    
285
            self.certificate_repository.delete(cert.certificate_id)
286
            # TODO log if not successfully deleted
287

    
288
    def get_certificates_issued_by(self, unique_id):
289
        """
290
        Returns a list of all children of a certificate identified by an unique_id.
291
        Raises a DatabaseException should any unexpected behavior occur.
292
        :param unique_id: target certificate ID
293
        :return: children of unique_id
294
        """
295
        try:
296
            if self.certificate_repository.read(unique_id) is None:
297
                raise CertificateNotFoundException(unique_id)
298
        except DatabaseException:
299
            raise CertificateNotFoundException(unique_id)
300

    
301
        return self.certificate_repository.get_all_issued_by(unique_id)
302

    
303
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
304
        """
305
        Set certificate status to 'valid' or 'revoked'.
306
        If the new status is revoked a reason can be provided -> default is unspecified
307
        :param reason: reason for revocation
308
        :param id: identifier of the certificate whose status is to be changed
309
        :param status: new status of the certificate
310
        """
311
        if status not in CERTIFICATE_STATES:
312
            raise CertificateStatusInvalidException(status)
313
        if reason not in CERTIFICATE_REVOCATION_REASONS:
314
            raise RevocationReasonInvalidException(reason)
315

    
316
        # check whether the certificate exists
317
        certificate = self.certificate_repository.read(id)
318
        if certificate is None:
319
            raise CertificateNotFoundException(id)
320

    
321
        updated = False
322
        if status == STATUS_VALID:
323
            updated = self.certificate_repository.clear_certificate_revocation(id)
324
        elif status == STATUS_REVOKED:
325
            # check if the certificate is not revoked already
326
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
327
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
328
                raise CertificateAlreadyRevokedException(id)
329

    
330
            revocation_timestamp = int(time.time())
331
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
332

    
333
        if not updated:
334
            # TODO log this
335
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
336
                                   "or set_certificate_revoked().")
337

    
338
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
339
        """
340
        Get Subject distinguished name from a Certificate
341
        :param certificate: certificate instance whose Subject shall be parsed
342
        :return: instance of Subject class containing resulting distinguished name
343
        """
344
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
345
        return subject
346

    
347
    def get_public_key_from_certificate(self, certificate: Certificate):
348
        """
349
        Extracts a public key from the given certificate
350
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
351
        should be extracted.
352
        :return: a string containing the extracted public key in PEM format
353
        """
354
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
355

    
356
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
357
        """
358
        Get URL address of CRL distribution endpoint based on
359
        issuer's ID
360

    
361
        :param ca_identifier: ID of issuing authority
362
        :return: CRL endpoint for the given CA
363
        """
364
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
365

    
366
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
367
        """
368
        Get URL address of OCSP distribution endpoint based on
369
        issuer's ID
370

    
371
        :param ca_identifier: ID of issuing authority
372
        :return: OCSP endpoint for the given CA
373
        """
374
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
375

    
376

    
377
class RevocationReasonInvalidException(Exception):
378
    """
379
    Exception that denotes that the caller was trying to revoke
380
    a certificate using an invalid revocation reason
381
    """
382

    
383
    def __init__(self, reason):
384
        self.reason = reason
385

    
386
    def __str__(self):
387
        return f"Revocation reason '{self.reason}' is not valid."
388

    
389

    
390
class CertificateStatusInvalidException(Exception):
391
    """
392
    Exception that denotes that the caller was trying to set
393
    a certificate to an invalid state
394
    """
395

    
396
    def __init__(self, status):
397
        self.status = status
398

    
399
    def __str__(self):
400
        return f"Certificate status '{self.status}' is not valid."
401

    
402

    
403
class CertificateAlreadyRevokedException(Exception):
404
    """
405
    Exception that denotes that the caller was trying to revoke
406
    a certificate that is already revoked
407
    """
408

    
409
    def __init__(self, id):
410
        self.id = id
411

    
412
    def __str__(self):
413
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)