Projekt

Obecné

Profil

Stáhnout (19.1 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
9 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
10 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
11 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
12 313b647b Stanislav Král
from src.model.private_key import PrivateKey
13 4a40b0d2 Stanislav Král
from src.model.subject import Subject
14
from src.services.cryptography import CryptographyService
15
16 313b647b Stanislav Král
import time
17
18 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
19 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
20 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
21
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
22
STATUS_REVOKED = "revoked"
23
STATUS_VALID = "valid"
24 313b647b Stanislav Král
25 4a40b0d2 Stanislav Král
26
class CertificateService:
27
28 151e7604 Jan Pašek
    @inject
29 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
30
                 certificate_repository: CertificateRepository,
31
                 configuration: Configuration):
32 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
33
        self.certificate_repository = certificate_repository
34 20b33bd4 Jan Pašek
        self.configuration = configuration
35 4a40b0d2 Stanislav Král
36 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
37 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
38 2f5101f1 Stanislav Král
                       usages=None, days=30):
39 a6727aa9 Stanislav Král
        """
40
        Creates a root CA certificate based on the given parameters.
41
        :param key: Private key to be used when generating the certificate
42
        :param subject: Subject to be used put into the certificate
43
        :param config: String containing the configuration to be used
44
        :param extensions: Name of the section in the configuration representing extensions
45
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
46 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
47 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
48
        """
49 ca3ac7c0 Stanislav Král
        if usages is None:
50
            usages = {}
51
52 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
53
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
54
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
55
56 313b647b Stanislav Král
        # create a new self signed  certificate
57
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
58 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
59 ca3ac7c0 Stanislav Král
        # specify CA usage
60
        usages[CA_ID] = True
61
62 4c19a9b1 Stanislav Král
        # wrap into Certificate class
63 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
64 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
65 313b647b Stanislav Král
66
        # store the wrapper into the repository
67
        created_id = self.certificate_repository.create(certificate)
68
69
        # assign the generated ID to the inserted certificate
70
        certificate.certificate_id = created_id
71 4a40b0d2 Stanislav Král
72 313b647b Stanislav Král
        return certificate
73 10fab051 Stanislav Král
74 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
75
        """
76 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
77 a6727aa9 Stanislav Král
        notAfter fields.
78
        :param cert_pem: PEM of the cert. to be wrapped
79
        :param private_key_id: ID of the private key used to create the given certificate
80
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
81
        :param parent_id: ID of the CA that issued this certificate
82
        :param cert_type: Type of this certificate (see constants.py)
83
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
84
        """
85 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
86 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
87 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
88
        # format the parsed date
89 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
90
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
91 4c19a9b1 Stanislav Král
92
        # create a certificate wrapper
93 a6727aa9 Stanislav Král
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
94 4c19a9b1 Stanislav Král
                                  private_key_id, cert_type, parent_id, usages)
95
96
        return certificate
97
98 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
99
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
100 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
101 a6727aa9 Stanislav Král
        """
102
        Creates an intermediate CA certificate issued by the given parent CA.
103
        :param subject_key: Private key to be used when generating the certificate
104
        :param subject: Subject to be used put into the certificate
105
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
106
        :param issuer_key: PK used to generate the issuer certificate
107
        :param extensions: Extensions to be used when generating the certificate
108
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
109
        :param days: Number of days for which the generated cert. will be considered valid
110
        :return: An instance of Certificate class representing the generated intermediate CA cert
111
        """
112 ca3ac7c0 Stanislav Král
        if usages is None:
113
            usages = {}
114
115 bbcb7c89 Stanislav Král
        extensions = extensions + "\n" + CA_EXTENSIONS
116 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
117
        cert_id = self.certificate_repository.get_next_id()
118
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
119
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
120
121 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
122
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
123
                                                        issuer_key.private_key,
124
                                                        subject_key_pass=subject_key.password,
125
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
126 87c56935 Stanislav Král
                                                        days=days,
127
                                                        sn=cert_id)
128 bbcb7c89 Stanislav Král
129 4c19a9b1 Stanislav Král
        # specify CA usage
130
        usages[CA_ID] = True
131
132
        # wrap into Certificate class
133 a6727aa9 Stanislav Král
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
134 4c19a9b1 Stanislav Král
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
135
136 bbcb7c89 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
137
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
138
139
        # format the parsed date
140 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
141
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
142 bbcb7c89 Stanislav Král
143 ca3ac7c0 Stanislav Král
        # specify CA usage
144
        usages[CA_ID] = True
145
146 bbcb7c89 Stanislav Král
        # create a certificate wrapper
147
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
148 ca3ac7c0 Stanislav Král
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
149 bbcb7c89 Stanislav Král
150
        # store the wrapper into the repository
151
        created_id = self.certificate_repository.create(certificate)
152
153
        # assign the generated ID to the inserted certificate
154
        certificate.certificate_id = created_id
155
156
        return certificate
157
158 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
159
                        issuer_key: PrivateKey,
160
                        extensions: str = "", days: int = 30, usages=None):
161 a6727aa9 Stanislav Král
        """
162
        Creates an end certificate issued by the given parent CA.
163
        :param subject_key: Private key to be used when generating the certificate
164
        :param subject: Subject to be used put into the certificate
165
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
166
        :param issuer_key: PK used to generate the issuer certificate
167
        :param extensions: Extensions to be used when generating the certificate
168
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
169
        :param days: Number of days for which the generated cert. will be considered valid
170
        :return: An instance of Certificate class representing the generated cert
171
        """
172 4c19a9b1 Stanislav Král
        if usages is None:
173
            usages = {}
174
175 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
176
        cert_id = self.certificate_repository.get_next_id()
177
178 4c19a9b1 Stanislav Král
        # generate a new certificate
179
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
180
                                                        issuer_key.private_key,
181
                                                        subject_key_pass=subject_key.password,
182
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
183 87c56935 Stanislav Král
                                                        days=days,
184
                                                        sn=cert_id
185
                                                        )
186 4c19a9b1 Stanislav Král
187
        # wrap the generated certificate using Certificate class
188 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
189 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
190
191
        created_id = self.certificate_repository.create(certificate)
192
193
        certificate.certificate_id = created_id
194
195
        return certificate
196
197 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
198 a6727aa9 Stanislav Král
        """
199
        Tries to fetch a certificate from the certificate repository using a given id.
200
        :param unique_id: ID of the certificate to be fetched
201
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
202
        certificate is not found
203
        """
204 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
205 2a90f4fd Stanislav Král
206 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
207 a6727aa9 Stanislav Král
        """
208
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
209
        certificates of the given type can be returned.
210
        :param cert_type: Type of certificates to be returned
211
        :return: List of instances of the Certificate class representing all certificates present in the certificate
212
        repository. An empty list is returned when no certificates are found.
213
        """
214 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
215 ef65f488 Stanislav Král
216
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
217 4e70d22a Stanislav Král
        """
218
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
219
        root CA certificate is found. Root certificates are excluded from the chain by default.
220
        :param from_id: ID of the first certificate to be included in the chain of trust
221
        :param to_id: ID of the last certificate to be included in the chain of trust
222
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
223
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
224
        ID
225
        """
226
        # read the first certificate of the chain
227 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
228
229 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
230 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
231
            return []
232
233
        current_cert = start_cert
234
        chain_of_trust = [current_cert]
235
236
        # TODO could possibly be simplified
237
        if start_cert.type_id == ROOT_CA_ID:
238 4e70d22a Stanislav Král
            # the first cert found is a root ca
239 ef65f488 Stanislav Král
            return chain_of_trust
240
241
        while True:
242
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
243
244 4e70d22a Stanislav Král
            # check whether parent certificate exists
245
            if parent_cert is None:
246
                break
247
248
            # check whether the found certificate is a root certificate
249
            if parent_cert.type_id == ROOT_CA_ID:
250 ef65f488 Stanislav Král
                if not exclude_root:
251 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
252 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
253
                break
254
255 4e70d22a Stanislav Král
            # append the certificate
256 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
257
258 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
259 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
260
                break
261
262
            current_cert = parent_cert
263
264
        return chain_of_trust
265 3d639744 Stanislav Král
266 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
267 3d639744 Stanislav Král
        """
268 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
269 3d639744 Stanislav Král
270
        :param unique_id: ID of specific certificate
271
        """
272 5f8a2c07 Captain_Trojan
273
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
274
        if to_delete is None:
275
            raise CertificateNotFoundException(unique_id)
276
277
        for cert in to_delete:
278
            try:
279
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
280
            except CertificateAlreadyRevokedException:
281
                # TODO log as an info/debug, not warning or above <-- perfectly legal
282
                continue
283
284
            self.certificate_repository.delete(cert.certificate_id)
285
            # TODO log if not successfully deleted
286 c4ba6bb7 Jan Pašek
287 485913d0 Captain_Trojan
    def get_certificates_issued_by(self, unique_id):
288
        """
289
        Returns a list of all children of a certificate identified by an unique_id.
290
        Raises a DatabaseException should any unexpected behavior occur.
291
        :param unique_id: target certificate ID
292
        :return: children of unique_id
293
        """
294
        try:
295
            if self.certificate_repository.read(unique_id) is None:
296
                raise CertificateNotFoundException(unique_id)
297
        except DatabaseException:
298
            raise CertificateNotFoundException(unique_id)
299
300
        return self.certificate_repository.get_all_issued_by(unique_id)
301
302 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
303
        """
304
        Set certificate status to 'valid' or 'revoked'.
305
        If the new status is revoked a reason can be provided -> default is unspecified
306
        :param reason: reason for revocation
307
        :param id: identifier of the certificate whose status is to be changed
308
        :param status: new status of the certificate
309
        """
310
        if status not in CERTIFICATE_STATES:
311
            raise CertificateStatusInvalidException(status)
312
        if reason not in CERTIFICATE_REVOCATION_REASONS:
313
            raise RevocationReasonInvalidException(reason)
314
315 9e6f791a Jan Pašek
        # check whether the certificate exists
316
        certificate = self.certificate_repository.read(id)
317
        if certificate is None:
318
            raise CertificateNotFoundException(id)
319
320 9c704fb1 Jan Pašek
        updated = False
321 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
322 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
323 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
324 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
325
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
326
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
327
                raise CertificateAlreadyRevokedException(id)
328
329 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
330 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
331
332
        if not updated:
333 9e6f791a Jan Pašek
            # TODO log this
334
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
335
                                   "or set_certificate_revoked().")
336 20b33bd4 Jan Pašek
337 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
338
        """
339
        Get Subject distinguished name from a Certificate
340
        :param certificate: certificate instance whose Subject shall be parsed
341
        :return: instance of Subject class containing resulting distinguished name
342
        """
343
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
344
        return subject
345 d3bfacfc Stanislav Král
346
    def get_public_key_from_certificate(self, certificate: Certificate):
347
        """
348
        Extracts a public key from the given certificate
349
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
350
        should be extracted.
351
        :return: a string containing the extracted public key in PEM format
352
        """
353
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
354 20b33bd4 Jan Pašek
355
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
356
        """
357
        Get URL address of CRL distribution endpoint based on
358
        issuer's ID
359
360
        :param ca_identifier: ID of issuing authority
361
        :return: CRL endpoint for the given CA
362
        """
363
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
364
365
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
366
        """
367
        Get URL address of OCSP distribution endpoint based on
368
        issuer's ID
369
370
        :param ca_identifier: ID of issuing authority
371
        :return: OCSP endpoint for the given CA
372
        """
373
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
374
375
376
class RevocationReasonInvalidException(Exception):
377
    """
378
    Exception that denotes that the caller was trying to revoke
379
    a certificate using an invalid revocation reason
380
    """
381
382
    def __init__(self, reason):
383
        self.reason = reason
384
385
    def __str__(self):
386
        return f"Revocation reason '{self.reason}' is not valid."
387
388
389
class CertificateStatusInvalidException(Exception):
390
    """
391
    Exception that denotes that the caller was trying to set
392
    a certificate to an invalid state
393
    """
394
395
    def __init__(self, status):
396
        self.status = status
397
398
    def __str__(self):
399
        return f"Certificate status '{self.status}' is not valid."
400 9c704fb1 Jan Pašek
401
402
class CertificateNotFoundException(Exception):
403
    """
404
    Exception that denotes that the caller was trying to set
405 9e6f791a Jan Pašek
    work with non-existing certificate
406 9c704fb1 Jan Pašek
    """
407
408
    def __init__(self, id):
409
        self.id = id
410
411
    def __str__(self):
412
        return f"Certificate id '{self.id}' does not exist."
413 9e6f791a Jan Pašek
414
415
class CertificateAlreadyRevokedException(Exception):
416
    """
417
    Exception that denotes that the caller was trying to revoke
418
    a certificate that is already revoked
419
    """
420
421
    def __init__(self, id):
422
        self.id = id
423
424
    def __str__(self):
425
        return f"Certificate id '{self.id}' is already revoked."