Projekt

Obecné

Profil

Stáhnout (19.1 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.database_exception import DatabaseException
10
from src.exceptions.unknown_exception import UnknownException
11
from src.model.certificate import Certificate
12
from src.model.private_key import PrivateKey
13
from src.model.subject import Subject
14
from src.services.cryptography import CryptographyService
15

    
16
import time
17

    
18
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
19
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
20
CRL_EXTENSION = "crlDistributionPoints=URI:"
21
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
22
STATUS_REVOKED = "revoked"
23
STATUS_VALID = "valid"
24

    
25

    
26
class CertificateService:
27

    
28
    @inject
29
    def __init__(self, cryptography_service: CryptographyService,
30
                 certificate_repository: CertificateRepository,
31
                 configuration: Configuration):
32
        self.cryptography_service = cryptography_service
33
        self.certificate_repository = certificate_repository
34
        self.configuration = configuration
35

    
36
    # TODO usages present in method parameters but not in class diagram
37
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
38
                       usages=None, days=30):
39
        """
40
        Creates a root CA certificate based on the given parameters.
41
        :param key: Private key to be used when generating the certificate
42
        :param subject: Subject to be used put into the certificate
43
        :param config: String containing the configuration to be used
44
        :param extensions: Name of the section in the configuration representing extensions
45
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
46
        :param days: Number of days for which the generated cert. will be considered valid
47
        :return: An instance of Certificate class representing the generated root CA cert
48
        """
49
        if usages is None:
50
            usages = {}
51

    
52
        cert_id = self.certificate_repository.get_next_id()
53
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
54
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
55

    
56
        # create a new self signed  certificate
57
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
58
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
59
        # specify CA usage
60
        usages[CA_ID] = True
61

    
62
        # wrap into Certificate class
63
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
64
                                            ROOT_CA_ID)
65

    
66
        # store the wrapper into the repository
67
        created_id = self.certificate_repository.create(certificate)
68

    
69
        # assign the generated ID to the inserted certificate
70
        certificate.certificate_id = created_id
71

    
72
        return certificate
73

    
74
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
75
        """
76
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
77
        notAfter fields.
78
        :param cert_pem: PEM of the cert. to be wrapped
79
        :param private_key_id: ID of the private key used to create the given certificate
80
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
81
        :param parent_id: ID of the CA that issued this certificate
82
        :param cert_type: Type of this certificate (see constants.py)
83
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
84
        """
85
        # parse the generated pem for subject and notBefore/notAfter fields
86
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
87
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
88
        # format the parsed date
89
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
90
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
91

    
92
        # create a certificate wrapper
93
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
94
                                  private_key_id, cert_type, parent_id, usages)
95

    
96
        return certificate
97

    
98
    # TODO config parameter present in class diagram but not here (unused)
99
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
100
                  extensions: str = "", days: int = 30, usages=None):
101
        """
102
        Creates an intermediate CA certificate issued by the given parent CA.
103
        :param subject_key: Private key to be used when generating the certificate
104
        :param subject: Subject to be used put into the certificate
105
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
106
        :param issuer_key: PK used to generate the issuer certificate
107
        :param extensions: Extensions to be used when generating the certificate
108
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
109
        :param days: Number of days for which the generated cert. will be considered valid
110
        :return: An instance of Certificate class representing the generated intermediate CA cert
111
        """
112
        if usages is None:
113
            usages = {}
114

    
115
        extensions = extensions + "\n" + CA_EXTENSIONS
116
        # Add CRL and OCSP distribution point to certificate extensions
117
        cert_id = self.certificate_repository.get_next_id()
118
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
119
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
120

    
121
        # TODO implement AIA URI via extensions
122
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
123
                                                        issuer_key.private_key,
124
                                                        subject_key_pass=subject_key.password,
125
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
126
                                                        days=days,
127
                                                        sn=cert_id)
128

    
129
        # specify CA usage
130
        usages[CA_ID] = True
131

    
132
        # wrap into Certificate class
133
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
134
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
135

    
136
        # parse the generated pem for subject and notBefore/notAfter fields
137
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
138

    
139
        # format the parsed date
140
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
141
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
142

    
143
        # specify CA usage
144
        usages[CA_ID] = True
145

    
146
        # create a certificate wrapper
147
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
148
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
149

    
150
        # store the wrapper into the repository
151
        created_id = self.certificate_repository.create(certificate)
152

    
153
        # assign the generated ID to the inserted certificate
154
        certificate.certificate_id = created_id
155

    
156
        return certificate
157

    
158
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
159
                        issuer_key: PrivateKey,
160
                        extensions: str = "", days: int = 30, usages=None):
161
        """
162
        Creates an end certificate issued by the given parent CA.
163
        :param subject_key: Private key to be used when generating the certificate
164
        :param subject: Subject to be used put into the certificate
165
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
166
        :param issuer_key: PK used to generate the issuer certificate
167
        :param extensions: Extensions to be used when generating the certificate
168
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
169
        :param days: Number of days for which the generated cert. will be considered valid
170
        :return: An instance of Certificate class representing the generated cert
171
        """
172
        if usages is None:
173
            usages = {}
174

    
175
        # get the next certificate ID in order to be able to specify the serial number
176
        cert_id = self.certificate_repository.get_next_id()
177

    
178
        # generate a new certificate
179
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
180
                                                        issuer_key.private_key,
181
                                                        subject_key_pass=subject_key.password,
182
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
183
                                                        days=days,
184
                                                        sn=cert_id
185
                                                        )
186

    
187
        # wrap the generated certificate using Certificate class
188
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
189
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
190

    
191
        created_id = self.certificate_repository.create(certificate)
192

    
193
        certificate.certificate_id = created_id
194

    
195
        return certificate
196

    
197
    def get_certificate(self, unique_id: int) -> Certificate:
198
        """
199
        Tries to fetch a certificate from the certificate repository using a given id.
200
        :param unique_id: ID of the certificate to be fetched
201
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
202
        certificate is not found
203
        """
204
        return self.certificate_repository.read(unique_id)
205

    
206
    def get_certificates(self, cert_type=None) -> List[Certificate]:
207
        """
208
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
209
        certificates of the given type can be returned.
210
        :param cert_type: Type of certificates to be returned
211
        :return: List of instances of the Certificate class representing all certificates present in the certificate
212
        repository. An empty list is returned when no certificates are found.
213
        """
214
        return self.certificate_repository.read_all(cert_type)
215

    
216
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
217
        """
218
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
219
        root CA certificate is found. Root certificates are excluded from the chain by default.
220
        :param from_id: ID of the first certificate to be included in the chain of trust
221
        :param to_id: ID of the last certificate to be included in the chain of trust
222
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
223
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
224
        ID
225
        """
226
        # read the first certificate of the chain
227
        start_cert = self.certificate_repository.read(from_id)
228

    
229
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
230
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
231
            return []
232

    
233
        current_cert = start_cert
234
        chain_of_trust = [current_cert]
235

    
236
        # TODO could possibly be simplified
237
        if start_cert.type_id == ROOT_CA_ID:
238
            # the first cert found is a root ca
239
            return chain_of_trust
240

    
241
        while True:
242
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
243

    
244
            # check whether parent certificate exists
245
            if parent_cert is None:
246
                break
247

    
248
            # check whether the found certificate is a root certificate
249
            if parent_cert.type_id == ROOT_CA_ID:
250
                if not exclude_root:
251
                    # append the found root cert only if root certificates should not be excluded from the CoT
252
                    chain_of_trust.append(parent_cert)
253
                break
254

    
255
            # append the certificate
256
            chain_of_trust.append(parent_cert)
257

    
258
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
259
            if parent_cert.certificate_id == to_id:
260
                break
261

    
262
            current_cert = parent_cert
263

    
264
        return chain_of_trust
265

    
266
    def delete_certificate(self, unique_id):
267
        """
268
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
269

    
270
        :param unique_id: ID of specific certificate
271
        """
272

    
273
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
274
        if to_delete is None:
275
            raise CertificateNotFoundException(unique_id)
276

    
277
        for cert in to_delete:
278
            try:
279
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
280
            except CertificateAlreadyRevokedException:
281
                # TODO log as an info/debug, not warning or above <-- perfectly legal
282
                continue
283

    
284
            self.certificate_repository.delete(cert.certificate_id)
285
            # TODO log if not successfully deleted
286

    
287
    def get_certificates_issued_by(self, unique_id):
288
        """
289
        Returns a list of all children of a certificate identified by an unique_id.
290
        Raises a DatabaseException should any unexpected behavior occur.
291
        :param unique_id: target certificate ID
292
        :return: children of unique_id
293
        """
294
        try:
295
            if self.certificate_repository.read(unique_id) is None:
296
                raise CertificateNotFoundException(unique_id)
297
        except DatabaseException:
298
            raise CertificateNotFoundException(unique_id)
299

    
300
        return self.certificate_repository.get_all_issued_by(unique_id)
301

    
302
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
303
        """
304
        Set certificate status to 'valid' or 'revoked'.
305
        If the new status is revoked a reason can be provided -> default is unspecified
306
        :param reason: reason for revocation
307
        :param id: identifier of the certificate whose status is to be changed
308
        :param status: new status of the certificate
309
        """
310
        if status not in CERTIFICATE_STATES:
311
            raise CertificateStatusInvalidException(status)
312
        if reason not in CERTIFICATE_REVOCATION_REASONS:
313
            raise RevocationReasonInvalidException(reason)
314

    
315
        # check whether the certificate exists
316
        certificate = self.certificate_repository.read(id)
317
        if certificate is None:
318
            raise CertificateNotFoundException(id)
319

    
320
        updated = False
321
        if status == STATUS_VALID:
322
            updated = self.certificate_repository.clear_certificate_revocation(id)
323
        elif status == STATUS_REVOKED:
324
            # check if the certificate is not revoked already
325
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
326
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
327
                raise CertificateAlreadyRevokedException(id)
328

    
329
            revocation_timestamp = int(time.time())
330
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
331

    
332
        if not updated:
333
            # TODO log this
334
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
335
                                   "or set_certificate_revoked().")
336

    
337
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
338
        """
339
        Get Subject distinguished name from a Certificate
340
        :param certificate: certificate instance whose Subject shall be parsed
341
        :return: instance of Subject class containing resulting distinguished name
342
        """
343
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
344
        return subject
345

    
346
    def get_public_key_from_certificate(self, certificate: Certificate):
347
        """
348
        Extracts a public key from the given certificate
349
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
350
        should be extracted.
351
        :return: a string containing the extracted public key in PEM format
352
        """
353
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
354

    
355
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
356
        """
357
        Get URL address of CRL distribution endpoint based on
358
        issuer's ID
359

    
360
        :param ca_identifier: ID of issuing authority
361
        :return: CRL endpoint for the given CA
362
        """
363
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
364

    
365
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
366
        """
367
        Get URL address of OCSP distribution endpoint based on
368
        issuer's ID
369

    
370
        :param ca_identifier: ID of issuing authority
371
        :return: OCSP endpoint for the given CA
372
        """
373
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
374

    
375

    
376
class RevocationReasonInvalidException(Exception):
377
    """
378
    Exception that denotes that the caller was trying to revoke
379
    a certificate using an invalid revocation reason
380
    """
381

    
382
    def __init__(self, reason):
383
        self.reason = reason
384

    
385
    def __str__(self):
386
        return f"Revocation reason '{self.reason}' is not valid."
387

    
388

    
389
class CertificateStatusInvalidException(Exception):
390
    """
391
    Exception that denotes that the caller was trying to set
392
    a certificate to an invalid state
393
    """
394

    
395
    def __init__(self, status):
396
        self.status = status
397

    
398
    def __str__(self):
399
        return f"Certificate status '{self.status}' is not valid."
400

    
401

    
402
class CertificateNotFoundException(Exception):
403
    """
404
    Exception that denotes that the caller was trying to set
405
    work with non-existing certificate
406
    """
407

    
408
    def __init__(self, id):
409
        self.id = id
410

    
411
    def __str__(self):
412
        return f"Certificate id '{self.id}' does not exist."
413

    
414

    
415
class CertificateAlreadyRevokedException(Exception):
416
    """
417
    Exception that denotes that the caller was trying to revoke
418
    a certificate that is already revoked
419
    """
420

    
421
    def __init__(self, id):
422
        self.id = id
423

    
424
    def __str__(self):
425
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)