Projekt

Obecné

Profil

Stáhnout (16.3 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
from datetime import datetime
3
from itertools import chain
4
from flask import request
5
from src.dao.private_key_repository import PrivateKeyRepository
6
from src.model.subject import Subject
7
from src.services.certificate_service import CertificateService
8
from src.dao.certificate_repository import CertificateRepository        # TODO not the Controller's responsibility.
9
from src.services.cryptography import CryptographyService               # TODO not the Controller's responsibility.
10
from sqlite3 import Connection                                          # TODO not the Controller's responsibility.
11
from src.constants import CA_ID, \
12
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
13
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID     # TODO DATABASE_FILE - not the Controller's
14
                                                                        #  responsibility.
15
from src.services.key_service import KeyService
16

    
17
TREE_NODE_TYPE_COUNT = 3
18

    
19
FILTERING = "filtering"
20
ISSUER = "issuer"
21
US = "usage"
22
NOT_AFTER = "notAfter"
23
NOT_BEFORE = "notBefore"
24
COMMON_NAME = "CN"
25
ID = "id"
26
USAGE = "usage"
27
SUBJECT = "subject"
28
VALIDITY_DAYS = "validityDays"
29
CA = "CA"
30

    
31
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
32
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
33
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38

    
39
C_CREATED_SUCCESSFULLY = 201
40
C_BAD_REQUEST = 400
41
C_NO_DATA = 205                                                         # TODO related to 204 issue
42
C_INTERNAL_SERVER_ERROR = 500
43
C_SUCCESS = 200
44

    
45
__ = CryptographyService()                                              # TODO not the Controller's responsibility.
46
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
47
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO not the Controller's responsibility.
48

    
49

    
50
class CertController:
51
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
52
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
53

    
54
    @staticmethod
55
    def setup():
56
        """
57
        SQLite3 thread issue hack.
58
        :return:
59
        """
60
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
61
        CERTIFICATE_SERVICE.certificate_repository.connection = _
62
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
63
        KEY_SERVICE.private_key_repository.connection = _
64
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
65

    
66
    @staticmethod
67
    def create_certificate():
68
        """create new certificate
69

    
70
        Create a new certificate based on given information
71

    
72
        :param body: Certificate data to be created
73
        :type body: dict | bytes
74

    
75
        :rtype: CreatedResponse
76
        """
77
        CertController.setup()                                                      # TODO remove after issue fixed
78

    
79
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
80

    
81
        if request.is_json:                                                         # accept JSON only
82
            body = request.get_json()
83
            if not all(k in body for k in required_keys):                           # verify that all keys are present
84
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
85

    
86
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
87
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
88

    
89
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
90

    
91
            if subject is None:                                                     # if the format is incorrect
92
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
93

    
94
            usages_dict = {}
95

    
96
            if not isinstance(body[USAGE], dict):                                   # type checking
97
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
98

    
99
            for k, v in body[USAGE].items():                                        # for each usage
100
                if k not in CertController.KEY_MAP:                                 # check that it is a valid usage
101
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
102
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
103

    
104
            key = KEY_SERVICE.create_new_key()                                      # TODO pass key
105

    
106
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
107
                cert = CERTIFICATE_SERVICE.create_root_ca(                          # create a root CA
108
                    key,
109
                    subject,
110
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
111
                    days=body[VALIDITY_DAYS]
112
                )
113
            else:
114
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])              # get base issuer info
115

    
116
                if issuer is None:                                                  # if such issuer does not exist
117
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
118
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
119

    
120
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)             # get issuer's key, which must exist
121

    
122
                if issuer_key is None:                                              # if it does not
123
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
124
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
125

    
126
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
127
                    CERTIFICATE_SERVICE.create_end_cert
128

    
129
                # noinspection PyTypeChecker
130
                cert = f(                                                           # create inter CA or end cert
131
                    key,                                                            # according to whether 'CA' is among
132
                    subject,                                                        # the usages' fields
133
                    issuer,
134
                    issuer_key,
135
                    usages=usages_dict,
136
                    days=body[VALIDITY_DAYS]
137
                )
138

    
139
            if cert is not None:
140
                return {"success": True,
141
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
142
            else:                                                                   # if this fails, then
143
                KEY_SERVICE.delete_key(key.private_key_id)                          # free
144
                return {"success": False,                                           # and wonder what the cause is,
145
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
146
                                                                                    # as obj/None carries only one bit
147
                                                                                    # of error information
148
        else:
149
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
150

    
151
    @staticmethod
152
    def get_certificate_by_id(id):
153
        """get certificate by ID
154

    
155
        Get certificate in PEM format by ID
156

    
157
        :param id: ID of a certificate to be queried
158
        :type id: dict | bytes
159

    
160
        :rtype: PemResponse
161
        """
162
        CertController.setup()                                                      # TODO remove after issue fixed
163

    
164
        try:
165
            v = int(id)
166
        except ValueError:
167
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
168

    
169
        cert = CERTIFICATE_SERVICE.get_certificate(v)
170

    
171
        if cert is None:
172
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
173
        else:
174
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
175

    
176
    @staticmethod
177
    def get_certificate_details_by_id(id):
178
        """get certificate's details by ID
179

    
180
        Get certificate details by ID
181

    
182
        :param id: ID of a certificate whose details are to be queried
183
        :type id: dict | bytes
184

    
185
        :rtype: CertificateResponse
186
        """
187
        CertController.setup()                                                      # TODO remove after issue fixed
188

    
189
        try:
190
            v = int(id)
191
        except ValueError:
192
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
193

    
194
        cert = CERTIFICATE_SERVICE.get_certificate(v)
195

    
196
        if cert is None:
197
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
198
        else:
199
            data = CertController.cert_to_dict_full(cert)
200
            if data is None:
201
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
202
            return {"success": True, "data": data}, C_SUCCESS
203

    
204
    @staticmethod
205
    def get_certificate_list():
206
        """get list of certificates
207

    
208
        Lists certificates based on provided filtering options
209

    
210
        :param filtering: Filter certificate type to be queried
211
        :type filtering: dict | bytes
212

    
213
        :rtype: CertificateListResponse
214
        """
215
        CertController.setup()                                                      # TODO remove after issue fixed
216

    
217
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
218

    
219
        # the filtering parameter can be read as URL argument or as a request body
220
        if request.is_json or "filtering" in request.args.keys():                   # if the request carries JSON data
221
            if request.is_json:
222
                data = request.get_json()                                           # get it
223
            else:
224
                data = {"filtering": json.loads(request.args["filtering"])}
225
            if FILTERING in data:                                                   # if the 'filtering' field exists
226
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
227
                    if CA in data[FILTERING]:                                       # containing 'CA'
228
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
229
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
230
                                targets.remove(CERTIFICATE_ID)
231
                            else:
232
                                targets.remove(ROOT_CA_ID)
233
                                targets.remove(INTERMEDIATE_CA_ID)
234
                        else:
235
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
236
                else:
237
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
238

    
239
        if len(targets) == TREE_NODE_TYPE_COUNT:                                    # = 3 -> root node,
240
                                                                                    # intermediate node,
241
                                                                                    # or leaf node
242

    
243
                                                                                    # if filtering did not change the
244
                                                                                    # targets,
245
            certs = CERTIFICATE_SERVICE.get_certificates()                          # fetch everything
246
        else:                                                                       # otherwise fetch targets only
247
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
248

    
249
        if certs is None:
250
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
251
        elif len(certs) == 0:
252
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
253
        else:
254
            ret = []
255
            for c in certs:
256
                data = CertController.cert_to_dict_partial(c)
257
                if data is None:
258
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
259
                ret.append(
260
                    data
261
                )
262
            return {"success": True, "data": ret}, C_SUCCESS
263

    
264
    @staticmethod
265
    def get_certificate_root_by_id(id):
266
        """get certificate's root of trust chain by ID
267

    
268
        Get certificate's root of trust chain in PEM format by ID
269

    
270
        :param id: ID of a child certificate whose root is to be queried
271
        :type id: dict | bytes
272

    
273
        :rtype: PemResponse
274
        """
275
        CertController.setup()                                                      # TODO remove after issue fixed
276

    
277
        try:
278
            v = int(id)
279
        except ValueError:
280
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
281

    
282
        cert = CERTIFICATE_SERVICE.get_certificate(v)
283

    
284
        if cert is None:
285
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
286

    
287
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id, exclude_root=False)
288
        if trust_chain is None or len(trust_chain) is 0:
289
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
290

    
291
        return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
292

    
293
    @staticmethod
294
    def get_certificate_trust_chain_by_id(id):
295
        """get certificate's trust chain by ID
296

    
297
        Get certificate trust chain in PEM format by ID
298

    
299
        :param id: ID of a child certificate whose chain is to be queried
300
        :type id: dict | bytes
301

    
302
        :rtype: PemResponse
303
        """
304
        CertController.setup()                                                      # TODO remove after issue fixed
305

    
306
        try:
307
            v = int(id)
308
        except ValueError:
309
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
310

    
311
        cert = CERTIFICATE_SERVICE.get_certificate(v)
312

    
313
        if cert is None:
314
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
315

    
316
        if cert.parent_id is None:
317
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
318

    
319
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id)
320

    
321
        ret = []
322
        for intermediate in trust_chain:
323
            ret.append(intermediate.pem_data)
324

    
325
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
326

    
327
    @staticmethod
328
    def cert_to_dict_partial(c):
329
        """
330
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
331
        :param c: target cert
332
        :return: certificate dict (compliant with some parts of the REST API)
333
        """
334
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
335
        if c_issuer is None:
336
            return None
337

    
338
        return {
339
            ID: c.certificate_id,
340
            COMMON_NAME: c.common_name,
341
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
342
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
343
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
344
            ISSUER: {
345
                ID: c_issuer.certificate_id,
346
                COMMON_NAME: c_issuer.common_name
347
            }
348
        }
349

    
350
    @staticmethod
351
    def cert_to_dict_full(c):
352
        """
353
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
354
        Contains full information.
355
        :param c: target cert
356
        :return: certificate dict (compliant with some parts of the REST API)
357
        """
358
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
359
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
360
        if c_issuer is None:
361
            return None
362

    
363
        return {
364
            SUBJECT: subj.to_dict(),
365
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
366
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
367
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
368
            CA: c_issuer.certificate_id
369
        }
(2-2/2)