Revize b3c80ccb
Přidáno uživatelem David Friesecký před téměř 4 roky(ů)
src/services/certificate_service.py | ||
---|---|---|
16 | 16 |
|
17 | 17 |
import time |
18 | 18 |
|
19 |
from src.utils.logger import Logger |
|
20 |
|
|
19 | 21 |
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S" |
20 | 22 |
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" |
21 | 23 |
CRL_EXTENSION = "crlDistributionPoints=URI:" |
... | ... | |
47 | 49 |
:param days: Number of days for which the generated cert. will be considered valid |
48 | 50 |
:return: An instance of Certificate class representing the generated root CA cert |
49 | 51 |
""" |
52 |
|
|
53 |
Logger.debug("Function launched.") |
|
54 |
|
|
50 | 55 |
if usages is None: |
51 | 56 |
usages = {} |
52 | 57 |
|
... | ... | |
81 | 86 |
:param cert_type: Type of this certificate (see constants.py) |
82 | 87 |
:return: An instance of the Certificate class wrapping the values passed via method parameters |
83 | 88 |
""" |
89 |
|
|
90 |
Logger.debug("Function launched.") |
|
91 |
|
|
84 | 92 |
# parse the generated pem for subject and notBefore/notAfter fields |
85 | 93 |
# TODO this could be improved in the future in such way that calling openssl is not required to parse the dates |
86 | 94 |
subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem) |
... | ... | |
108 | 116 |
:param days: Number of days for which the generated cert. will be considered valid |
109 | 117 |
:return: An instance of Certificate class representing the generated intermediate CA cert |
110 | 118 |
""" |
119 |
|
|
120 |
Logger.debug("Function launched.") |
|
121 |
|
|
111 | 122 |
if usages is None: |
112 | 123 |
usages = {} |
113 | 124 |
|
... | ... | |
168 | 179 |
:param days: Number of days for which the generated cert. will be considered valid |
169 | 180 |
:return: An instance of Certificate class representing the generated cert |
170 | 181 |
""" |
182 |
|
|
183 |
Logger.debug("Function launched.") |
|
184 |
|
|
171 | 185 |
if usages is None: |
172 | 186 |
usages = {} |
173 | 187 |
|
... | ... | |
204 | 218 |
:return: Instance of the Certificate class containing a certificate with the given id or `None` if such |
205 | 219 |
certificate is not found |
206 | 220 |
""" |
221 |
|
|
222 |
Logger.debug("Function launched.") |
|
223 |
|
|
207 | 224 |
return self.certificate_repository.read(unique_id) |
208 | 225 |
|
209 | 226 |
def get_certificates(self, cert_type=None) -> List[Certificate]: |
... | ... | |
214 | 231 |
:return: List of instances of the Certificate class representing all certificates present in the certificate |
215 | 232 |
repository. An empty list is returned when no certificates are found. |
216 | 233 |
""" |
234 |
|
|
235 |
Logger.debug("Function launched.") |
|
236 |
|
|
217 | 237 |
return self.certificate_repository.read_all(cert_type) |
218 | 238 |
|
219 | 239 |
def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]: |
... | ... | |
226 | 246 |
:return: a list of certificates representing the chain of trust starting with the certificate given by `from_id` |
227 | 247 |
ID |
228 | 248 |
""" |
249 |
|
|
250 |
Logger.debug("Function launched.") |
|
251 |
|
|
229 | 252 |
# read the first certificate of the chain |
230 | 253 |
start_cert = self.certificate_repository.read(from_id) |
231 | 254 |
|
... | ... | |
273 | 296 |
:param unique_id: ID of specific certificate |
274 | 297 |
""" |
275 | 298 |
|
299 |
Logger.debug("Function launched.") |
|
300 |
|
|
276 | 301 |
to_delete = self.certificate_repository.get_all_descendants_of(unique_id) |
277 | 302 |
if to_delete is None: |
303 |
Logger.error(f"No such certificate found 'ID = {unique_id}'.") |
|
278 | 304 |
raise CertificateNotFoundException(unique_id) |
279 | 305 |
|
280 | 306 |
for cert in to_delete: |
281 | 307 |
try: |
282 | 308 |
self.set_certificate_revocation_status(cert.certificate_id, STATUS_REVOKED) |
283 | 309 |
except CertificateAlreadyRevokedException: |
310 |
Logger.info(f"Certificate already revoked 'ID = {unique_id}'.") |
|
284 | 311 |
# TODO log as an info/debug, not warning or above <-- perfectly legal |
285 | 312 |
pass |
286 | 313 |
|
287 |
self.certificate_repository.delete(cert.certificate_id) |
|
288 |
# TODO log if not successfully deleted |
|
314 |
if not self.certificate_repository.delete(cert.certificate_id): |
|
315 |
Logger.error(f"The certificate has not been deleted 'ID = {cert.certificate_id}'.") |
|
316 |
|
|
289 | 317 |
|
290 | 318 |
def get_certificates_issued_by(self, unique_id): |
291 | 319 |
""" |
... | ... | |
294 | 322 |
:param unique_id: target certificate ID |
295 | 323 |
:return: children of unique_id |
296 | 324 |
""" |
325 |
|
|
326 |
Logger.debug("Function launched.") |
|
327 |
|
|
297 | 328 |
try: |
298 | 329 |
if self.certificate_repository.read(unique_id) is None: |
330 |
Logger.error(f"No such certificate found 'ID = {unique_id}'.") |
|
299 | 331 |
raise CertificateNotFoundException(unique_id) |
300 | 332 |
except DatabaseException: |
333 |
Logger.error(f"No such certificate found 'ID = {unique_id}'.") |
|
301 | 334 |
raise CertificateNotFoundException(unique_id) |
302 | 335 |
|
303 | 336 |
return self.certificate_repository.get_all_issued_by(unique_id) |
... | ... | |
310 | 343 |
:param id: identifier of the certificate whose status is to be changed |
311 | 344 |
:param status: new status of the certificate |
312 | 345 |
""" |
346 |
|
|
347 |
Logger.debug("Function launched.") |
|
348 |
|
|
313 | 349 |
if status not in CERTIFICATE_STATES: |
350 |
Logger.error(f"Wrong parameter, invalid status '{status}'.") |
|
314 | 351 |
raise CertificateStatusInvalidException(status) |
315 | 352 |
if reason not in CERTIFICATE_REVOCATION_REASONS: |
353 |
Logger.error(f"Wrong parameter, invalid reason '{reason}'.") |
|
316 | 354 |
raise RevocationReasonInvalidException(reason) |
317 | 355 |
|
318 | 356 |
# check whether the certificate exists |
319 | 357 |
certificate = self.certificate_repository.read(id) |
320 | 358 |
if certificate is None: |
359 |
Logger.error(f"No such certificate found 'ID = {id}'.") |
|
321 | 360 |
raise CertificateNotFoundException(id) |
322 | 361 |
|
323 | 362 |
updated = False |
... | ... | |
327 | 366 |
# check if the certificate is not revoked already |
328 | 367 |
revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id) |
329 | 368 |
if certificate.certificate_id in [x.certificate_id for x in revoked]: |
369 |
Logger.error(f"Certificate already revoked 'ID = {id}'.") |
|
330 | 370 |
raise CertificateAlreadyRevokedException(id) |
331 | 371 |
|
332 | 372 |
revocation_timestamp = int(time.time()) |
333 | 373 |
updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason) |
334 | 374 |
|
335 | 375 |
if not updated: |
336 |
# TODO log this |
|
376 |
Logger.error(f"Repository returned 'false' from clear_certificate_revocation() " |
|
377 |
f"or set_certificate_revoked().") |
|
337 | 378 |
raise UnknownException("Repository returned 'false' from clear_certificate_revocation() " |
338 | 379 |
"or set_certificate_revoked().") |
339 | 380 |
|
... | ... | |
343 | 384 |
:param certificate: certificate instance whose Subject shall be parsed |
344 | 385 |
:return: instance of Subject class containing resulting distinguished name |
345 | 386 |
""" |
387 |
|
|
388 |
Logger.debug("Function launched.") |
|
389 |
|
|
346 | 390 |
(subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data) |
347 | 391 |
return subject |
348 | 392 |
|
... | ... | |
353 | 397 |
should be extracted. |
354 | 398 |
:return: a string containing the extracted public key in PEM format |
355 | 399 |
""" |
400 |
|
|
401 |
Logger.debug("Function launched.") |
|
402 |
|
|
356 | 403 |
return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data) |
357 | 404 |
|
358 | 405 |
def __get_crl_endpoint(self, ca_identifier: int) -> str: |
... | ... | |
363 | 410 |
:param ca_identifier: ID of issuing authority |
364 | 411 |
:return: CRL endpoint for the given CA |
365 | 412 |
""" |
413 |
|
|
414 |
Logger.debug("Function launched.") |
|
415 |
|
|
366 | 416 |
return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier) |
367 | 417 |
|
368 | 418 |
def __get_ocsp_endpoint(self, ca_identifier: int) -> str: |
... | ... | |
373 | 423 |
:param ca_identifier: ID of issuing authority |
374 | 424 |
:return: OCSP endpoint for the given CA |
375 | 425 |
""" |
426 |
|
|
427 |
Logger.debug("Function launched.") |
|
428 |
|
|
376 | 429 |
return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier) |
377 | 430 |
|
378 | 431 |
|
Také k dispozici: Unified diff
Re #8570 - Added logs