Projekt

Obecné

Profil

Stáhnout (17.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.unknown_exception import UnknownException
10
from src.model.certificate import Certificate
11
from src.model.private_key import PrivateKey
12
from src.model.subject import Subject
13
from src.services.cryptography import CryptographyService
14

    
15
import time
16

    
17
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
18
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
19
CRL_EXTENSION = "crlDistributionPoints=URI:"
20
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
21
STATUS_REVOKED = "revoked"
22
STATUS_VALID = "valid"
23

    
24

    
25
class CertificateService:
26

    
27
    @inject
28
    def __init__(self, cryptography_service: CryptographyService,
29
                 certificate_repository: CertificateRepository,
30
                 configuration: Configuration):
31
        self.cryptography_service = cryptography_service
32
        self.certificate_repository = certificate_repository
33
        self.configuration = configuration
34

    
35
    # TODO usages present in method parameters but not in class diagram
36
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
37
                       usages=None, days=30):
38
        """
39
        Creates a root CA certificate based on the given parameters.
40
        :param key: Private key to be used when generating the certificate
41
        :param subject: Subject to be used put into the certificate
42
        :param config: String containing the configuration to be used
43
        :param extensions: Name of the section in the configuration representing extensions
44
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
45
        :param days: Number of days for which the generated cert. will be considered valid
46
        :return: An instance of Certificate class representing the generated root CA cert
47
        """
48
        if usages is None:
49
            usages = {}
50

    
51
        cert_id = self.certificate_repository.get_next_id()
52
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
53
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
54

    
55
        # create a new self signed  certificate
56
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
57
                                                          extensions=extensions, config=config, days=days)
58
        # specify CA usage
59
        usages[CA_ID] = True
60

    
61
        # wrap into Certificate class
62
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
63
                                            ROOT_CA_ID)
64

    
65
        # store the wrapper into the repository
66
        created_id = self.certificate_repository.create(certificate)
67

    
68
        # assign the generated ID to the inserted certificate
69
        certificate.certificate_id = created_id
70

    
71
        return certificate
72

    
73
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
74
        """
75
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
76
        notAfter fields.
77
        :param cert_pem: PEM of the cert. to be wrapped
78
        :param private_key_id: ID of the private key used to create the given certificate
79
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
80
        :param parent_id: ID of the CA that issued this certificate
81
        :param cert_type: Type of this certificate (see constants.py)
82
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
83
        """
84
        # parse the generated pem for subject and notBefore/notAfter fields
85
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
86
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
87
        # format the parsed date
88
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
89
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
90

    
91
        # create a certificate wrapper
92
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
93
                                  private_key_id, cert_type, parent_id, usages)
94

    
95
        return certificate
96

    
97
    # TODO config parameter present in class diagram but not here (unused)
98
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
99
                  extensions: str = "", days: int = 30, usages=None):
100
        """
101
        Creates an intermediate CA certificate issued by the given parent CA.
102
        :param subject_key: Private key to be used when generating the certificate
103
        :param subject: Subject to be used put into the certificate
104
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
105
        :param issuer_key: PK used to generate the issuer certificate
106
        :param extensions: Extensions to be used when generating the certificate
107
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
108
        :param days: Number of days for which the generated cert. will be considered valid
109
        :return: An instance of Certificate class representing the generated intermediate CA cert
110
        """
111
        if usages is None:
112
            usages = {}
113

    
114
        extensions = extensions + "\n" + CA_EXTENSIONS
115
        # Add CRL and OCSP distribution point to certificate extensions
116
        cert_id = self.certificate_repository.get_next_id()
117
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
118
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
119

    
120
        # TODO implement AIA URI via extensions
121
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
122
                                                        issuer_key.private_key,
123
                                                        subject_key_pass=subject_key.password,
124
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
125
                                                        days=days)
126

    
127
        # specify CA usage
128
        usages[CA_ID] = True
129

    
130
        # wrap into Certificate class
131
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
132
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
133

    
134
        # parse the generated pem for subject and notBefore/notAfter fields
135
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
136

    
137
        # format the parsed date
138
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
139
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
140

    
141
        # specify CA usage
142
        usages[CA_ID] = True
143

    
144
        # create a certificate wrapper
145
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
146
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
147

    
148
        # store the wrapper into the repository
149
        created_id = self.certificate_repository.create(certificate)
150

    
151
        # assign the generated ID to the inserted certificate
152
        certificate.certificate_id = created_id
153

    
154
        return certificate
155

    
156
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
157
                        issuer_key: PrivateKey,
158
                        extensions: str = "", days: int = 30, usages=None):
159
        """
160
        Creates an end certificate issued by the given parent CA.
161
        :param subject_key: Private key to be used when generating the certificate
162
        :param subject: Subject to be used put into the certificate
163
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
164
        :param issuer_key: PK used to generate the issuer certificate
165
        :param extensions: Extensions to be used when generating the certificate
166
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
167
        :param days: Number of days for which the generated cert. will be considered valid
168
        :return: An instance of Certificate class representing the generated cert
169
        """
170
        if usages is None:
171
            usages = {}
172

    
173
        # generate a new certificate
174
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
175
                                                        issuer_key.private_key,
176
                                                        subject_key_pass=subject_key.password,
177
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
178
                                                        days=days)
179

    
180
        # wrap the generated certificate using Certificate class
181
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
182
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
183

    
184
        created_id = self.certificate_repository.create(certificate)
185

    
186
        certificate.certificate_id = created_id
187

    
188
        return certificate
189

    
190
    def get_certificate(self, unique_id: int) -> Certificate:
191
        """
192
        Tries to fetch a certificate from the certificate repository using a given id.
193
        :param unique_id: ID of the certificate to be fetched
194
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
195
        certificate is not found
196
        """
197
        return self.certificate_repository.read(unique_id)
198

    
199
    def get_certificates(self, cert_type=None) -> List[Certificate]:
200
        """
201
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
202
        certificates of the given type can be returned.
203
        :param cert_type: Type of certificates to be returned
204
        :return: List of instances of the Certificate class representing all certificates present in the certificate
205
        repository. An empty list is returned when no certificates are found.
206
        """
207
        return self.certificate_repository.read_all(cert_type)
208

    
209
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
210
        """
211
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
212
        root CA certificate is found. Root certificates are excluded from the chain by default.
213
        :param from_id: ID of the first certificate to be included in the chain of trust
214
        :param to_id: ID of the last certificate to be included in the chain of trust
215
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
216
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
217
        ID
218
        """
219
        # read the first certificate of the chain
220
        start_cert = self.certificate_repository.read(from_id)
221

    
222
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
223
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
224
            return []
225

    
226
        current_cert = start_cert
227
        chain_of_trust = [current_cert]
228

    
229
        # TODO could possibly be simplified
230
        if start_cert.type_id == ROOT_CA_ID:
231
            # the first cert found is a root ca
232
            return chain_of_trust
233

    
234
        while True:
235
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
236

    
237
            # check whether parent certificate exists
238
            if parent_cert is None:
239
                break
240

    
241
            # check whether the found certificate is a root certificate
242
            if parent_cert.type_id == ROOT_CA_ID:
243
                if not exclude_root:
244
                    # append the found root cert only if root certificates should not be excluded from the CoT
245
                    chain_of_trust.append(parent_cert)
246
                break
247

    
248
            # append the certificate
249
            chain_of_trust.append(parent_cert)
250

    
251
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
252
            if parent_cert.certificate_id == to_id:
253
                break
254

    
255
            current_cert = parent_cert
256

    
257
        return chain_of_trust
258

    
259
    def delete_certificate(self, unique_id) -> bool:
260
        """
261
        Deletes a certificate
262

    
263
        :param unique_id: ID of specific certificate
264

    
265
        :return: `True` when the deletion was successful. `False` in other case
266
        """
267
        # TODO delete children?
268
        return self.certificate_repository.delete(unique_id)
269

    
270
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
271
        """
272
        Set certificate status to 'valid' or 'revoked'.
273
        If the new status is revoked a reason can be provided -> default is unspecified
274
        :param reason: reason for revocation
275
        :param id: identifier of the certificate whose status is to be changed
276
        :param status: new status of the certificate
277
        """
278
        if status not in CERTIFICATE_STATES:
279
            raise CertificateStatusInvalidException(status)
280
        if reason not in CERTIFICATE_REVOCATION_REASONS:
281
            raise RevocationReasonInvalidException(reason)
282

    
283
        # check whether the certificate exists
284
        certificate = self.certificate_repository.read(id)
285
        if certificate is None:
286
            raise CertificateNotFoundException(id)
287

    
288
        updated = False
289
        if status == STATUS_VALID:
290
            updated = self.certificate_repository.clear_certificate_revocation(id)
291
        elif status == STATUS_REVOKED:
292
            # check if the certificate is not revoked already
293
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
294
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
295
                raise CertificateAlreadyRevokedException(id)
296

    
297
            revocation_timestamp = int(time.time())
298
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
299

    
300
        if not updated:
301
            # TODO log this
302
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
303
                                   "or set_certificate_revoked().")
304

    
305
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
306
        """
307
        Get Subject distinguished name from a Certificate
308
        :param certificate: certificate instance whose Subject shall be parsed
309
        :return: instance of Subject class containing resulting distinguished name
310
        """
311
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
312
        return subject
313

    
314
    def get_public_key_from_certificate(self, certificate: Certificate):
315
        """
316
        Extracts a public key from the given certificate
317
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
318
        should be extracted.
319
        :return: a string containing the extracted public key in PEM format
320
        """
321
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
322

    
323
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
324
        """
325
        Get URL address of CRL distribution endpoint based on
326
        issuer's ID
327

    
328
        :param ca_identifier: ID of issuing authority
329
        :return: CRL endpoint for the given CA
330
        """
331
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
332

    
333
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
334
        """
335
        Get URL address of OCSP distribution endpoint based on
336
        issuer's ID
337

    
338
        :param ca_identifier: ID of issuing authority
339
        :return: OCSP endpoint for the given CA
340
        """
341
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
342

    
343

    
344
class RevocationReasonInvalidException(Exception):
345
    """
346
    Exception that denotes that the caller was trying to revoke
347
    a certificate using an invalid revocation reason
348
    """
349

    
350
    def __init__(self, reason):
351
        self.reason = reason
352

    
353
    def __str__(self):
354
        return f"Revocation reason '{self.reason}' is not valid."
355

    
356

    
357
class CertificateStatusInvalidException(Exception):
358
    """
359
    Exception that denotes that the caller was trying to set
360
    a certificate to an invalid state
361
    """
362

    
363
    def __init__(self, status):
364
        self.status = status
365

    
366
    def __str__(self):
367
        return f"Certificate status '{self.status}' is not valid."
368

    
369

    
370
class CertificateNotFoundException(Exception):
371
    """
372
    Exception that denotes that the caller was trying to set
373
    work with non-existing certificate
374
    """
375

    
376
    def __init__(self, id):
377
        self.id = id
378

    
379
    def __str__(self):
380
        return f"Certificate id '{self.id}' does not exist."
381

    
382

    
383
class CertificateAlreadyRevokedException(Exception):
384
    """
385
    Exception that denotes that the caller was trying to revoke
386
    a certificate that is already revoked
387
    """
388

    
389
    def __init__(self, id):
390
        self.id = id
391

    
392
    def __str__(self):
393
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)