Projekt

Obecné

Profil

Stáhnout (19.1 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
8
from src.dao.certificate_repository import CertificateRepository
9
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
10
from src.exceptions.database_exception import DatabaseException
11
from src.exceptions.unknown_exception import UnknownException
12
from src.model.certificate import Certificate
13
from src.model.private_key import PrivateKey
14
from src.model.subject import Subject
15
from src.services.cryptography import CryptographyService
16

    
17
import time
18

    
19
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
20
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
21
CRL_EXTENSION = "crlDistributionPoints=URI:"
22
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
23
STATUS_REVOKED = "revoked"
24
STATUS_VALID = "valid"
25

    
26

    
27
class CertificateService:
28

    
29
    @inject
30
    def __init__(self, cryptography_service: CryptographyService,
31
                 certificate_repository: CertificateRepository,
32
                 configuration: Configuration):
33
        self.cryptography_service = cryptography_service
34
        self.certificate_repository = certificate_repository
35
        self.configuration = configuration
36

    
37
    # TODO usages present in method parameters but not in class diagram
38
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
39
                       usages=None, days=30):
40
        """
41
        Creates a root CA certificate based on the given parameters.
42
        :param key: Private key to be used when generating the certificate
43
        :param subject: Subject to be used put into the certificate
44
        :param config: String containing the configuration to be used
45
        :param extensions: Name of the section in the configuration representing extensions
46
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
47
        :param days: Number of days for which the generated cert. will be considered valid
48
        :return: An instance of Certificate class representing the generated root CA cert
49
        """
50
        if usages is None:
51
            usages = {}
52

    
53
        cert_id = self.certificate_repository.get_next_id()
54

    
55
        # create a new self signed  certificate
56
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
57
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
58
        # specify CA usage
59
        usages[CA_ID] = True
60

    
61
        # wrap into Certificate class
62
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
63
                                            ROOT_CA_ID)
64

    
65
        # store the wrapper into the repository
66
        created_id = self.certificate_repository.create(certificate)
67

    
68
        # assign the generated ID to the inserted certificate
69
        certificate.certificate_id = created_id
70

    
71
        return certificate
72

    
73
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
74
        """
75
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
76
        notAfter fields.
77
        :param cert_pem: PEM of the cert. to be wrapped
78
        :param private_key_id: ID of the private key used to create the given certificate
79
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
80
        :param parent_id: ID of the CA that issued this certificate
81
        :param cert_type: Type of this certificate (see constants.py)
82
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
83
        """
84
        # parse the generated pem for subject and notBefore/notAfter fields
85
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
86
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
87
        # format the parsed date
88
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
89
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
90

    
91
        # create a certificate wrapper
92
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
93
                                  private_key_id, cert_type, parent_id, usages)
94

    
95
        return certificate
96

    
97
    # TODO config parameter present in class diagram but not here (unused)
98
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
99
                  extensions: str = "", days: int = 30, usages=None):
100
        """
101
        Creates an intermediate CA certificate issued by the given parent CA.
102
        :param subject_key: Private key to be used when generating the certificate
103
        :param subject: Subject to be used put into the certificate
104
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
105
        :param issuer_key: PK used to generate the issuer certificate
106
        :param extensions: Extensions to be used when generating the certificate
107
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
108
        :param days: Number of days for which the generated cert. will be considered valid
109
        :return: An instance of Certificate class representing the generated intermediate CA cert
110
        """
111
        if usages is None:
112
            usages = {}
113

    
114
        extensions = extensions + "\n" + CA_EXTENSIONS
115
        # Add CRL and OCSP distribution point to certificate extensions
116
        cert_id = self.certificate_repository.get_next_id()
117
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
118
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
119

    
120
        # TODO implement AIA URI via extensions
121
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
122
                                                        issuer_key.private_key,
123
                                                        subject_key_pass=subject_key.password,
124
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
125
                                                        days=days,
126
                                                        sn=cert_id)
127

    
128
        # specify CA usage
129
        usages[CA_ID] = True
130

    
131
        # wrap into Certificate class
132
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
133
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
134

    
135
        # parse the generated pem for subject and notBefore/notAfter fields
136
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
137

    
138
        # format the parsed date
139
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
140
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
141

    
142
        # specify CA usage
143
        usages[CA_ID] = True
144

    
145
        # create a certificate wrapper
146
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
147
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
148

    
149
        # store the wrapper into the repository
150
        created_id = self.certificate_repository.create(certificate)
151

    
152
        # assign the generated ID to the inserted certificate
153
        certificate.certificate_id = created_id
154

    
155
        return certificate
156

    
157
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
158
                        issuer_key: PrivateKey,
159
                        extensions: str = "", days: int = 30, usages=None):
160
        """
161
        Creates an end certificate issued by the given parent CA.
162
        :param subject_key: Private key to be used when generating the certificate
163
        :param subject: Subject to be used put into the certificate
164
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
165
        :param issuer_key: PK used to generate the issuer certificate
166
        :param extensions: Extensions to be used when generating the certificate
167
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
168
        :param days: Number of days for which the generated cert. will be considered valid
169
        :return: An instance of Certificate class representing the generated cert
170
        """
171
        if usages is None:
172
            usages = {}
173

    
174
        # get the next certificate ID in order to be able to specify the serial number
175
        cert_id = self.certificate_repository.get_next_id()
176

    
177
        # Add CRL and OCSP distribution point to certificate extensions
178
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
179
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
180

    
181
        # generate a new certificate
182
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
183
                                                        issuer_key.private_key,
184
                                                        subject_key_pass=subject_key.password,
185
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
186
                                                        days=days,
187
                                                        sn=cert_id
188
                                                        )
189

    
190
        # wrap the generated certificate using Certificate class
191
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
192
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
193

    
194
        created_id = self.certificate_repository.create(certificate)
195

    
196
        certificate.certificate_id = created_id
197

    
198
        return certificate
199

    
200
    def get_certificate(self, unique_id: int) -> Certificate:
201
        """
202
        Tries to fetch a certificate from the certificate repository using a given id.
203
        :param unique_id: ID of the certificate to be fetched
204
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
205
        certificate is not found
206
        """
207
        return self.certificate_repository.read(unique_id)
208

    
209
    def get_certificates(self, cert_type=None) -> List[Certificate]:
210
        """
211
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
212
        certificates of the given type can be returned.
213
        :param cert_type: Type of certificates to be returned
214
        :return: List of instances of the Certificate class representing all certificates present in the certificate
215
        repository. An empty list is returned when no certificates are found.
216
        """
217
        return self.certificate_repository.read_all(cert_type)
218

    
219
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
220
        """
221
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
222
        root CA certificate is found. Root certificates are excluded from the chain by default.
223
        :param from_id: ID of the first certificate to be included in the chain of trust
224
        :param to_id: ID of the last certificate to be included in the chain of trust
225
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
226
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
227
        ID
228
        """
229
        # read the first certificate of the chain
230
        start_cert = self.certificate_repository.read(from_id)
231

    
232
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
233
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
234
            return []
235

    
236
        current_cert = start_cert
237
        chain_of_trust = [current_cert]
238

    
239
        # TODO could possibly be simplified
240
        if start_cert.type_id == ROOT_CA_ID:
241
            # the first cert found is a root ca
242
            return chain_of_trust
243

    
244
        while True:
245
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
246

    
247
            # check whether parent certificate exists
248
            if parent_cert is None:
249
                break
250

    
251
            # check whether the found certificate is a root certificate
252
            if parent_cert.type_id == ROOT_CA_ID:
253
                if not exclude_root:
254
                    # append the found root cert only if root certificates should not be excluded from the CoT
255
                    chain_of_trust.append(parent_cert)
256
                break
257

    
258
            # append the certificate
259
            chain_of_trust.append(parent_cert)
260

    
261
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
262
            if parent_cert.certificate_id == to_id:
263
                break
264

    
265
            current_cert = parent_cert
266

    
267
        return chain_of_trust
268

    
269
    def delete_certificate(self, unique_id):
270
        """
271
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
272

    
273
        :param unique_id: ID of specific certificate
274
        """
275

    
276
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
277
        if to_delete is None:
278
            raise CertificateNotFoundException(unique_id)
279

    
280
        for cert in to_delete:
281
            try:
282
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
283
            except CertificateAlreadyRevokedException:
284
                # TODO log as an info/debug, not warning or above <-- perfectly legal
285
                continue
286

    
287
            self.certificate_repository.delete(cert.certificate_id)
288
            # TODO log if not successfully deleted
289

    
290
    def get_certificates_issued_by(self, unique_id):
291
        """
292
        Returns a list of all children of a certificate identified by an unique_id.
293
        Raises a DatabaseException should any unexpected behavior occur.
294
        :param unique_id: target certificate ID
295
        :return: children of unique_id
296
        """
297
        try:
298
            if self.certificate_repository.read(unique_id) is None:
299
                raise CertificateNotFoundException(unique_id)
300
        except DatabaseException:
301
            raise CertificateNotFoundException(unique_id)
302

    
303
        return self.certificate_repository.get_all_issued_by(unique_id)
304

    
305
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
306
        """
307
        Set certificate status to 'valid' or 'revoked'.
308
        If the new status is revoked a reason can be provided -> default is unspecified
309
        :param reason: reason for revocation
310
        :param id: identifier of the certificate whose status is to be changed
311
        :param status: new status of the certificate
312
        """
313
        if status not in CERTIFICATE_STATES:
314
            raise CertificateStatusInvalidException(status)
315
        if reason not in CERTIFICATE_REVOCATION_REASONS:
316
            raise RevocationReasonInvalidException(reason)
317

    
318
        # check whether the certificate exists
319
        certificate = self.certificate_repository.read(id)
320
        if certificate is None:
321
            raise CertificateNotFoundException(id)
322

    
323
        updated = False
324
        if status == STATUS_VALID:
325
            updated = self.certificate_repository.clear_certificate_revocation(id)
326
        elif status == STATUS_REVOKED:
327
            # check if the certificate is not revoked already
328
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
329
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
330
                raise CertificateAlreadyRevokedException(id)
331

    
332
            revocation_timestamp = int(time.time())
333
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
334

    
335
        if not updated:
336
            # TODO log this
337
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
338
                                   "or set_certificate_revoked().")
339

    
340
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
341
        """
342
        Get Subject distinguished name from a Certificate
343
        :param certificate: certificate instance whose Subject shall be parsed
344
        :return: instance of Subject class containing resulting distinguished name
345
        """
346
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
347
        return subject
348

    
349
    def get_public_key_from_certificate(self, certificate: Certificate):
350
        """
351
        Extracts a public key from the given certificate
352
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
353
        should be extracted.
354
        :return: a string containing the extracted public key in PEM format
355
        """
356
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
357

    
358
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
359
        """
360
        Get URL address of CRL distribution endpoint based on
361
        issuer's ID
362

    
363
        :param ca_identifier: ID of issuing authority
364
        :return: CRL endpoint for the given CA
365
        """
366
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
367

    
368
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
369
        """
370
        Get URL address of OCSP distribution endpoint based on
371
        issuer's ID
372

    
373
        :param ca_identifier: ID of issuing authority
374
        :return: OCSP endpoint for the given CA
375
        """
376
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
377

    
378

    
379
class RevocationReasonInvalidException(Exception):
380
    """
381
    Exception that denotes that the caller was trying to revoke
382
    a certificate using an invalid revocation reason
383
    """
384

    
385
    def __init__(self, reason):
386
        self.reason = reason
387

    
388
    def __str__(self):
389
        return f"Revocation reason '{self.reason}' is not valid."
390

    
391

    
392
class CertificateStatusInvalidException(Exception):
393
    """
394
    Exception that denotes that the caller was trying to set
395
    a certificate to an invalid state
396
    """
397

    
398
    def __init__(self, status):
399
        self.status = status
400

    
401
    def __str__(self):
402
        return f"Certificate status '{self.status}' is not valid."
403

    
404

    
405
class CertificateAlreadyRevokedException(Exception):
406
    """
407
    Exception that denotes that the caller was trying to revoke
408
    a certificate that is already revoked
409
    """
410

    
411
    def __init__(self, id):
412
        self.id = id
413

    
414
    def __str__(self):
415
        return f"Certificate id '{self.id}' is already revoked."
(2-2/4)