Projekt

Obecné

Profil

Stáhnout (28.9 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7 c15357a9 Jan Pašek
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8 7855d234 Captain_Trojan
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
11 8ff50e4e Jan Pašek
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
13 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
14 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
15 313b647b Stanislav Král
from src.model.private_key import PrivateKey
16 4a40b0d2 Stanislav Král
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18
19 313b647b Stanislav Král
import time
20
21 329216fe Stanislav Král
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
22
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
23
    CLIENT_AUTH
24
25 b3c80ccb David Friesecký
from src.utils.logger import Logger
26
27 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
28 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
29 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
30
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
31
STATUS_REVOKED = "revoked"
32
STATUS_VALID = "valid"
33 313b647b Stanislav Král
34 329216fe Stanislav Král
# define which flags are required for various usages
35
REQUIRED_USAGE_EXTENSION_FLAGS = {
36
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
37
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
38 52f2eca4 Jan Pašek
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
39 329216fe Stanislav Král
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
40
41 4a40b0d2 Stanislav Král
42
class CertificateService:
43
44 151e7604 Jan Pašek
    @inject
45 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
46
                 certificate_repository: CertificateRepository,
47
                 configuration: Configuration):
48 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
49
        self.certificate_repository = certificate_repository
50 20b33bd4 Jan Pašek
        self.configuration = configuration
51 4a40b0d2 Stanislav Král
52 7a30da1b Stanislav Král
    @staticmethod
53
    def __check_subject_is_valid(subject: Subject):
54
        if subject.country is not None and len(subject.country) != 2:
55
            raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters.")
56
57 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
58 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
59 2f5101f1 Stanislav Král
                       usages=None, days=30):
60 a6727aa9 Stanislav Král
        """
61
        Creates a root CA certificate based on the given parameters.
62
        :param key: Private key to be used when generating the certificate
63
        :param subject: Subject to be used put into the certificate
64
        :param config: String containing the configuration to be used
65
        :param extensions: Name of the section in the configuration representing extensions
66
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
67 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
68 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
69
        """
70 b3c80ccb David Friesecký
71
        Logger.debug("Function launched.")
72
73 7a30da1b Stanislav Král
        # check whether the given subject is valid
74
        self.__check_subject_is_valid(subject)
75
76 ca3ac7c0 Stanislav Král
        if usages is None:
77
            usages = {}
78
79 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
80
81 329216fe Stanislav Král
        # specify CA usage
82
        usages[CA_ID] = True
83
84
        # generate extension configuration lines based on the specified usages
85
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
86
87 313b647b Stanislav Král
        # create a new self signed  certificate
88
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
89 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
90 ca3ac7c0 Stanislav Král
91 4c19a9b1 Stanislav Král
        # wrap into Certificate class
92 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
93 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
94 313b647b Stanislav Král
95
        # store the wrapper into the repository
96
        created_id = self.certificate_repository.create(certificate)
97
98
        # assign the generated ID to the inserted certificate
99
        certificate.certificate_id = created_id
100 4a40b0d2 Stanislav Král
101 313b647b Stanislav Král
        return certificate
102 10fab051 Stanislav Král
103 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
104
        """
105 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
106 a6727aa9 Stanislav Král
        notAfter fields.
107
        :param cert_pem: PEM of the cert. to be wrapped
108
        :param private_key_id: ID of the private key used to create the given certificate
109
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
110
        :param parent_id: ID of the CA that issued this certificate
111
        :param cert_type: Type of this certificate (see constants.py)
112
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
113
        """
114 b3c80ccb David Friesecký
115
        Logger.debug("Function launched.")
116
117 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
118 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
119 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
120
        # format the parsed date
121 7313994f Stanislav Král
        not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
122
        not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
123 4c19a9b1 Stanislav Král
124
        # create a certificate wrapper
125 894cc2cd David Friesecký
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
126
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
127
                                  subj.organization, subj.organization_unit, subj.email_address)
128 4c19a9b1 Stanislav Král
129
        return certificate
130
131 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
132
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
133 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
134 a6727aa9 Stanislav Král
        """
135
        Creates an intermediate CA certificate issued by the given parent CA.
136
        :param subject_key: Private key to be used when generating the certificate
137
        :param subject: Subject to be used put into the certificate
138
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
139
        :param issuer_key: PK used to generate the issuer certificate
140
        :param extensions: Extensions to be used when generating the certificate
141
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
142
        :param days: Number of days for which the generated cert. will be considered valid
143
        :return: An instance of Certificate class representing the generated intermediate CA cert
144
        """
145 b3c80ccb David Friesecký
146
        Logger.debug("Function launched.")
147
148 7a30da1b Stanislav Král
        # check whether the given subject is valid
149
        self.__check_subject_is_valid(subject)
150
151 ca3ac7c0 Stanislav Král
        if usages is None:
152
            usages = {}
153
154 329216fe Stanislav Král
        # specify CA usage
155
        usages[CA_ID] = True
156
157
        # generate extension configuration lines based on the specified usages
158
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
159
160 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
161
        cert_id = self.certificate_repository.get_next_id()
162 ea1229ee Jan Pašek
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
163
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
164 20b33bd4 Jan Pašek
165 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
166
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
167
                                                        issuer_key.private_key,
168
                                                        subject_key_pass=subject_key.password,
169
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
170 87c56935 Stanislav Král
                                                        days=days,
171
                                                        sn=cert_id)
172 bbcb7c89 Stanislav Král
173 4c19a9b1 Stanislav Král
        # wrap into Certificate class
174 894cc2cd David Friesecký
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
175
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
176 bbcb7c89 Stanislav Král
177
        # store the wrapper into the repository
178
        created_id = self.certificate_repository.create(certificate)
179
180
        # assign the generated ID to the inserted certificate
181
        certificate.certificate_id = created_id
182
183
        return certificate
184
185 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
186
                        issuer_key: PrivateKey,
187
                        extensions: str = "", days: int = 30, usages=None):
188 a6727aa9 Stanislav Král
        """
189
        Creates an end certificate issued by the given parent CA.
190
        :param subject_key: Private key to be used when generating the certificate
191
        :param subject: Subject to be used put into the certificate
192
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
193
        :param issuer_key: PK used to generate the issuer certificate
194
        :param extensions: Extensions to be used when generating the certificate
195
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
196
        :param days: Number of days for which the generated cert. will be considered valid
197
        :return: An instance of Certificate class representing the generated cert
198
        """
199 b3c80ccb David Friesecký
200
        Logger.debug("Function launched.")
201
202 7a30da1b Stanislav Král
        # check whether the given subject is valid
203
        self.__check_subject_is_valid(subject)
204
205 4c19a9b1 Stanislav Král
        if usages is None:
206
            usages = {}
207
208 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
209
        cert_id = self.certificate_repository.get_next_id()
210
211 329216fe Stanislav Král
        # generate extension configuration lines based on the specified usages
212
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
213
214 ea1229ee Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
215
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
216
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
217
218 4c19a9b1 Stanislav Král
        # generate a new certificate
219
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
220
                                                        issuer_key.private_key,
221
                                                        subject_key_pass=subject_key.password,
222
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
223 87c56935 Stanislav Král
                                                        days=days,
224
                                                        sn=cert_id
225
                                                        )
226 4c19a9b1 Stanislav Král
227
        # wrap the generated certificate using Certificate class
228 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
229 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
230
231
        created_id = self.certificate_repository.create(certificate)
232
233
        certificate.certificate_id = created_id
234
235
        return certificate
236
237 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
238 a6727aa9 Stanislav Král
        """
239
        Tries to fetch a certificate from the certificate repository using a given id.
240
        :param unique_id: ID of the certificate to be fetched
241
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
242
        certificate is not found
243
        """
244 b3c80ccb David Friesecký
245
        Logger.debug("Function launched.")
246
247 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
248 2a90f4fd Stanislav Král
249 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
250 a6727aa9 Stanislav Král
        """
251
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
252
        certificates of the given type can be returned.
253
        :param cert_type: Type of certificates to be returned
254
        :return: List of instances of the Certificate class representing all certificates present in the certificate
255
        repository. An empty list is returned when no certificates are found.
256
        """
257 b3c80ccb David Friesecký
258
        Logger.debug("Function launched.")
259
260 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
261 ef65f488 Stanislav Král
262 67723931 Captain_Trojan
    def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
263
        """
264
        Tries to fetch a filtered list of all certificates from the certificate repository.
265
        :param target_types: certificate types (filter)
266
        :param target_usages: certificate usages (filter)
267
        :param target_cn_substring: certificate CN substring (filter)
268
        :param page: target page
269
        :param per_page: target page size
270
        :return: List of instances of the Certificate class representing filtered certificates
271
                    present in the certificate repository. An empty list is returned when no certificates are found.
272
        """
273
        return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
274
                                                           page, per_page)
275
276 ef65f488 Stanislav Král
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
277 4e70d22a Stanislav Král
        """
278
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
279
        root CA certificate is found. Root certificates are excluded from the chain by default.
280
        :param from_id: ID of the first certificate to be included in the chain of trust
281
        :param to_id: ID of the last certificate to be included in the chain of trust
282
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
283
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
284
        ID
285
        """
286 b3c80ccb David Friesecký
287
        Logger.debug("Function launched.")
288
289 4e70d22a Stanislav Král
        # read the first certificate of the chain
290 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
291
292 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
293 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
294
            return []
295
296
        current_cert = start_cert
297
        chain_of_trust = [current_cert]
298
299
        # TODO could possibly be simplified
300
        if start_cert.type_id == ROOT_CA_ID:
301 4e70d22a Stanislav Král
            # the first cert found is a root ca
302 ef65f488 Stanislav Král
            return chain_of_trust
303
304
        while True:
305
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
306
307 4e70d22a Stanislav Král
            # check whether parent certificate exists
308
            if parent_cert is None:
309
                break
310
311
            # check whether the found certificate is a root certificate
312
            if parent_cert.type_id == ROOT_CA_ID:
313 ef65f488 Stanislav Král
                if not exclude_root:
314 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
315 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
316
                break
317
318 4e70d22a Stanislav Král
            # append the certificate
319 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
320
321 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
322 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
323
                break
324
325
            current_cert = parent_cert
326
327
        return chain_of_trust
328 3d639744 Stanislav Král
329 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
330 3d639744 Stanislav Král
        """
331 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
332 3d639744 Stanislav Král
333
        :param unique_id: ID of specific certificate
334
        """
335 5f8a2c07 Captain_Trojan
336 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
337
338 5f8a2c07 Captain_Trojan
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
339
        if to_delete is None:
340 b3c80ccb David Friesecký
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
341 5f8a2c07 Captain_Trojan
            raise CertificateNotFoundException(unique_id)
342
343
        for cert in to_delete:
344
            try:
345
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
346
            except CertificateAlreadyRevokedException:
347 b3c80ccb David Friesecký
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
348 5f8a2c07 Captain_Trojan
                # TODO log as an info/debug, not warning or above <-- perfectly legal
349 02954c9d Jan Pašek
                pass
350 5f8a2c07 Captain_Trojan
351 b3c80ccb David Friesecký
            if not self.certificate_repository.delete(cert.certificate_id):
352
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
353
354 c839facb Captain_Trojan
    def get_certificates_issued_by(self, issuer_id):
355 485913d0 Captain_Trojan
        """
356
        Returns a list of all children of a certificate identified by an unique_id.
357
        Raises a DatabaseException should any unexpected behavior occur.
358 c839facb Captain_Trojan
        :param issuer_id: target certificate ID
359 485913d0 Captain_Trojan
        :return: children of unique_id
360
        """
361 b3c80ccb David Friesecký
362
        Logger.debug("Function launched.")
363
364 485913d0 Captain_Trojan
        try:
365 c839facb Captain_Trojan
            if self.certificate_repository.read(issuer_id) is None:
366
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
367
                raise CertificateNotFoundException(issuer_id)
368 485913d0 Captain_Trojan
        except DatabaseException:
369 c839facb Captain_Trojan
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
370
            raise CertificateNotFoundException(issuer_id)
371
372
        return self.certificate_repository.get_all_issued_by(issuer_id)
373 485913d0 Captain_Trojan
374 c839facb Captain_Trojan
    def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
375
                                          per_page):
376
        """
377
        Returns a list of all children of a certificate identified by an unique_id.
378
        Filters the results according to target types, usages, cn substring and pagination.
379
        Raises a DatabaseException should any unexpected behavior occur.
380
        :param issuer_id: target certificate ID
381
        :param target_types: filter of types
382
        :param target_usages: filter of usages
383
        :param target_cn_substring: CN substring
384
        :param page: target page or None
385
        :param per_page: certs per page or None
386
        :return: list of certificates (a page if specified)
387
        """
388 485913d0 Captain_Trojan
389 c839facb Captain_Trojan
        Logger.debug("Function launched.")
390
391
        try:
392
            if self.certificate_repository.read(issuer_id) is None:
393
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
394
                raise CertificateNotFoundException(issuer_id)
395
        except DatabaseException:
396
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
397
            raise CertificateNotFoundException(issuer_id)
398 485913d0 Captain_Trojan
399 67723931 Captain_Trojan
        return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
400
                                                                    target_cn_substring, page, per_page)
401 485913d0 Captain_Trojan
402 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
403
        """
404
        Set certificate status to 'valid' or 'revoked'.
405
        If the new status is revoked a reason can be provided -> default is unspecified
406
        :param reason: reason for revocation
407
        :param id: identifier of the certificate whose status is to be changed
408
        :param status: new status of the certificate
409 94f8d5cf Jan Pašek
        :raises CertificateStatusInvalidException: if status is not valid
410
        :raises RevocationReasonInvalidException: if reason is not valid
411
        :raises CertificateNotFoundException: if certificate with given id cannot be found
412
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
413
                it cannot be set revalidated
414
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
415
        :raises UnknownException: if the database is corrupted
416 20b33bd4 Jan Pašek
        """
417 b3c80ccb David Friesecký
418
        Logger.debug("Function launched.")
419
420 20b33bd4 Jan Pašek
        if status not in CERTIFICATE_STATES:
421 b3c80ccb David Friesecký
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
422 20b33bd4 Jan Pašek
            raise CertificateStatusInvalidException(status)
423
        if reason not in CERTIFICATE_REVOCATION_REASONS:
424 b3c80ccb David Friesecký
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
425 20b33bd4 Jan Pašek
            raise RevocationReasonInvalidException(reason)
426
427 9e6f791a Jan Pašek
        # check whether the certificate exists
428
        certificate = self.certificate_repository.read(id)
429
        if certificate is None:
430 b3c80ccb David Friesecký
            Logger.error(f"No such certificate found 'ID = {id}'.")
431 9e6f791a Jan Pašek
            raise CertificateNotFoundException(id)
432
433 9c704fb1 Jan Pašek
        updated = False
434 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
435 94f8d5cf Jan Pašek
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
436
            #    -> throw an exception
437
            if certificate.revocation_reason != "" and \
438
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
439
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
440 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
441 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
442 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
443
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
444
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
445 b3c80ccb David Friesecký
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
446 9e6f791a Jan Pašek
                raise CertificateAlreadyRevokedException(id)
447
448 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
449 9c704fb1 Jan Pašek
            updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
450
451
        if not updated:
452 b3c80ccb David Friesecký
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
453
                         f"or set_certificate_revoked().")
454 9e6f791a Jan Pašek
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
455
                                   "or set_certificate_revoked().")
456 20b33bd4 Jan Pašek
457 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
458
        """
459
        Get Subject distinguished name from a Certificate
460
        :param certificate: certificate instance whose Subject shall be parsed
461
        :return: instance of Subject class containing resulting distinguished name
462
        """
463 b3c80ccb David Friesecký
464
        Logger.debug("Function launched.")
465
466 894cc2cd David Friesecký
        subject = Subject(certificate.common_name,
467
                          certificate.country_code,
468
                          certificate.locality,
469
                          certificate.province,
470
                          certificate.organization,
471
                          certificate.organizational_unit,
472
                          certificate.email_address)
473 c4ba6bb7 Jan Pašek
        return subject
474 d3bfacfc Stanislav Král
475
    def get_public_key_from_certificate(self, certificate: Certificate):
476
        """
477
        Extracts a public key from the given certificate
478
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
479
        should be extracted.
480
        :return: a string containing the extracted public key in PEM format
481
        """
482 b3c80ccb David Friesecký
483
        Logger.debug("Function launched.")
484
485 d3bfacfc Stanislav Král
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
486 20b33bd4 Jan Pašek
487 a53e5aef Jan Pašek
    def get_certificate_state(self, id: int) -> str:
488
        """
489
        Check whether the certificate is expired, valid or revoked.
490
            - in case it's revoked and expired, revoked is returned
491
        :param id: identifier of the certificate
492
        :return: certificates state from {valid, revoked, expired}
493
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
494
        """
495 4ff15a44 Jan Pašek
        Logger.debug("Function launched.")
496 c15357a9 Jan Pašek
        status = CERTIFICATE_VALID
497
498
        # Read the selected certificate from the repository
499
        certificate = self.certificate_repository.read(id)
500
        if certificate is None:
501 1fa20e93 Stanislav Král
            Logger.error("Certificate whose details were requested does not exist.")
502 c15357a9 Jan Pašek
            raise CertificateNotFoundException(id)
503
504
        # check the expiration date using OpenSSL
505
        if not self.cryptography_service.verify_cert(certificate.pem_data):
506
            status = CERTIFICATE_EXPIRED
507
508
        # check certificate revocation
509
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
510
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
511
512
        if id in all_revoked_by_parent_ids:
513
            status = CERTIFICATE_REVOKED
514
515
        return status
516
517 20b33bd4 Jan Pašek
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
518
        """
519
        Get URL address of CRL distribution endpoint based on
520
        issuer's ID
521
522
        :param ca_identifier: ID of issuing authority
523
        :return: CRL endpoint for the given CA
524
        """
525 b3c80ccb David Friesecký
526
        Logger.debug("Function launched.")
527
528 20b33bd4 Jan Pašek
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
529
530
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
531
        """
532
        Get URL address of OCSP distribution endpoint based on
533
        issuer's ID
534
535
        :param ca_identifier: ID of issuing authority
536
        :return: OCSP endpoint for the given CA
537
        """
538 b3c80ccb David Friesecký
539
        Logger.debug("Function launched.")
540
541 20b33bd4 Jan Pašek
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
542
543 04805a41 Stanislav Král
    def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
544 1fa20e93 Stanislav Král
        """
545
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
546
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
547
        of certificate's private key (if encrypted).
548 04805a41 Stanislav Král
        :param certificate: certificate to be put into the PKCS identity store
549 1fa20e93 Stanislav Král
        :param cert_key: key used to sign the given certificate
550
        :param identity_name: name to be given to the identity to be created
551
        :param identity_passphrase: passphrase to be used to encrypt the identity
552
        :return: byte array containing the generated identity (PKCS12 store)
553
        """
554
        Logger.debug("Function launched.")
555
556
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
557
        # whose chain of trust we are querying
558 04805a41 Stanislav Král
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
559 1fa20e93 Stanislav Král
560
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
561
                                                                identity_name,
562
                                                                identity_passphrase, cot_pem_list, cert_key.password)
563
564 20b33bd4 Jan Pašek
565
class RevocationReasonInvalidException(Exception):
566
    """
567
    Exception that denotes that the caller was trying to revoke
568
    a certificate using an invalid revocation reason
569
    """
570
571
    def __init__(self, reason):
572
        self.reason = reason
573
574
    def __str__(self):
575
        return f"Revocation reason '{self.reason}' is not valid."
576
577
578
class CertificateStatusInvalidException(Exception):
579
    """
580
    Exception that denotes that the caller was trying to set
581
    a certificate to an invalid state
582
    """
583
584
    def __init__(self, status):
585
        self.status = status
586
587
    def __str__(self):
588
        return f"Certificate status '{self.status}' is not valid."
589 9c704fb1 Jan Pašek
590
591 9e6f791a Jan Pašek
class CertificateAlreadyRevokedException(Exception):
592
    """
593
    Exception that denotes that the caller was trying to revoke
594
    a certificate that is already revoked
595
    """
596
597
    def __init__(self, id):
598
        self.id = id
599
600
    def __str__(self):
601
        return f"Certificate id '{self.id}' is already revoked."
602 94f8d5cf Jan Pašek
603
604
class CertificateCannotBeSetToValid(Exception):
605
    """
606
    Exception that denotes that the caller was trying to
607
    set certificate to valid if the certificate was already
608
    revoked but not certificateHold.
609
    """
610
611
    def __init__(self, old_reason):
612
        self.old_state = old_reason
613
614
    def __str__(self):
615
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
616
               "certificateHold. " \
617
               f"The revocation reason of the certificate is {self.old_state}"
618 7a30da1b Stanislav Král
619
620
class InvalidSubjectAttribute(Exception):
621
    """
622
    Exception that denotes that the caller was trying to
623
    create a certificate while passing a subject with an invalid
624
    attribute.
625
    """
626
627
    def __init__(self, attribute_name, reason):
628
        self.attribute_name = attribute_name
629
        self.reason = reason
630
631
    def __str__(self):
632
        return f"""Subject attribute "{self.attribute_name} is invalid (reason: {self.reason})."""