Projekt

Obecné

Profil

Stáhnout (12.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from injector import inject
4

    
5
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID
6
from src.dao.certificate_repository import CertificateRepository
7
from src.model.certificate import Certificate
8
from src.model.private_key import PrivateKey
9
from src.model.subject import Subject
10
from src.services.cryptography import CryptographyService
11

    
12
import time
13

    
14
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
15
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
16

    
17

    
18
class CertificateService:
19

    
20
    @inject
21
    def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository):
22
        self.cryptography_service = cryptography_service
23
        self.certificate_repository = certificate_repository
24

    
25
    # TODO usages present in method parameters but not in class diagram
26
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
27
                       usages=None, days=30):
28
        """
29
        Creates a root CA certificate based on the given parameters.
30
        :param key: Private key to be used when generating the certificate
31
        :param subject: Subject to be used put into the certificate
32
        :param config: String containing the configuration to be used
33
        :param extensions: Name of the section in the configuration representing extensions
34
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
35
        :param days: Number of days for which the generated cert. will be considered valid
36
        :return: An instance of Certificate class representing the generated root CA cert
37
        """
38
        if usages is None:
39
            usages = {}
40

    
41
        # create a new self signed  certificate
42
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
43
                                                          extensions=extensions, config=config, days=days)
44
        # specify CA usage
45
        usages[CA_ID] = True
46

    
47
        # wrap into Certificate class
48
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0,
49
                                            ROOT_CA_ID)
50

    
51
        # store the wrapper into the repository
52
        created_id = self.certificate_repository.create(certificate)
53

    
54
        # assign the generated ID to the inserted certificate
55
        certificate.certificate_id = created_id
56

    
57
        return certificate
58

    
59
    def __create_wrapper(self, cert_pem, private_key_id, usages, parent_id, cert_type):
60
        """
61
        Wraps the given parameters using the Certificate class. Uses CryptographyService to find out the notBefore and
62
        notAfter fields.
63
        :param cert_pem: PEM of the cert. to be wrapped
64
        :param private_key_id: ID of the private key used to create the given certificate
65
        :param usages: A dictionary containing usages of the generated certificate generated (see constants.py)
66
        :param parent_id: ID of the CA that issued this certificate
67
        :param cert_type: Type of this certificate (see constants.py)
68
        :return: An instance of the Certificate class wrapping the values passed  via method parameters
69
        """
70
        # parse the generated pem for subject and notBefore/notAfter fields
71
        # TODO this could be improved in the future in such way that calling openssl is not required to parse the dates
72
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
73
        # format the parsed date
74
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
75
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
76

    
77
        # create a certificate wrapper
78
        certificate = Certificate(-1, subj.common_name, not_before_formatted, not_after_formatted, cert_pem,
79
                                  private_key_id, cert_type, parent_id, usages)
80

    
81
        return certificate
82

    
83
    # TODO config parameter present in class diagram but not here (unused)
84
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
85
                  extensions: str = "", days: int = 30, usages=None):
86
        """
87
        Creates an intermediate CA certificate issued by the given parent CA.
88
        :param subject_key: Private key to be used when generating the certificate
89
        :param subject: Subject to be used put into the certificate
90
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
91
        :param issuer_key: PK used to generate the issuer certificate
92
        :param extensions: Extensions to be used when generating the certificate
93
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
94
        :param days: Number of days for which the generated cert. will be considered valid
95
        :return: An instance of Certificate class representing the generated intermediate CA cert
96
        """
97
        if usages is None:
98
            usages = {}
99

    
100
        extensions = extensions + "\n" + CA_EXTENSIONS
101
        # TODO implement AIA URI via extensions
102
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
103
                                                        issuer_key.private_key,
104
                                                        subject_key_pass=subject_key.password,
105
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
106
                                                        days=days)
107

    
108
        # specify CA usage
109
        usages[CA_ID] = True
110

    
111
        # wrap into Certificate class
112
        self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
113
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
114

    
115
        # parse the generated pem for subject and notBefore/notAfter fields
116
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
117

    
118
        # format the parsed date
119
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
120
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
121

    
122
        # specify CA usage
123
        usages[CA_ID] = True
124

    
125
        # create a certificate wrapper
126
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
127
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
128

    
129
        # store the wrapper into the repository
130
        created_id = self.certificate_repository.create(certificate)
131

    
132
        # assign the generated ID to the inserted certificate
133
        certificate.certificate_id = created_id
134

    
135
        return certificate
136

    
137
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
138
                        issuer_key: PrivateKey,
139
                        extensions: str = "", days: int = 30, usages=None):
140
        """
141
        Creates an end certificate issued by the given parent CA.
142
        :param subject_key: Private key to be used when generating the certificate
143
        :param subject: Subject to be used put into the certificate
144
        :param issuer_cert: Issuer certificate that will sign the CSR required to create an intermediate CA
145
        :param issuer_key: PK used to generate the issuer certificate
146
        :param extensions: Extensions to be used when generating the certificate
147
        :param usages: A dictionary containing usages of the certificate to be generated (see constants.py)
148
        :param days: Number of days for which the generated cert. will be considered valid
149
        :return: An instance of Certificate class representing the generated cert
150
        """
151
        if usages is None:
152
            usages = {}
153

    
154
        # generate a new certificate
155
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
156
                                                        issuer_key.private_key,
157
                                                        subject_key_pass=subject_key.password,
158
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
159
                                                        days=days)
160

    
161
        # wrap the generated certificate using Certificate class
162
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, usages,
163
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
164

    
165
        created_id = self.certificate_repository.create(certificate)
166

    
167
        certificate.certificate_id = created_id
168

    
169
        return certificate
170

    
171
    def get_certificate(self, unique_id: int) -> Certificate:
172
        """
173
        Tries to fetch a certificate from the certificate repository using a given id.
174
        :param unique_id: ID of the certificate to be fetched
175
        :return: Instance of the Certificate class containing a certificate with the given id or `None` if such
176
        certificate is not found
177
        """
178
        return self.certificate_repository.read(unique_id)
179

    
180
    def get_certificates(self, cert_type=None) -> List[Certificate]:
181
        """
182
        Tries to fetch a list of all certificates from the certificate repository. Using the `cert_type` parameter only
183
        certificates of the given type can be returned.
184
        :param cert_type: Type of certificates to be returned
185
        :return: List of instances of the Certificate class representing all certificates present in the certificate
186
        repository. An empty list is returned when no certificates are found.
187
        """
188
        return self.certificate_repository.read_all(cert_type)
189

    
190
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
191
        """
192
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
193
        root CA certificate is found. Root certificates are excluded from the chain by default.
194
        :param from_id: ID of the first certificate to be included in the chain of trust
195
        :param to_id: ID of the last certificate to be included in the chain of trust
196
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
197
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
198
        ID
199
        """
200
        # read the first certificate of the chain
201
        start_cert = self.certificate_repository.read(from_id)
202

    
203
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
204
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
205
            return []
206

    
207
        current_cert = start_cert
208
        chain_of_trust = [current_cert]
209

    
210
        # TODO could possibly be simplified
211
        if start_cert.type_id == ROOT_CA_ID:
212
            # the first cert found is a root ca
213
            return chain_of_trust
214

    
215
        while True:
216
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
217

    
218
            # check whether parent certificate exists
219
            if parent_cert is None:
220
                break
221

    
222
            # check whether the found certificate is a root certificate
223
            if parent_cert.type_id == ROOT_CA_ID:
224
                if not exclude_root:
225
                    # append the found root cert only if root certificates should not be excluded from the CoT
226
                    chain_of_trust.append(parent_cert)
227
                break
228

    
229
            # append the certificate
230
            chain_of_trust.append(parent_cert)
231

    
232
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
233
            if parent_cert.certificate_id == to_id:
234
                break
235

    
236
            current_cert = parent_cert
237

    
238
        return chain_of_trust
239

    
240
    def delete_certificate(self, unique_id) -> bool:
241
        """
242
        Deletes a certificate
243

    
244
        :param unique_id: ID of specific certificate
245

    
246
        :return: `True` when the deletion was successful. `False` in other case
247
        """
248
        # TODO delete children?
249
        return self.certificate_repository.delete(unique_id)
250

    
251
    def get_subject_from_certificate(self, certificate: Certificate) -> Subject:
252
        """
253
        Get Subject distinguished name from a Certificate
254
        :param certificate: certificate instance whose Subject shall be parsed
255
        :return: instance of Subject class containing resulting distinguished name
256
        """
257
        (subject, _, _) = self.cryptography_service.parse_cert_pem(certificate.pem_data)
258
        return subject
(2-2/4)