Projekt

Obecné

Profil

Stáhnout (20.3 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7 329216fe Stanislav Král
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID
8 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
9 8ff50e4e Jan Pašek
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
11 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
12 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
13 313b647b Stanislav Král
from src.model.private_key import PrivateKey
14 4a40b0d2 Stanislav Král
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16
17 313b647b Stanislav Král
import time
18
19 329216fe Stanislav Král
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
20
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
21
    CLIENT_AUTH
22
23 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
24 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
25 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
26
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
27
STATUS_REVOKED = "revoked"
28
STATUS_VALID = "valid"
29 313b647b Stanislav Král
30 329216fe Stanislav Král
# define which flags are required for various usages
31
REQUIRED_USAGE_EXTENSION_FLAGS = {
32
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
33
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
34 52f2eca4 Jan Pašek
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
35 329216fe Stanislav Král
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
36
37 4a40b0d2 Stanislav Král
38
class CertificateService:
39
40 151e7604 Jan Pašek
    @inject
41 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
42
                 certificate_repository: CertificateRepository,
43
                 configuration: Configuration):
44 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
45
        self.certificate_repository = certificate_repository
46 20b33bd4 Jan Pašek
        self.configuration = configuration
47 4a40b0d2 Stanislav Král
48 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
49 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
50 2f5101f1 Stanislav Král
                       usages=None, days=30):
51 a6727aa9 Stanislav Král
        """
52
        Creates a root CA certificate based on the given parameters.
53
        :param key: Private key to be used when generating the certificate
54
        :param subject: Subject to be used put into the certificate
55
        :param config: String containing the configuration to be used
56
        :param extensions: Name of the section in the configuration representing extensions
57
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
58 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
59 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
60
        """
61 ca3ac7c0 Stanislav Král
        if usages is None:
62
            usages = {}
63
64 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
65
66 329216fe Stanislav Král
        # specify CA usage
67
        usages[CA_ID] = True
68
69
        # generate extension configuration lines based on the specified usages
70
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
71
72 313b647b Stanislav Král
        # create a new self signed  certificate
73
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
74 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
75 ca3ac7c0 Stanislav Král
76 4c19a9b1 Stanislav Král
        # wrap into Certificate class
77 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
78 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
79 313b647b Stanislav Král
80
        # store the wrapper into the repository
81
        created_id = self.certificate_repository.create(certificate)
82
83
        # assign the generated ID to the inserted certificate
84
        certificate.certificate_id = created_id
85 4a40b0d2 Stanislav Král
86 313b647b Stanislav Král
        return certificate
87 10fab051 Stanislav Král
88 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
89
        """
90 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
91 a6727aa9 Stanislav Král
        notAfter fields.
92
        :param cert_pem: PEM of the cert. to be wrapped
93
        :param private_key_id: ID of the private key used to create the given certificate
94
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
95
        :param parent_id: ID of the CA that issued this certificate
96
        :param cert_type: Type of this certificate (see constants.py)
97
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
98
        """
99 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
100 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
101 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
102
        # format the parsed date
103 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
104
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
105 4c19a9b1 Stanislav Král
106
        # create a certificate wrapper
107 a6727aa9 Stanislav Král
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
108 4c19a9b1 Stanislav Král
                                  private_key_id, cert_type, parent_id, usages)
109
110
        return certificate
111
112 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
113
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
114 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
115 a6727aa9 Stanislav Král
        """
116
        Creates an intermediate CA certificate issued by the given parent CA.
117
        :param subject_key: Private key to be used when generating the certificate
118
        :param subject: Subject to be used put into the certificate
119
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
120
        :param issuer_key: PK used to generate the issuer certificate
121
        :param extensions: Extensions to be used when generating the certificate
122
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
123
        :param days: Number of days for which the generated cert. will be considered valid
124
        :return: An instance of Certificate class representing the generated intermediate CA cert
125
        """
126 ca3ac7c0 Stanislav Král
        if usages is None:
127
            usages = {}
128
129 329216fe Stanislav Král
        # specify CA usage
130
        usages[CA_ID] = True
131
132
        # generate extension configuration lines based on the specified usages
133
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
134
135 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
136
        cert_id = self.certificate_repository.get_next_id()
137 ea1229ee Jan Pašek
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
138
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
139 20b33bd4 Jan Pašek
140 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
141
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
142
                                                        issuer_key.private_key,
143
                                                        subject_key_pass=subject_key.password,
144
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
145 87c56935 Stanislav Král
                                                        days=days,
146
                                                        sn=cert_id)
147 bbcb7c89 Stanislav Král
148 4c19a9b1 Stanislav Král
        # wrap into Certificate class
149 a6727aa9 Stanislav Král
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
150 4c19a9b1 Stanislav Král
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
151
152 bbcb7c89 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
153
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
154
155
        # format the parsed date
156 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
157
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
158 bbcb7c89 Stanislav Král
159
        # create a certificate wrapper
160
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
161 ca3ac7c0 Stanislav Král
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
162 bbcb7c89 Stanislav Král
163
        # store the wrapper into the repository
164
        created_id = self.certificate_repository.create(certificate)
165
166
        # assign the generated ID to the inserted certificate
167
        certificate.certificate_id = created_id
168
169
        return certificate
170
171 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
172
                        issuer_key: PrivateKey,
173
                        extensions: str = "", days: int = 30, usages=None):
174 a6727aa9 Stanislav Král
        """
175
        Creates an end certificate issued by the given parent CA.
176
        :param subject_key: Private key to be used when generating the certificate
177
        :param subject: Subject to be used put into the certificate
178
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
179
        :param issuer_key: PK used to generate the issuer certificate
180
        :param extensions: Extensions to be used when generating the certificate
181
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
182
        :param days: Number of days for which the generated cert. will be considered valid
183
        :return: An instance of Certificate class representing the generated cert
184
        """
185 4c19a9b1 Stanislav Král
        if usages is None:
186
            usages = {}
187
188 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
189
        cert_id = self.certificate_repository.get_next_id()
190
191 329216fe Stanislav Král
        # generate extension configuration lines based on the specified usages
192
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
193
194 ea1229ee Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
195
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
196
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
197
198 4c19a9b1 Stanislav Král
        # generate a new certificate
199
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
200
                                                        issuer_key.private_key,
201
                                                        subject_key_pass=subject_key.password,
202
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
203 87c56935 Stanislav Král
                                                        days=days,
204
                                                        sn=cert_id
205
                                                        )
206 4c19a9b1 Stanislav Král
207
        # wrap the generated certificate using Certificate class
208 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
209 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
210
211
        created_id = self.certificate_repository.create(certificate)
212
213
        certificate.certificate_id = created_id
214
215
        return certificate
216
217 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
218 a6727aa9 Stanislav Král
        """
219
        Tries to fetch a certificate from the certificate repository using a given id.
220
        :param unique_id: ID of the certificate to be fetched
221
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
222
        certificate is not found
223
        """
224 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
225 2a90f4fd Stanislav Král
226 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
227 a6727aa9 Stanislav Král
        """
228
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
229
        certificates of the given type can be returned.
230
        :param cert_type: Type of certificates to be returned
231
        :return: List of instances of the Certificate class representing all certificates present in the certificate
232
        repository. An empty list is returned when no certificates are found.
233
        """
234 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
235 ef65f488 Stanislav Král
236
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
237 4e70d22a Stanislav Král
        """
238
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
239
        root CA certificate is found. Root certificates are excluded from the chain by default.
240
        :param from_id: ID of the first certificate to be included in the chain of trust
241
        :param to_id: ID of the last certificate to be included in the chain of trust
242
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
243
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
244
        ID
245
        """
246
        # read the first certificate of the chain
247 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
248
249 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
250 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
251
            return []
252
253
        current_cert = start_cert
254
        chain_of_trust = [current_cert]
255
256
        # TODO could possibly be simplified
257
        if start_cert.type_id == ROOT_CA_ID:
258 4e70d22a Stanislav Král
            # the first cert found is a root ca
259 ef65f488 Stanislav Král
            return chain_of_trust
260
261
        while True:
262
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
263
264 4e70d22a Stanislav Král
            # check whether parent certificate exists
265
            if parent_cert is None:
266
                break
267
268
            # check whether the found certificate is a root certificate
269
            if parent_cert.type_id == ROOT_CA_ID:
270 ef65f488 Stanislav Král
                if not exclude_root:
271 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
272 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
273
                break
274
275 4e70d22a Stanislav Král
            # append the certificate
276 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
277
278 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
279 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
280
                break
281
282
            current_cert = parent_cert
283
284
        return chain_of_trust
285 3d639744 Stanislav Král
286 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
287 3d639744 Stanislav Král
        """
288 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
289 3d639744 Stanislav Král
290
        :param unique_id: ID of specific certificate
291
        """
292 5f8a2c07 Captain_Trojan
293
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
294
        if to_delete is None:
295
            raise CertificateNotFoundException(unique_id)
296
297
        for cert in to_delete:
298
            try:
299
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
300
            except CertificateAlreadyRevokedException:
301
                # TODO log as an info/debug, not warning or above <-- perfectly legal
302 02954c9d Jan Pašek
                pass
303 5f8a2c07 Captain_Trojan
304
            self.certificate_repository.delete(cert.certificate_id)
305
            # TODO log if not successfully deleted
306 c4ba6bb7 Jan Pašek
307 485913d0 Captain_Trojan
    def get_certificates_issued_by(self, unique_id):
308
        """
309
        Returns a list of all children of a certificate identified by an unique_id.
310
        Raises a DatabaseException should any unexpected behavior occur.
311
        :param unique_id: target certificate ID
312
        :return: children of unique_id
313
        """
314
        try:
315
            if self.certificate_repository.read(unique_id) is None:
316
                raise CertificateNotFoundException(unique_id)
317
        except DatabaseException:
318
            raise CertificateNotFoundException(unique_id)
319
320
        return self.certificate_repository.get_all_issued_by(unique_id)
321
322 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
323
        """
324
        Set certificate status to 'valid' or 'revoked'.
325
        If the new status is revoked a reason can be provided -> default is unspecified
326
        :param reason: reason for revocation
327
        :param id: identifier of the certificate whose status is to be changed
328
        :param status: new status of the certificate
329
        """
330
        if status not in CERTIFICATE_STATES:
331
            raise CertificateStatusInvalidException(status)
332
        if reason not in CERTIFICATE_REVOCATION_REASONS:
333
            raise RevocationReasonInvalidException(reason)
334
335 9e6f791a Jan Pašek
        # check whether the certificate exists
336
        certificate = self.certificate_repository.read(id)
337
        if certificate is None:
338
            raise CertificateNotFoundException(id)
339
340 9c704fb1 Jan Pašek
        updated = False
341 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
342 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
343 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
344 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
345
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
346
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
347
                raise CertificateAlreadyRevokedException(id)
348
349 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
350 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
351
352
        if not updated:
353 9e6f791a Jan Pašek
            # TODO log this
354
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
355
                                   "or set_certificate_revoked().")
356 20b33bd4 Jan Pašek
357 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
358
        """
359
        Get Subject distinguished name from a Certificate
360
        :param certificate: certificate instance whose Subject shall be parsed
361
        :return: instance of Subject class containing resulting distinguished name
362
        """
363
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
364
        return subject
365 d3bfacfc Stanislav Král
366
    def get_public_key_from_certificate(self, certificate: Certificate):
367
        """
368
        Extracts a public key from the given certificate
369
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
370
        should be extracted.
371
        :return: a string containing the extracted public key in PEM format
372
        """
373
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
374 20b33bd4 Jan Pašek
375
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
376
        """
377
        Get URL address of CRL distribution endpoint based on
378
        issuer's ID
379
380
        :param ca_identifier: ID of issuing authority
381
        :return: CRL endpoint for the given CA
382
        """
383
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
384
385
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
386
        """
387
        Get URL address of OCSP distribution endpoint based on
388
        issuer's ID
389
390
        :param ca_identifier: ID of issuing authority
391
        :return: OCSP endpoint for the given CA
392
        """
393
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
394
395
396
class RevocationReasonInvalidException(Exception):
397
    """
398
    Exception that denotes that the caller was trying to revoke
399
    a certificate using an invalid revocation reason
400
    """
401
402
    def __init__(self, reason):
403
        self.reason = reason
404
405
    def __str__(self):
406
        return f"Revocation reason '{self.reason}' is not valid."
407
408
409
class CertificateStatusInvalidException(Exception):
410
    """
411
    Exception that denotes that the caller was trying to set
412
    a certificate to an invalid state
413
    """
414
415
    def __init__(self, status):
416
        self.status = status
417
418
    def __str__(self):
419
        return f"Certificate status '{self.status}' is not valid."
420 9c704fb1 Jan Pašek
421
422 9e6f791a Jan Pašek
class CertificateAlreadyRevokedException(Exception):
423
    """
424
    Exception that denotes that the caller was trying to revoke
425
    a certificate that is already revoked
426
    """
427
428
    def __init__(self, id):
429
        self.id = id
430
431
    def __str__(self):
432
        return f"Certificate id '{self.id}' is already revoked."