Projekt

Obecné

Profil

Stáhnout (19.1 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
9 8ff50e4e Jan Pašek
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
11 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
12 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
13 313b647b Stanislav Král
from src.model.private_key import PrivateKey
14 4a40b0d2 Stanislav Král
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16
17 313b647b Stanislav Král
import time
18
19 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
20 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
21 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
22
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
23
STATUS_REVOKED = "revoked"
24
STATUS_VALID = "valid"
25 313b647b Stanislav Král
26 4a40b0d2 Stanislav Král
27
class CertificateService:
28
29 151e7604 Jan Pašek
    @inject
30 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
31
                 certificate_repository: CertificateRepository,
32
                 configuration: Configuration):
33 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
34
        self.certificate_repository = certificate_repository
35 20b33bd4 Jan Pašek
        self.configuration = configuration
36 4a40b0d2 Stanislav Král
37 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
38 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
39 2f5101f1 Stanislav Král
                       usages=None, days=30):
40 a6727aa9 Stanislav Král
        """
41
        Creates a root CA certificate based on the given parameters.
42
        :param key: Private key to be used when generating the certificate
43
        :param subject: Subject to be used put into the certificate
44
        :param config: String containing the configuration to be used
45
        :param extensions: Name of the section in the configuration representing extensions
46
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
47 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
48 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
49
        """
50 ca3ac7c0 Stanislav Král
        if usages is None:
51
            usages = {}
52
53 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
54
55 313b647b Stanislav Král
        # create a new self signed  certificate
56
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
57 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
58 ca3ac7c0 Stanislav Král
        # specify CA usage
59
        usages[CA_ID] = True
60
61 4c19a9b1 Stanislav Král
        # wrap into Certificate class
62 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
63 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
64 313b647b Stanislav Král
65
        # store the wrapper into the repository
66
        created_id = self.certificate_repository.create(certificate)
67
68
        # assign the generated ID to the inserted certificate
69
        certificate.certificate_id = created_id
70 4a40b0d2 Stanislav Král
71 313b647b Stanislav Král
        return certificate
72 10fab051 Stanislav Král
73 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
74
        """
75 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
76 a6727aa9 Stanislav Král
        notAfter fields.
77
        :param cert_pem: PEM of the cert. to be wrapped
78
        :param private_key_id: ID of the private key used to create the given certificate
79
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
80
        :param parent_id: ID of the CA that issued this certificate
81
        :param cert_type: Type of this certificate (see constants.py)
82
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
83
        """
84 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
85 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
86 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
87
        # format the parsed date
88 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
89
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
90 4c19a9b1 Stanislav Král
91
        # create a certificate wrapper
92 a6727aa9 Stanislav Král
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
93 4c19a9b1 Stanislav Král
                                  private_key_id, cert_type, parent_id, usages)
94
95
        return certificate
96
97 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
98
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
99 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
100 a6727aa9 Stanislav Král
        """
101
        Creates an intermediate CA certificate issued by the given parent CA.
102
        :param subject_key: Private key to be used when generating the certificate
103
        :param subject: Subject to be used put into the certificate
104
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
105
        :param issuer_key: PK used to generate the issuer certificate
106
        :param extensions: Extensions to be used when generating the certificate
107
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
108
        :param days: Number of days for which the generated cert. will be considered valid
109
        :return: An instance of Certificate class representing the generated intermediate CA cert
110
        """
111 ca3ac7c0 Stanislav Král
        if usages is None:
112
            usages = {}
113
114 bbcb7c89 Stanislav Král
        extensions = extensions + "\n" + CA_EXTENSIONS
115 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
116
        cert_id = self.certificate_repository.get_next_id()
117 ea1229ee Jan Pašek
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
118
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
119 20b33bd4 Jan Pašek
120 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
121
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
122
                                                        issuer_key.private_key,
123
                                                        subject_key_pass=subject_key.password,
124
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
125 87c56935 Stanislav Král
                                                        days=days,
126
                                                        sn=cert_id)
127 bbcb7c89 Stanislav Král
128 4c19a9b1 Stanislav Král
        # specify CA usage
129
        usages[CA_ID] = True
130
131
        # wrap into Certificate class
132 a6727aa9 Stanislav Král
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
133 4c19a9b1 Stanislav Král
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
134
135 bbcb7c89 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
136
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
137
138
        # format the parsed date
139 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
140
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
141 bbcb7c89 Stanislav Král
142 ca3ac7c0 Stanislav Král
        # specify CA usage
143
        usages[CA_ID] = True
144
145 bbcb7c89 Stanislav Král
        # create a certificate wrapper
146
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
147 ca3ac7c0 Stanislav Král
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
148 bbcb7c89 Stanislav Král
149
        # store the wrapper into the repository
150
        created_id = self.certificate_repository.create(certificate)
151
152
        # assign the generated ID to the inserted certificate
153
        certificate.certificate_id = created_id
154
155
        return certificate
156
157 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
158
                        issuer_key: PrivateKey,
159
                        extensions: str = "", days: int = 30, usages=None):
160 a6727aa9 Stanislav Král
        """
161
        Creates an end certificate issued by the given parent CA.
162
        :param subject_key: Private key to be used when generating the certificate
163
        :param subject: Subject to be used put into the certificate
164
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
165
        :param issuer_key: PK used to generate the issuer certificate
166
        :param extensions: Extensions to be used when generating the certificate
167
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
168
        :param days: Number of days for which the generated cert. will be considered valid
169
        :return: An instance of Certificate class representing the generated cert
170
        """
171 4c19a9b1 Stanislav Král
        if usages is None:
172
            usages = {}
173
174 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
175
        cert_id = self.certificate_repository.get_next_id()
176
177 ea1229ee Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
178
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
179
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
180
181 4c19a9b1 Stanislav Král
        # generate a new certificate
182
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
183
                                                        issuer_key.private_key,
184
                                                        subject_key_pass=subject_key.password,
185
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
186 87c56935 Stanislav Král
                                                        days=days,
187
                                                        sn=cert_id
188
                                                        )
189 4c19a9b1 Stanislav Král
190
        # wrap the generated certificate using Certificate class
191 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
192 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
193
194
        created_id = self.certificate_repository.create(certificate)
195
196
        certificate.certificate_id = created_id
197
198
        return certificate
199
200 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
201 a6727aa9 Stanislav Král
        """
202
        Tries to fetch a certificate from the certificate repository using a given id.
203
        :param unique_id: ID of the certificate to be fetched
204
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
205
        certificate is not found
206
        """
207 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
208 2a90f4fd Stanislav Král
209 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
210 a6727aa9 Stanislav Král
        """
211
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
212
        certificates of the given type can be returned.
213
        :param cert_type: Type of certificates to be returned
214
        :return: List of instances of the Certificate class representing all certificates present in the certificate
215
        repository. An empty list is returned when no certificates are found.
216
        """
217 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
218 ef65f488 Stanislav Král
219
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
220 4e70d22a Stanislav Král
        """
221
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
222
        root CA certificate is found. Root certificates are excluded from the chain by default.
223
        :param from_id: ID of the first certificate to be included in the chain of trust
224
        :param to_id: ID of the last certificate to be included in the chain of trust
225
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
226
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
227
        ID
228
        """
229
        # read the first certificate of the chain
230 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
231
232 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
233 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
234
            return []
235
236
        current_cert = start_cert
237
        chain_of_trust = [current_cert]
238
239
        # TODO could possibly be simplified
240
        if start_cert.type_id == ROOT_CA_ID:
241 4e70d22a Stanislav Král
            # the first cert found is a root ca
242 ef65f488 Stanislav Král
            return chain_of_trust
243
244
        while True:
245
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
246
247 4e70d22a Stanislav Král
            # check whether parent certificate exists
248
            if parent_cert is None:
249
                break
250
251
            # check whether the found certificate is a root certificate
252
            if parent_cert.type_id == ROOT_CA_ID:
253 ef65f488 Stanislav Král
                if not exclude_root:
254 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
255 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
256
                break
257
258 4e70d22a Stanislav Král
            # append the certificate
259 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
260
261 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
262 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
263
                break
264
265
            current_cert = parent_cert
266
267
        return chain_of_trust
268 3d639744 Stanislav Král
269 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
270 3d639744 Stanislav Král
        """
271 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
272 3d639744 Stanislav Král
273
        :param unique_id: ID of specific certificate
274
        """
275 5f8a2c07 Captain_Trojan
276
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
277
        if to_delete is None:
278
            raise CertificateNotFoundException(unique_id)
279
280
        for cert in to_delete:
281
            try:
282
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
283
            except CertificateAlreadyRevokedException:
284
                # TODO log as an info/debug, not warning or above <-- perfectly legal
285
                continue
286
287
            self.certificate_repository.delete(cert.certificate_id)
288
            # TODO log if not successfully deleted
289 c4ba6bb7 Jan Pašek
290 485913d0 Captain_Trojan
    def get_certificates_issued_by(self, unique_id):
291
        """
292
        Returns a list of all children of a certificate identified by an unique_id.
293
        Raises a DatabaseException should any unexpected behavior occur.
294
        :param unique_id: target certificate ID
295
        :return: children of unique_id
296
        """
297
        try:
298
            if self.certificate_repository.read(unique_id) is None:
299
                raise CertificateNotFoundException(unique_id)
300
        except DatabaseException:
301
            raise CertificateNotFoundException(unique_id)
302
303
        return self.certificate_repository.get_all_issued_by(unique_id)
304
305 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
306
        """
307
        Set certificate status to 'valid' or 'revoked'.
308
        If the new status is revoked a reason can be provided -> default is unspecified
309
        :param reason: reason for revocation
310
        :param id: identifier of the certificate whose status is to be changed
311
        :param status: new status of the certificate
312
        """
313
        if status not in CERTIFICATE_STATES:
314
            raise CertificateStatusInvalidException(status)
315
        if reason not in CERTIFICATE_REVOCATION_REASONS:
316
            raise RevocationReasonInvalidException(reason)
317
318 9e6f791a Jan Pašek
        # check whether the certificate exists
319
        certificate = self.certificate_repository.read(id)
320
        if certificate is None:
321
            raise CertificateNotFoundException(id)
322
323 9c704fb1 Jan Pašek
        updated = False
324 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
325 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
326 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
327 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
328
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
329
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
330
                raise CertificateAlreadyRevokedException(id)
331
332 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
333 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
334
335
        if not updated:
336 9e6f791a Jan Pašek
            # TODO log this
337
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
338
                                   "or set_certificate_revoked().")
339 20b33bd4 Jan Pašek
340 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
341
        """
342
        Get Subject distinguished name from a Certificate
343
        :param certificate: certificate instance whose Subject shall be parsed
344
        :return: instance of Subject class containing resulting distinguished name
345
        """
346
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
347
        return subject
348 d3bfacfc Stanislav Král
349
    def get_public_key_from_certificate(self, certificate: Certificate):
350
        """
351
        Extracts a public key from the given certificate
352
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
353
        should be extracted.
354
        :return: a string containing the extracted public key in PEM format
355
        """
356
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
357 20b33bd4 Jan Pašek
358
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
359
        """
360
        Get URL address of CRL distribution endpoint based on
361
        issuer's ID
362
363
        :param ca_identifier: ID of issuing authority
364
        :return: CRL endpoint for the given CA
365
        """
366
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
367
368
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
369
        """
370
        Get URL address of OCSP distribution endpoint based on
371
        issuer's ID
372
373
        :param ca_identifier: ID of issuing authority
374
        :return: OCSP endpoint for the given CA
375
        """
376
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
377
378
379
class RevocationReasonInvalidException(Exception):
380
    """
381
    Exception that denotes that the caller was trying to revoke
382
    a certificate using an invalid revocation reason
383
    """
384
385
    def __init__(self, reason):
386
        self.reason = reason
387
388
    def __str__(self):
389
        return f"Revocation reason '{self.reason}' is not valid."
390
391
392
class CertificateStatusInvalidException(Exception):
393
    """
394
    Exception that denotes that the caller was trying to set
395
    a certificate to an invalid state
396
    """
397
398
    def __init__(self, status):
399
        self.status = status
400
401
    def __str__(self):
402
        return f"Certificate status '{self.status}' is not valid."
403 9c704fb1 Jan Pašek
404
405 9e6f791a Jan Pašek
class CertificateAlreadyRevokedException(Exception):
406
    """
407
    Exception that denotes that the caller was trying to revoke
408
    a certificate that is already revoked
409
    """
410
411
    def __init__(self, id):
412
        self.id = id
413
414
    def __str__(self):
415
        return f"Certificate id '{self.id}' is already revoked."