Projekt

Obecné

Profil

Stáhnout (8.33 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import List
2

    
3
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID
4
from src.dao.certificate_repository import CertificateRepository
5
from src.model.certificate import Certificate
6
from src.model.private_key import PrivateKey
7
from src.model.subject import Subject
8
from src.services.cryptography import CryptographyService
9

    
10
import time
11

    
12
DATE_FORMAT = "%d.%m.%Y %H:%M:%S"
13
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
14

    
15

    
16
class CertificateService:
17

    
18
    def __init__(self, cryptography_service: CryptographyService, certificate_repository: CertificateRepository):
19
        self.cryptography_service = cryptography_service
20
        self.certificate_repository = certificate_repository
21

    
22
    # TODO usages present in method parameters but not in class diagram
23
    def create_root_ca(self, key: PrivateKey, subject: Subject, extensions: str = "", config: str = "",
24
                       usages=None):
25
        if usages is None:
26
            usages = {}
27

    
28
        # create a new self signed  certificate
29
        cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password,
30
                                                          extensions=extensions, config=config)
31
        # specify CA usage
32
        usages[CA_ID] = True
33

    
34
        # wrap into Certificate class
35
        certificate = self.__create_wrapper(cert_pem, key.private_key_id, subject.common_name, usages, 0,
36
                                            ROOT_CA_ID)
37

    
38
        # store the wrapper into the repository
39
        created_id = self.certificate_repository.create(certificate)
40

    
41
        # assign the generated ID to the inserted certificate
42
        certificate.certificate_id = created_id
43

    
44
        return certificate
45

    
46
    def __create_wrapper(self, cert_pem, private_key_id, common_name, usages, parent_id, cert_type):
47
        # parse the generated pem for subject and notBefore/notAfter fields
48
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
49
        # format the parsed date
50
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
51
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
52

    
53
        # create a certificate wrapper
54
        certificate = Certificate(-1, common_name, not_before_formatted, not_after_formatted, cert_pem,
55
                                  private_key_id, cert_type, parent_id, usages)
56

    
57
        return certificate
58

    
59
    # TODO config parameter present in class diagram but not here (unused)
60
    def create_ca(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate, issuer_key: PrivateKey,
61
                  extensions: str = "", days: int = 30, usages=None):
62
        if usages is None:
63
            usages = {}
64

    
65
        extensions = extensions + "\n" + CA_EXTENSIONS
66
        # TODO implement AIA URI via extensions
67
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
68
                                                        issuer_key.private_key,
69
                                                        subject_key_pass=subject_key.password,
70
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
71
                                                        days=days)
72

    
73
        # specify CA usage
74
        usages[CA_ID] = True
75

    
76
        # wrap into Certificate class
77
        self.__create_wrapper(cert_pem, subject_key.private_key_id, subject.common_name, usages,
78
                              issuer_cert.certificate_id, INTERMEDIATE_CA_ID)
79

    
80
        # parse the generated pem for subject and notBefore/notAfter fields
81
        subj, not_before, not_after = self.cryptography_service.parse_cert_pem(cert_pem)
82

    
83
        # format the parsed date
84
        not_before_formatted = time.strftime(DATE_FORMAT, not_before)
85
        not_after_formatted = time.strftime(DATE_FORMAT, not_after)
86

    
87
        # specify CA usage
88
        usages[CA_ID] = True
89

    
90
        # create a certificate wrapper
91
        certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem,
92
                                  subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages)
93

    
94
        # store the wrapper into the repository
95
        created_id = self.certificate_repository.create(certificate)
96

    
97
        # assign the generated ID to the inserted certificate
98
        certificate.certificate_id = created_id
99

    
100
        return certificate
101

    
102
    def create_end_cert(self, subject_key: PrivateKey, subject: Subject, issuer_cert: Certificate,
103
                        issuer_key: PrivateKey,
104
                        extensions: str = "", days: int = 30, usages=None):
105
        if usages is None:
106
            usages = {}
107

    
108
        # generate a new certificate
109
        cert_pem = self.cryptography_service.create_crt(subject, subject_key.private_key, issuer_cert.pem_data,
110
                                                        issuer_key.private_key,
111
                                                        subject_key_pass=subject_key.password,
112
                                                        issuer_key_pass=issuer_key.password, extensions=extensions,
113
                                                        days=days)
114

    
115
        # wrap the generated certificate using Certificate class
116
        certificate = self.__create_wrapper(cert_pem, subject_key.private_key_id, subject.common_name, usages,
117
                                            issuer_cert.certificate_id, CERTIFICATE_ID)
118

    
119
        created_id = self.certificate_repository.create(certificate)
120

    
121
        certificate.certificate_id = created_id
122

    
123
        return certificate
124

    
125
    def get_certificate(self, unique_id: int) -> Certificate:
126
        return self.certificate_repository.read(unique_id)
127

    
128
    def get_certificates(self, cert_type=None) -> List[Certificate]:
129
        return self.certificate_repository.read_all(cert_type)
130

    
131
    def get_chain_of_trust(self, from_id: int, to_id: int = -1, exclude_root=True) -> List[Certificate]:
132
        """
133
        Traverses the certificate hierarchy tree upwards till a certificate with the `to_id` ID is found or till a
134
        root CA certificate is found. Root certificates are excluded from the chain by default.
135
        :param from_id: ID of the first certificate to be included in the chain of trust
136
        :param to_id: ID of the last certificate to be included in the chain of trust
137
        :param exclude_root: a flag indicating whether root CA certificate should be excluded
138
        :return: a list of certificates representing the chain of trust starting with the certificate given by `from_id`
139
        ID
140
        """
141
        # read the first certificate of the chain
142
        start_cert = self.certificate_repository.read(from_id)
143

    
144
        # if no cert is found or the current cert is root CA and root CAs should be excluded, then return an empty list
145
        if start_cert is None or (start_cert.type_id == ROOT_CA_ID and exclude_root):
146
            return []
147

    
148
        current_cert = start_cert
149
        chain_of_trust = [current_cert]
150

    
151
        # TODO could possibly be simplified
152
        if start_cert.type_id == ROOT_CA_ID:
153
            # the first cert found is a root ca
154
            return chain_of_trust
155

    
156
        while True:
157
            parent_cert = self.certificate_repository.read(current_cert.parent_id)
158

    
159
            # check whether parent certificate exists
160
            if parent_cert is None:
161
                break
162

    
163
            # check whether the found certificate is a root certificate
164
            if parent_cert.type_id == ROOT_CA_ID:
165
                if not exclude_root:
166
                    # append the found root cert only if root certificates should not be excluded from the CoT
167
                    chain_of_trust.append(parent_cert)
168
                break
169

    
170
            # append the certificate
171
            chain_of_trust.append(parent_cert)
172

    
173
            # stop iterating over certificates if the id of the found certificate matches `to_id` method parameter
174
            if parent_cert.certificate_id == to_id:
175
                break
176

    
177
            current_cert = parent_cert
178

    
179
        return chain_of_trust
180

    
181
    def delete_certificate(self, unique_id):
182
        """
183
        Deletes a certificate
184

    
185
        :param unique_id: ID of specific certificate
186

    
187
        :return: the result of whether the deletion was successful
188
        """
189
        # TODO delete children?
190
        return self.certificate_repository.delete(unique_id)
(2-2/4)