1
|
from datetime import datetime
|
2
|
|
3
|
from injector import inject
|
4
|
|
5
|
from src.dao.certificate_repository import CertificateRepository
|
6
|
from src.dao.private_key_repository import PrivateKeyRepository
|
7
|
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
|
8
|
from src.exceptions.private_key_not_found_exception import PrivateKeyNotFoundException
|
9
|
from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line
|
10
|
from src.services.cryptography import CryptographyService
|
11
|
from src.utils.temporary_file import TemporaryFile
|
12
|
|
13
|
|
14
|
class CrlService:
|
15
|
@inject
|
16
|
def __init__(self,
|
17
|
certificate_repository: CertificateRepository,
|
18
|
key_repository: PrivateKeyRepository,
|
19
|
cryptography_service: CryptographyService
|
20
|
):
|
21
|
self.key_repository = key_repository
|
22
|
self.certificate_repository = certificate_repository
|
23
|
self.cryptography_service = cryptography_service
|
24
|
|
25
|
def create_revoked_index(self, ca_id) -> str:
|
26
|
"""
|
27
|
Queries the certificate repository and looks for all certificates revoked by the certificate authority given
|
28
|
by the passed ID. Found certificates are then put into a string representing the CA's database index file.
|
29
|
|
30
|
:param ca_id: ID of the CA whose revoked certificates should be put into the index file
|
31
|
:return: a str representing the content of a CA index file
|
32
|
"""
|
33
|
# get issuing certificate
|
34
|
certificate = self.certificate_repository.read(ca_id)
|
35
|
if certificate is None:
|
36
|
raise CertificateNotFoundException(ca_id)
|
37
|
|
38
|
# get subject and notAfter of the issuer
|
39
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
40
|
|
41
|
index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
|
42
|
# iterate over revoked certificates of the CA given by an ID
|
43
|
for certificate in self.certificate_repository.get_all_revoked_by(ca_id):
|
44
|
# extract the complete subject information and not_after date field
|
45
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
46
|
|
47
|
line = create_index_file_revoked_line(certificate,
|
48
|
subject,
|
49
|
# parse revocation date from unix timestamp to struct_time
|
50
|
datetime.utcfromtimestamp(int(certificate.revocation_date)).timetuple(),
|
51
|
not_after)
|
52
|
|
53
|
# append it to the list of lines
|
54
|
index_lines.append(line)
|
55
|
|
56
|
# join all lines with a new line
|
57
|
return "\n".join(index_lines)
|
58
|
|
59
|
def generate_crl_response(self, ca_id: int) -> str:
|
60
|
"""
|
61
|
Generate a CRL for the given certificate authority
|
62
|
that contains all revoked certificates
|
63
|
|
64
|
:param ca_id: ID of a CA whose CRL shall be generated
|
65
|
:return: CRL in PEM format
|
66
|
"""
|
67
|
# get cert and check if the requested CA exists and if not throw an exception
|
68
|
cert = self.certificate_repository.read(ca_id)
|
69
|
if cert is None:
|
70
|
raise CertificateNotFoundException(ca_id)
|
71
|
|
72
|
# get key and check if it exists
|
73
|
key = self.key_repository.read(cert.private_key_id)
|
74
|
if key is None:
|
75
|
raise PrivateKeyNotFoundException(ca_id)
|
76
|
|
77
|
# Create an index file and call cryptography service to generate CRL
|
78
|
with TemporaryFile("crl.index", f"{self.create_revoked_index(ca_id)}\n") as index_path:
|
79
|
crl_content = self.cryptography_service.generate_crl(cert, key, index_path)
|
80
|
|
81
|
return crl_content
|