1 |
ca31a7f7
|
Stanislav Král
|
from datetime import datetime
|
2 |
7313994f
|
Stanislav Král
|
|
3 |
|
|
from injector import inject
|
4 |
|
|
|
5 |
|
|
from src.dao.certificate_repository import CertificateRepository
|
6 |
94e89bb1
|
Jan Pašek
|
from src.dao.private_key_repository import PrivateKeyRepository
|
7 |
0fd6d825
|
Jan Pašek
|
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
|
8 |
94e89bb1
|
Jan Pašek
|
from src.exceptions.private_key_not_found_exception import PrivateKeyNotFoundException
|
9 |
75ebc6fc
|
Jan Pašek
|
from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line
|
10 |
7313994f
|
Stanislav Král
|
from src.services.cryptography import CryptographyService
|
11 |
0fd6d825
|
Jan Pašek
|
from src.utils.temporary_file import TemporaryFile
|
12 |
7313994f
|
Stanislav Král
|
|
13 |
|
|
|
14 |
|
|
class CrlService:
|
15 |
|
|
@inject
|
16 |
|
|
def __init__(self,
|
17 |
|
|
certificate_repository: CertificateRepository,
|
18 |
94e89bb1
|
Jan Pašek
|
key_repository: PrivateKeyRepository,
|
19 |
7313994f
|
Stanislav Král
|
cryptography_service: CryptographyService
|
20 |
|
|
):
|
21 |
94e89bb1
|
Jan Pašek
|
self.key_repository = key_repository
|
22 |
7313994f
|
Stanislav Král
|
self.certificate_repository = certificate_repository
|
23 |
|
|
self.cryptography_service = cryptography_service
|
24 |
|
|
|
25 |
|
|
def create_revoked_index(self, ca_id) -> str:
|
26 |
|
|
"""
|
27 |
|
|
Queries the certificate repository and looks for all certificates revoked by the certificate authority given
|
28 |
|
|
by the passed ID. Found certificates are then put into a string representing the CA's database index file.
|
29 |
|
|
|
30 |
|
|
:param ca_id: ID of the CA whose revoked certificates should be put into the index file
|
31 |
|
|
:return: a str representing the content of a CA index file
|
32 |
|
|
"""
|
33 |
75ebc6fc
|
Jan Pašek
|
# get issuing certificate
|
34 |
|
|
certificate = self.certificate_repository.read(ca_id)
|
35 |
|
|
if certificate is None:
|
36 |
|
|
raise CertificateNotFoundException(ca_id)
|
37 |
7313994f
|
Stanislav Král
|
|
38 |
75ebc6fc
|
Jan Pašek
|
# get subject and notAfter of the issuer
|
39 |
|
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
40 |
|
|
|
41 |
|
|
index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
|
42 |
7313994f
|
Stanislav Král
|
# iterate over revoked certificates of the CA given by an ID
|
43 |
|
|
for certificate in self.certificate_repository.get_all_revoked_by(ca_id):
|
44 |
|
|
# extract the complete subject information and not_after date field
|
45 |
|
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
46 |
ca31a7f7
|
Stanislav Král
|
|
47 |
7313994f
|
Stanislav Král
|
line = create_index_file_revoked_line(certificate,
|
48 |
|
|
subject,
|
49 |
ca31a7f7
|
Stanislav Král
|
# parse revocation date from unix timestamp to struct_time
|
50 |
|
|
datetime.utcfromtimestamp(int(certificate.revocation_date)).timetuple(),
|
51 |
7313994f
|
Stanislav Král
|
not_after)
|
52 |
|
|
|
53 |
|
|
# append it to the list of lines
|
54 |
|
|
index_lines.append(line)
|
55 |
|
|
|
56 |
|
|
# join all lines with a new line
|
57 |
|
|
return "\n".join(index_lines)
|
58 |
0fd6d825
|
Jan Pašek
|
|
59 |
cc271e04
|
Captain_Trojan
|
def create_index(self, ca_id) -> str:
|
60 |
|
|
"""
|
61 |
|
|
Queries the certificate repository and looks for all certificates issued by the certificate authority given
|
62 |
|
|
by the passed ID. Found certificates are then put into a string representing the CA's database index file.
|
63 |
|
|
|
64 |
|
|
:param ca_id: ID of the CA whose issued (=child) certificates should be put into the index file
|
65 |
|
|
:return: a str representing the content of a CA index file
|
66 |
|
|
"""
|
67 |
|
|
# get issuing certificate
|
68 |
|
|
certificate = self.certificate_repository.read(ca_id)
|
69 |
|
|
if certificate is None:
|
70 |
|
|
raise CertificateNotFoundException(ca_id)
|
71 |
|
|
|
72 |
|
|
# get subject and notAfter of the issuer
|
73 |
|
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
74 |
|
|
|
75 |
|
|
index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
|
76 |
|
|
# iterate over revoked certificates of the CA given by an ID
|
77 |
|
|
for certificate in self.certificate_repository.get_all_issued_by(ca_id):
|
78 |
|
|
# extract the complete subject information and not_after date field
|
79 |
|
|
subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
|
80 |
|
|
if len(certificate.revocation_reason) > 0:
|
81 |
|
|
line = create_index_file_revoked_line(certificate,
|
82 |
|
|
subject,
|
83 |
|
|
# parse revocation date from unix timestamp to struct_time
|
84 |
|
|
datetime.utcfromtimestamp(
|
85 |
|
|
int(certificate.revocation_date)).timetuple(),
|
86 |
|
|
not_after)
|
87 |
|
|
else:
|
88 |
|
|
line = create_index_file_valid_line(certificate, subject, not_after)
|
89 |
|
|
|
90 |
|
|
# append it to the list of lines
|
91 |
|
|
index_lines.append(line)
|
92 |
|
|
|
93 |
|
|
# join all lines with a new line
|
94 |
|
|
return "\n".join(index_lines)
|
95 |
|
|
|
96 |
0fd6d825
|
Jan Pašek
|
def generate_crl_response(self, ca_id: int) -> str:
|
97 |
|
|
"""
|
98 |
|
|
Generate a CRL for the given certificate authority
|
99 |
|
|
that contains all revoked certificates
|
100 |
|
|
|
101 |
|
|
:param ca_id: ID of a CA whose CRL shall be generated
|
102 |
|
|
:return: CRL in PEM format
|
103 |
|
|
"""
|
104 |
94e89bb1
|
Jan Pašek
|
# get cert and check if the requested CA exists and if not throw an exception
|
105 |
|
|
cert = self.certificate_repository.read(ca_id)
|
106 |
|
|
if cert is None:
|
107 |
0fd6d825
|
Jan Pašek
|
raise CertificateNotFoundException(ca_id)
|
108 |
|
|
|
109 |
94e89bb1
|
Jan Pašek
|
# get key and check if it exists
|
110 |
|
|
key = self.key_repository.read(cert.private_key_id)
|
111 |
|
|
if key is None:
|
112 |
|
|
raise PrivateKeyNotFoundException(ca_id)
|
113 |
|
|
|
114 |
0fd6d825
|
Jan Pašek
|
# Create an index file and call cryptography service to generate CRL
|
115 |
94e89bb1
|
Jan Pašek
|
with TemporaryFile("crl.index", f"{self.create_revoked_index(ca_id)}\n") as index_path:
|
116 |
|
|
crl_content = self.cryptography_service.generate_crl(cert, key, index_path)
|
117 |
0fd6d825
|
Jan Pašek
|
|
118 |
|
|
return crl_content
|
119 |
cc271e04
|
Captain_Trojan
|
|
120 |
|
|
def generate_ocsp_response(self, ca_id: int, der_ocsp_request: bytes):
|
121 |
|
|
# get cert and check if the requested CA exists and if not throw an exception
|
122 |
|
|
cert = self.certificate_repository.read(ca_id)
|
123 |
|
|
if cert is None:
|
124 |
|
|
raise CertificateNotFoundException(ca_id)
|
125 |
|
|
|
126 |
|
|
# get key and check if it exists
|
127 |
|
|
key = self.key_repository.read(cert.private_key_id)
|
128 |
|
|
if key is None:
|
129 |
|
|
raise PrivateKeyNotFoundException(ca_id)
|
130 |
|
|
|
131 |
|
|
# Create an index file and call cryptography service to generate the OCSP response
|
132 |
|
|
with TemporaryFile("crl.index", f"{self.create_index(ca_id)}\n") as index_path:
|
133 |
|
|
ocsp_content = self.cryptography_service.generate_ocsp(cert, key, index_path, der_ocsp_request)
|
134 |
|
|
|
135 |
|
|
return ocsp_content
|