Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 20b33bd4

Přidáno uživatelem Jan Pašek před asi 4 roky(ů)

Re #8571 - certificate_service.py revocation support end tests

Zobrazit rozdíly:

src/services/certificate_service.py
2 2

  
3 3
from injector import inject
4 4

  
5
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
6 8
from src.dao.certificate_repository import CertificateRepository
7 9
from src.model.certificate import Certificate
8 10
from src.model.private_key import PrivateKey
......
13 15

  
14 16
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
15 17
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
18
CRL_EXTENSION = "crlDistributionPoints=URI:"
19
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
20
STATUS_REVOKED = "revoked"
21
STATUS_VALID = "valid"
16 22

  
17 23

  
18 24
class CertificateService:
19 25

  
20 26
    @inject
21
    def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository):
27
    def __init__(self, cryptography_service: CryptographyService,
28
                 certificate_repository: CertificateRepository,
29
                 configuration: Configuration):
22 30
        self.cryptography_service = cryptography_service
23 31
        self.certificate_repository = certificate_repository
32
        self.configuration = configuration
24 33

  
25 34
    # TODO usages present in method parameters but not in class diagram
26 35
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
......
38 47
        if usages is None:
39 48
            usages = {}
40 49

  
50
        cert_id = self.certificate_repository.get_next_id()
51
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
52
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
53

  
41 54
        # create a new self signed  certificate
42 55
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
43 56
                                                          extensions=extensions, config=config, days=days)
......
98 111
            usages = {}
99 112

  
100 113
        extensions = extensions + "\n" + CA_EXTENSIONS
114
        # Add CRL and OCSP distribution point to certificate extensions
115
        cert_id = self.certificate_repository.get_next_id()
116
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
117
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
118

  
101 119
        # TODO implement AIA URI via extensions
102 120
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
103 121
                                                        issuer_key.private_key,
......
248 266
        # TODO delete children?
249 267
        return self.certificate_repository.delete(unique_id)
250 268

  
269
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
270
        """
271
        Set certificate status to 'valid' or 'revoked'.
272
        If the new status is revoked a reason can be provided -> default is unspecified
273
        :param reason: reason for revocation
274
        :param id: identifier of the certificate whose status is to be changed
275
        :param status: new status of the certificate
276
        """
277
        if status not in CERTIFICATE_STATES:
278
            raise CertificateStatusInvalidException(status)
279
        if reason not in CERTIFICATE_REVOCATION_REASONS:
280
            raise RevocationReasonInvalidException(reason)
281

  
282
        if status == STATUS_VALID:
283
            self.certificate_repository.clear_certificate_revocation(id)
284
        elif status == STATUS_REVOKED:
285
            revocation_timestamp = int(time.time())
286
            self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
287

  
251 288
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
252 289
        """
253 290
        Get Subject distinguished name from a Certificate
......
265 302
        :return: a string containing the extracted public key in PEM format
266 303
        """
267 304
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
305

  
306
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
307
        """
308
        Get URL address of CRL distribution endpoint based on
309
        issuer's ID
310

  
311
        :param ca_identifier: ID of issuing authority
312
        :return: CRL endpoint for the given CA
313
        """
314
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
315

  
316
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
317
        """
318
        Get URL address of OCSP distribution endpoint based on
319
        issuer's ID
320

  
321
        :param ca_identifier: ID of issuing authority
322
        :return: OCSP endpoint for the given CA
323
        """
324
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
325

  
326

  
327
class RevocationReasonInvalidException(Exception):
328
    """
329
    Exception that denotes that the caller was trying to revoke
330
    a certificate using an invalid revocation reason
331
    """
332

  
333
    def __init__(self, reason):
334
        self.reason = reason
335

  
336
    def __str__(self):
337
        return f"Revocation reason '{self.reason}' is not valid."
338

  
339

  
340
class CertificateStatusInvalidException(Exception):
341
    """
342
    Exception that denotes that the caller was trying to set
343
    a certificate to an invalid state
344
    """
345

  
346
    def __init__(self, status):
347
        self.status = status
348

  
349
    def __str__(self):
350
        return f"Certificate status '{self.status}' is not valid."
tests/integration_tests/services/certificate_service_test.py
27 27

  
28 28
    # verify that the loaded certificate is a CA
29 29
    cert_loaded_printed = export_crt(cert_loaded.pem_data)
30
    assert """            X509v3 Basic Constraints: critical
31
                CA:TRUE""" in cert_loaded_printed
30
    expected = """            X509v3 Basic Constraints: critical
31
                CA:TRUE""".replace("\r", "").replace("\n", "")
32
    assert expected in cert_loaded_printed.replace("\n", "").replace("\r", "")
32 33

  
33 34
    assert cert.certificate_id == cert_loaded.certificate_id
34 35
    assert cert.common_name == cert_loaded.common_name
......
62 63

  
63 64
    # verify that the loaded certificate is a CA
64 65
    cert_loaded_printed = export_crt(inter_cert_loaded.pem_data)
65
    assert """            X509v3 Basic Constraints: critical
66
                CA:TRUE""" in cert_loaded_printed
66
    expected = """            X509v3 Basic Constraints: critical
67
                CA:TRUE""".replace("\n", "").replace("\r", "")
68
    assert expected in cert_loaded_printed.replace("\n", "").replace("\r", "")
67 69

  
68 70
    assert inter_cert.certificate_id == inter_cert_loaded.certificate_id
69 71
    assert inter_cert.common_name == inter_cert_loaded.common_name
tests/integration_tests/services/conftest.py
4 4

  
5 5
import pytest
6 6

  
7
from src.config.configuration import test_configuration
7
from src.config.configuration import test_configuration, Configuration
8 8
from src.config.connection_provider import ConnectionProvider
9 9
from src.dao.certificate_repository import CertificateRepository
10 10
from src.dao.private_key_repository import PrivateKeyRepository
......
36 36
    return CryptographyService()
37 37

  
38 38

  
39
@pytest.fixture
40
def configuration():
41
    return Configuration()
42

  
43

  
39 44
@pytest.fixture
40 45
def private_key_service(private_key_repository, cryptography_service):
41 46
    return KeyService(cryptography_service, private_key_repository)
42 47

  
43 48

  
44 49
@pytest.fixture
45
def certificate_service(certificate_repository, cryptography_service):
46
    return CertificateService(cryptography_service, certificate_repository)
50
def certificate_service(certificate_repository, cryptography_service, configuration):
51
    return CertificateService(cryptography_service, certificate_repository, configuration)
47 52

  
48 53

  
49 54
@pytest.fixture
......
73 78

  
74 79

  
75 80
@pytest.fixture
76
def certificate_service_unique(certificate_repository_unique, cryptography_service_unique):
77
    return CertificateService(cryptography_service_unique, certificate_repository_unique)
81
def certificate_service_unique(certificate_repository_unique, cryptography_service_unique, configuration):
82
    return CertificateService(cryptography_service_unique, certificate_repository_unique, configuration)
tests/unit_tests/services/cryptography/create_crt_test.py
80 80
    # create a CA using the root CA
81 81
    inter_ca = service.create_crt(Subject(common_name="bar", country="CZ"), inter_key, root_ca, root_key,
82 82
                                  subject_key_pass=inter_key_passphrase, issuer_key_pass=root_key_passphrase,
83
                                  extensions="authorityInfoAccess = caIssuers;URI:bar.cz/baz/cert\nbasicConstraints=critical,CA:TRUE")
83
                                  extensions="authorityInfoAccess=caIssuers;URI:bar.cz/baz/cert\n"
84
                                             "basicConstraints=critical,CA:TRUE\n"
85
                                             "crlDistributionPoints=URI:http://localhost/api/crl/0\n"
86
                                             "authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n")
84 87

  
85 88
    inter_ca_printed = export_crt(inter_ca)
86 89

  
87 90
    # assert fields
88 91
    assert "Issuer: CN = foo" in inter_ca_printed
89 92
    assert "Subject: CN = bar, C = CZ" in inter_ca_printed
93
    assert "X509v3 CRL Distribution Points:" in inter_ca_printed
94
    assert "URI:http://localhost/api/crl/0" in inter_ca_printed
95
    assert "Authority Information Access:" in inter_ca_printed
96
    assert "OCSP - URI:http://localhost/api/ocsp/0" in inter_ca_printed
90 97

  
91 98
    # assert extensions
92 99
    expected_extensions = """        X509v3 extensions:
93
            Authority Information Access: 
94
                CA Issuers - URI:bar.cz/baz/cert
95

  
96 100
            X509v3 Basic Constraints: critical
97
                CA:TRUE"""
101
                CA:TRUE
102
            X509v3 CRL Distribution Points: 
103

  
104
                Full Name:
105
                  URI:http://localhost/api/crl/0
106

  
107
            Authority Information Access: 
108
                OCSP - URI:http://localhost/api/ocsp/0"""
109
    expected_extensions = expected_extensions.replace("\n", "").replace("\r", "")
110
    inter_ca_printed = inter_ca_printed.replace("\n", "").replace("\r", "")
98 111
    assert expected_extensions in inter_ca_printed
tests/unit_tests/services/cryptography/self_signed_cert_test.py
16 16

  
17 17
    [ req ]
18 18
    distinguished_name      = ca_dn                 # DN section
19
    extensions = root_ca_ext
19 20

  
20 21
    [ ca_dn ]
21 22

  
22
    [ root_ca_ext ]
23
    keyUsage                = critical,keyCertSign,cRLSign
24
    basicConstraints        = critical,CA:true
25
    subjectKeyIdentifier    = hash
26
    authorityKeyIdentifier  = keyid:always
27 23
    """
28 24

  
29 25
    cert = service.create_sscrt(Subject(common_name="Topnax",
......
33 29
                                        organization="Mysterious Org.",
34 30
                                        organization_unit="Department of Mysteries",
35 31
                                        email_address="mysterious@box.cz"), private_key, config=config,
36
                                extensions="root_ca_ext", key_pass="foobar")
32
                                extensions="basicConstraints=critical,CA:TRUE\n"
33
                                           "keyUsage=critical,keyCertSign,cRLSign\n"
34
                                           "crlDistributionPoints=URI:http://localhost/api/crl/0\n"
35
                                           "authorityInfoAccess=OCSP;URI:http://localhost/api/ocsp/0\n",
36
                                key_pass="foobar")
37 37

  
38 38
    cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"],
39 39
                                           input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode()
......
41 41
    assert "Certificate Sign, CRL Sign" in cert_printed
42 42
    assert "X509v3 Key Usage: critical" in cert_printed
43 43
    assert "CA:TRUE" in cert_printed
44
    assert "X509v3 CRL Distribution Points:" in cert_printed
45
    assert "URI:http://localhost/api/crl/0" in cert_printed
46
    assert "Authority Information Access:" in cert_printed
47
    assert "OCSP - URI:http://localhost/api/ocsp/0" in cert_printed
44 48

  
45 49
    assert "Issuer: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed
46 50
    assert "Subject: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed
tests/unit_tests/services/cryptography/sign_csr_test.py
91 91

  
92 92
            X509v3 Basic Constraints: critical
93 93
                CA:TRUE"""
94
    expected_extensions = expected_extensions.replace("\n", "").replace("\r", "")
95
    inter_ca_printed = inter_ca_printed.replace("\n", "").replace("\r", "")
94 96
    assert expected_extensions in inter_ca_printed

Také k dispozici: Unified diff