Projekt

Obecné

Profil

Stáhnout (16.2 KB) Statistiky
| Větev: | Tag: | Revize:
1 5b57121e Captain_Trojan
from datetime import datetime
2
from itertools import chain
3
from flask import request
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7 9247d70a Captain_Trojan
from src.dao.certificate_repository import CertificateRepository        # TODO not the Controller's responsibility.
8
from src.services.cryptography import CryptographyService               # TODO not the Controller's responsibility.
9
from sqlite3 import Connection                                          # TODO not the Controller's responsibility.
10
from src.constants import CA_ID, \
11 5b57121e Captain_Trojan
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12 9247d70a Captain_Trojan
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID     # TODO DATABASE_FILE - not the Controller's
13
                                                                        #  responsibility.
14 5b57121e Captain_Trojan
from src.services.key_service import KeyService
15
16 9247d70a Captain_Trojan
TREE_NODE_TYPE_COUNT = 3
17
18 5b57121e Captain_Trojan
FILTERING = "filtering"
19
ISSUER = "issuer"
20
US = "usage"
21
NOT_AFTER = "notAfter"
22
NOT_BEFORE = "notBefore"
23
COMMON_NAME = "CN"
24
ID = "id"
25 9247d70a Captain_Trojan
USAGE = "usage"
26
SUBJECT = "subject"
27
VALIDITY_DAYS = "validityDays"
28
CA = "CA"
29
30
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
31 d53c2fdc Captain_Trojan
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
32 5b57121e Captain_Trojan
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
33
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
34
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
35
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
36
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
37
38 9247d70a Captain_Trojan
C_CREATED_SUCCESSFULLY = 201
39
C_BAD_REQUEST = 400
40
C_NO_DATA = 205                                                         # TODO related to 204 issue
41
C_INTERNAL_SERVER_ERROR = 500
42
C_SUCCESS = 200
43 5b57121e Captain_Trojan
44 9247d70a Captain_Trojan
__ = CryptographyService()                                              # TODO not the Controller's responsibility.
45
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
46
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO not the Controller's responsibility.
47 5b57121e Captain_Trojan
48
49
class CertController:
50 5b6d9513 Captain_Trojan
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
51
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
52 5b57121e Captain_Trojan
53
    @staticmethod
54
    def setup():
55
        """
56
        SQLite3 thread issue hack.
57
        :return:
58
        """
59
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
60
        CERTIFICATE_SERVICE.certificate_repository.connection = _
61
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
62
        KEY_SERVICE.private_key_repository.connection = _
63
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
64
65
    @staticmethod
66 9247d70a Captain_Trojan
    def create_certificate():
67 5b57121e Captain_Trojan
        """create new certificate
68
69 9247d70a Captain_Trojan
        Create a new certificate based on given information
70 5b57121e Captain_Trojan
71
        :param body: Certificate data to be created
72
        :type body: dict | bytes
73
74
        :rtype: CreatedResponse
75
        """
76 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
77 5b57121e Captain_Trojan
78 9247d70a Captain_Trojan
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
79 5b57121e Captain_Trojan
80 9247d70a Captain_Trojan
        if request.is_json:                                                         # accept JSON only
81 5b57121e Captain_Trojan
            body = request.get_json()
82 9247d70a Captain_Trojan
            if not all(k in body for k in required_keys):                           # verify that all keys are present
83
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
84 5b57121e Captain_Trojan
85 9247d70a Captain_Trojan
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
86
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
87 5b57121e Captain_Trojan
88 9247d70a Captain_Trojan
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
89 5b57121e Captain_Trojan
90 9247d70a Captain_Trojan
            if subject is None:                                                     # if the format is incorrect
91
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
92 5b57121e Captain_Trojan
93 5b6d9513 Captain_Trojan
            usages_dict = {}
94 5b57121e Captain_Trojan
95 9247d70a Captain_Trojan
            if not isinstance(body[USAGE], dict):                                   # type checking
96
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
97 5b57121e Captain_Trojan
98 9247d70a Captain_Trojan
            for k, v in body[USAGE].items():                                        # for each usage
99
                if k not in CertController.KEY_MAP:                                 # check that it is a valid usage
100
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
101
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
102 5b57121e Captain_Trojan
103 9247d70a Captain_Trojan
            key = KEY_SERVICE.create_new_key()                                      # TODO pass key
104 5b57121e Captain_Trojan
105 493022a0 Jan Pašek
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
106 9247d70a Captain_Trojan
                cert = CERTIFICATE_SERVICE.create_root_ca(                          # create a root CA
107 5b57121e Captain_Trojan
                    key,
108
                    subject,
109 9247d70a Captain_Trojan
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
110 5b57121e Captain_Trojan
                    days=body[VALIDITY_DAYS]
111
                )
112
            else:
113 9247d70a Captain_Trojan
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])              # get base issuer info
114 5b57121e Captain_Trojan
115 9247d70a Captain_Trojan
                if issuer is None:                                                  # if such issuer does not exist
116
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
117
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
118 5b57121e Captain_Trojan
119 9247d70a Captain_Trojan
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)             # get issuer's key, which must exist
120 5b57121e Captain_Trojan
121 9247d70a Captain_Trojan
                if issuer_key is None:                                              # if it does not
122
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
123
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
124 5b57121e Captain_Trojan
125
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
126
                    CERTIFICATE_SERVICE.create_end_cert
127
128 9247d70a Captain_Trojan
                # noinspection PyTypeChecker
129
                cert = f(                                                           # create inter CA or end cert
130
                    key,                                                            # according to whether 'CA' is among
131
                    subject,                                                        # the usages' fields
132 5b57121e Captain_Trojan
                    issuer,
133
                    issuer_key,
134
                    usages=usages_dict,
135
                    days=body[VALIDITY_DAYS]
136
                )
137
138
            if cert is not None:
139
                return {"success": True,
140 9247d70a Captain_Trojan
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
141
            else:                                                                   # if this fails, then
142
                KEY_SERVICE.delete_key(key.private_key_id)                          # free
143
                return {"success": False,                                           # and wonder what the cause is,
144
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
145
                                                                                    # as obj/None carries only one bit
146
                                                                                    # of error information
147 5b57121e Captain_Trojan
        else:
148 9247d70a Captain_Trojan
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
149 5b57121e Captain_Trojan
150
    @staticmethod
151 9247d70a Captain_Trojan
    def get_certificate_by_id(id):
152 5b57121e Captain_Trojan
        """get certificate by ID
153
154 9247d70a Captain_Trojan
        Get certificate in PEM format by ID
155 5b57121e Captain_Trojan
156
        :param id: ID of a certificate to be queried
157
        :type id: dict | bytes
158
159
        :rtype: PemResponse
160
        """
161 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
162 5b57121e Captain_Trojan
163 fb987403 Captain_Trojan
        try:
164
            v = int(id)
165
        except ValueError:
166 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
167 fb987403 Captain_Trojan
168
        cert = CERTIFICATE_SERVICE.get_certificate(v)
169
170
        if cert is None:
171 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
172 fb987403 Captain_Trojan
        else:
173 9247d70a Captain_Trojan
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
174 5b57121e Captain_Trojan
175
    @staticmethod
176 9247d70a Captain_Trojan
    def get_certificate_details_by_id(id):
177 5b57121e Captain_Trojan
        """get certificate's details by ID
178
179 9247d70a Captain_Trojan
        Get certificate details by ID
180 5b57121e Captain_Trojan
181
        :param id: ID of a certificate whose details are to be queried
182
        :type id: dict | bytes
183
184
        :rtype: CertificateResponse
185
        """
186 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
187 5b57121e Captain_Trojan
188 5b6d9513 Captain_Trojan
        try:
189
            v = int(id)
190
        except ValueError:
191 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
192 5b6d9513 Captain_Trojan
193
        cert = CERTIFICATE_SERVICE.get_certificate(v)
194
195
        if cert is None:
196 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
197 5b6d9513 Captain_Trojan
        else:
198
            data = CertController.cert_to_dict_full(cert)
199
            if data is None:
200 9247d70a Captain_Trojan
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
201
            return {"success": True, "data": data}, C_SUCCESS
202 5b57121e Captain_Trojan
203
    @staticmethod
204 9247d70a Captain_Trojan
    def get_certificate_list():
205 5b57121e Captain_Trojan
        """get list of certificates
206
207 9247d70a Captain_Trojan
        Lists certificates based on provided filtering options
208 5b57121e Captain_Trojan
209
        :param filtering: Filter certificate type to be queried
210
        :type filtering: dict | bytes
211
212
        :rtype: CertificateListResponse
213
        """
214 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
215
216
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
217
        if request.is_json:                                                         # if the request carries JSON data
218
            data = request.get_json()                                               # get it
219
            if FILTERING in data:                                                   # if the 'filtering' field exists
220
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
221
                    if CA in data[FILTERING]:                                       # containing 'CA'
222
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
223
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
224 5b57121e Captain_Trojan
                                targets.remove(CERTIFICATE_ID)
225
                            else:
226
                                targets.remove(ROOT_CA_ID)
227
                                targets.remove(INTERMEDIATE_CA_ID)
228
                        else:
229 9247d70a Captain_Trojan
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
230 5b57121e Captain_Trojan
                else:
231 9247d70a Captain_Trojan
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
232 5b57121e Captain_Trojan
233 9247d70a Captain_Trojan
        if len(targets) == TREE_NODE_TYPE_COUNT:                                    # = 3 -> root node,
234
                                                                                    # intermediate node,
235
                                                                                    # or leaf node
236
237
                                                                                    # if filtering did not change the
238
                                                                                    # targets,
239
            certs = CERTIFICATE_SERVICE.get_certificates()                          # fetch everything
240
        else:                                                                       # otherwise fetch targets only
241 49f22fd9 Captain_Trojan
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
242 5b57121e Captain_Trojan
243
        if certs is None:
244 9247d70a Captain_Trojan
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
245 5b57121e Captain_Trojan
        elif len(certs) == 0:
246 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
247 5b57121e Captain_Trojan
        else:
248
            ret = []
249
            for c in certs:
250 5b6d9513 Captain_Trojan
                data = CertController.cert_to_dict_partial(c)
251
                if data is None:
252 9247d70a Captain_Trojan
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
253 5b57121e Captain_Trojan
                ret.append(
254 5b6d9513 Captain_Trojan
                    data
255 5b57121e Captain_Trojan
                )
256 9247d70a Captain_Trojan
            return {"success": True, "data": ret}, C_SUCCESS
257 5b57121e Captain_Trojan
258
    @staticmethod
259 9247d70a Captain_Trojan
    def get_certificate_root_by_id(id):
260 5b57121e Captain_Trojan
        """get certificate's root of trust chain by ID
261
262 9247d70a Captain_Trojan
        Get certificate's root of trust chain in PEM format by ID
263 5b57121e Captain_Trojan
264
        :param id: ID of a child certificate whose root is to be queried
265
        :type id: dict | bytes
266
267
        :rtype: PemResponse
268
        """
269 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
270 5b57121e Captain_Trojan
271 d53c2fdc Captain_Trojan
        try:
272
            v = int(id)
273
        except ValueError:
274 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
275 d53c2fdc Captain_Trojan
276
        cert = CERTIFICATE_SERVICE.get_certificate(v)
277
278
        if cert is None:
279 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
280 d53c2fdc Captain_Trojan
281
        while cert.parent_id != cert.certificate_id:
282
            cert = CERTIFICATE_SERVICE.get_certificate(cert.parent_id)
283
            if cert is None:
284 9247d70a Captain_Trojan
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
285 d53c2fdc Captain_Trojan
286 9247d70a Captain_Trojan
        return {"success": True, "data": cert.pem_data}, C_SUCCESS
287 5b57121e Captain_Trojan
288
    @staticmethod
289 9247d70a Captain_Trojan
    def get_certificate_trust_chain_by_id(id):
290 5b57121e Captain_Trojan
        """get certificate's trust chain by ID
291
292 9247d70a Captain_Trojan
        Get certificate trust chain in PEM format by ID
293 5b57121e Captain_Trojan
294
        :param id: ID of a child certificate whose chain is to be queried
295
        :type id: dict | bytes
296
297
        :rtype: PemResponse
298
        """
299 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
300 5b57121e Captain_Trojan
301 aa740737 Captain_Trojan
        try:
302
            v = int(id)
303
        except ValueError:
304 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
305 aa740737 Captain_Trojan
306
        cert = CERTIFICATE_SERVICE.get_certificate(v)
307
308
        if cert is None:
309 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
310 aa740737 Captain_Trojan
311
        ret = []
312
313
        while True:
314
            cert = CERTIFICATE_SERVICE.get_certificate(cert.parent_id)
315
            if cert is None:
316 9247d70a Captain_Trojan
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
317 aa740737 Captain_Trojan
            elif cert.parent_id == cert.certificate_id:
318
                break
319
            ret.append(cert.pem_data)
320 5b6d9513 Captain_Trojan
321 9247d70a Captain_Trojan
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
322 5b6d9513 Captain_Trojan
323
    @staticmethod
324
    def cert_to_dict_partial(c):
325 9247d70a Captain_Trojan
        """
326
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
327
        :param c: target cert
328
        :return: certificate dict (compliant with some parts of the REST API)
329
        """
330 5b6d9513 Captain_Trojan
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
331
        if c_issuer is None:
332
            return None
333
334
        return {
335
            ID: c.certificate_id,
336
            COMMON_NAME: c.common_name,
337
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
338
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
339
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
340
            ISSUER: {
341
                ID: c_issuer.certificate_id,
342
                COMMON_NAME: c_issuer.common_name
343
            }
344
        }
345
346
    @staticmethod
347
    def cert_to_dict_full(c):
348 9247d70a Captain_Trojan
        """
349
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
350
        Contains full information.
351
        :param c: target cert
352
        :return: certificate dict (compliant with some parts of the REST API)
353
        """
354 5b6d9513 Captain_Trojan
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
355
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
356
        if c_issuer is None:
357
            return None
358
359
        return {
360
            SUBJECT: subj.to_dict(),
361
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
362
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
363
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
364
            CA: c_issuer.certificate_id
365
        }