Revize 20b33bd4
Přidáno uživatelem Jan Pašek před asi 4 roky(ů)
src/services/certificate_service.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
from injector import inject |
4 | 4 |
|
5 |
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID |
|
5 |
from src.config.configuration import Configuration |
|
6 |
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \ |
|
7 |
CERTIFICATE_REVOCATION_REASONS |
|
6 | 8 |
from src.dao.certificate_repository import CertificateRepository |
7 | 9 |
from src.model.certificate import Certificate |
8 | 10 |
from src.model.private_key import PrivateKey |
... | ... | |
13 | 15 |
|
14 | 16 |
DATE_FORMAT = "%d.%m.%Y %H:%M:%S" |
15 | 17 |
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" |
18 |
CRL_EXTENSION = "crlDistributionPoints=URI:" |
|
19 |
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:" |
|
20 |
STATUS_REVOKED = "revoked" |
|
21 |
STATUS_VALID = "valid" |
|
16 | 22 |
|
17 | 23 |
|
18 | 24 |
class CertificateService: |
19 | 25 |
|
20 | 26 |
@inject |
21 |
def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository): |
|
27 |
def __init__(self, cryptography_service: CryptographyService, |
|
28 |
certificate_repository: CertificateRepository, |
|
29 |
configuration: Configuration): |
|
22 | 30 |
self.cryptography_service = cryptography_service |
23 | 31 |
self.certificate_repository = certificate_repository |
32 |
self.configuration = configuration |
|
24 | 33 |
|
25 | 34 |
# TODO usages present in method parameters but not in class diagram |
26 | 35 |
def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "", |
... | ... | |
38 | 47 |
if usages is None: |
39 | 48 |
usages = {} |
40 | 49 |
|
50 |
cert_id = self.certificate_repository.get_next_id() |
|
51 |
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id) |
|
52 |
extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id) |
|
53 |
|
|
41 | 54 |
# create a new self signed certificate |
42 | 55 |
cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password, |
43 | 56 |
extensions=extensions, config=config, days=days) |
... | ... | |
98 | 111 |
usages = {} |
99 | 112 |
|
100 | 113 |
extensions = extensions + "\n" + CA_EXTENSIONS |
114 |
# Add CRL and OCSP distribution point to certificate extensions |
|
115 |
cert_id = self.certificate_repository.get_next_id() |
|
116 |
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id) |
|
117 |
extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id) |
|
118 |
|
|
101 | 119 |
# TODO implement AIA URI via extensions |
102 | 120 |
cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data, |
103 | 121 |
issuer_key.private_key, |
... | ... | |
248 | 266 |
# TODO delete children? |
249 | 267 |
return self.certificate_repository.delete(unique_id) |
250 | 268 |
|
269 |
def set_certificate_revocation_status(self, id, status, reason="unspecified"): |
|
270 |
""" |
|
271 |
Set certificate status to 'valid' or 'revoked'. |
|
272 |
If the new status is revoked a reason can be provided -> default is unspecified |
|
273 |
:param reason: reason for revocation |
|
274 |
:param id: identifier of the certificate whose status is to be changed |
|
275 |
:param status: new status of the certificate |
|
276 |
""" |
|
277 |
if status not in CERTIFICATE_STATES: |
|
278 |
raise CertificateStatusInvalidException(status) |
|
279 |
if reason not in CERTIFICATE_REVOCATION_REASONS: |
|
280 |
raise RevocationReasonInvalidException(reason) |
|
281 |
|
|
282 |
if status == STATUS_VALID: |
|
283 |
self.certificate_repository.clear_certificate_revocation(id) |
|
284 |
elif status == STATUS_REVOKED: |
|
285 |
revocation_timestamp = int(time.time()) |
|
286 |
self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason) |
|
287 |
|
|
251 | 288 |
def get_subject_from_certificate(self, certificate: Certificate) -> Subject: |
252 | 289 |
""" |
253 | 290 |
Get Subject distinguished name from a Certificate |
... | ... | |
265 | 302 |
:return: a string containing the extracted public key in PEM format |
266 | 303 |
""" |
267 | 304 |
return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data) |
305 |
|
|
306 |
def __get_crl_endpoint(self, ca_identifier: int) -> str: |
|
307 |
""" |
|
308 |
Get URL address of CRL distribution endpoint based on |
|
309 |
issuer's ID |
|
310 |
|
|
311 |
:param ca_identifier: ID of issuing authority |
|
312 |
:return: CRL endpoint for the given CA |
|
313 |
""" |
|
314 |
return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier) |
|
315 |
|
|
316 |
def __get_ocsp_endpoint(self, ca_identifier: int) -> str: |
|
317 |
""" |
|
318 |
Get URL address of OCSP distribution endpoint based on |
|
319 |
issuer's ID |
|
320 |
|
|
321 |
:param ca_identifier: ID of issuing authority |
|
322 |
:return: OCSP endpoint for the given CA |
|
323 |
""" |
|
324 |
return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier) |
|
325 |
|
|
326 |
|
|
327 |
class RevocationReasonInvalidException(Exception): |
|
328 |
""" |
|
329 |
Exception that denotes that the caller was trying to revoke |
|
330 |
a certificate using an invalid revocation reason |
|
331 |
""" |
|
332 |
|
|
333 |
def __init__(self, reason): |
|
334 |
self.reason = reason |
|
335 |
|
|
336 |
def __str__(self): |
|
337 |
return f"Revocation reason '{self.reason}' is not valid." |
|
338 |
|
|
339 |
|
|
340 |
class CertificateStatusInvalidException(Exception): |
|
341 |
""" |
|
342 |
Exception that denotes that the caller was trying to set |
|
343 |
a certificate to an invalid state |
|
344 |
""" |
|
345 |
|
|
346 |
def __init__(self, status): |
|
347 |
self.status = status |
|
348 |
|
|
349 |
def __str__(self): |
|
350 |
return f"Certificate status '{self.status}' is not valid." |
tests/integration_tests/services/certificate_service_test.py | ||
---|---|---|
27 | 27 |
|
28 | 28 |
# verify that the loaded certificate is a CA |
29 | 29 |
cert_loaded_printed = export_crt(cert_loaded.pem_data) |
30 |
assert """ X509v3 Basic Constraints: critical |
|
31 |
CA:TRUE""" in cert_loaded_printed |
|
30 |
expected = """ X509v3 Basic Constraints: critical |
|
31 |
CA:TRUE""".replace("\r", "").replace("\n", "") |
|
32 |
assert expected in cert_loaded_printed.replace("\n", "").replace("\r", "") |
|
32 | 33 |
|
33 | 34 |
assert cert.certificate_id == cert_loaded.certificate_id |
34 | 35 |
assert cert.common_name == cert_loaded.common_name |
... | ... | |
62 | 63 |
|
63 | 64 |
# verify that the loaded certificate is a CA |
64 | 65 |
cert_loaded_printed = export_crt(inter_cert_loaded.pem_data) |
65 |
assert """ X509v3 Basic Constraints: critical |
|
66 |
CA:TRUE""" in cert_loaded_printed |
|
66 |
expected = """ X509v3 Basic Constraints: critical |
|
67 |
CA:TRUE""".replace("\n", "").replace("\r", "") |
|
68 |
assert expected in cert_loaded_printed.replace("\n", "").replace("\r", "") |
|
67 | 69 |
|
68 | 70 |
assert inter_cert.certificate_id == inter_cert_loaded.certificate_id |
69 | 71 |
assert inter_cert.common_name == inter_cert_loaded.common_name |
tests/integration_tests/services/conftest.py | ||
---|---|---|
4 | 4 |
|
5 | 5 |
import pytest |
6 | 6 |
|
7 |
from src.config.configuration import test_configuration |
|
7 |
from src.config.configuration import test_configuration, Configuration
|
|
8 | 8 |
from src.config.connection_provider import ConnectionProvider |
9 | 9 |
from src.dao.certificate_repository import CertificateRepository |
10 | 10 |
from src.dao.private_key_repository import PrivateKeyRepository |
... | ... | |
36 | 36 |
return CryptographyService() |
37 | 37 |
|
38 | 38 |
|
39 |
@pytest.fixture |
|
40 |
def configuration(): |
|
41 |
return Configuration() |
|
42 |
|
|
43 |
|
|
39 | 44 |
@pytest.fixture |
40 | 45 |
def private_key_service(private_key_repository, cryptography_service): |
41 | 46 |
return KeyService(cryptography_service, private_key_repository) |
42 | 47 |
|
43 | 48 |
|
44 | 49 |
@pytest.fixture |
45 |
def certificate_service(certificate_repository, cryptography_service): |
|
46 |
return CertificateService(cryptography_service, certificate_repository) |
|
50 |
def certificate_service(certificate_repository, cryptography_service, configuration):
|
|
51 |
return CertificateService(cryptography_service, certificate_repository, configuration)
|
|
47 | 52 |
|
48 | 53 |
|
49 | 54 |
@pytest.fixture |
... | ... | |
73 | 78 |
|
74 | 79 |
|
75 | 80 |
@pytest.fixture |
76 |
def certificate_service_unique(certificate_repository_unique, cryptography_service_unique): |
|
77 |
return CertificateService(cryptography_service_unique, certificate_repository_unique) |
|
81 |
def certificate_service_unique(certificate_repository_unique, cryptography_service_unique, configuration): |
|
82 |
return CertificateService(cryptography_service_unique, certificate_repository_unique, configuration) |
tests/unit_tests/services/cryptography/create_crt_test.py | ||
---|---|---|
80 | 80 |
# create a CA using the root CA |
81 | 81 |
inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key, |
82 | 82 |
subject_key_pass=inter_key_passphrase, issuer_key_pass=root_key_passphrase, |
83 |
extensions="authorityInfoAccess = caIssuers;URI:bar.cz/baz/cert\nbasicConstraints=critical,CA:TRUE") |
|
83 |
extensions="authorityInfoAccess=caIssuers;URI:bar.cz/baz/cert\n" |
|
84 |
"basicConstraints=critical,CA:TRUE\n" |
|
85 |
"crlDistributionPoints=URI:http://localhost/api/crl/0\n" |
|
86 |
"authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n") |
|
84 | 87 |
|
85 | 88 |
inter_ca_printed = export_crt(inter_ca) |
86 | 89 |
|
87 | 90 |
# assert fields |
88 | 91 |
assert "Issuer: CN = foo" in inter_ca_printed |
89 | 92 |
assert "Subject: CN = bar, C = CZ" in inter_ca_printed |
93 |
assert "X509v3 CRL Distribution Points:" in inter_ca_printed |
|
94 |
assert "URI:http://localhost/api/crl/0" in inter_ca_printed |
|
95 |
assert "Authority Information Access:" in inter_ca_printed |
|
96 |
assert "OCSP - URI:http://localhost/api/ocsp/0" in inter_ca_printed |
|
90 | 97 |
|
91 | 98 |
# assert extensions |
92 | 99 |
expected_extensions = """ X509v3 extensions: |
93 |
Authority Information Access: |
|
94 |
CA Issuers - URI:bar.cz/baz/cert |
|
95 |
|
|
96 | 100 |
X509v3 Basic Constraints: critical |
97 |
CA:TRUE""" |
|
101 |
CA:TRUE |
|
102 |
X509v3 CRL Distribution Points: |
|
103 |
|
|
104 |
Full Name: |
|
105 |
URI:http://localhost/api/crl/0 |
|
106 |
|
|
107 |
Authority Information Access: |
|
108 |
OCSP - URI:http://localhost/api/ocsp/0""" |
|
109 |
expected_extensions = expected_extensions.replace("\n", "").replace("\r", "") |
|
110 |
inter_ca_printed = inter_ca_printed.replace("\n", "").replace("\r", "") |
|
98 | 111 |
assert expected_extensions in inter_ca_printed |
tests/unit_tests/services/cryptography/self_signed_cert_test.py | ||
---|---|---|
16 | 16 |
|
17 | 17 |
[ req ] |
18 | 18 |
distinguished_name = ca_dn # DN section |
19 |
extensions = root_ca_ext |
|
19 | 20 |
|
20 | 21 |
[ ca_dn ] |
21 | 22 |
|
22 |
[ root_ca_ext ] |
|
23 |
keyUsage = critical,keyCertSign,cRLSign |
|
24 |
basicConstraints = critical,CA:true |
|
25 |
subjectKeyIdentifier = hash |
|
26 |
authorityKeyIdentifier = keyid:always |
|
27 | 23 |
""" |
28 | 24 |
|
29 | 25 |
cert = service.create_sscrt(Subject(common_name="Topnax", |
... | ... | |
33 | 29 |
organization="Mysterious Org.", |
34 | 30 |
organization_unit="Department of Mysteries", |
35 | 31 |
email_address="mysterious@box.cz"), private_key, config=config, |
36 |
extensions="root_ca_ext", key_pass="foobar") |
|
32 |
extensions="basicConstraints=critical,CA:TRUE\n" |
|
33 |
"keyUsage=critical,keyCertSign,cRLSign\n" |
|
34 |
"crlDistributionPoints=URI:http://localhost/api/crl/0\n" |
|
35 |
"authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n", |
|
36 |
key_pass="foobar") |
|
37 | 37 |
|
38 | 38 |
cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"], |
39 | 39 |
input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode() |
... | ... | |
41 | 41 |
assert "Certificate Sign, CRL Sign" in cert_printed |
42 | 42 |
assert "X509v3 Key Usage: critical" in cert_printed |
43 | 43 |
assert "CA:TRUE" in cert_printed |
44 |
assert "X509v3 CRL Distribution Points:" in cert_printed |
|
45 |
assert "URI:http://localhost/api/crl/0" in cert_printed |
|
46 |
assert "Authority Information Access:" in cert_printed |
|
47 |
assert "OCSP - URI:http://localhost/api/ocsp/0" in cert_printed |
|
44 | 48 |
|
45 | 49 |
assert "Issuer: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed |
46 | 50 |
assert "Subject: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed |
tests/unit_tests/services/cryptography/sign_csr_test.py | ||
---|---|---|
91 | 91 |
|
92 | 92 |
X509v3 Basic Constraints: critical |
93 | 93 |
CA:TRUE""" |
94 |
expected_extensions = expected_extensions.replace("\n", "").replace("\r", "") |
|
95 |
inter_ca_printed = inter_ca_printed.replace("\n", "").replace("\r", "") |
|
94 | 96 |
assert expected_extensions in inter_ca_printed |
Také k dispozici: Unified diff
Re #8571 - certificate_service.py revocation support end tests