Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 1fa243ca

Přidáno uživatelem Jan Pašek před asi 4 roky(ů)

Re #8569 Added application initialization and running dependency injection

Zobrazit rozdíly:

src/controllers/certificates_controller.py
4 4
from json import JSONDecodeError
5 5

  
6 6
from flask import request
7
from src.dao.private_key_repository import PrivateKeyRepository
7
from injector import inject
8

  
9
from src.constants import CA_ID, \
10
    SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
11
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
8 12
from src.model.subject import Subject
9 13
from src.services.certificate_service import CertificateService
10
from src.dao.certificate_repository import CertificateRepository        # TODO not the Controller's responsibility.
11
from src.services.cryptography import CryptographyService               # TODO not the Controller's responsibility.
12
from sqlite3 import Connection                                          # TODO not the Controller's responsibility.
13
from src.constants import CA_ID, \
14
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
15
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID     # TODO DATABASE_FILE - not the Controller's
16
                                                                        #  responsibility.
14
#  responsibility.
17 15
from src.services.key_service import KeyService
18 16

  
19 17
TREE_NODE_TYPE_COUNT = 3
......
44 42
C_INTERNAL_SERVER_ERROR = 500
45 43
C_SUCCESS = 200
46 44

  
47
__ = CryptographyService()                                              # TODO not the Controller's responsibility.
48
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO not the Controller's responsibility.
50

  
51 45

  
52 46
class CertController:
53 47
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
54 48
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
55 49

  
56
    @staticmethod
57
    def setup():
58
        """
59
        SQLite3 thread issue hack.
60
        :return:
61
        """
62
        _ = Connection("db/database_sqlite.db")
63
        CERTIFICATE_SERVICE.certificate_repository.connection = _
64
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
65
        KEY_SERVICE.private_key_repository.connection = _
66
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
67

  
68
    @staticmethod
69
    def create_certificate():
50
    @inject
51
    def __init__(self, certificate_service: CertificateService, key_service: KeyService):
52
        self.certificate_service = certificate_service
53
        self.key_service = key_service
54

  
55
    def create_certificate(self):
70 56
        """create new certificate
71 57

  
72 58
        Create a new certificate based on given information
......
76 62

  
77 63
        :rtype: CreatedResponse
78 64
        """
79
        CertController.setup()                                                      # TODO remove after issue fixed
80

  
81 65
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
82 66

  
83 67
        if request.is_json:                                                         # accept JSON only
......
103 87
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
104 88
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
105 89

  
106
            key = KEY_SERVICE.create_new_key()                                      # TODO pass key
90
            key = self.key_service.create_new_key()                                      # TODO pass key
107 91

  
108 92
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
109
                cert = CERTIFICATE_SERVICE.create_root_ca(                          # create a root CA
93
                cert = self.certificate_service.create_root_ca(                          # create a root CA
110 94
                    key,
111 95
                    subject,
112 96
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
113 97
                    days=body[VALIDITY_DAYS]
114 98
                )
115 99
            else:
116
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])              # get base issuer info
100
                issuer = self.certificate_service.get_certificate(body[CA])              # get base issuer info
117 101

  
118 102
                if issuer is None:                                                  # if such issuer does not exist
119
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
103
                    self.key_service.delete_key(key.private_key_id)                      # free
120 104
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
121 105

  
122
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)             # get issuer's key, which must exist
106
                issuer_key = self.key_service.get_key(issuer.private_key_id)             # get issuer's key, which must exist
123 107

  
124 108
                if issuer_key is None:                                              # if it does not
125
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
109
                    self.key_service.delete_key(key.private_key_id)                      # free
126 110
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
127 111

  
128
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
129
                    CERTIFICATE_SERVICE.create_end_cert
112
                f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
113
                    self.certificate_service.create_end_cert
130 114

  
131 115
                # noinspection PyTypeChecker
132 116
                cert = f(                                                           # create inter CA or end cert
......
142 126
                return {"success": True,
143 127
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
144 128
            else:                                                                   # if this fails, then
145
                KEY_SERVICE.delete_key(key.private_key_id)                          # free
129
                self.key_service.delete_key(key.private_key_id)                          # free
146 130
                return {"success": False,                                           # and wonder what the cause is,
147 131
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
148 132
                                                                                    # as obj/None carries only one bit
......
150 134
        else:
151 135
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
152 136

  
153
    @staticmethod
154
    def get_certificate_by_id(id):
137
    def get_certificate_by_id(self, id):
155 138
        """get certificate by ID
156 139

  
157 140
        Get certificate in PEM format by ID
......
161 144

  
162 145
        :rtype: PemResponse
163 146
        """
164
        CertController.setup()                                                      # TODO remove after issue fixed
165

  
166 147
        try:
167 148
            v = int(id)
168 149
        except ValueError:
169 150
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
170 151

  
171
        cert = CERTIFICATE_SERVICE.get_certificate(v)
152
        cert = self.certificate_service.get_certificate(v)
172 153

  
173 154
        if cert is None:
174 155
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
175 156
        else:
176 157
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
177 158

  
178
    @staticmethod
179
    def get_certificate_details_by_id(id):
159
    def get_certificate_details_by_id(self, id):
180 160
        """get certificate's details by ID
181 161

  
182 162
        Get certificate details by ID
......
186 166

  
187 167
        :rtype: CertificateResponse
188 168
        """
189
        CertController.setup()                                                      # TODO remove after issue fixed
190

  
191 169
        try:
192 170
            v = int(id)
193 171
        except ValueError:
194 172
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
195 173

  
196
        cert = CERTIFICATE_SERVICE.get_certificate(v)
174
        cert = self.certificate_service.get_certificate(v)
197 175

  
198 176
        if cert is None:
199 177
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
200 178
        else:
201
            data = CertController.cert_to_dict_full(cert)
179
            data = self.cert_to_dict_full(cert)
202 180
            if data is None:
203 181
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
204 182
            return {"success": True, "data": data}, C_SUCCESS
205 183

  
206
    @staticmethod
207
    def get_certificate_list():
184
    def get_certificate_list(self):
208 185
        """get list of certificates
209 186

  
210 187
        Lists certificates based on provided filtering options
......
214 191

  
215 192
        :rtype: CertificateListResponse
216 193
        """
217
        CertController.setup()                                                      # TODO remove after issue fixed
218

  
219 194
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
220 195

  
221 196
        # the filtering parameter can be read as URL argument or as a request body
......
248 223

  
249 224
                                                                                    # if filtering did not change the
250 225
                                                                                    # targets,
251
            certs = CERTIFICATE_SERVICE.get_certificates()                          # fetch everything
226
            certs = self.certificate_service.get_certificates()                          # fetch everything
252 227
        else:                                                                       # otherwise fetch targets only
253
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
228
            certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
254 229

  
255 230
        if certs is None:
256 231
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
......
259 234
        else:
260 235
            ret = []
261 236
            for c in certs:
262
                data = CertController.cert_to_dict_partial(c)
237
                data = self.cert_to_dict_partial(c)
263 238
                if data is None:
264 239
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
265 240
                ret.append(
......
267 242
                )
268 243
            return {"success": True, "data": ret}, C_SUCCESS
269 244

  
270
    @staticmethod
271
    def get_certificate_root_by_id(id):
245
    def get_certificate_root_by_id(self, id):
272 246
        """get certificate's root of trust chain by ID
273 247

  
274 248
        Get certificate's root of trust chain in PEM format by ID
......
278 252

  
279 253
        :rtype: PemResponse
280 254
        """
281
        CertController.setup()                                                      # TODO remove after issue fixed
282

  
283 255
        try:
284 256
            v = int(id)
285 257
        except ValueError:
286 258
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
287 259

  
288
        cert = CERTIFICATE_SERVICE.get_certificate(v)
260
        cert = self.certificate_service.get_certificate(v)
289 261

  
290 262
        if cert is None:
291 263
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
292 264

  
293
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id, exclude_root=False)
294
        if trust_chain is None or len(trust_chain) is 0:
265
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
266
        if trust_chain is None or len(trust_chain) == 0:
295 267
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
296 268

  
297 269
        return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
298 270

  
299
    @staticmethod
300
    def get_certificate_trust_chain_by_id(id):
271
    def get_certificate_trust_chain_by_id(self, id):
301 272
        """get certificate's trust chain by ID
302 273

  
303 274
        Get certificate trust chain in PEM format by ID
......
307 278

  
308 279
        :rtype: PemResponse
309 280
        """
310
        CertController.setup()                                                      # TODO remove after issue fixed
311 281

  
312 282
        try:
313 283
            v = int(id)
314 284
        except ValueError:
315 285
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
316 286

  
317
        cert = CERTIFICATE_SERVICE.get_certificate(v)
287
        cert = self.certificate_service.get_certificate(v)
318 288

  
319 289
        if cert is None:
320 290
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
......
322 292
        if cert.parent_id is None:
323 293
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
324 294

  
325
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id)
295
        trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id)
326 296

  
327 297
        ret = []
328 298
        for intermediate in trust_chain:
......
330 300

  
331 301
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
332 302

  
333
    @staticmethod
334
    def cert_to_dict_partial(c):
303
    def cert_to_dict_partial(self, c):
335 304
        """
336 305
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
337 306
        :param c: target cert
338 307
        :return: certificate dict (compliant with some parts of the REST API)
339 308
        """
340
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
309
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
341 310
        if c_issuer is None:
342 311
            return None
343 312

  
......
353 322
            }
354 323
        }
355 324

  
356
    @staticmethod
357
    def cert_to_dict_full(c):
325
    def cert_to_dict_full(self, c):
358 326
        """
359 327
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
360 328
        Contains full information.
361 329
        :param c: target cert
362 330
        :return: certificate dict (compliant with some parts of the REST API)
363 331
        """
364
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
365
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
332
        subj = self.certificate_service.get_subject_from_certificate(c)
333
        c_issuer = self.certificate_service.get_certificate(c.parent_id)
366 334
        if c_issuer is None:
367 335
            return None
368 336

  

Také k dispozici: Unified diff