1 |
ef65f488
|
Stanislav Král
|
from typing import List
|
2 |
|
|
|
3 |
151e7604
|
Jan Pašek
|
from injector import inject
|
4 |
|
|
|
5 |
20b33bd4
|
Jan Pašek
|
from src.config.configuration import Configuration
|
6 |
|
|
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
|
7 |
c15357a9
|
Jan Pašek
|
CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, CERTIFICATE_VALID, CERTIFICATE_EXPIRED, \
|
8 |
7855d234
|
Captain_Trojan
|
CERTIFICATE_REVOKED,\
|
9 |
|
|
CERTIFICATE_REVOCATION_REASON_HOLD
|
10 |
4a40b0d2
|
Stanislav Král
|
from src.dao.certificate_repository import CertificateRepository
|
11 |
8ff50e4e
|
Jan Pašek
|
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
|
12 |
485913d0
|
Captain_Trojan
|
from src.exceptions.database_exception import DatabaseException
|
13 |
9e6f791a
|
Jan Pašek
|
from src.exceptions.unknown_exception import UnknownException
|
14 |
4a40b0d2
|
Stanislav Král
|
from src.model.certificate import Certificate
|
15 |
313b647b
|
Stanislav Král
|
from src.model.private_key import PrivateKey
|
16 |
4a40b0d2
|
Stanislav Král
|
from src.model.subject import Subject
|
17 |
|
|
from src.services.cryptography import CryptographyService
|
18 |
|
|
|
19 |
313b647b
|
Stanislav Král
|
import time
|
20 |
|
|
|
21 |
329216fe
|
Stanislav Král
|
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \
|
22 |
|
|
CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \
|
23 |
|
|
CLIENT_AUTH
|
24 |
|
|
|
25 |
b3c80ccb
|
David Friesecký
|
from src.utils.logger import Logger
|
26 |
|
|
|
27 |
7313994f
|
Stanislav Král
|
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
|
28 |
bbcb7c89
|
Stanislav Král
|
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
|
29 |
20b33bd4
|
Jan Pašek
|
CRL_EXTENSION = "crlDistributionPoints=URI:"
|
30 |
|
|
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
|
31 |
|
|
STATUS_REVOKED = "revoked"
|
32 |
|
|
STATUS_VALID = "valid"
|
33 |
313b647b
|
Stanislav Král
|
|
34 |
329216fe
|
Stanislav Král
|
# define which flags are required for various usages
|
35 |
|
|
REQUIRED_USAGE_EXTENSION_FLAGS = {
|
36 |
|
|
CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}),
|
37 |
|
|
SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}),
|
38 |
52f2eca4
|
Jan Pašek
|
SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {}, {}),
|
39 |
329216fe
|
Stanislav Král
|
AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})}
|
40 |
|
|
|
41 |
4a40b0d2
|
Stanislav Král
|
|
42 |
|
|
class CertificateService:
|
43 |
|
|
|
44 |
151e7604
|
Jan Pašek
|
@inject
|
45 |
20b33bd4
|
Jan Pašek
|
def __init__(self, cryptography_service: CryptographyService,
|
46 |
|
|
certificate_repository: CertificateRepository,
|
47 |
|
|
configuration: Configuration):
|
48 |
4a40b0d2
|
Stanislav Král
|
self.cryptography_service = cryptography_service
|
49 |
|
|
self.certificate_repository = certificate_repository
|
50 |
20b33bd4
|
Jan Pašek
|
self.configuration = configuration
|
51 |
4a40b0d2
|
Stanislav Král
|
|
52 |
7a30da1b
|
Stanislav Král
|
@staticmethod
|
53 |
|
|
def __check_subject_is_valid(subject: Subject):
|
54 |
35a4e794
|
Stanislav Král
|
"""
|
55 |
|
|
Checks whether the given subject is valid
|
56 |
|
|
:param subject: A subject to be verified
|
57 |
|
|
:raises InvalidCertificateAttribute: When a subject with an invalid attribute is passed
|
58 |
|
|
"""
|
59 |
7a30da1b
|
Stanislav Král
|
if subject.country is not None and len(subject.country) != 2:
|
60 |
|
|
raise InvalidSubjectAttribute("Country code", "Must consist of exactly 2 letters.")
|
61 |
|
|
|
62 |
bbcb7c89
|
Stanislav Král
|
# TODO usages present in method parameters but not in class diagram
|
63 |
ca3ac7c0
|
Stanislav Král
|
def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
|
64 |
2f5101f1
|
Stanislav Král
|
usages=None, days=30):
|
65 |
a6727aa9
|
Stanislav Král
|
"""
|
66 |
|
|
Creates a root CA certificate based on the given parameters.
|
67 |
|
|
:param key: Private key to be used when generating the certificate
|
68 |
|
|
:param subject: Subject to be used put into the certificate
|
69 |
|
|
:param config: String containing the configuration to be used
|
70 |
|
|
:param extensions: Name of the section in the configuration representing extensions
|
71 |
|
|
:param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
|
72 |
2f5101f1
|
Stanislav Král
|
:param days: Number of days for which the generated cert. will be considered valid
|
73 |
35a4e794
|
Stanislav Král
|
:raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
|
74 |
a6727aa9
|
Stanislav Král
|
:return: An instance of Certificate class representing the generated root CA cert
|
75 |
|
|
"""
|
76 |
b3c80ccb
|
David Friesecký
|
|
77 |
|
|
Logger.debug("Function launched.")
|
78 |
|
|
|
79 |
7a30da1b
|
Stanislav Král
|
# check whether the given subject is valid
|
80 |
|
|
self.__check_subject_is_valid(subject)
|
81 |
|
|
|
82 |
ca3ac7c0
|
Stanislav Král
|
if usages is None:
|
83 |
|
|
usages = {}
|
84 |
|
|
|
85 |
20b33bd4
|
Jan Pašek
|
cert_id = self.certificate_repository.get_next_id()
|
86 |
|
|
|
87 |
329216fe
|
Stanislav Král
|
# specify CA usage
|
88 |
|
|
usages[CA_ID] = True
|
89 |
|
|
|
90 |
|
|
# generate extension configuration lines based on the specified usages
|
91 |
|
|
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
|
92 |
|
|
|
93 |
313b647b
|
Stanislav Král
|
# create a new self signed certificate
|
94 |
|
|
cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
|
95 |
87c56935
|
Stanislav Král
|
extensions=extensions, config=config, days=days, sn=cert_id)
|
96 |
ca3ac7c0
|
Stanislav Král
|
|
97 |
4c19a9b1
|
Stanislav Král
|
# wrap into Certificate class
|
98 |
a6727aa9
|
Stanislav Král
|
certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
|
99 |
4c19a9b1
|
Stanislav Král
|
ROOT_CA_ID)
|
100 |
313b647b
|
Stanislav Král
|
|
101 |
|
|
# store the wrapper into the repository
|
102 |
|
|
created_id = self.certificate_repository.create(certificate)
|
103 |
|
|
|
104 |
|
|
# assign the generated ID to the inserted certificate
|
105 |
|
|
certificate.certificate_id = created_id
|
106 |
4a40b0d2
|
Stanislav Král
|
|
107 |
313b647b
|
Stanislav Král
|
return certificate
|
108 |
10fab051
|
Stanislav Král
|
|
109 |
a6727aa9
|
Stanislav Král
|
def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
|
110 |
|
|
"""
|
111 |
a4e818dc
|
Jan Pašek
|
Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
|
112 |
a6727aa9
|
Stanislav Král
|
notAfter fields.
|
113 |
|
|
:param cert_pem: PEM of the cert. to be wrapped
|
114 |
|
|
:param private_key_id: ID of the private key used to create the given certificate
|
115 |
|
|
:param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
|
116 |
|
|
:param parent_id: ID of the CA that issued this certificate
|
117 |
|
|
:param cert_type: Type of this certificate (see constants.py)
|
118 |
|
|
:return: An instance of the Certificate class wrapping the values passed via method parameters
|
119 |
|
|
"""
|
120 |
b3c80ccb
|
David Friesecký
|
|
121 |
|
|
Logger.debug("Function launched.")
|
122 |
|
|
|
123 |
4c19a9b1
|
Stanislav Král
|
# parse the generated pem for subject and notBefore/notAfter fields
|
124 |
a6727aa9
|
Stanislav Král
|
# TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
|
125 |
4c19a9b1
|
Stanislav Král
|
subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
|
126 |
|
|
# format the parsed date
|
127 |
7313994f
|
Stanislav Král
|
not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before)
|
128 |
|
|
not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after)
|
129 |
4c19a9b1
|
Stanislav Král
|
|
130 |
|
|
# create a certificate wrapper
|
131 |
894cc2cd
|
David Friesecký
|
certificate = Certificate(-1, not_before_formatted, not_after_formatted, cert_pem, cert_type, parent_id,
|
132 |
|
|
private_key_id, usages, subj.common_name, subj.country, subj.locality, subj.state,
|
133 |
|
|
subj.organization, subj.organization_unit, subj.email_address)
|
134 |
4c19a9b1
|
Stanislav Král
|
|
135 |
|
|
return certificate
|
136 |
|
|
|
137 |
bbcb7c89
|
Stanislav Král
|
# TODO config parameter present in class diagram but not here (unused)
|
138 |
|
|
def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
|
139 |
ca3ac7c0
|
Stanislav Král
|
extensions: str = "", days: int = 30, usages=None):
|
140 |
a6727aa9
|
Stanislav Král
|
"""
|
141 |
|
|
Creates an intermediate CA certificate issued by the given parent CA.
|
142 |
|
|
:param subject_key: Private key to be used when generating the certificate
|
143 |
|
|
:param subject: Subject to be used put into the certificate
|
144 |
|
|
:param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
|
145 |
|
|
:param issuer_key: PK used to generate the issuer certificate
|
146 |
|
|
:param extensions: Extensions to be used when generating the certificate
|
147 |
|
|
:param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
|
148 |
|
|
:param days: Number of days for which the generated cert. will be considered valid
|
149 |
35a4e794
|
Stanislav Král
|
:raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
|
150 |
a6727aa9
|
Stanislav Král
|
:return: An instance of Certificate class representing the generated intermediate CA cert
|
151 |
|
|
"""
|
152 |
b3c80ccb
|
David Friesecký
|
|
153 |
|
|
Logger.debug("Function launched.")
|
154 |
|
|
|
155 |
7a30da1b
|
Stanislav Král
|
# check whether the given subject is valid
|
156 |
|
|
self.__check_subject_is_valid(subject)
|
157 |
|
|
|
158 |
ca3ac7c0
|
Stanislav Král
|
if usages is None:
|
159 |
|
|
usages = {}
|
160 |
|
|
|
161 |
329216fe
|
Stanislav Král
|
# specify CA usage
|
162 |
|
|
usages[CA_ID] = True
|
163 |
|
|
|
164 |
|
|
# generate extension configuration lines based on the specified usages
|
165 |
|
|
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
|
166 |
|
|
|
167 |
20b33bd4
|
Jan Pašek
|
# Add CRL and OCSP distribution point to certificate extensions
|
168 |
|
|
cert_id = self.certificate_repository.get_next_id()
|
169 |
ea1229ee
|
Jan Pašek
|
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
|
170 |
|
|
extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
|
171 |
20b33bd4
|
Jan Pašek
|
|
172 |
bbcb7c89
|
Stanislav Král
|
# TODO implement AIA URI via extensions
|
173 |
|
|
cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
|
174 |
|
|
issuer_key.private_key,
|
175 |
|
|
subject_key_pass=subject_key.password,
|
176 |
|
|
issuer_key_pass=issuer_key.password, extensions=extensions,
|
177 |
87c56935
|
Stanislav Král
|
days=days,
|
178 |
|
|
sn=cert_id)
|
179 |
bbcb7c89
|
Stanislav Král
|
|
180 |
4c19a9b1
|
Stanislav Král
|
# wrap into Certificate class
|
181 |
894cc2cd
|
David Friesecký
|
certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
|
182 |
|
|
issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
|
183 |
bbcb7c89
|
Stanislav Král
|
|
184 |
|
|
# store the wrapper into the repository
|
185 |
|
|
created_id = self.certificate_repository.create(certificate)
|
186 |
|
|
|
187 |
|
|
# assign the generated ID to the inserted certificate
|
188 |
|
|
certificate.certificate_id = created_id
|
189 |
|
|
|
190 |
|
|
return certificate
|
191 |
|
|
|
192 |
4c19a9b1
|
Stanislav Král
|
def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
|
193 |
|
|
issuer_key: PrivateKey,
|
194 |
|
|
extensions: str = "", days: int = 30, usages=None):
|
195 |
a6727aa9
|
Stanislav Král
|
"""
|
196 |
|
|
Creates an end certificate issued by the given parent CA.
|
197 |
|
|
:param subject_key: Private key to be used when generating the certificate
|
198 |
|
|
:param subject: Subject to be used put into the certificate
|
199 |
|
|
:param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
|
200 |
|
|
:param issuer_key: PK used to generate the issuer certificate
|
201 |
|
|
:param extensions: Extensions to be used when generating the certificate
|
202 |
|
|
:param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
|
203 |
|
|
:param days: Number of days for which the generated cert. will be considered valid
|
204 |
35a4e794
|
Stanislav Král
|
:raises InvalidCertificateAttribute: when a subject with an invalid attribute is passed
|
205 |
a6727aa9
|
Stanislav Král
|
:return: An instance of Certificate class representing the generated cert
|
206 |
|
|
"""
|
207 |
b3c80ccb
|
David Friesecký
|
|
208 |
|
|
Logger.debug("Function launched.")
|
209 |
|
|
|
210 |
7a30da1b
|
Stanislav Král
|
# check whether the given subject is valid
|
211 |
|
|
self.__check_subject_is_valid(subject)
|
212 |
|
|
|
213 |
4c19a9b1
|
Stanislav Král
|
if usages is None:
|
214 |
|
|
usages = {}
|
215 |
|
|
|
216 |
87c56935
|
Stanislav Král
|
# get the next certificate ID in order to be able to specify the serial number
|
217 |
|
|
cert_id = self.certificate_repository.get_next_id()
|
218 |
|
|
|
219 |
329216fe
|
Stanislav Král
|
# generate extension configuration lines based on the specified usages
|
220 |
|
|
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS))
|
221 |
|
|
|
222 |
ea1229ee
|
Jan Pašek
|
# Add CRL and OCSP distribution point to certificate extensions
|
223 |
|
|
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id)
|
224 |
|
|
extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id)
|
225 |
|
|
|
226 |
4c19a9b1
|
Stanislav Král
|
# generate a new certificate
|
227 |
|
|
cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
|
228 |
|
|
issuer_key.private_key,
|
229 |
|
|
subject_key_pass=subject_key.password,
|
230 |
|
|
issuer_key_pass=issuer_key.password, extensions=extensions,
|
231 |
87c56935
|
Stanislav Král
|
days=days,
|
232 |
|
|
sn=cert_id
|
233 |
|
|
)
|
234 |
4c19a9b1
|
Stanislav Král
|
|
235 |
|
|
# wrap the generated certificate using Certificate class
|
236 |
a6727aa9
|
Stanislav Král
|
certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
|
237 |
4c19a9b1
|
Stanislav Král
|
issuer_cert.certificate_id, CERTIFICATE_ID)
|
238 |
|
|
|
239 |
|
|
created_id = self.certificate_repository.create(certificate)
|
240 |
|
|
|
241 |
|
|
certificate.certificate_id = created_id
|
242 |
|
|
|
243 |
|
|
return certificate
|
244 |
|
|
|
245 |
10fab051
|
Stanislav Král
|
def get_certificate(self, unique_id: int) -> Certificate:
|
246 |
a6727aa9
|
Stanislav Král
|
"""
|
247 |
|
|
Tries to fetch a certificate from the certificate repository using a given id.
|
248 |
|
|
:param unique_id: ID of the certificate to be fetched
|
249 |
|
|
:return: Instance of the Certificate class containing a certificate with the given id or `None` if such
|
250 |
|
|
certificate is not found
|
251 |
|
|
"""
|
252 |
b3c80ccb
|
David Friesecký
|
|
253 |
|
|
Logger.debug("Function launched.")
|
254 |
|
|
|
255 |
10fab051
|
Stanislav Král
|
return self.certificate_repository.read(unique_id)
|
256 |
2a90f4fd
|
Stanislav Král
|
|
257 |
ef65f488
|
Stanislav Král
|
def get_certificates(self, cert_type=None) -> List[Certificate]:
|
258 |
a6727aa9
|
Stanislav Král
|
"""
|
259 |
|
|
Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
|
260 |
|
|
certificates of the given type can be returned.
|
261 |
|
|
:param cert_type: Type of certificates to be returned
|
262 |
|
|
:return: List of instances of the Certificate class representing all certificates present in the certificate
|
263 |
|
|
repository. An empty list is returned when no certificates are found.
|
264 |
|
|
"""
|
265 |
b3c80ccb
|
David Friesecký
|
|
266 |
|
|
Logger.debug("Function launched.")
|
267 |
|
|
|
268 |
2a90f4fd
|
Stanislav Král
|
return self.certificate_repository.read_all(cert_type)
|
269 |
ef65f488
|
Stanislav Král
|
|
270 |
67723931
|
Captain_Trojan
|
def get_certificates_filter(self, target_types, target_usages, target_cn_substring, page, per_page):
|
271 |
|
|
"""
|
272 |
|
|
Tries to fetch a filtered list of all certificates from the certificate repository.
|
273 |
|
|
:param target_types: certificate types (filter)
|
274 |
|
|
:param target_usages: certificate usages (filter)
|
275 |
|
|
:param target_cn_substring: certificate CN substring (filter)
|
276 |
|
|
:param page: target page
|
277 |
|
|
:param per_page: target page size
|
278 |
|
|
:return: List of instances of the Certificate class representing filtered certificates
|
279 |
|
|
present in the certificate repository. An empty list is returned when no certificates are found.
|
280 |
|
|
"""
|
281 |
|
|
return self.certificate_repository.read_all_filter(target_types, target_usages, target_cn_substring,
|
282 |
|
|
page, per_page)
|
283 |
|
|
|
284 |
ef65f488
|
Stanislav Král
|
def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
|
285 |
4e70d22a
|
Stanislav Král
|
"""
|
286 |
|
|
Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
|
287 |
|
|
root CA certificate is found. Root certificates are excluded from the chain by default.
|
288 |
|
|
:param from_id: ID of the first certificate to be included in the chain of trust
|
289 |
|
|
:param to_id: ID of the last certificate to be included in the chain of trust
|
290 |
|
|
:param exclude_root: a flag indicating whether root CA certificate should be excluded
|
291 |
|
|
:return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
|
292 |
|
|
ID
|
293 |
|
|
"""
|
294 |
b3c80ccb
|
David Friesecký
|
|
295 |
|
|
Logger.debug("Function launched.")
|
296 |
|
|
|
297 |
4e70d22a
|
Stanislav Král
|
# read the first certificate of the chain
|
298 |
ef65f488
|
Stanislav Král
|
start_cert = self.certificate_repository.read(from_id)
|
299 |
|
|
|
300 |
4e70d22a
|
Stanislav Král
|
# if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
|
301 |
ef65f488
|
Stanislav Král
|
if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
|
302 |
|
|
return []
|
303 |
|
|
|
304 |
|
|
current_cert = start_cert
|
305 |
|
|
chain_of_trust = [current_cert]
|
306 |
|
|
|
307 |
|
|
# TODO could possibly be simplified
|
308 |
|
|
if start_cert.type_id == ROOT_CA_ID:
|
309 |
4e70d22a
|
Stanislav Král
|
# the first cert found is a root ca
|
310 |
ef65f488
|
Stanislav Král
|
return chain_of_trust
|
311 |
|
|
|
312 |
|
|
while True:
|
313 |
|
|
parent_cert = self.certificate_repository.read(current_cert.parent_id)
|
314 |
|
|
|
315 |
4e70d22a
|
Stanislav Král
|
# check whether parent certificate exists
|
316 |
|
|
if parent_cert is None:
|
317 |
|
|
break
|
318 |
|
|
|
319 |
|
|
# check whether the found certificate is a root certificate
|
320 |
|
|
if parent_cert.type_id == ROOT_CA_ID:
|
321 |
ef65f488
|
Stanislav Král
|
if not exclude_root:
|
322 |
4e70d22a
|
Stanislav Král
|
# append the found root cert only if root certificates should not be excluded from the CoT
|
323 |
ef65f488
|
Stanislav Král
|
chain_of_trust.append(parent_cert)
|
324 |
|
|
break
|
325 |
|
|
|
326 |
4e70d22a
|
Stanislav Král
|
# append the certificate
|
327 |
ef65f488
|
Stanislav Král
|
chain_of_trust.append(parent_cert)
|
328 |
|
|
|
329 |
4e70d22a
|
Stanislav Král
|
# stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
|
330 |
ef65f488
|
Stanislav Král
|
if parent_cert.certificate_id == to_id:
|
331 |
|
|
break
|
332 |
|
|
|
333 |
|
|
current_cert = parent_cert
|
334 |
|
|
|
335 |
|
|
return chain_of_trust
|
336 |
3d639744
|
Stanislav Král
|
|
337 |
5f8a2c07
|
Captain_Trojan
|
def delete_certificate(self, unique_id):
|
338 |
3d639744
|
Stanislav Král
|
"""
|
339 |
5f8a2c07
|
Captain_Trojan
|
Deletes a certificate. Raises an Exception should any unexpected behavior occur.
|
340 |
3d639744
|
Stanislav Král
|
|
341 |
|
|
:param unique_id: ID of specific certificate
|
342 |
|
|
"""
|
343 |
5f8a2c07
|
Captain_Trojan
|
|
344 |
b3c80ccb
|
David Friesecký
|
Logger.debug("Function launched.")
|
345 |
|
|
|
346 |
5f8a2c07
|
Captain_Trojan
|
to_delete = self.certificate_repository.get_all_descendants_of(unique_id)
|
347 |
|
|
if to_delete is None:
|
348 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"No such certificate found 'ID = {unique_id}'.")
|
349 |
5f8a2c07
|
Captain_Trojan
|
raise CertificateNotFoundException(unique_id)
|
350 |
|
|
|
351 |
|
|
for cert in to_delete:
|
352 |
|
|
try:
|
353 |
|
|
self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED)
|
354 |
|
|
except CertificateAlreadyRevokedException:
|
355 |
b3c80ccb
|
David Friesecký
|
Logger.info(f"Certificate already revoked 'ID = {unique_id}'.")
|
356 |
5f8a2c07
|
Captain_Trojan
|
# TODO log as an info/debug, not warning or above <-- perfectly legal
|
357 |
02954c9d
|
Jan Pašek
|
pass
|
358 |
5f8a2c07
|
Captain_Trojan
|
|
359 |
b3c80ccb
|
David Friesecký
|
if not self.certificate_repository.delete(cert.certificate_id):
|
360 |
|
|
Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.")
|
361 |
|
|
|
362 |
c839facb
|
Captain_Trojan
|
def get_certificates_issued_by(self, issuer_id):
|
363 |
485913d0
|
Captain_Trojan
|
"""
|
364 |
|
|
Returns a list of all children of a certificate identified by an unique_id.
|
365 |
|
|
Raises a DatabaseException should any unexpected behavior occur.
|
366 |
c839facb
|
Captain_Trojan
|
:param issuer_id: target certificate ID
|
367 |
485913d0
|
Captain_Trojan
|
:return: children of unique_id
|
368 |
|
|
"""
|
369 |
b3c80ccb
|
David Friesecký
|
|
370 |
|
|
Logger.debug("Function launched.")
|
371 |
|
|
|
372 |
485913d0
|
Captain_Trojan
|
try:
|
373 |
c839facb
|
Captain_Trojan
|
if self.certificate_repository.read(issuer_id) is None:
|
374 |
|
|
Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
|
375 |
|
|
raise CertificateNotFoundException(issuer_id)
|
376 |
485913d0
|
Captain_Trojan
|
except DatabaseException:
|
377 |
c839facb
|
Captain_Trojan
|
Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
|
378 |
|
|
raise CertificateNotFoundException(issuer_id)
|
379 |
|
|
|
380 |
|
|
return self.certificate_repository.get_all_issued_by(issuer_id)
|
381 |
485913d0
|
Captain_Trojan
|
|
382 |
c839facb
|
Captain_Trojan
|
def get_certificates_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page,
|
383 |
|
|
per_page):
|
384 |
|
|
"""
|
385 |
|
|
Returns a list of all children of a certificate identified by an unique_id.
|
386 |
|
|
Filters the results according to target types, usages, cn substring and pagination.
|
387 |
|
|
Raises a DatabaseException should any unexpected behavior occur.
|
388 |
|
|
:param issuer_id: target certificate ID
|
389 |
|
|
:param target_types: filter of types
|
390 |
|
|
:param target_usages: filter of usages
|
391 |
|
|
:param target_cn_substring: CN substring
|
392 |
|
|
:param page: target page or None
|
393 |
|
|
:param per_page: certs per page or None
|
394 |
|
|
:return: list of certificates (a page if specified)
|
395 |
|
|
"""
|
396 |
485913d0
|
Captain_Trojan
|
|
397 |
c839facb
|
Captain_Trojan
|
Logger.debug("Function launched.")
|
398 |
|
|
|
399 |
|
|
try:
|
400 |
|
|
if self.certificate_repository.read(issuer_id) is None:
|
401 |
|
|
Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
|
402 |
|
|
raise CertificateNotFoundException(issuer_id)
|
403 |
|
|
except DatabaseException:
|
404 |
|
|
Logger.error(f"No such certificate found 'ID = {issuer_id}'.")
|
405 |
|
|
raise CertificateNotFoundException(issuer_id)
|
406 |
485913d0
|
Captain_Trojan
|
|
407 |
67723931
|
Captain_Trojan
|
return self.certificate_repository.get_all_issued_by_filter(issuer_id, target_types, target_usages,
|
408 |
|
|
target_cn_substring, page, per_page)
|
409 |
485913d0
|
Captain_Trojan
|
|
410 |
20b33bd4
|
Jan Pašek
|
def set_certificate_revocation_status(self, id, status, reason="unspecified"):
|
411 |
|
|
"""
|
412 |
|
|
Set certificate status to 'valid' or 'revoked'.
|
413 |
|
|
If the new status is revoked a reason can be provided -> default is unspecified
|
414 |
|
|
:param reason: reason for revocation
|
415 |
|
|
:param id: identifier of the certificate whose status is to be changed
|
416 |
|
|
:param status: new status of the certificate
|
417 |
94f8d5cf
|
Jan Pašek
|
:raises CertificateStatusInvalidException: if status is not valid
|
418 |
|
|
:raises RevocationReasonInvalidException: if reason is not valid
|
419 |
|
|
:raises CertificateNotFoundException: if certificate with given id cannot be found
|
420 |
|
|
:raises CertificateCannotBeSetToValid: if certificate was already revoked and not on hold,
|
421 |
|
|
it cannot be set revalidated
|
422 |
|
|
:raises CertificateAlreadyRevokedException: if caller tries to revoke a certificate that is already revoked
|
423 |
|
|
:raises UnknownException: if the database is corrupted
|
424 |
20b33bd4
|
Jan Pašek
|
"""
|
425 |
b3c80ccb
|
David Friesecký
|
|
426 |
|
|
Logger.debug("Function launched.")
|
427 |
|
|
|
428 |
20b33bd4
|
Jan Pašek
|
if status not in CERTIFICATE_STATES:
|
429 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"Wrong parameter, invalid status '{status}'.")
|
430 |
20b33bd4
|
Jan Pašek
|
raise CertificateStatusInvalidException(status)
|
431 |
|
|
if reason not in CERTIFICATE_REVOCATION_REASONS:
|
432 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"Wrong parameter, invalid reason '{reason}'.")
|
433 |
20b33bd4
|
Jan Pašek
|
raise RevocationReasonInvalidException(reason)
|
434 |
|
|
|
435 |
9e6f791a
|
Jan Pašek
|
# check whether the certificate exists
|
436 |
|
|
certificate = self.certificate_repository.read(id)
|
437 |
|
|
if certificate is None:
|
438 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"No such certificate found 'ID = {id}'.")
|
439 |
9e6f791a
|
Jan Pašek
|
raise CertificateNotFoundException(id)
|
440 |
|
|
|
441 |
9c704fb1
|
Jan Pašek
|
updated = False
|
442 |
20b33bd4
|
Jan Pašek
|
if status == STATUS_VALID:
|
443 |
94f8d5cf
|
Jan Pašek
|
# if the certificate is revoked but the reason is not certificateHold, it cannot be re-validated
|
444 |
|
|
# -> throw an exception
|
445 |
|
|
if certificate.revocation_reason != "" and \
|
446 |
|
|
certificate.revocation_reason != CERTIFICATE_REVOCATION_REASON_HOLD:
|
447 |
|
|
raise CertificateCannotBeSetToValid(certificate.revocation_reason)
|
448 |
9c704fb1
|
Jan Pašek
|
updated = self.certificate_repository.clear_certificate_revocation(id)
|
449 |
20b33bd4
|
Jan Pašek
|
elif status == STATUS_REVOKED:
|
450 |
9e6f791a
|
Jan Pašek
|
# check if the certificate is not revoked already
|
451 |
|
|
revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
|
452 |
|
|
if certificate.certificate_id in [x.certificate_id for x in revoked]:
|
453 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"Certificate already revoked 'ID = {id}'.")
|
454 |
9e6f791a
|
Jan Pašek
|
raise CertificateAlreadyRevokedException(id)
|
455 |
|
|
|
456 |
20b33bd4
|
Jan Pašek
|
revocation_timestamp = int(time.time())
|
457 |
9c704fb1
|
Jan Pašek
|
updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
|
458 |
|
|
|
459 |
|
|
if not updated:
|
460 |
b3c80ccb
|
David Friesecký
|
Logger.error(f"Repository returned 'false' from clear_certificate_revocation() "
|
461 |
|
|
f"or set_certificate_revoked().")
|
462 |
9e6f791a
|
Jan Pašek
|
raise UnknownException("Repository returned 'false' from clear_certificate_revocation() "
|
463 |
|
|
"or set_certificate_revoked().")
|
464 |
20b33bd4
|
Jan Pašek
|
|
465 |
c4ba6bb7
|
Jan Pašek
|
def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
|
466 |
|
|
"""
|
467 |
|
|
Get Subject distinguished name from a Certificate
|
468 |
|
|
:param certificate: certificate instance whose Subject shall be parsed
|
469 |
|
|
:return: instance of Subject class containing resulting distinguished name
|
470 |
|
|
"""
|
471 |
b3c80ccb
|
David Friesecký
|
|
472 |
|
|
Logger.debug("Function launched.")
|
473 |
|
|
|
474 |
894cc2cd
|
David Friesecký
|
subject = Subject(certificate.common_name,
|
475 |
|
|
certificate.country_code,
|
476 |
|
|
certificate.locality,
|
477 |
|
|
certificate.province,
|
478 |
|
|
certificate.organization,
|
479 |
|
|
certificate.organizational_unit,
|
480 |
|
|
certificate.email_address)
|
481 |
c4ba6bb7
|
Jan Pašek
|
return subject
|
482 |
d3bfacfc
|
Stanislav Král
|
|
483 |
|
|
def get_public_key_from_certificate(self, certificate: Certificate):
|
484 |
|
|
"""
|
485 |
|
|
Extracts a public key from the given certificate
|
486 |
|
|
:param certificate: an instance of the Certificate class containing the certificate from which a public key
|
487 |
|
|
should be extracted.
|
488 |
|
|
:return: a string containing the extracted public key in PEM format
|
489 |
|
|
"""
|
490 |
b3c80ccb
|
David Friesecký
|
|
491 |
|
|
Logger.debug("Function launched.")
|
492 |
|
|
|
493 |
d3bfacfc
|
Stanislav Král
|
return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
|
494 |
20b33bd4
|
Jan Pašek
|
|
495 |
a53e5aef
|
Jan Pašek
|
def get_certificate_state(self, id: int) -> str:
|
496 |
|
|
"""
|
497 |
|
|
Check whether the certificate is expired, valid or revoked.
|
498 |
|
|
- in case it's revoked and expired, revoked is returned
|
499 |
|
|
:param id: identifier of the certificate
|
500 |
|
|
:return: certificates state from {valid, revoked, expired}
|
501 |
|
|
:raises CertificateNotFoundException: in case id of non-existing certificate is entered
|
502 |
|
|
"""
|
503 |
4ff15a44
|
Jan Pašek
|
Logger.debug("Function launched.")
|
504 |
c15357a9
|
Jan Pašek
|
status = CERTIFICATE_VALID
|
505 |
|
|
|
506 |
|
|
# Read the selected certificate from the repository
|
507 |
|
|
certificate = self.certificate_repository.read(id)
|
508 |
|
|
if certificate is None:
|
509 |
1fa20e93
|
Stanislav Král
|
Logger.error("Certificate whose details were requested does not exist.")
|
510 |
c15357a9
|
Jan Pašek
|
raise CertificateNotFoundException(id)
|
511 |
|
|
|
512 |
|
|
# check the expiration date using OpenSSL
|
513 |
|
|
if not self.cryptography_service.verify_cert(certificate.pem_data):
|
514 |
|
|
status = CERTIFICATE_EXPIRED
|
515 |
|
|
|
516 |
|
|
# check certificate revocation
|
517 |
|
|
all_revoked_by_parent = self.certificate_repository.get_all_revoked_by(certificate.parent_id)
|
518 |
|
|
all_revoked_by_parent_ids = [cert.certificate_id for cert in all_revoked_by_parent]
|
519 |
|
|
|
520 |
|
|
if id in all_revoked_by_parent_ids:
|
521 |
|
|
status = CERTIFICATE_REVOKED
|
522 |
|
|
|
523 |
|
|
return status
|
524 |
|
|
|
525 |
20b33bd4
|
Jan Pašek
|
def __get_crl_endpoint(self, ca_identifier: int) -> str:
|
526 |
|
|
"""
|
527 |
|
|
Get URL address of CRL distribution endpoint based on
|
528 |
|
|
issuer's ID
|
529 |
|
|
|
530 |
|
|
:param ca_identifier: ID of issuing authority
|
531 |
|
|
:return: CRL endpoint for the given CA
|
532 |
|
|
"""
|
533 |
b3c80ccb
|
David Friesecký
|
|
534 |
|
|
Logger.debug("Function launched.")
|
535 |
|
|
|
536 |
20b33bd4
|
Jan Pašek
|
return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
|
537 |
|
|
|
538 |
|
|
def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
|
539 |
|
|
"""
|
540 |
|
|
Get URL address of OCSP distribution endpoint based on
|
541 |
|
|
issuer's ID
|
542 |
|
|
|
543 |
|
|
:param ca_identifier: ID of issuing authority
|
544 |
|
|
:return: OCSP endpoint for the given CA
|
545 |
|
|
"""
|
546 |
b3c80ccb
|
David Friesecký
|
|
547 |
|
|
Logger.debug("Function launched.")
|
548 |
|
|
|
549 |
20b33bd4
|
Jan Pašek
|
return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
|
550 |
|
|
|
551 |
04805a41
|
Stanislav Král
|
def generate_pkcs_identity(self, certificate: Certificate, cert_key: PrivateKey, identity_name: str, identity_passphrase: str):
|
552 |
1fa20e93
|
Stanislav Král
|
"""
|
553 |
|
|
Generates a PKCS identity of the certificate given by the specified ID while using the private key passed.
|
554 |
|
|
A name of the identity to be used and certificate's passphrase have to be specified as well as the passphrase
|
555 |
|
|
of certificate's private key (if encrypted).
|
556 |
04805a41
|
Stanislav Král
|
:param certificate: certificate to be put into the PKCS identity store
|
557 |
1fa20e93
|
Stanislav Král
|
:param cert_key: key used to sign the given certificate
|
558 |
|
|
:param identity_name: name to be given to the identity to be created
|
559 |
|
|
:param identity_passphrase: passphrase to be used to encrypt the identity
|
560 |
|
|
:return: byte array containing the generated identity (PKCS12 store)
|
561 |
|
|
"""
|
562 |
|
|
Logger.debug("Function launched.")
|
563 |
|
|
|
564 |
|
|
# get the chain of trust of the certificate whose identity should be generated and exclude the certificate
|
565 |
|
|
# whose chain of trust we are querying
|
566 |
04805a41
|
Stanislav Král
|
cot_pem_list = [cert.pem_data for cert in self.get_chain_of_trust(certificate.certificate_id, exclude_root=False)[1:]]
|
567 |
1fa20e93
|
Stanislav Král
|
|
568 |
|
|
return self.cryptography_service.generate_pkcs_identity(certificate.pem_data, cert_key.private_key,
|
569 |
|
|
identity_name,
|
570 |
|
|
identity_passphrase, cot_pem_list, cert_key.password)
|
571 |
|
|
|
572 |
20b33bd4
|
Jan Pašek
|
|
573 |
|
|
class RevocationReasonInvalidException(Exception):
|
574 |
|
|
"""
|
575 |
|
|
Exception that denotes that the caller was trying to revoke
|
576 |
|
|
a certificate using an invalid revocation reason
|
577 |
|
|
"""
|
578 |
|
|
|
579 |
|
|
def __init__(self, reason):
|
580 |
|
|
self.reason = reason
|
581 |
|
|
|
582 |
|
|
def __str__(self):
|
583 |
|
|
return f"Revocation reason '{self.reason}' is not valid."
|
584 |
|
|
|
585 |
|
|
|
586 |
|
|
class CertificateStatusInvalidException(Exception):
|
587 |
|
|
"""
|
588 |
|
|
Exception that denotes that the caller was trying to set
|
589 |
|
|
a certificate to an invalid state
|
590 |
|
|
"""
|
591 |
|
|
|
592 |
|
|
def __init__(self, status):
|
593 |
|
|
self.status = status
|
594 |
|
|
|
595 |
|
|
def __str__(self):
|
596 |
|
|
return f"Certificate status '{self.status}' is not valid."
|
597 |
9c704fb1
|
Jan Pašek
|
|
598 |
|
|
|
599 |
9e6f791a
|
Jan Pašek
|
class CertificateAlreadyRevokedException(Exception):
|
600 |
|
|
"""
|
601 |
|
|
Exception that denotes that the caller was trying to revoke
|
602 |
|
|
a certificate that is already revoked
|
603 |
|
|
"""
|
604 |
|
|
|
605 |
|
|
def __init__(self, id):
|
606 |
|
|
self.id = id
|
607 |
|
|
|
608 |
|
|
def __str__(self):
|
609 |
|
|
return f"Certificate id '{self.id}' is already revoked."
|
610 |
94f8d5cf
|
Jan Pašek
|
|
611 |
|
|
|
612 |
|
|
class CertificateCannotBeSetToValid(Exception):
|
613 |
|
|
"""
|
614 |
|
|
Exception that denotes that the caller was trying to
|
615 |
|
|
set certificate to valid if the certificate was already
|
616 |
|
|
revoked but not certificateHold.
|
617 |
|
|
"""
|
618 |
|
|
|
619 |
|
|
def __init__(self, old_reason):
|
620 |
|
|
self.old_state = old_reason
|
621 |
|
|
|
622 |
|
|
def __str__(self):
|
623 |
|
|
return "Cannot set revoked certificate back to valid when the certificate revocation reason is not " \
|
624 |
|
|
"certificateHold. " \
|
625 |
|
|
f"The revocation reason of the certificate is {self.old_state}"
|
626 |
7a30da1b
|
Stanislav Král
|
|
627 |
|
|
|
628 |
|
|
class InvalidSubjectAttribute(Exception):
|
629 |
|
|
"""
|
630 |
|
|
Exception that denotes that the caller was trying to
|
631 |
|
|
create a certificate while passing a subject with an invalid
|
632 |
|
|
attribute.
|
633 |
|
|
"""
|
634 |
|
|
|
635 |
|
|
def __init__(self, attribute_name, reason):
|
636 |
|
|
self.attribute_name = attribute_name
|
637 |
|
|
self.reason = reason
|
638 |
|
|
|
639 |
|
|
def __str__(self):
|
640 |
|
|
return f"""Subject attribute "{self.attribute_name} is invalid (reason: {self.reason})."""
|