Projekt

Obecné

Profil

Stáhnout (18.8 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.database_exception import DatabaseException
10
from src.exceptions.unknown_exception import UnknownException
11
from src.model.certificate import Certificate
12
from src.model.private_key import PrivateKey
13
from src.model.subject import Subject
14
from src.services.cryptography import CryptographyService
15

    
16
import time
17

    
18
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
19
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
20
CRL_EXTENSION = "crlDistributionPoints=URI:"
21
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
22
STATUS_REVOKED = "revoked"
23
STATUS_VALID = "valid"
24

    
25

    
26
class CertificateService:
27

    
28
    @inject
29
    def __init__(self, cryptography_service: CryptographyService,
30
                 certificate_repository: CertificateRepository,
31
                 configuration: Configuration):
32
        self.cryptography_service = cryptography_service
33
        self.certificate_repository = certificate_repository
34
        self.configuration = configuration
35

    
36
    # TODO usages present in method parameters but not in class diagram
37
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
38
                       usages=None, days=30):
39
        """
40
        Creates a root CA certificate based on the given parameters.
41
        :param key: Private key to be used when generating the certificate
42
        :param subject: Subject to be used put into the certificate
43
        :param config: String containing the configuration to be used
44
        :param extensions: Name of the section in the configuration representing extensions
45
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
46
        :param days: Number of days for which the generated cert. will be considered valid
47
        :return: An instance of Certificate class representing the generated root CA cert
48
        """
49
        if usages is None:
50
            usages = {}
51

    
52
        cert_id = self.certificate_repository.get_next_id()
53
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
54
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
55

    
56
        # create a new self signed  certificate
57
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
58
                                                          extensions=extensions, config=config, days=days)
59
        # specify CA usage
60
        usages[CA_ID] = True
61

    
62
        # wrap into Certificate class
63
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
64
                                            ROOT_CA_ID)
65

    
66
        # store the wrapper into the repository
67
        created_id = self.certificate_repository.create(certificate)
68

    
69
        # assign the generated ID to the inserted certificate
70
        certificate.certificate_id = created_id
71

    
72
        return certificate
73

    
74
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
75
        """
76
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
77
        notAfter fields.
78
        :param cert_pem: PEM of the cert. to be wrapped
79
        :param private_key_id: ID of the private key used to create the given certificate
80
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
81
        :param parent_id: ID of the CA that issued this certificate
82
        :param cert_type: Type of this certificate (see constants.py)
83
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
84
        """
85
        # parse the generated pem for subject and notBefore/notAfter fields
86
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
87
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
88
        # format the parsed date
89
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
90
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
91

    
92
        # create a certificate wrapper
93
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
94
                                  private_key_id, cert_type, parent_id, usages)
95

    
96
        return certificate
97

    
98
    # TODO config parameter present in class diagram but not here (unused)
99
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
100
                  extensions: str = "", days: int = 30, usages=None):
101
        """
102
        Creates an intermediate CA certificate issued by the given parent CA.
103
        :param subject_key: Private key to be used when generating the certificate
104
        :param subject: Subject to be used put into the certificate
105
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
106
        :param issuer_key: PK used to generate the issuer certificate
107
        :param extensions: Extensions to be used when generating the certificate
108
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
109
        :param days: Number of days for which the generated cert. will be considered valid
110
        :return: An instance of Certificate class representing the generated intermediate CA cert
111
        """
112
        if usages is None:
113
            usages = {}
114

    
115
        extensions = extensions + "\n" + CA_EXTENSIONS
116
        # Add CRL and OCSP distribution point to certificate extensions
117
        cert_id = self.certificate_repository.get_next_id()
118
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
119
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
120

    
121
        # TODO implement AIA URI via extensions
122
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
123
                                                        issuer_key.private_key,
124
                                                        subject_key_pass=subject_key.password,
125
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
126
                                                        days=days)
127

    
128
        # specify CA usage
129
        usages[CA_ID] = True
130

    
131
        # wrap into Certificate class
132
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
133
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
134

    
135
        # parse the generated pem for subject and notBefore/notAfter fields
136
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
137

    
138
        # format the parsed date
139
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
140
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
141

    
142
        # specify CA usage
143
        usages[CA_ID] = True
144

    
145
        # create a certificate wrapper
146
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
147
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
148

    
149
        # store the wrapper into the repository
150
        created_id = self.certificate_repository.create(certificate)
151

    
152
        # assign the generated ID to the inserted certificate
153
        certificate.certificate_id = created_id
154

    
155
        return certificate
156

    
157
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
158
                        issuer_key: PrivateKey,
159
                        extensions: str = "", days: int = 30, usages=None):
160
        """
161
        Creates an end certificate issued by the given parent CA.
162
        :param subject_key: Private key to be used when generating the certificate
163
        :param subject: Subject to be used put into the certificate
164
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
165
        :param issuer_key: PK used to generate the issuer certificate
166
        :param extensions: Extensions to be used when generating the certificate
167
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
168
        :param days: Number of days for which the generated cert. will be considered valid
169
        :return: An instance of Certificate class representing the generated cert
170
        """
171
        if usages is None:
172
            usages = {}
173

    
174
        # generate a new certificate
175
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
176
                                                        issuer_key.private_key,
177
                                                        subject_key_pass=subject_key.password,
178
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
179
                                                        days=days)
180

    
181
        # wrap the generated certificate using Certificate class
182
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
183
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
184

    
185
        created_id = self.certificate_repository.create(certificate)
186

    
187
        certificate.certificate_id = created_id
188

    
189
        return certificate
190

    
191
    def get_certificate(self, unique_id: int) -> Certificate:
192
        """
193
        Tries to fetch a certificate from the certificate repository using a given id.
194
        :param unique_id: ID of the certificate to be fetched
195
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
196
        certificate is not found
197
        """
198
        return self.certificate_repository.read(unique_id)
199

    
200
    def get_certificates(self, cert_type=None) -> List[Certificate]:
201
        """
202
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
203
        certificates of the given type can be returned.
204
        :param cert_type: Type of certificates to be returned
205
        :return: List of instances of the Certificate class representing all certificates present in the certificate
206
        repository. An empty list is returned when no certificates are found.
207
        """
208
        return self.certificate_repository.read_all(cert_type)
209

    
210
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
211
        """
212
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
213
        root CA certificate is found. Root certificates are excluded from the chain by default.
214
        :param from_id: ID of the first certificate to be included in the chain of trust
215
        :param to_id: ID of the last certificate to be included in the chain of trust
216
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
217
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
218
        ID
219
        """
220
        # read the first certificate of the chain
221
        start_cert = self.certificate_repository.read(from_id)
222

    
223
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
224
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
225
            return []
226

    
227
        current_cert = start_cert
228
        chain_of_trust = [current_cert]
229

    
230
        # TODO could possibly be simplified
231
        if start_cert.type_id == ROOT_CA_ID:
232
            # the first cert found is a root ca
233
            return chain_of_trust
234

    
235
        while True:
236
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
237

    
238
            # check whether parent certificate exists
239
            if parent_cert is None:
240
                break
241

    
242
            # check whether the found certificate is a root certificate
243
            if parent_cert.type_id == ROOT_CA_ID:
244
                if not exclude_root:
245
                    # append the found root cert only if root certificates should not be excluded from the CoT
246
                    chain_of_trust.append(parent_cert)
247
                break
248

    
249
            # append the certificate
250
            chain_of_trust.append(parent_cert)
251

    
252
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
253
            if parent_cert.certificate_id == to_id:
254
                break
255

    
256
            current_cert = parent_cert
257

    
258
        return chain_of_trust
259

    
260
    def delete_certificate(self, unique_id):
261
        """
262
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
263

    
264
        :param unique_id: ID of specific certificate
265
        """
266

    
267
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
268
        if to_delete is None:
269
            raise CertificateNotFoundException(unique_id)
270

    
271
        for cert in to_delete:
272
            try:
273
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
274
            except CertificateAlreadyRevokedException:
275
                # TODO log as an info/debug, not warning or above <-- perfectly legal
276
                continue
277

    
278
            self.certificate_repository.delete(cert.certificate_id)
279
            # TODO log if not successfully deleted
280

    
281
    def get_certificates_issued_by(self, unique_id):
282
        """
283
        Returns a list of all children of a certificate identified by an unique_id.
284
        Raises a DatabaseException should any unexpected behavior occur.
285
        :param unique_id: target certificate ID
286
        :return: children of unique_id
287
        """
288
        try:
289
            if self.certificate_repository.read(unique_id) is None:
290
                raise CertificateNotFoundException(unique_id)
291
        except DatabaseException:
292
            raise CertificateNotFoundException(unique_id)
293

    
294
        return self.certificate_repository.get_all_issued_by(unique_id)
295

    
296
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
297
        """
298
        Set certificate status to 'valid' or 'revoked'.
299
        If the new status is revoked a reason can be provided -> default is unspecified
300
        :param reason: reason for revocation
301
        :param id: identifier of the certificate whose status is to be changed
302
        :param status: new status of the certificate
303
        """
304
        if status not in CERTIFICATE_STATES:
305
            raise CertificateStatusInvalidException(status)
306
        if reason not in CERTIFICATE_REVOCATION_REASONS:
307
            raise RevocationReasonInvalidException(reason)
308

    
309
        # check whether the certificate exists
310
        certificate = self.certificate_repository.read(id)
311
        if certificate is None:
312
            raise CertificateNotFoundException(id)
313

    
314
        updated = False
315
        if status == STATUS_VALID:
316
            updated = self.certificate_repository.clear_certificate_revocation(id)
317
        elif status == STATUS_REVOKED:
318
            # check if the certificate is not revoked already
319
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
320
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
321
                raise CertificateAlreadyRevokedException(id)
322

    
323
            revocation_timestamp = int(time.time())
324
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
325

    
326
        if not updated:
327
            # TODO log this
328
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
329
                                   "or set_certificate_revoked().")
330

    
331
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
332
        """
333
        Get Subject distinguished name from a Certificate
334
        :param certificate: certificate instance whose Subject shall be parsed
335
        :return: instance of Subject class containing resulting distinguished name
336
        """
337
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
338
        return subject
339

    
340
    def get_public_key_from_certificate(self, certificate: Certificate):
341
        """
342
        Extracts a public key from the given certificate
343
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
344
        should be extracted.
345
        :return: a string containing the extracted public key in PEM format
346
        """
347
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
348

    
349
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
350
        """
351
        Get URL address of CRL distribution endpoint based on
352
        issuer's ID
353

    
354
        :param ca_identifier: ID of issuing authority
355
        :return: CRL endpoint for the given CA
356
        """
357
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
358

    
359
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
360
        """
361
        Get URL address of OCSP distribution endpoint based on
362
        issuer's ID
363

    
364
        :param ca_identifier: ID of issuing authority
365
        :return: OCSP endpoint for the given CA
366
        """
367
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
368

    
369

    
370
class RevocationReasonInvalidException(Exception):
371
    """
372
    Exception that denotes that the caller was trying to revoke
373
    a certificate using an invalid revocation reason
374
    """
375

    
376
    def __init__(self, reason):
377
        self.reason = reason
378

    
379
    def __str__(self):
380
        return f"Revocation reason '{self.reason}' is not valid."
381

    
382

    
383
class CertificateStatusInvalidException(Exception):
384
    """
385
    Exception that denotes that the caller was trying to set
386
    a certificate to an invalid state
387
    """
388

    
389
    def __init__(self, status):
390
        self.status = status
391

    
392
    def __str__(self):
393
        return f"Certificate status '{self.status}' is not valid."
394

    
395

    
396
class CertificateNotFoundException(Exception):
397
    """
398
    Exception that denotes that the caller was trying to set
399
    work with non-existing certificate
400
    """
401

    
402
    def __init__(self, id):
403
        self.id = id
404

    
405
    def __str__(self):
406
        return f"Certificate id '{self.id}' does not exist."
407

    
408

    
409
class CertificateAlreadyRevokedException(Exception):
410
    """
411
    Exception that denotes that the caller was trying to revoke
412
    a certificate that is already revoked
413
    """
414

    
415
    def __init__(self, id):
416
        self.id = id
417

    
418
    def __str__(self):
419
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)