Projekt

Obecné

Profil

Stáhnout (18.9 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
9 8ff50e4e Jan Pašek
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
11 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
12 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
13 313b647b Stanislav Král
from src.model.private_key import PrivateKey
14 4a40b0d2 Stanislav Král
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16
17 313b647b Stanislav Král
import time
18
19 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
20 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
21 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
22
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
23
STATUS_REVOKED = "revoked"
24
STATUS_VALID = "valid"
25 313b647b Stanislav Král
26 4a40b0d2 Stanislav Král
27
class CertificateService:
28
29 151e7604 Jan Pašek
    @inject
30 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
31
                 certificate_repository: CertificateRepository,
32
                 configuration: Configuration):
33 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
34
        self.certificate_repository = certificate_repository
35 20b33bd4 Jan Pašek
        self.configuration = configuration
36 4a40b0d2 Stanislav Král
37 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
38 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
39 2f5101f1 Stanislav Král
                       usages=None, days=30):
40 a6727aa9 Stanislav Král
        """
41
        Creates a root CA certificate based on the given parameters.
42
        :param key: Private key to be used when generating the certificate
43
        :param subject: Subject to be used put into the certificate
44
        :param config: String containing the configuration to be used
45
        :param extensions: Name of the section in the configuration representing extensions
46
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
47 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
48 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
49
        """
50 ca3ac7c0 Stanislav Král
        if usages is None:
51
            usages = {}
52
53 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
54
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
55
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
56
57 313b647b Stanislav Král
        # create a new self signed  certificate
58
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
59 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
60 ca3ac7c0 Stanislav Král
        # specify CA usage
61
        usages[CA_ID] = True
62
63 4c19a9b1 Stanislav Král
        # wrap into Certificate class
64 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
65 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
66 313b647b Stanislav Král
67
        # store the wrapper into the repository
68
        created_id = self.certificate_repository.create(certificate)
69
70
        # assign the generated ID to the inserted certificate
71
        certificate.certificate_id = created_id
72 4a40b0d2 Stanislav Král
73 313b647b Stanislav Král
        return certificate
74 10fab051 Stanislav Král
75 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
76
        """
77 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
78 a6727aa9 Stanislav Král
        notAfter fields.
79
        :param cert_pem: PEM of the cert. to be wrapped
80
        :param private_key_id: ID of the private key used to create the given certificate
81
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
82
        :param parent_id: ID of the CA that issued this certificate
83
        :param cert_type: Type of this certificate (see constants.py)
84
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
85
        """
86 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
87 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
88 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
89
        # format the parsed date
90 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
91
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
92 4c19a9b1 Stanislav Král
93
        # create a certificate wrapper
94 a6727aa9 Stanislav Král
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
95 4c19a9b1 Stanislav Král
                                  private_key_id, cert_type, parent_id, usages)
96
97
        return certificate
98
99 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
100
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
101 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
102 a6727aa9 Stanislav Král
        """
103
        Creates an intermediate CA certificate issued by the given parent CA.
104
        :param subject_key: Private key to be used when generating the certificate
105
        :param subject: Subject to be used put into the certificate
106
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
107
        :param issuer_key: PK used to generate the issuer certificate
108
        :param extensions: Extensions to be used when generating the certificate
109
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
110
        :param days: Number of days for which the generated cert. will be considered valid
111
        :return: An instance of Certificate class representing the generated intermediate CA cert
112
        """
113 ca3ac7c0 Stanislav Král
        if usages is None:
114
            usages = {}
115
116 bbcb7c89 Stanislav Král
        extensions = extensions + "\n" + CA_EXTENSIONS
117 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
118
        cert_id = self.certificate_repository.get_next_id()
119
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
120
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
121
122 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
123
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
124
                                                        issuer_key.private_key,
125
                                                        subject_key_pass=subject_key.password,
126
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
127 87c56935 Stanislav Král
                                                        days=days,
128
                                                        sn=cert_id)
129 bbcb7c89 Stanislav Král
130 4c19a9b1 Stanislav Král
        # specify CA usage
131
        usages[CA_ID] = True
132
133
        # wrap into Certificate class
134 a6727aa9 Stanislav Král
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
135 4c19a9b1 Stanislav Král
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
136
137 bbcb7c89 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
138
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
139
140
        # format the parsed date
141 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
142
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
143 bbcb7c89 Stanislav Král
144 ca3ac7c0 Stanislav Král
        # specify CA usage
145
        usages[CA_ID] = True
146
147 bbcb7c89 Stanislav Král
        # create a certificate wrapper
148
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
149 ca3ac7c0 Stanislav Král
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
150 bbcb7c89 Stanislav Král
151
        # store the wrapper into the repository
152
        created_id = self.certificate_repository.create(certificate)
153
154
        # assign the generated ID to the inserted certificate
155
        certificate.certificate_id = created_id
156
157
        return certificate
158
159 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
160
                        issuer_key: PrivateKey,
161
                        extensions: str = "", days: int = 30, usages=None):
162 a6727aa9 Stanislav Král
        """
163
        Creates an end certificate issued by the given parent CA.
164
        :param subject_key: Private key to be used when generating the certificate
165
        :param subject: Subject to be used put into the certificate
166
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
167
        :param issuer_key: PK used to generate the issuer certificate
168
        :param extensions: Extensions to be used when generating the certificate
169
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
170
        :param days: Number of days for which the generated cert. will be considered valid
171
        :return: An instance of Certificate class representing the generated cert
172
        """
173 4c19a9b1 Stanislav Král
        if usages is None:
174
            usages = {}
175
176 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
177
        cert_id = self.certificate_repository.get_next_id()
178
179 4c19a9b1 Stanislav Král
        # generate a new certificate
180
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
181
                                                        issuer_key.private_key,
182
                                                        subject_key_pass=subject_key.password,
183
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
184 87c56935 Stanislav Král
                                                        days=days,
185
                                                        sn=cert_id
186
                                                        )
187 4c19a9b1 Stanislav Král
188
        # wrap the generated certificate using Certificate class
189 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
190 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
191
192
        created_id = self.certificate_repository.create(certificate)
193
194
        certificate.certificate_id = created_id
195
196
        return certificate
197
198 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
199 a6727aa9 Stanislav Král
        """
200
        Tries to fetch a certificate from the certificate repository using a given id.
201
        :param unique_id: ID of the certificate to be fetched
202
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
203
        certificate is not found
204
        """
205 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
206 2a90f4fd Stanislav Král
207 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
208 a6727aa9 Stanislav Král
        """
209
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
210
        certificates of the given type can be returned.
211
        :param cert_type: Type of certificates to be returned
212
        :return: List of instances of the Certificate class representing all certificates present in the certificate
213
        repository. An empty list is returned when no certificates are found.
214
        """
215 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
216 ef65f488 Stanislav Král
217
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
218 4e70d22a Stanislav Král
        """
219
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
220
        root CA certificate is found. Root certificates are excluded from the chain by default.
221
        :param from_id: ID of the first certificate to be included in the chain of trust
222
        :param to_id: ID of the last certificate to be included in the chain of trust
223
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
224
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
225
        ID
226
        """
227
        # read the first certificate of the chain
228 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
229
230 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
231 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
232
            return []
233
234
        current_cert = start_cert
235
        chain_of_trust = [current_cert]
236
237
        # TODO could possibly be simplified
238
        if start_cert.type_id == ROOT_CA_ID:
239 4e70d22a Stanislav Král
            # the first cert found is a root ca
240 ef65f488 Stanislav Král
            return chain_of_trust
241
242
        while True:
243
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
244
245 4e70d22a Stanislav Král
            # check whether parent certificate exists
246
            if parent_cert is None:
247
                break
248
249
            # check whether the found certificate is a root certificate
250
            if parent_cert.type_id == ROOT_CA_ID:
251 ef65f488 Stanislav Král
                if not exclude_root:
252 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
253 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
254
                break
255
256 4e70d22a Stanislav Král
            # append the certificate
257 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
258
259 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
260 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
261
                break
262
263
            current_cert = parent_cert
264
265
        return chain_of_trust
266 3d639744 Stanislav Král
267 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
268 3d639744 Stanislav Král
        """
269 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
270 3d639744 Stanislav Král
271
        :param unique_id: ID of specific certificate
272
        """
273 5f8a2c07 Captain_Trojan
274
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
275
        if to_delete is None:
276
            raise CertificateNotFoundException(unique_id)
277
278
        for cert in to_delete:
279
            try:
280
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
281
            except CertificateAlreadyRevokedException:
282
                # TODO log as an info/debug, not warning or above <-- perfectly legal
283
                continue
284
285
            self.certificate_repository.delete(cert.certificate_id)
286
            # TODO log if not successfully deleted
287 c4ba6bb7 Jan Pašek
288 485913d0 Captain_Trojan
    def get_certificates_issued_by(self, unique_id):
289
        """
290
        Returns a list of all children of a certificate identified by an unique_id.
291
        Raises a DatabaseException should any unexpected behavior occur.
292
        :param unique_id: target certificate ID
293
        :return: children of unique_id
294
        """
295
        try:
296
            if self.certificate_repository.read(unique_id) is None:
297
                raise CertificateNotFoundException(unique_id)
298
        except DatabaseException:
299
            raise CertificateNotFoundException(unique_id)
300
301
        return self.certificate_repository.get_all_issued_by(unique_id)
302
303 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
304
        """
305
        Set certificate status to 'valid' or 'revoked'.
306
        If the new status is revoked a reason can be provided -> default is unspecified
307
        :param reason: reason for revocation
308
        :param id: identifier of the certificate whose status is to be changed
309
        :param status: new status of the certificate
310
        """
311
        if status not in CERTIFICATE_STATES:
312
            raise CertificateStatusInvalidException(status)
313
        if reason not in CERTIFICATE_REVOCATION_REASONS:
314
            raise RevocationReasonInvalidException(reason)
315
316 9e6f791a Jan Pašek
        # check whether the certificate exists
317
        certificate = self.certificate_repository.read(id)
318
        if certificate is None:
319
            raise CertificateNotFoundException(id)
320
321 9c704fb1 Jan Pašek
        updated = False
322 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
323 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
324 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
325 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
326
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
327
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
328
                raise CertificateAlreadyRevokedException(id)
329
330 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
331 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
332
333
        if not updated:
334 9e6f791a Jan Pašek
            # TODO log this
335
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
336
                                   "or set_certificate_revoked().")
337 20b33bd4 Jan Pašek
338 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
339
        """
340
        Get Subject distinguished name from a Certificate
341
        :param certificate: certificate instance whose Subject shall be parsed
342
        :return: instance of Subject class containing resulting distinguished name
343
        """
344
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
345
        return subject
346 d3bfacfc Stanislav Král
347
    def get_public_key_from_certificate(self, certificate: Certificate):
348
        """
349
        Extracts a public key from the given certificate
350
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
351
        should be extracted.
352
        :return: a string containing the extracted public key in PEM format
353
        """
354
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
355 20b33bd4 Jan Pašek
356
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
357
        """
358
        Get URL address of CRL distribution endpoint based on
359
        issuer's ID
360
361
        :param ca_identifier: ID of issuing authority
362
        :return: CRL endpoint for the given CA
363
        """
364
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
365
366
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
367
        """
368
        Get URL address of OCSP distribution endpoint based on
369
        issuer's ID
370
371
        :param ca_identifier: ID of issuing authority
372
        :return: OCSP endpoint for the given CA
373
        """
374
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
375
376
377
class RevocationReasonInvalidException(Exception):
378
    """
379
    Exception that denotes that the caller was trying to revoke
380
    a certificate using an invalid revocation reason
381
    """
382
383
    def __init__(self, reason):
384
        self.reason = reason
385
386
    def __str__(self):
387
        return f"Revocation reason '{self.reason}' is not valid."
388
389
390
class CertificateStatusInvalidException(Exception):
391
    """
392
    Exception that denotes that the caller was trying to set
393
    a certificate to an invalid state
394
    """
395
396
    def __init__(self, status):
397
        self.status = status
398
399
    def __str__(self):
400
        return f"Certificate status '{self.status}' is not valid."
401 9c704fb1 Jan Pašek
402
403 9e6f791a Jan Pašek
class CertificateAlreadyRevokedException(Exception):
404
    """
405
    Exception that denotes that the caller was trying to revoke
406
    a certificate that is already revoked
407
    """
408
409
    def __init__(self, id):
410
        self.id = id
411
412
    def __str__(self):
413
        return f"Certificate id '{self.id}' is already revoked."