Projekt

Obecné

Profil

« Předchozí | Další » 

Revize c4b733ef

Přidáno uživatelem Michal Seják před téměř 4 roky(ů)

Re #8577 - Minor refactoring (crl -> crl_ocsp).

Zobrazit rozdíly:

src/services/crl/ca_index_file_line_generator.py
1
import time
2
from time import struct_time
3

  
4
from src.model.certificate import Certificate
5
from src.model.subject import Subject
6

  
7
SRL_LEN = 8  # number of hex digits in serial number
8
TAB_CHAR = "\t"
9
INDEX_FILE_DATE_ENTRY_FORMAT = "%y%m%d%H%M%SZ"
10

  
11

  
12
def get_index_file_time_entry(date: struct_time):
13
    # convert the time to the format of openssl CA index file
14
    return time.strftime(INDEX_FILE_DATE_ENTRY_FORMAT, date)
15

  
16

  
17
def get_distinguished_name(subject: Subject):
18
    # convert subject class instance to the distinguished name in the openssl CA index file format
19
    return "".join([f"/{key}={value}" if value is not None else "" for key, value in subject.to_dict().items()])
20

  
21

  
22
def create_index_file_revoked_line(revoked_certificate: Certificate, subject: Subject, revocation_date: struct_time,
23
                                   valid_to: struct_time) -> str:
24
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
25
    # index file format
26
    items = [
27
        # certificate status flag (R stands for revoked)
28
        "R",
29
        # followed by the expiration date field
30
        f"{get_index_file_time_entry(valid_to)}",
31
        # followed by the revocation date field
32
        f"{get_index_file_time_entry(revocation_date)},{revoked_certificate.revocation_reason}",
33
        # followed by the serial number of the certificate in hex format
34
        __get_serial(revoked_certificate.certificate_id),
35
        # certificate filename ("unknown" literal used for unknown file names)
36
        "unknown",
37
        # certificate distinguished name
38
        get_distinguished_name(subject)
39
    ]
40

  
41
    return TAB_CHAR.join(items)
42

  
43
def create_index_file_valid_line(certificate: Certificate, subject: Subject, valid_to: struct_time) -> str:
44
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
45
    # index file format
46
    items = [
47
        # certificate status flag (R stands for revoked)
48
        "V",
49
        # followed by the expiration date field
50
        f"{get_index_file_time_entry(valid_to)}",
51
        # followed by the revocation date field
52
        f"",
53
        # followed by the serial number of the certificate in hex format
54
        __get_serial(certificate.certificate_id),
55
        # certificate filename ("unknown" literal used for unknown file names)
56
        "unknown",
57
        # certificate distinguished name
58
        get_distinguished_name(subject)
59
    ]
60

  
61
    return TAB_CHAR.join(items)
62

  
63
def __get_serial(cert_id) -> str:
64
    srl = hex(cert_id).replace("0x", "")
65
    srl = "0"*(SRL_LEN - len(srl)) + srl  # generate exactly SRL_LEN digits
66
    return srl.upper()
src/services/crl/crl_service.py
1
from datetime import datetime
2

  
3
from injector import inject
4

  
5
from src.dao.certificate_repository import CertificateRepository
6
from src.dao.private_key_repository import PrivateKeyRepository
7
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
8
from src.exceptions.private_key_not_found_exception import PrivateKeyNotFoundException
9
from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line
10
from src.services.cryptography import CryptographyService
11
from src.utils.temporary_file import TemporaryFile
12

  
13

  
14
class CrlService:
15
    @inject
16
    def __init__(self,
17
                 certificate_repository: CertificateRepository,
18
                 key_repository: PrivateKeyRepository,
19
                 cryptography_service: CryptographyService
20
                 ):
21
        self.key_repository = key_repository
22
        self.certificate_repository = certificate_repository
23
        self.cryptography_service = cryptography_service
24

  
25
    def create_revoked_index(self, ca_id) -> str:
26
        """
27
        Queries the certificate repository and looks for all certificates revoked by the certificate authority given
28
        by the passed ID. Found certificates are then put into a string representing the CA's database index file.
29
        
30
        :param ca_id: ID of the CA whose revoked certificates should be put into the index file
31
        :return: a str representing the content of a CA index file
32
        """
33
        # get issuing certificate
34
        certificate = self.certificate_repository.read(ca_id)
35
        if certificate is None:
36
            raise CertificateNotFoundException(ca_id)
37

  
38
        # get subject and notAfter of the issuer
39
        subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
40

  
41
        index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
42
        # iterate over revoked certificates of the CA given by an ID
43
        for certificate in self.certificate_repository.get_all_revoked_by(ca_id):
44
            # extract the complete subject information and not_after date field
45
            subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
46

  
47
            line = create_index_file_revoked_line(certificate,
48
                                                  subject,
49
                                                  # parse revocation date from unix timestamp to struct_time
50
                                                  datetime.utcfromtimestamp(int(certificate.revocation_date)).timetuple(),
51
                                                  not_after)
52

  
53
            # append it to the list of lines
54
            index_lines.append(line)
55

  
56
        # join all lines with a new line
57
        return "\n".join(index_lines)
58

  
59
    def create_index(self, ca_id) -> str:
60
        """
61
        Queries the certificate repository and looks for all certificates issued by the certificate authority given
62
        by the passed ID. Found certificates are then put into a string representing the CA's database index file.
63

  
64
        :param ca_id: ID of the CA whose issued (=child) certificates should be put into the index file
65
        :return: a str representing the content of a CA index file
66
        """
67
        # get issuing certificate
68
        certificate = self.certificate_repository.read(ca_id)
69
        if certificate is None:
70
            raise CertificateNotFoundException(ca_id)
71

  
72
        # get subject and notAfter of the issuer
73
        subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
74

  
75
        index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
76
        # iterate over revoked certificates of the CA given by an ID
77
        for certificate in self.certificate_repository.get_all_issued_by(ca_id):
78
            # extract the complete subject information and not_after date field
79
            subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
80
            if len(certificate.revocation_reason) > 0:
81
                line = create_index_file_revoked_line(certificate,
82
                                                      subject,
83
                                                      # parse revocation date from unix timestamp to struct_time
84
                                                      datetime.utcfromtimestamp(
85
                                                          int(certificate.revocation_date)).timetuple(),
86
                                                      not_after)
87
            else:
88
                line = create_index_file_valid_line(certificate, subject, not_after)
89

  
90
            # append it to the list of lines
91
            index_lines.append(line)
92

  
93
        # join all lines with a new line
94
        return "\n".join(index_lines)
95

  
96
    def generate_crl_response(self, ca_id: int) -> str:
97
        """
98
        Generate a CRL for the given certificate authority
99
        that contains all revoked certificates
100

  
101
        :param ca_id: ID of a CA whose CRL shall be generated
102
        :return: CRL in PEM format
103
        """
104
        # get cert and check if the requested CA exists and if not throw an exception
105
        cert = self.certificate_repository.read(ca_id)
106
        if cert is None:
107
            raise CertificateNotFoundException(ca_id)
108

  
109
        # get key and check if it exists
110
        key = self.key_repository.read(cert.private_key_id)
111
        if key is None:
112
            raise PrivateKeyNotFoundException(ca_id)
113

  
114
        # Create an index file and call cryptography service to generate CRL
115
        with TemporaryFile("crl.index", f"{self.create_revoked_index(ca_id)}\n") as index_path:
116
            crl_content = self.cryptography_service.generate_crl(cert, key, index_path)
117

  
118
        return crl_content
119

  
120
    def generate_ocsp_response(self, ca_id: int, der_ocsp_request: bytes):
121
        # get cert and check if the requested CA exists and if not throw an exception
122
        cert = self.certificate_repository.read(ca_id)
123
        if cert is None:
124
            raise CertificateNotFoundException(ca_id)
125

  
126
        # get key and check if it exists
127
        key = self.key_repository.read(cert.private_key_id)
128
        if key is None:
129
            raise PrivateKeyNotFoundException(ca_id)
130

  
131
        # Create an index file and call cryptography service to generate the OCSP response
132
        with TemporaryFile("crl.index", f"{self.create_index(ca_id)}\n") as index_path:
133
            ocsp_content = self.cryptography_service.generate_ocsp(cert, key, index_path, der_ocsp_request)
134

  
135
        return ocsp_content
src/services/crl_ocsp/ca_index_file_line_generator.py
1
import time
2
from time import struct_time
3

  
4
from src.model.certificate import Certificate
5
from src.model.subject import Subject
6

  
7
SRL_LEN = 8  # number of hex digits in serial number
8
TAB_CHAR = "\t"
9
INDEX_FILE_DATE_ENTRY_FORMAT = "%y%m%d%H%M%SZ"
10

  
11

  
12
def get_index_file_time_entry(date: struct_time):
13
    # convert the time to the format of openssl CA index file
14
    return time.strftime(INDEX_FILE_DATE_ENTRY_FORMAT, date)
15

  
16

  
17
def get_distinguished_name(subject: Subject):
18
    # convert subject class instance to the distinguished name in the openssl CA index file format
19
    return "".join([f"/{key}={value}" if value is not None else "" for key, value in subject.to_dict().items()])
20

  
21

  
22
def create_index_file_revoked_line(revoked_certificate: Certificate, subject: Subject, revocation_date: struct_time,
23
                                   valid_to: struct_time) -> str:
24
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
25
    # index file format
26
    items = [
27
        # certificate status flag (R stands for revoked)
28
        "R",
29
        # followed by the expiration date field
30
        f"{get_index_file_time_entry(valid_to)}",
31
        # followed by the revocation date field
32
        f"{get_index_file_time_entry(revocation_date)},{revoked_certificate.revocation_reason}",
33
        # followed by the serial number of the certificate in hex format
34
        __get_serial(revoked_certificate.certificate_id),
35
        # certificate filename ("unknown" literal used for unknown file names)
36
        "unknown",
37
        # certificate distinguished name
38
        get_distinguished_name(subject)
39
    ]
40

  
41
    return TAB_CHAR.join(items)
42

  
43
def create_index_file_valid_line(certificate: Certificate, subject: Subject, valid_to: struct_time) -> str:
44
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
45
    # index file format
46
    items = [
47
        # certificate status flag (R stands for revoked)
48
        "V",
49
        # followed by the expiration date field
50
        f"{get_index_file_time_entry(valid_to)}",
51
        # followed by the revocation date field
52
        f"",
53
        # followed by the serial number of the certificate in hex format
54
        __get_serial(certificate.certificate_id),
55
        # certificate filename ("unknown" literal used for unknown file names)
56
        "unknown",
57
        # certificate distinguished name
58
        get_distinguished_name(subject)
59
    ]
60

  
61
    return TAB_CHAR.join(items)
62

  
63
def __get_serial(cert_id) -> str:
64
    srl = hex(cert_id).replace("0x", "")
65
    srl = "0"*(SRL_LEN - len(srl)) + srl  # generate exactly SRL_LEN digits
66
    return srl.upper()
src/services/crl_ocsp/crl_ocsp_service.py
1
from datetime import datetime
2

  
3
from injector import inject
4

  
5
from src.dao.certificate_repository import CertificateRepository
6
from src.dao.private_key_repository import PrivateKeyRepository
7
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException
8
from src.exceptions.unknown_exception import UnknownException
9
from src.services.crl.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line
10
from src.services.cryptography import CryptographyService
11
from src.utils.temporary_file import TemporaryFile
12

  
13

  
14
class CrlOcspService:
15
    @inject
16
    def __init__(self,
17
                 certificate_repository: CertificateRepository,
18
                 key_repository: PrivateKeyRepository,
19
                 cryptography_service: CryptographyService
20
                 ):
21
        self.key_repository = key_repository
22
        self.certificate_repository = certificate_repository
23
        self.cryptography_service = cryptography_service
24

  
25
    def create_revoked_index(self, ca_id) -> str:
26
        """
27
        Queries the certificate repository and looks for all certificates revoked by the certificate authority given
28
        by the passed ID. Found certificates are then put into a string representing the CA's database index file.
29
        
30
        :param ca_id: ID of the CA whose revoked certificates should be put into the index file
31
        :return: a str representing the content of a CA index file
32
        """
33
        # get issuing certificate
34
        certificate = self.certificate_repository.read(ca_id)
35
        if certificate is None:
36
            raise CertificateNotFoundException(ca_id)
37

  
38
        # get subject and notAfter of the issuer
39
        subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
40

  
41
        index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
42
        # iterate over revoked certificates of the CA given by an ID
43
        for certificate in self.certificate_repository.get_all_revoked_by(ca_id):
44
            # extract the complete subject information and not_after date field
45
            subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
46

  
47
            line = create_index_file_revoked_line(certificate,
48
                                                  subject,
49
                                                  # parse revocation date from unix timestamp to struct_time
50
                                                  datetime.utcfromtimestamp(int(certificate.revocation_date)).timetuple(),
51
                                                  not_after)
52

  
53
            # append it to the list of lines
54
            index_lines.append(line)
55

  
56
        # join all lines with a new line
57
        return "\n".join(index_lines)
58

  
59
    def create_index(self, ca_id) -> str:
60
        """
61
        Queries the certificate repository and looks for all certificates issued by the certificate authority given
62
        by the passed ID. Found certificates are then put into a string representing the CA's database index file.
63

  
64
        :param ca_id: ID of the CA whose issued (=child) certificates should be put into the index file
65
        :return: a str representing the content of a CA index file
66
        """
67
        # get issuing certificate
68
        certificate = self.certificate_repository.read(ca_id)
69
        if certificate is None:
70
            raise CertificateNotFoundException(ca_id)
71

  
72
        # get subject and notAfter of the issuer
73
        subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
74

  
75
        index_lines = [create_index_file_valid_line(certificate, subject, not_after)]
76
        # iterate over revoked certificates of the CA given by an ID
77
        for certificate in self.certificate_repository.get_all_issued_by(ca_id):
78
            # extract the complete subject information and not_after date field
79
            subject, _, not_after = self.cryptography_service.parse_cert_pem(certificate.pem_data)
80
            if len(certificate.revocation_reason) > 0:
81
                line = create_index_file_revoked_line(certificate,
82
                                                      subject,
83
                                                      # parse revocation date from unix timestamp to struct_time
84
                                                      datetime.utcfromtimestamp(
85
                                                          int(certificate.revocation_date)).timetuple(),
86
                                                      not_after)
87
            else:
88
                line = create_index_file_valid_line(certificate, subject, not_after)
89

  
90
            # append it to the list of lines
91
            index_lines.append(line)
92

  
93
        # join all lines with a new line
94
        return "\n".join(index_lines)
95

  
96
    def generate_crl_response(self, ca_id: int) -> str:
97
        """
98
        Generate a CRL for the given certificate authority
99
        that contains all revoked certificates
100

  
101
        :param ca_id: ID of a CA whose CRL shall be generated
102
        :return: CRL in PEM format
103
        """
104
        # get cert and check if the requested CA exists and if not throw an exception
105
        cert, key = self.check_ca_and_key(ca_id)
106

  
107
        # Create an index file and call cryptography service to generate CRL
108
        with TemporaryFile("crl.index", f"{self.create_revoked_index(ca_id)}\n") as index_path:
109
            crl_content = self.cryptography_service.generate_crl(cert, key, index_path)
110

  
111
        return crl_content
112

  
113
    def generate_ocsp_response(self, ca_id: int, der_ocsp_request: bytes):
114
        """
115
        Generates an OCSP Response from a DER encoded OCSP Request given a CA id.
116
        :param ca_id: certificate authority ID
117
        :param der_ocsp_request: DER encoded OCSP Request
118
        :return: DER encoded OCSP Response
119
        """
120
        cert, key = self.check_ca_and_key(ca_id)
121

  
122
        # Create an index file and call cryptography service to generate the OCSP response
123
        with TemporaryFile("crl.index", f"{self.create_index(ca_id)}\n") as index_path:
124
            ocsp_content = self.cryptography_service.generate_ocsp(cert, key, index_path, der_ocsp_request)
125

  
126
        return ocsp_content
127

  
128
    def check_ca_and_key(self, ca_id):
129
        """
130
        Checks whether a certificate with a given ID exists, if it does, returns its contents, including the
131
        associated key.
132
        :param ca_id: certificate authority ID
133
        :return: certificate data & associated key
134
        """
135
        # get cert and check if the requested CA exists and if not throw an exception
136
        cert = self.certificate_repository.read(ca_id)
137
        if cert is None:
138
            raise CertificateNotFoundException(ca_id)
139
        # get key and check if it exists
140
        key = self.key_repository.read(cert.private_key_id)
141
        if key is None:
142
            raise UnknownException("Corrupted database, expected PrivateKey not found.")
143
            # TODO log this
144
        return cert, key

Také k dispozici: Unified diff