Projekt

Obecné

Profil

Stáhnout (29.4 KB) Statistiky
| Větev: | Tag: | Revize:
1 ef65f488 Stanislav Král
from typing import List
2
3 151e7604 Jan Pašek
from injector import inject
4
5 20b33bd4 Jan Pašek
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7 c15357a9 Jan Pašek
    CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
8 7855d234 Captain_Trojan
    CERTIFICATE_REVOKED,\
9
    CERTIFICATE_REVOCATION_REASON_HOLD
10 4a40b0d2 Stanislav Král
from src.dao.certificate_repository import CertificateRepository
11 8ff50e4e Jan Pašek
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
12 485913d0 Captain_Trojan
from src.exceptions.database_exception import DatabaseException
13 9e6f791a Jan Pašek
from src.exceptions.unknown_exception import UnknownException
14 4a40b0d2 Stanislav Král
from src.model.certificate import Certificate
15 313b647b Stanislav Král
from src.model.private_key import PrivateKey
16 4a40b0d2 Stanislav Král
from src.model.subject import Subject
17
from src.services.cryptography import CryptographyService
18
19 313b647b Stanislav Král
import time
20 03484d49 Captain_Trojan
from calendar import timegm
21 313b647b Stanislav Král
22 329216fe Stanislav Král
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
23
    CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
24
    CLIENT_AUTH
25
26 b3c80ccb David Friesecký
from src.utils.logger import Logger
27
28 7313994f Stanislav Král
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
29 bbcb7c89 Stanislav Král
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
30 20b33bd4 Jan Pašek
CRL_EXTENSION = "crlDistributionPoints=URI:"
31
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
32
STATUS_REVOKED = "revoked"
33
STATUS_VALID = "valid"
34 313b647b Stanislav Král
35 329216fe Stanislav Král
# define which flags are required for various usages
36
REQUIRED_USAGE_EXTENSION_FLAGS = {
37
    CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
38
    SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
39 52f2eca4 Jan Pašek
    SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
40 329216fe Stanislav Král
    AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
41
42 4a40b0d2 Stanislav Král
43
class CertificateService:
44
45 151e7604 Jan Pašek
    @inject
46 20b33bd4 Jan Pašek
    def __init__(self, cryptography_service: CryptographyService,
47
                 certificate_repository: CertificateRepository,
48
                 configuration: Configuration):
49 4a40b0d2 Stanislav Král
        self.cryptography_service = cryptography_service
50
        self.certificate_repository = certificate_repository
51 20b33bd4 Jan Pašek
        self.configuration = configuration
52 4a40b0d2 Stanislav Král
53 7a30da1b Stanislav Král
    @staticmethod
54
    def __check_subject_is_valid(subject: Subject):
55 35a4e794 Stanislav Král
        """
56
        Checks whether the given subject is valid
57
        :param subject: A subject to be verified
58
        :raises InvalidCertificateAttribute: When a subject with an invalid attribute is passed
59
        """
60 e9b4e2a1 Stanislav Král
        if subject.country is not None and subject.country != "" and len(subject.country) != 2:
61 c62151d4 Stanislav Král
            raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters")
62 7a30da1b Stanislav Král
63 bbcb7c89 Stanislav Král
    # TODO usages present in method parameters but not in class diagram
64 ca3ac7c0 Stanislav Král
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
65 2f5101f1 Stanislav Král
                       usages=None, days=30):
66 a6727aa9 Stanislav Král
        """
67
        Creates a root CA certificate based on the given parameters.
68
        :param key: Private key to be used when generating the certificate
69
        :param subject: Subject to be used put into the certificate
70
        :param config: String containing the configuration to be used
71
        :param extensions: Name of the section in the configuration representing extensions
72
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
73 2f5101f1 Stanislav Král
        :param days: Number of days for which the generated cert. will be considered valid
74 35a4e794 Stanislav Král
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
75 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated root CA cert
76
        """
77 b3c80ccb David Friesecký
78
        Logger.debug("Function launched.")
79
80 7a30da1b Stanislav Král
        # check whether the given subject is valid
81
        self.__check_subject_is_valid(subject)
82
83 ca3ac7c0 Stanislav Král
        if usages is None:
84
            usages = {}
85
86 20b33bd4 Jan Pašek
        cert_id = self.certificate_repository.get_next_id()
87
88 329216fe Stanislav Král
        # specify CA usage
89
        usages[CA_ID] = True
90
91
        # generate extension configuration lines based on the specified usages
92
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
93
94 313b647b Stanislav Král
        # create a new self signed  certificate
95
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
96 87c56935 Stanislav Král
                                                          extensions=extensions, config=config, days=days, sn=cert_id)
97 ca3ac7c0 Stanislav Král
98 4c19a9b1 Stanislav Král
        # wrap into Certificate class
99 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
100 4c19a9b1 Stanislav Král
                                            ROOT_CA_ID)
101 313b647b Stanislav Král
102
        # store the wrapper into the repository
103
        created_id = self.certificate_repository.create(certificate)
104
105
        # assign the generated ID to the inserted certificate
106
        certificate.certificate_id = created_id
107 4a40b0d2 Stanislav Král
108 313b647b Stanislav Král
        return certificate
109 10fab051 Stanislav Král
110 a6727aa9 Stanislav Král
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
111
        """
112 a4e818dc Jan Pašek
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
113 a6727aa9 Stanislav Král
        notAfter fields.
114
        :param cert_pem: PEM of the cert. to be wrapped
115
        :param private_key_id: ID of the private key used to create the given certificate
116
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
117
        :param parent_id: ID of the CA that issued this certificate
118
        :param cert_type: Type of this certificate (see constants.py)
119
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
120
        """
121 b3c80ccb David Friesecký
122
        Logger.debug("Function launched.")
123
124 4c19a9b1 Stanislav Král
        # parse the generated pem for subject and notBefore/notAfter fields
125 a6727aa9 Stanislav Král
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
126 4c19a9b1 Stanislav Král
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
127
        # format the parsed date
128 03484d49 Captain_Trojan
        not_before_formatted = int(timegm(not_before))
129
        not_after_formatted = int(timegm(not_after))
130 4c19a9b1 Stanislav Král
131
        # create a certificate wrapper
132 894cc2cd David Friesecký
        certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
133
                                  private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
134
                                  subj.organization, subj.organization_unit, subj.email_address)
135 4c19a9b1 Stanislav Král
136
        return certificate
137
138 bbcb7c89 Stanislav Král
    # TODO config parameter present in class diagram but not here (unused)
139
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
140 ca3ac7c0 Stanislav Král
                  extensions: str = "", days: int = 30, usages=None):
141 a6727aa9 Stanislav Král
        """
142
        Creates an intermediate CA certificate issued by the given parent CA.
143
        :param subject_key: Private key to be used when generating the certificate
144
        :param subject: Subject to be used put into the certificate
145
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
146
        :param issuer_key: PK used to generate the issuer certificate
147
        :param extensions: Extensions to be used when generating the certificate
148
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
149
        :param days: Number of days for which the generated cert. will be considered valid
150 35a4e794 Stanislav Král
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
151 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated intermediate CA cert
152
        """
153 b3c80ccb David Friesecký
154
        Logger.debug("Function launched.")
155
156 7a30da1b Stanislav Král
        # check whether the given subject is valid
157
        self.__check_subject_is_valid(subject)
158
159 ca3ac7c0 Stanislav Král
        if usages is None:
160
            usages = {}
161
162 329216fe Stanislav Král
        # specify CA usage
163
        usages[CA_ID] = True
164
165
        # generate extension configuration lines based on the specified usages
166
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
167
168 20b33bd4 Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
169
        cert_id = self.certificate_repository.get_next_id()
170 ea1229ee Jan Pašek
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
171
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
172 20b33bd4 Jan Pašek
173 bbcb7c89 Stanislav Král
        # TODO implement AIA URI via extensions
174
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
175
                                                        issuer_key.private_key,
176
                                                        subject_key_pass=subject_key.password,
177
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
178 87c56935 Stanislav Král
                                                        days=days,
179
                                                        sn=cert_id)
180 bbcb7c89 Stanislav Král
181 4c19a9b1 Stanislav Král
        # wrap into Certificate class
182 894cc2cd David Friesecký
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
183
                                            issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
184 bbcb7c89 Stanislav Král
185
        # store the wrapper into the repository
186
        created_id = self.certificate_repository.create(certificate)
187
188
        # assign the generated ID to the inserted certificate
189
        certificate.certificate_id = created_id
190
191
        return certificate
192
193 4c19a9b1 Stanislav Král
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
194
                        issuer_key: PrivateKey,
195
                        extensions: str = "", days: int = 30, usages=None):
196 a6727aa9 Stanislav Král
        """
197
        Creates an end certificate issued by the given parent CA.
198
        :param subject_key: Private key to be used when generating the certificate
199
        :param subject: Subject to be used put into the certificate
200
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
201
        :param issuer_key: PK used to generate the issuer certificate
202
        :param extensions: Extensions to be used when generating the certificate
203
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
204
        :param days: Number of days for which the generated cert. will be considered valid
205 35a4e794 Stanislav Král
        :raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
206 a6727aa9 Stanislav Král
        :return: An instance of Certificate class representing the generated cert
207
        """
208 b3c80ccb David Friesecký
209
        Logger.debug("Function launched.")
210
211 7a30da1b Stanislav Král
        # check whether the given subject is valid
212
        self.__check_subject_is_valid(subject)
213
214 4c19a9b1 Stanislav Král
        if usages is None:
215
            usages = {}
216
217 87c56935 Stanislav Král
        # get the next certificate ID in order to be able to specify the serial number
218
        cert_id = self.certificate_repository.get_next_id()
219
220 329216fe Stanislav Král
        # generate extension configuration lines based on the specified usages
221
        extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
222
223 ea1229ee Jan Pašek
        # Add CRL and OCSP distribution point to certificate extensions
224
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
225
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
226
227 4c19a9b1 Stanislav Král
        # generate a new certificate
228
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
229
                                                        issuer_key.private_key,
230
                                                        subject_key_pass=subject_key.password,
231
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
232 87c56935 Stanislav Král
                                                        days=days,
233
                                                        sn=cert_id
234
                                                        )
235 4c19a9b1 Stanislav Král
236
        # wrap the generated certificate using Certificate class
237 a6727aa9 Stanislav Král
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
238 4c19a9b1 Stanislav Král
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
239
240
        created_id = self.certificate_repository.create(certificate)
241
242
        certificate.certificate_id = created_id
243
244
        return certificate
245
246 10fab051 Stanislav Král
    def get_certificate(self, unique_id: int) -> Certificate:
247 a6727aa9 Stanislav Král
        """
248
        Tries to fetch a certificate from the certificate repository using a given id.
249
        :param unique_id: ID of the certificate to be fetched
250
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
251
        certificate is not found
252
        """
253 b3c80ccb David Friesecký
254
        Logger.debug("Function launched.")
255
256 10fab051 Stanislav Král
        return self.certificate_repository.read(unique_id)
257 2a90f4fd Stanislav Král
258 ef65f488 Stanislav Král
    def get_certificates(self, cert_type=None) -> List[Certificate]:
259 a6727aa9 Stanislav Král
        """
260
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
261
        certificates of the given type can be returned.
262
        :param cert_type: Type of certificates to be returned
263
        :return: List of instances of the Certificate class representing all certificates present in the certificate
264
        repository. An empty list is returned when no certificates are found.
265
        """
266 b3c80ccb David Friesecký
267
        Logger.debug("Function launched.")
268
269 2a90f4fd Stanislav Král
        return self.certificate_repository.read_all(cert_type)
270 ef65f488 Stanislav Král
271 67723931 Captain_Trojan
    def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
272
        """
273
        Tries to fetch a filtered list of all certificates from the certificate repository.
274
        :param target_types: certificate types (filter)
275
        :param target_usages: certificate usages (filter)
276
        :param target_cn_substring: certificate CN substring (filter)
277
        :param page: target page
278
        :param per_page: target page size
279
        :return: List of instances of the Certificate class representing filtered certificates
280
                    present in the certificate repository. An empty list is returned when no certificates are found.
281
        """
282
        return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
283
                                                           page, per_page)
284
285 ef65f488 Stanislav Král
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
286 4e70d22a Stanislav Král
        """
287
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
288
        root CA certificate is found. Root certificates are excluded from the chain by default.
289
        :param from_id: ID of the first certificate to be included in the chain of trust
290
        :param to_id: ID of the last certificate to be included in the chain of trust
291
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
292
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
293
        ID
294
        """
295 b3c80ccb David Friesecký
296
        Logger.debug("Function launched.")
297
298 4e70d22a Stanislav Král
        # read the first certificate of the chain
299 ef65f488 Stanislav Král
        start_cert = self.certificate_repository.read(from_id)
300
301 4e70d22a Stanislav Král
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
302 ef65f488 Stanislav Král
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
303
            return []
304
305
        current_cert = start_cert
306
        chain_of_trust = [current_cert]
307
308
        # TODO could possibly be simplified
309
        if start_cert.type_id == ROOT_CA_ID:
310 4e70d22a Stanislav Král
            # the first cert found is a root ca
311 ef65f488 Stanislav Král
            return chain_of_trust
312
313
        while True:
314
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
315
316 4e70d22a Stanislav Král
            # check whether parent certificate exists
317
            if parent_cert is None:
318
                break
319
320
            # check whether the found certificate is a root certificate
321
            if parent_cert.type_id == ROOT_CA_ID:
322 ef65f488 Stanislav Král
                if not exclude_root:
323 4e70d22a Stanislav Král
                    # append the found root cert only if root certificates should not be excluded from the CoT
324 ef65f488 Stanislav Král
                    chain_of_trust.append(parent_cert)
325
                break
326
327 4e70d22a Stanislav Král
            # append the certificate
328 ef65f488 Stanislav Král
            chain_of_trust.append(parent_cert)
329
330 4e70d22a Stanislav Král
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
331 ef65f488 Stanislav Král
            if parent_cert.certificate_id == to_id:
332
                break
333
334
            current_cert = parent_cert
335
336
        return chain_of_trust
337 3d639744 Stanislav Král
338 5f8a2c07 Captain_Trojan
    def delete_certificate(self, unique_id):
339 3d639744 Stanislav Král
        """
340 5f8a2c07 Captain_Trojan
        Deletes a certificate. Raises an Exception should any unexpected behavior occur.
341 3d639744 Stanislav Král
342
        :param unique_id: ID of specific certificate
343
        """
344 5f8a2c07 Captain_Trojan
345 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
346
347 5f8a2c07 Captain_Trojan
        to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
348
        if to_delete is None:
349 b3c80ccb David Friesecký
            Logger.error(f"No such certificate found 'ID = {unique_id}'.")
350 5f8a2c07 Captain_Trojan
            raise CertificateNotFoundException(unique_id)
351
352
        for cert in to_delete:
353
            try:
354
                self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
355
            except CertificateAlreadyRevokedException:
356 b3c80ccb David Friesecký
                Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
357 5f8a2c07 Captain_Trojan
                # TODO log as an info/debug, not warning or above <-- perfectly legal
358 02954c9d Jan Pašek
                pass
359 5f8a2c07 Captain_Trojan
360 b3c80ccb David Friesecký
            if not self.certificate_repository.delete(cert.certificate_id):
361
                Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
362
363 c839facb Captain_Trojan
    def get_certificates_issued_by(self, issuer_id):
364 485913d0 Captain_Trojan
        """
365
        Returns a list of all children of a certificate identified by an unique_id.
366
        Raises a DatabaseException should any unexpected behavior occur.
367 c839facb Captain_Trojan
        :param issuer_id: target certificate ID
368 485913d0 Captain_Trojan
        :return: children of unique_id
369
        """
370 b3c80ccb David Friesecký
371
        Logger.debug("Function launched.")
372
373 485913d0 Captain_Trojan
        try:
374 c839facb Captain_Trojan
            if self.certificate_repository.read(issuer_id) is None:
375
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
376
                raise CertificateNotFoundException(issuer_id)
377 485913d0 Captain_Trojan
        except DatabaseException:
378 c839facb Captain_Trojan
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
379
            raise CertificateNotFoundException(issuer_id)
380
381
        return self.certificate_repository.get_all_issued_by(issuer_id)
382 485913d0 Captain_Trojan
383 c839facb Captain_Trojan
    def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
384
                                          per_page):
385
        """
386
        Returns a list of all children of a certificate identified by an unique_id.
387
        Filters the results according to target types, usages, cn substring and pagination.
388
        Raises a DatabaseException should any unexpected behavior occur.
389
        :param issuer_id: target certificate ID
390
        :param target_types: filter of types
391
        :param target_usages: filter of usages
392
        :param target_cn_substring: CN substring
393
        :param page: target page or None
394
        :param per_page: certs per page or None
395
        :return: list of certificates (a page if specified)
396
        """
397 485913d0 Captain_Trojan
398 c839facb Captain_Trojan
        Logger.debug("Function launched.")
399
400
        try:
401
            if self.certificate_repository.read(issuer_id) is None:
402
                Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
403
                raise CertificateNotFoundException(issuer_id)
404
        except DatabaseException:
405
            Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
406
            raise CertificateNotFoundException(issuer_id)
407 485913d0 Captain_Trojan
408 67723931 Captain_Trojan
        return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
409
                                                                    target_cn_substring, page, per_page)
410 485913d0 Captain_Trojan
411 20b33bd4 Jan Pašek
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
412
        """
413
        Set certificate status to 'valid' or 'revoked'.
414
        If the new status is revoked a reason can be provided -> default is unspecified
415
        :param reason: reason for revocation
416
        :param id: identifier of the certificate whose status is to be changed
417
        :param status: new status of the certificate
418 94f8d5cf Jan Pašek
        :raises CertificateStatusInvalidException: if status is not valid
419
        :raises RevocationReasonInvalidException: if reason is not valid
420
        :raises CertificateNotFoundException: if certificate with given id cannot be found
421
        :raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
422
                it cannot be set revalidated
423
        :raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
424
        :raises UnknownException: if the database is corrupted
425 20b33bd4 Jan Pašek
        """
426 b3c80ccb David Friesecký
427
        Logger.debug("Function launched.")
428
429 20b33bd4 Jan Pašek
        if status not in CERTIFICATE_STATES:
430 b3c80ccb David Friesecký
            Logger.error(f"Wrong parameter, invalid status '{status}'.")
431 20b33bd4 Jan Pašek
            raise CertificateStatusInvalidException(status)
432
        if reason not in CERTIFICATE_REVOCATION_REASONS:
433 b3c80ccb David Friesecký
            Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
434 20b33bd4 Jan Pašek
            raise RevocationReasonInvalidException(reason)
435
436 9e6f791a Jan Pašek
        # check whether the certificate exists
437
        certificate = self.certificate_repository.read(id)
438
        if certificate is None:
439 b3c80ccb David Friesecký
            Logger.error(f"No such certificate found 'ID = {id}'.")
440 9e6f791a Jan Pašek
            raise CertificateNotFoundException(id)
441
442 9c704fb1 Jan Pašek
        updated = False
443 20b33bd4 Jan Pašek
        if status == STATUS_VALID:
444 94f8d5cf Jan Pašek
            # if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
445
            #    -> throw an exception
446
            if certificate.revocation_reason != "" and \
447
               certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
448
                raise CertificateCannotBeSetToValid(certificate.revocation_reason)
449 9c704fb1 Jan Pašek
            updated = self.certificate_repository.clear_certificate_revocation(id)
450 20b33bd4 Jan Pašek
        elif status == STATUS_REVOKED:
451 9e6f791a Jan Pašek
            # check if the certificate is not revoked already
452
            revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
453
            if certificate.certificate_id in [x.certificate_id for x in revoked]:
454 b3c80ccb David Friesecký
                Logger.error(f"Certificate already revoked 'ID = {id}'.")
455 9e6f791a Jan Pašek
                raise CertificateAlreadyRevokedException(id)
456
457 20b33bd4 Jan Pašek
            revocation_timestamp = int(time.time())
458 03484d49 Captain_Trojan
            updated = self.certificate_repository.set_certificate_revoked(id, revocation_timestamp, reason)
459 9c704fb1 Jan Pašek
460
        if not updated:
461 b3c80ccb David Friesecký
            Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
462
                         f"or set_certificate_revoked().")
463 9e6f791a Jan Pašek
            raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
464
                                   "or set_certificate_revoked().")
465 20b33bd4 Jan Pašek
466 c4ba6bb7 Jan Pašek
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
467
        """
468
        Get Subject distinguished name from a Certificate
469
        :param certificate: certificate instance whose Subject shall be parsed
470
        :return: instance of Subject class containing resulting distinguished name
471
        """
472 b3c80ccb David Friesecký
473
        Logger.debug("Function launched.")
474
475 894cc2cd David Friesecký
        subject = Subject(certificate.common_name,
476
                          certificate.country_code,
477
                          certificate.locality,
478
                          certificate.province,
479
                          certificate.organization,
480
                          certificate.organizational_unit,
481
                          certificate.email_address)
482 c4ba6bb7 Jan Pašek
        return subject
483 d3bfacfc Stanislav Král
484
    def get_public_key_from_certificate(self, certificate: Certificate):
485
        """
486
        Extracts a public key from the given certificate
487
        :param certificate: an instance of the Certificate class containing the certificate from which a public key
488
        should be extracted.
489
        :return: a string containing the extracted public key in PEM format
490
        """
491 b3c80ccb David Friesecký
492
        Logger.debug("Function launched.")
493
494 d3bfacfc Stanislav Král
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
495 20b33bd4 Jan Pašek
496 a53e5aef Jan Pašek
    def get_certificate_state(self, id: int) -> str:
497
        """
498
        Check whether the certificate is expired, valid or revoked.
499
            - in case it's revoked and expired, revoked is returned
500
        :param id: identifier of the certificate
501
        :return: certificates state from {valid, revoked, expired}
502
        :raises CertificateNotFoundException: in case id of non-existing certificate is entered
503
        """
504 4ff15a44 Jan Pašek
        Logger.debug("Function launched.")
505 c15357a9 Jan Pašek
        status = CERTIFICATE_VALID
506
507
        # Read the selected certificate from the repository
508
        certificate = self.certificate_repository.read(id)
509
        if certificate is None:
510 1fa20e93 Stanislav Král
            Logger.error("Certificate whose details were requested does not exist.")
511 c15357a9 Jan Pašek
            raise CertificateNotFoundException(id)
512
513
        # check the expiration date using OpenSSL
514
        if not self.cryptography_service.verify_cert(certificate.pem_data):
515
            status = CERTIFICATE_EXPIRED
516
517
        # check certificate revocation
518
        all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
519
        all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
520
521
        if id in all_revoked_by_parent_ids:
522
            status = CERTIFICATE_REVOKED
523
524
        return status
525
526 20b33bd4 Jan Pašek
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
527
        """
528
        Get URL address of CRL distribution endpoint based on
529
        issuer's ID
530
531
        :param ca_identifier: ID of issuing authority
532
        :return: CRL endpoint for the given CA
533
        """
534 b3c80ccb David Friesecký
535
        Logger.debug("Function launched.")
536
537 20b33bd4 Jan Pašek
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
538
539
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
540
        """
541
        Get URL address of OCSP distribution endpoint based on
542
        issuer's ID
543
544
        :param ca_identifier: ID of issuing authority
545
        :return: OCSP endpoint for the given CA
546
        """
547 b3c80ccb David Friesecký
548
        Logger.debug("Function launched.")
549
550 20b33bd4 Jan Pašek
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
551
552 04805a41 Stanislav Král
    def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
553 1fa20e93 Stanislav Král
        """
554
        Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
555
        A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
556
        of certificate's private key (if encrypted).
557 04805a41 Stanislav Král
        :param certificate: certificate to be put into the PKCS identity store
558 1fa20e93 Stanislav Král
        :param cert_key: key used to sign the given certificate
559
        :param identity_name: name to be given to the identity to be created
560
        :param identity_passphrase: passphrase to be used to encrypt the identity
561
        :return: byte array containing the generated identity (PKCS12 store)
562
        """
563
        Logger.debug("Function launched.")
564
565
        # get the chain of trust of the certificate whose identity should be generated and exclude the certificate
566
        # whose chain of trust we are querying
567 04805a41 Stanislav Král
        cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
568 1fa20e93 Stanislav Král
569
        return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
570
                                                                identity_name,
571
                                                                identity_passphrase, cot_pem_list, cert_key.password)
572
573 20b33bd4 Jan Pašek
574
class RevocationReasonInvalidException(Exception):
575
    """
576
    Exception that denotes that the caller was trying to revoke
577
    a certificate using an invalid revocation reason
578
    """
579
580
    def __init__(self, reason):
581
        self.reason = reason
582
583
    def __str__(self):
584
        return f"Revocation reason '{self.reason}' is not valid."
585
586
587
class CertificateStatusInvalidException(Exception):
588
    """
589
    Exception that denotes that the caller was trying to set
590
    a certificate to an invalid state
591
    """
592
593
    def __init__(self, status):
594
        self.status = status
595
596
    def __str__(self):
597
        return f"Certificate status '{self.status}' is not valid."
598 9c704fb1 Jan Pašek
599
600 9e6f791a Jan Pašek
class CertificateAlreadyRevokedException(Exception):
601
    """
602
    Exception that denotes that the caller was trying to revoke
603
    a certificate that is already revoked
604
    """
605
606
    def __init__(self, id):
607
        self.id = id
608
609
    def __str__(self):
610
        return f"Certificate id '{self.id}' is already revoked."
611 94f8d5cf Jan Pašek
612
613
class CertificateCannotBeSetToValid(Exception):
614
    """
615
    Exception that denotes that the caller was trying to
616
    set certificate to valid if the certificate was already
617
    revoked but not certificateHold.
618
    """
619
620
    def __init__(self, old_reason):
621
        self.old_state = old_reason
622
623
    def __str__(self):
624
        return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
625
               "certificateHold. " \
626
               f"The revocation reason of the certificate is {self.old_state}"
627 7a30da1b Stanislav Král
628
629
class InvalidSubjectAttribute(Exception):
630
    """
631
    Exception that denotes that the caller was trying to
632
    create a certificate while passing a subject with an invalid
633
    attribute.
634
    """
635
636
    def __init__(self, attribute_name, reason):
637
        self.attribute_name = attribute_name
638
        self.reason = reason
639
640
    def __str__(self):
641 c62151d4 Stanislav Král
        return f"""Subject attribute "{self.attribute_name}" is invalid (reason: {self.reason})."""