Projekt

Obecné

Profil

Stáhnout (18 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
9 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
10 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
11 313b647b Stanislav Král
from src.model.private_key import PrivateKey
12 4a40b0d2 Stanislav Král
from src.model.subject import Subject
13
from src.services.cryptography import CryptographyService
14
15 313b647b Stanislav Král
import time
16
17
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
18 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
19 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
20
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
21
STATUS_REVOKED = "revoked"
22
STATUS_VALID = "valid"
23 313b647b Stanislav Král
24 4a40b0d2 Stanislav Král
25
class CertificateService:
26
27 151e7604 Jan Pašek
    @inject
28 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
29
                 certificate_repository: CertificateRepository,
30
                 configuration: Configuration):
31 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
32
        self.certificate_repository = certificate_repository
33 20b33bd4 Jan Pašek
        self.configuration = configuration
34 4a40b0d2 Stanislav Král
35 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
36 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
37 2f5101f1 Stanislav Král
                       usages=None, days=30):
38 a6727aa9 Stanislav Král
        """
39
        Creates a root CA certificate based on the given parameters.
40
        :param key: Private key to be used when generating the certificate
41
        :param subject: Subject to be used put into the certificate
42
        :param config: String containing the configuration to be used
43
        :param extensions: Name of the section in the configuration representing extensions
44
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
45 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
46 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
47
        """
48 ca3ac7c0 Stanislav Král
        if usages is None:
49
            usages = {}
50
51 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
52
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
53
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
54
55 313b647b Stanislav Král
        # create a new self signed  certificate
56
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
57 2f5101f1 Stanislav Král
                                                          extensions=extensions, config=config, days=days)
58 ca3ac7c0 Stanislav Král
        # specify CA usage
59
        usages[CA_ID] = True
60
61 4c19a9b1 Stanislav Král
        # wrap into Certificate class
62 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
63 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
64 313b647b Stanislav Král
65
        # store the wrapper into the repository
66
        created_id = self.certificate_repository.create(certificate)
67
68
        # assign the generated ID to the inserted certificate
69
        certificate.certificate_id = created_id
70 4a40b0d2 Stanislav Král
71 313b647b Stanislav Král
        return certificate
72 10fab051 Stanislav Král
73 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
74
        """
75 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
76 a6727aa9 Stanislav Král
        notAfter fields.
77
        :param cert_pem: PEM of the cert. to be wrapped
78
        :param private_key_id: ID of the private key used to create the given certificate
79
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
80
        :param parent_id: ID of the CA that issued this certificate
81
        :param cert_type: Type of this certificate (see constants.py)
82
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
83
        """
84 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
85 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
86 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
87
        # format the parsed date
88
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
89
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
90
91
        # create a certificate wrapper
92 a6727aa9 Stanislav Král
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
93 4c19a9b1 Stanislav Král
                                  private_key_id, cert_type, parent_id, usages)
94
95
        return certificate
96
97 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
98
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
99 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
100 a6727aa9 Stanislav Král
        """
101
        Creates an intermediate CA certificate issued by the given parent CA.
102
        :param subject_key: Private key to be used when generating the certificate
103
        :param subject: Subject to be used put into the certificate
104
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
105
        :param issuer_key: PK used to generate the issuer certificate
106
        :param extensions: Extensions to be used when generating the certificate
107
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
108
        :param days: Number of days for which the generated cert. will be considered valid
109
        :return: An instance of Certificate class representing the generated intermediate CA cert
110
        """
111 ca3ac7c0 Stanislav Král
        if usages is None:
112
            usages = {}
113
114 bbcb7c89 Stanislav Král
        extensions = extensions + "\n" + CA_EXTENSIONS
115 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
116
        cert_id = self.certificate_repository.get_next_id()
117
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
118
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
119
120 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
121
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
122
                                                        issuer_key.private_key,
123
                                                        subject_key_pass=subject_key.password,
124
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
125
                                                        days=days)
126
127 4c19a9b1 Stanislav Král
        # specify CA usage
128
        usages[CA_ID] = True
129
130
        # wrap into Certificate class
131 a6727aa9 Stanislav Král
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
132 4c19a9b1 Stanislav Král
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
133
134 bbcb7c89 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
135
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
136
137
        # format the parsed date
138
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
139
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
140
141 ca3ac7c0 Stanislav Král
        # specify CA usage
142
        usages[CA_ID] = True
143
144 bbcb7c89 Stanislav Král
        # create a certificate wrapper
145
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
146 ca3ac7c0 Stanislav Král
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
147 bbcb7c89 Stanislav Král
148
        # store the wrapper into the repository
149
        created_id = self.certificate_repository.create(certificate)
150
151
        # assign the generated ID to the inserted certificate
152
        certificate.certificate_id = created_id
153
154
        return certificate
155
156 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
157
                        issuer_key: PrivateKey,
158
                        extensions: str = "", days: int = 30, usages=None):
159 a6727aa9 Stanislav Král
        """
160
        Creates an end certificate issued by the given parent CA.
161
        :param subject_key: Private key to be used when generating the certificate
162
        :param subject: Subject to be used put into the certificate
163
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
164
        :param issuer_key: PK used to generate the issuer certificate
165
        :param extensions: Extensions to be used when generating the certificate
166
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
167
        :param days: Number of days for which the generated cert. will be considered valid
168
        :return: An instance of Certificate class representing the generated cert
169
        """
170 4c19a9b1 Stanislav Král
        if usages is None:
171
            usages = {}
172
173
        # generate a new certificate
174
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
175
                                                        issuer_key.private_key,
176
                                                        subject_key_pass=subject_key.password,
177
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
178
                                                        days=days)
179
180
        # wrap the generated certificate using Certificate class
181 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
182 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
183
184
        created_id = self.certificate_repository.create(certificate)
185
186
        certificate.certificate_id = created_id
187
188
        return certificate
189
190 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
191 a6727aa9 Stanislav Král
        """
192
        Tries to fetch a certificate from the certificate repository using a given id.
193
        :param unique_id: ID of the certificate to be fetched
194
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
195
        certificate is not found
196
        """
197 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
198 2a90f4fd Stanislav Král
199 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
200 a6727aa9 Stanislav Král
        """
201
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
202
        certificates of the given type can be returned.
203
        :param cert_type: Type of certificates to be returned
204
        :return: List of instances of the Certificate class representing all certificates present in the certificate
205
        repository. An empty list is returned when no certificates are found.
206
        """
207 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
208 ef65f488 Stanislav Král
209
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
210 4e70d22a Stanislav Král
        """
211
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
212
        root CA certificate is found. Root certificates are excluded from the chain by default.
213
        :param from_id: ID of the first certificate to be included in the chain of trust
214
        :param to_id: ID of the last certificate to be included in the chain of trust
215
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
216
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
217
        ID
218
        """
219
        # read the first certificate of the chain
220 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
221
222 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
223 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
224
            return []
225
226
        current_cert = start_cert
227
        chain_of_trust = [current_cert]
228
229
        # TODO could possibly be simplified
230
        if start_cert.type_id == ROOT_CA_ID:
231 4e70d22a Stanislav Král
            # the first cert found is a root ca
232 ef65f488 Stanislav Král
            return chain_of_trust
233
234
        while True:
235
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
236
237 4e70d22a Stanislav Král
            # check whether parent certificate exists
238
            if parent_cert is None:
239
                break
240
241
            # check whether the found certificate is a root certificate
242
            if parent_cert.type_id == ROOT_CA_ID:
243 ef65f488 Stanislav Král
                if not exclude_root:
244 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
245 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
246
                break
247
248 4e70d22a Stanislav Král
            # append the certificate
249 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
250
251 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
252 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
253
                break
254
255
            current_cert = parent_cert
256
257
        return chain_of_trust
258 3d639744 Stanislav Král
259 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
260 3d639744 Stanislav Král
        """
261 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
262 3d639744 Stanislav Král
263
        :param unique_id: ID of specific certificate
264
        """
265 5f8a2c07 Captain_Trojan
266
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
267
        if to_delete is None:
268
            raise CertificateNotFoundException(unique_id)
269
270
        for cert in to_delete:
271
            try:
272
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
273
            except CertificateAlreadyRevokedException:
274
                # TODO log as an info/debug, not warning or above <-- perfectly legal
275
                continue
276
277
            self.certificate_repository.delete(cert.certificate_id)
278
            # TODO log if not successfully deleted
279 c4ba6bb7 Jan Pašek
280 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
281
        """
282
        Set certificate status to 'valid' or 'revoked'.
283
        If the new status is revoked a reason can be provided -> default is unspecified
284
        :param reason: reason for revocation
285
        :param id: identifier of the certificate whose status is to be changed
286
        :param status: new status of the certificate
287
        """
288
        if status not in CERTIFICATE_STATES:
289
            raise CertificateStatusInvalidException(status)
290
        if reason not in CERTIFICATE_REVOCATION_REASONS:
291
            raise RevocationReasonInvalidException(reason)
292
293 9e6f791a Jan Pašek
        # check whether the certificate exists
294
        certificate = self.certificate_repository.read(id)
295
        if certificate is None:
296
            raise CertificateNotFoundException(id)
297
298 9c704fb1 Jan Pašek
        updated = False
299 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
300 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
301 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
302 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
303
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
304
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
305
                raise CertificateAlreadyRevokedException(id)
306
307 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
308 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
309
310
        if not updated:
311 9e6f791a Jan Pašek
            # TODO log this
312
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
313
                                   "or set_certificate_revoked().")
314 20b33bd4 Jan Pašek
315 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
316
        """
317
        Get Subject distinguished name from a Certificate
318
        :param certificate: certificate instance whose Subject shall be parsed
319
        :return: instance of Subject class containing resulting distinguished name
320
        """
321
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
322
        return subject
323 d3bfacfc Stanislav Král
324
    def get_public_key_from_certificate(self, certificate: Certificate):
325
        """
326
        Extracts a public key from the given certificate
327
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
328
        should be extracted.
329
        :return: a string containing the extracted public key in PEM format
330
        """
331
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
332 20b33bd4 Jan Pašek
333
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
334
        """
335
        Get URL address of CRL distribution endpoint based on
336
        issuer's ID
337
338
        :param ca_identifier: ID of issuing authority
339
        :return: CRL endpoint for the given CA
340
        """
341
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
342
343
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
344
        """
345
        Get URL address of OCSP distribution endpoint based on
346
        issuer's ID
347
348
        :param ca_identifier: ID of issuing authority
349
        :return: OCSP endpoint for the given CA
350
        """
351
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
352
353
354
class RevocationReasonInvalidException(Exception):
355
    """
356
    Exception that denotes that the caller was trying to revoke
357
    a certificate using an invalid revocation reason
358
    """
359
360
    def __init__(self, reason):
361
        self.reason = reason
362
363
    def __str__(self):
364
        return f"Revocation reason '{self.reason}' is not valid."
365
366
367
class CertificateStatusInvalidException(Exception):
368
    """
369
    Exception that denotes that the caller was trying to set
370
    a certificate to an invalid state
371
    """
372
373
    def __init__(self, status):
374
        self.status = status
375
376
    def __str__(self):
377
        return f"Certificate status '{self.status}' is not valid."
378 9c704fb1 Jan Pašek
379
380
class CertificateNotFoundException(Exception):
381
    """
382
    Exception that denotes that the caller was trying to set
383 9e6f791a Jan Pašek
    work with non-existing certificate
384 9c704fb1 Jan Pašek
    """
385
386
    def __init__(self, id):
387
        self.id = id
388
389
    def __str__(self):
390
        return f"Certificate id '{self.id}' does not exist."
391 9e6f791a Jan Pašek
392
393
class CertificateAlreadyRevokedException(Exception):
394
    """
395
    Exception that denotes that the caller was trying to revoke
396
    a certificate that is already revoked
397
    """
398
399
    def __init__(self, id):
400
        self.id = id
401
402
    def __str__(self):
403
        return f"Certificate id '{self.id}' is already revoked."