Projekt

Obecné

Profil

Stáhnout (11.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID
4
from src.dao.certificate_repository import CertificateRepository
5
from src.model.certificate import Certificate
6
from src.model.private_key import PrivateKey
7
from src.model.subject import Subject
8
from src.services.cryptography import CryptographyService
9

    
10
import time
11

    
12
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
13
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
14

    
15

    
16
class CertificateService:
17

    
18
    def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository):
19
        self.cryptography_service = cryptography_service
20
        self.certificate_repository = certificate_repository
21

    
22
    # TODO usages present in method parameters but not in class diagram
23
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
24
                       usages=None):
25
        """
26
        Creates a root CA certificate based on the given parameters.
27
        :param key: Private key to be used when generating the certificate
28
        :param subject: Subject to be used put into the certificate
29
        :param config: String containing the configuration to be used
30
        :param extensions: Name of the section in the configuration representing extensions
31
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
32
        :return: An instance of Certificate class representing the generated root CA cert
33
        """
34
        if usages is None:
35
            usages = {}
36

    
37
        # create a new self signed  certificate
38
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
39
                                                          extensions=extensions, config=config)
40
        # specify CA usage
41
        usages[CA_ID] = True
42

    
43
        # wrap into Certificate class
44
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
45
                                            ROOT_CA_ID)
46

    
47
        # store the wrapper into the repository
48
        created_id = self.certificate_repository.create(certificate)
49

    
50
        # assign the generated ID to the inserted certificate
51
        certificate.certificate_id = created_id
52

    
53
        return certificate
54

    
55
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
56
        """
57
        Wraps the given parameters using hte Certificate class. Uses CryptographyService to find out the notBefore and
58
        notAfter fields.
59
        :param cert_pem: PEM of the cert. to be wrapped
60
        :param private_key_id: ID of the private key used to create the given certificate
61
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
62
        :param parent_id: ID of the CA that issued this certificate
63
        :param cert_type: Type of this certificate (see constants.py)
64
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
65
        """
66
        # parse the generated pem for subject and notBefore/notAfter fields
67
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
68
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
69
        # format the parsed date
70
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
71
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
72

    
73
        # create a certificate wrapper
74
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
75
                                  private_key_id, cert_type, parent_id, usages)
76

    
77
        return certificate
78

    
79
    # TODO config parameter present in class diagram but not here (unused)
80
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
81
                  extensions: str = "", days: int = 30, usages=None):
82
        """
83
        Creates an intermediate CA certificate issued by the given parent CA.
84
        :param subject_key: Private key to be used when generating the certificate
85
        :param subject: Subject to be used put into the certificate
86
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
87
        :param issuer_key: PK used to generate the issuer certificate
88
        :param extensions: Extensions to be used when generating the certificate
89
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
90
        :param days: Number of days for which the generated cert. will be considered valid
91
        :return: An instance of Certificate class representing the generated intermediate CA cert
92
        """
93
        if usages is None:
94
            usages = {}
95

    
96
        extensions = extensions + "\n" + CA_EXTENSIONS
97
        # TODO implement AIA URI via extensions
98
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
99
                                                        issuer_key.private_key,
100
                                                        subject_key_pass=subject_key.password,
101
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
102
                                                        days=days)
103

    
104
        # specify CA usage
105
        usages[CA_ID] = True
106

    
107
        # wrap into Certificate class
108
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
109
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
110

    
111
        # parse the generated pem for subject and notBefore/notAfter fields
112
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
113

    
114
        # format the parsed date
115
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
116
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
117

    
118
        # specify CA usage
119
        usages[CA_ID] = True
120

    
121
        # create a certificate wrapper
122
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
123
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
124

    
125
        # store the wrapper into the repository
126
        created_id = self.certificate_repository.create(certificate)
127

    
128
        # assign the generated ID to the inserted certificate
129
        certificate.certificate_id = created_id
130

    
131
        return certificate
132

    
133
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
134
                        issuer_key: PrivateKey,
135
                        extensions: str = "", days: int = 30, usages=None):
136
        """
137
        Creates an end certificate issued by the given parent CA.
138
        :param subject_key: Private key to be used when generating the certificate
139
        :param subject: Subject to be used put into the certificate
140
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
141
        :param issuer_key: PK used to generate the issuer certificate
142
        :param extensions: Extensions to be used when generating the certificate
143
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
144
        :param days: Number of days for which the generated cert. will be considered valid
145
        :return: An instance of Certificate class representing the generated cert
146
        """
147
        if usages is None:
148
            usages = {}
149

    
150
        # generate a new certificate
151
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
152
                                                        issuer_key.private_key,
153
                                                        subject_key_pass=subject_key.password,
154
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
155
                                                        days=days)
156

    
157
        # wrap the generated certificate using Certificate class
158
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
159
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
160

    
161
        created_id = self.certificate_repository.create(certificate)
162

    
163
        certificate.certificate_id = created_id
164

    
165
        return certificate
166

    
167
    def get_certificate(self, unique_id: int) -> Certificate:
168
        """
169
        Tries to fetch a certificate from the certificate repository using a given id.
170
        :param unique_id: ID of the certificate to be fetched
171
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
172
        certificate is not found
173
        """
174
        return self.certificate_repository.read(unique_id)
175

    
176
    def get_certificates(self, cert_type=None) -> List[Certificate]:
177
        """
178
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
179
        certificates of the given type can be returned.
180
        :param cert_type: Type of certificates to be returned
181
        :return: List of instances of the Certificate class representing all certificates present in the certificate
182
        repository. An empty list is returned when no certificates are found.
183
        """
184
        return self.certificate_repository.read_all(cert_type)
185

    
186
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
187
        """
188
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
189
        root CA certificate is found. Root certificates are excluded from the chain by default.
190
        :param from_id: ID of the first certificate to be included in the chain of trust
191
        :param to_id: ID of the last certificate to be included in the chain of trust
192
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
193
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
194
        ID
195
        """
196
        # read the first certificate of the chain
197
        start_cert = self.certificate_repository.read(from_id)
198

    
199
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
200
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
201
            return []
202

    
203
        current_cert = start_cert
204
        chain_of_trust = [current_cert]
205

    
206
        # TODO could possibly be simplified
207
        if start_cert.type_id == ROOT_CA_ID:
208
            # the first cert found is a root ca
209
            return chain_of_trust
210

    
211
        while True:
212
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
213

    
214
            # check whether parent certificate exists
215
            if parent_cert is None:
216
                break
217

    
218
            # check whether the found certificate is a root certificate
219
            if parent_cert.type_id == ROOT_CA_ID:
220
                if not exclude_root:
221
                    # append the found root cert only if root certificates should not be excluded from the CoT
222
                    chain_of_trust.append(parent_cert)
223
                break
224

    
225
            # append the certificate
226
            chain_of_trust.append(parent_cert)
227

    
228
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
229
            if parent_cert.certificate_id == to_id:
230
                break
231

    
232
            current_cert = parent_cert
233

    
234
        return chain_of_trust
235

    
236
    def delete_certificate(self, unique_id) -> bool:
237
        """
238
        Deletes a certificate
239

    
240
        :param unique_id: ID of specific certificate
241

    
242
        :return: `True` when the deletion was successful. `False` in other case
243
        """
244
        # TODO delete children?
245
        return self.certificate_repository.delete(unique_id)
(2-2/4)