Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 20b33bd4

Přidáno uživatelem Jan Pašek před asi 4 roky(ů)

Re #8571 - certificate_service.py revocation support end tests

Zobrazit rozdíly:

src/services/certificate_service.py
2 2

  
3 3
from injector import inject
4 4

  
5
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID
5
from src.config.configuration import Configuration
6
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \
7
    CERTIFICATE_REVOCATION_REASONS
6 8
from src.dao.certificate_repository import CertificateRepository
7 9
from src.model.certificate import Certificate
8 10
from src.model.private_key import PrivateKey
......
13 15

  
14 16
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
15 17
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
18
CRL_EXTENSION = "crlDistributionPoints=URI:"
19
OCSP_EXTENSION = "authorityInfoAccess=OCSP;URI:"
20
STATUS_REVOKED = "revoked"
21
STATUS_VALID = "valid"
16 22

  
17 23

  
18 24
class CertificateService:
19 25

  
20 26
    @inject
21
    def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository):
27
    def __init__(self, cryptography_service: CryptographyService,
28
                 certificate_repository: CertificateRepository,
29
                 configuration: Configuration):
22 30
        self.cryptography_service = cryptography_service
23 31
        self.certificate_repository = certificate_repository
32
        self.configuration = configuration
24 33

  
25 34
    # TODO usages present in method parameters but not in class diagram
26 35
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
......
38 47
        if usages is None:
39 48
            usages = {}
40 49

  
50
        cert_id = self.certificate_repository.get_next_id()
51
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
52
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
53

  
41 54
        # create a new self signed  certificate
42 55
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
43 56
                                                          extensions=extensions, config=config, days=days)
......
98 111
            usages = {}
99 112

  
100 113
        extensions = extensions + "\n" + CA_EXTENSIONS
114
        # Add CRL and OCSP distribution point to certificate extensions
115
        cert_id = self.certificate_repository.get_next_id()
116
        extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(cert_id)
117
        extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(cert_id)
118

  
101 119
        # TODO implement AIA URI via extensions
102 120
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
103 121
                                                        issuer_key.private_key,
......
248 266
        # TODO delete children?
249 267
        return self.certificate_repository.delete(unique_id)
250 268

  
269
    def set_certificate_revocation_status(self, id, status, reason="unspecified"):
270
        """
271
        Set certificate status to 'valid' or 'revoked'.
272
        If the new status is revoked a reason can be provided -> default is unspecified
273
        :param reason: reason for revocation
274
        :param id: identifier of the certificate whose status is to be changed
275
        :param status: new status of the certificate
276
        """
277
        if status not in CERTIFICATE_STATES:
278
            raise CertificateStatusInvalidException(status)
279
        if reason not in CERTIFICATE_REVOCATION_REASONS:
280
            raise RevocationReasonInvalidException(reason)
281

  
282
        if status == STATUS_VALID:
283
            self.certificate_repository.clear_certificate_revocation(id)
284
        elif status == STATUS_REVOKED:
285
            revocation_timestamp = int(time.time())
286
            self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason)
287

  
251 288
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
252 289
        """
253 290
        Get Subject distinguished name from a Certificate
......
265 302
        :return: a string containing the extracted public key in PEM format
266 303
        """
267 304
        return self.cryptography_service.extract_public_key_from_certificate(certificate.pem_data)
305

  
306
    def __get_crl_endpoint(self, ca_identifier: int) -> str:
307
        """
308
        Get URL address of CRL distribution endpoint based on
309
        issuer's ID
310

  
311
        :param ca_identifier: ID of issuing authority
312
        :return: CRL endpoint for the given CA
313
        """
314
        return self.configuration.base_server_url + "/api/crl/" + str(ca_identifier)
315

  
316
    def __get_ocsp_endpoint(self, ca_identifier: int) -> str:
317
        """
318
        Get URL address of OCSP distribution endpoint based on
319
        issuer's ID
320

  
321
        :param ca_identifier: ID of issuing authority
322
        :return: OCSP endpoint for the given CA
323
        """
324
        return self.configuration.base_server_url + "/api/ocsp/" + str(ca_identifier)
325

  
326

  
327
class RevocationReasonInvalidException(Exception):
328
    """
329
    Exception that denotes that the caller was trying to revoke
330
    a certificate using an invalid revocation reason
331
    """
332

  
333
    def __init__(self, reason):
334
        self.reason = reason
335

  
336
    def __str__(self):
337
        return f"Revocation reason '{self.reason}' is not valid."
338

  
339

  
340
class CertificateStatusInvalidException(Exception):
341
    """
342
    Exception that denotes that the caller was trying to set
343
    a certificate to an invalid state
344
    """
345

  
346
    def __init__(self, status):
347
        self.status = status
348

  
349
    def __str__(self):
350
        return f"Certificate status '{self.status}' is not valid."

Také k dispozici: Unified diff