Projekt

Obecné

Profil

Stáhnout (16.5 KB) Statistiky
| Větev: | Tag: | Revize:
1 ed55c677 Jan Pašek
import json
2 5b57121e Captain_Trojan
from datetime import datetime
3
from itertools import chain
4 6422796d Stanislav Král
from json import JSONDecodeError
5
6 5b57121e Captain_Trojan
from flask import request
7
from src.dao.private_key_repository import PrivateKeyRepository
8
from src.model.subject import Subject
9
from src.services.certificate_service import CertificateService
10 9247d70a Captain_Trojan
from src.dao.certificate_repository import CertificateRepository        # TODO not the Controller's responsibility.
11
from src.services.cryptography import CryptographyService               # TODO not the Controller's responsibility.
12
from sqlite3 import Connection                                          # TODO not the Controller's responsibility.
13
from src.constants import CA_ID, \
14 5b57121e Captain_Trojan
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
15 9247d70a Captain_Trojan
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID     # TODO DATABASE_FILE - not the Controller's
16
                                                                        #  responsibility.
17 5b57121e Captain_Trojan
from src.services.key_service import KeyService
18
19 9247d70a Captain_Trojan
TREE_NODE_TYPE_COUNT = 3
20
21 5b57121e Captain_Trojan
FILTERING = "filtering"
22
ISSUER = "issuer"
23
US = "usage"
24
NOT_AFTER = "notAfter"
25
NOT_BEFORE = "notBefore"
26
COMMON_NAME = "CN"
27
ID = "id"
28 9247d70a Captain_Trojan
USAGE = "usage"
29
SUBJECT = "subject"
30
VALIDITY_DAYS = "validityDays"
31
CA = "CA"
32
33
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
34 d53c2fdc Captain_Trojan
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
35 5b57121e Captain_Trojan
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
36
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
37
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
38
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
39
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
40
41 9247d70a Captain_Trojan
C_CREATED_SUCCESSFULLY = 201
42
C_BAD_REQUEST = 400
43
C_NO_DATA = 205                                                         # TODO related to 204 issue
44
C_INTERNAL_SERVER_ERROR = 500
45
C_SUCCESS = 200
46 5b57121e Captain_Trojan
47 9247d70a Captain_Trojan
__ = CryptographyService()                                              # TODO not the Controller's responsibility.
48
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO not the Controller's responsibility.
50 5b57121e Captain_Trojan
51
52
class CertController:
53 5b6d9513 Captain_Trojan
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
54
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
55 5b57121e Captain_Trojan
56
    @staticmethod
57
    def setup():
58
        """
59
        SQLite3 thread issue hack.
60
        :return:
61
        """
62 e80a2c00 Jan Pašek
        _ = Connection("db/database_sqlite.db")
63 5b57121e Captain_Trojan
        CERTIFICATE_SERVICE.certificate_repository.connection = _
64
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
65
        KEY_SERVICE.private_key_repository.connection = _
66
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
67
68
    @staticmethod
69 9247d70a Captain_Trojan
    def create_certificate():
70 5b57121e Captain_Trojan
        """create new certificate
71
72 9247d70a Captain_Trojan
        Create a new certificate based on given information
73 5b57121e Captain_Trojan
74
        :param body: Certificate data to be created
75
        :type body: dict | bytes
76
77
        :rtype: CreatedResponse
78
        """
79 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
80 5b57121e Captain_Trojan
81 9247d70a Captain_Trojan
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
82 5b57121e Captain_Trojan
83 9247d70a Captain_Trojan
        if request.is_json:                                                         # accept JSON only
84 5b57121e Captain_Trojan
            body = request.get_json()
85 9247d70a Captain_Trojan
            if not all(k in body for k in required_keys):                           # verify that all keys are present
86
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
87 5b57121e Captain_Trojan
88 9247d70a Captain_Trojan
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
89
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
90 5b57121e Captain_Trojan
91 9247d70a Captain_Trojan
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
92 5b57121e Captain_Trojan
93 9247d70a Captain_Trojan
            if subject is None:                                                     # if the format is incorrect
94
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
95 5b57121e Captain_Trojan
96 5b6d9513 Captain_Trojan
            usages_dict = {}
97 5b57121e Captain_Trojan
98 9247d70a Captain_Trojan
            if not isinstance(body[USAGE], dict):                                   # type checking
99
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
100 5b57121e Captain_Trojan
101 9247d70a Captain_Trojan
            for k, v in body[USAGE].items():                                        # for each usage
102
                if k not in CertController.KEY_MAP:                                 # check that it is a valid usage
103
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
104
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
105 5b57121e Captain_Trojan
106 9247d70a Captain_Trojan
            key = KEY_SERVICE.create_new_key()                                      # TODO pass key
107 5b57121e Captain_Trojan
108 493022a0 Jan Pašek
            if CA not in body or body[CA] is None:                                  # if issuer omitted (legal) or none
109 9247d70a Captain_Trojan
                cert = CERTIFICATE_SERVICE.create_root_ca(                          # create a root CA
110 5b57121e Captain_Trojan
                    key,
111
                    subject,
112 9247d70a Captain_Trojan
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
113 5b57121e Captain_Trojan
                    days=body[VALIDITY_DAYS]
114
                )
115
            else:
116 9247d70a Captain_Trojan
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])              # get base issuer info
117 5b57121e Captain_Trojan
118 9247d70a Captain_Trojan
                if issuer is None:                                                  # if such issuer does not exist
119
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
120
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
121 5b57121e Captain_Trojan
122 9247d70a Captain_Trojan
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)             # get issuer's key, which must exist
123 5b57121e Captain_Trojan
124 9247d70a Captain_Trojan
                if issuer_key is None:                                              # if it does not
125
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
126
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
127 5b57121e Captain_Trojan
128
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
129
                    CERTIFICATE_SERVICE.create_end_cert
130
131 9247d70a Captain_Trojan
                # noinspection PyTypeChecker
132
                cert = f(                                                           # create inter CA or end cert
133
                    key,                                                            # according to whether 'CA' is among
134
                    subject,                                                        # the usages' fields
135 5b57121e Captain_Trojan
                    issuer,
136
                    issuer_key,
137
                    usages=usages_dict,
138
                    days=body[VALIDITY_DAYS]
139
                )
140
141
            if cert is not None:
142
                return {"success": True,
143 9247d70a Captain_Trojan
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
144
            else:                                                                   # if this fails, then
145
                KEY_SERVICE.delete_key(key.private_key_id)                          # free
146
                return {"success": False,                                           # and wonder what the cause is,
147
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
148
                                                                                    # as obj/None carries only one bit
149
                                                                                    # of error information
150 5b57121e Captain_Trojan
        else:
151 9247d70a Captain_Trojan
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
152 5b57121e Captain_Trojan
153
    @staticmethod
154 9247d70a Captain_Trojan
    def get_certificate_by_id(id):
155 5b57121e Captain_Trojan
        """get certificate by ID
156
157 9247d70a Captain_Trojan
        Get certificate in PEM format by ID
158 5b57121e Captain_Trojan
159
        :param id: ID of a certificate to be queried
160
        :type id: dict | bytes
161
162
        :rtype: PemResponse
163
        """
164 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
165 5b57121e Captain_Trojan
166 fb987403 Captain_Trojan
        try:
167
            v = int(id)
168
        except ValueError:
169 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
170 fb987403 Captain_Trojan
171
        cert = CERTIFICATE_SERVICE.get_certificate(v)
172
173
        if cert is None:
174 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
175 fb987403 Captain_Trojan
        else:
176 9247d70a Captain_Trojan
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
177 5b57121e Captain_Trojan
178
    @staticmethod
179 9247d70a Captain_Trojan
    def get_certificate_details_by_id(id):
180 5b57121e Captain_Trojan
        """get certificate's details by ID
181
182 9247d70a Captain_Trojan
        Get certificate details by ID
183 5b57121e Captain_Trojan
184
        :param id: ID of a certificate whose details are to be queried
185
        :type id: dict | bytes
186
187
        :rtype: CertificateResponse
188
        """
189 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
190 5b57121e Captain_Trojan
191 5b6d9513 Captain_Trojan
        try:
192
            v = int(id)
193
        except ValueError:
194 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
195 5b6d9513 Captain_Trojan
196
        cert = CERTIFICATE_SERVICE.get_certificate(v)
197
198
        if cert is None:
199 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
200 5b6d9513 Captain_Trojan
        else:
201
            data = CertController.cert_to_dict_full(cert)
202
            if data is None:
203 9247d70a Captain_Trojan
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
204
            return {"success": True, "data": data}, C_SUCCESS
205 5b57121e Captain_Trojan
206
    @staticmethod
207 9247d70a Captain_Trojan
    def get_certificate_list():
208 5b57121e Captain_Trojan
        """get list of certificates
209
210 9247d70a Captain_Trojan
        Lists certificates based on provided filtering options
211 5b57121e Captain_Trojan
212
        :param filtering: Filter certificate type to be queried
213
        :type filtering: dict | bytes
214
215
        :rtype: CertificateListResponse
216
        """
217 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
218
219
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
220 ed55c677 Jan Pašek
221
        # the filtering parameter can be read as URL argument or as a request body
222
        if request.is_json or "filtering" in request.args.keys():                   # if the request carries JSON data
223
            if request.is_json:
224
                data = request.get_json()                                           # get it
225
            else:
226 6422796d Stanislav Král
                try:
227
                    data = {"filtering": json.loads(request.args["filtering"])}
228
                except JSONDecodeError:
229
                    return E_NOT_JSON_FORMAT, C_BAD_REQUEST
230
231 9247d70a Captain_Trojan
            if FILTERING in data:                                                   # if the 'filtering' field exists
232
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
233
                    if CA in data[FILTERING]:                                       # containing 'CA'
234
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
235
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
236 5b57121e Captain_Trojan
                                targets.remove(CERTIFICATE_ID)
237
                            else:
238
                                targets.remove(ROOT_CA_ID)
239
                                targets.remove(INTERMEDIATE_CA_ID)
240
                        else:
241 9247d70a Captain_Trojan
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
242 5b57121e Captain_Trojan
                else:
243 9247d70a Captain_Trojan
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
244 5b57121e Captain_Trojan
245 9247d70a Captain_Trojan
        if len(targets) == TREE_NODE_TYPE_COUNT:                                    # = 3 -> root node,
246
                                                                                    # intermediate node,
247
                                                                                    # or leaf node
248
249
                                                                                    # if filtering did not change the
250
                                                                                    # targets,
251
            certs = CERTIFICATE_SERVICE.get_certificates()                          # fetch everything
252
        else:                                                                       # otherwise fetch targets only
253 49f22fd9 Captain_Trojan
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
254 5b57121e Captain_Trojan
255
        if certs is None:
256 9247d70a Captain_Trojan
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
257 5b57121e Captain_Trojan
        elif len(certs) == 0:
258 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
259 5b57121e Captain_Trojan
        else:
260
            ret = []
261
            for c in certs:
262 5b6d9513 Captain_Trojan
                data = CertController.cert_to_dict_partial(c)
263
                if data is None:
264 9247d70a Captain_Trojan
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
265 5b57121e Captain_Trojan
                ret.append(
266 5b6d9513 Captain_Trojan
                    data
267 5b57121e Captain_Trojan
                )
268 9247d70a Captain_Trojan
            return {"success": True, "data": ret}, C_SUCCESS
269 5b57121e Captain_Trojan
270
    @staticmethod
271 9247d70a Captain_Trojan
    def get_certificate_root_by_id(id):
272 5b57121e Captain_Trojan
        """get certificate's root of trust chain by ID
273
274 9247d70a Captain_Trojan
        Get certificate's root of trust chain in PEM format by ID
275 5b57121e Captain_Trojan
276
        :param id: ID of a child certificate whose root is to be queried
277
        :type id: dict | bytes
278
279
        :rtype: PemResponse
280
        """
281 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
282 5b57121e Captain_Trojan
283 d53c2fdc Captain_Trojan
        try:
284
            v = int(id)
285
        except ValueError:
286 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
287 d53c2fdc Captain_Trojan
288
        cert = CERTIFICATE_SERVICE.get_certificate(v)
289
290
        if cert is None:
291 11a90594 Jan Pašek
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
292 d53c2fdc Captain_Trojan
293 11a90594 Jan Pašek
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id, exclude_root=False)
294
        if trust_chain is None or len(trust_chain) is 0:
295
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
296 d53c2fdc Captain_Trojan
297 11a90594 Jan Pašek
        return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
298 5b57121e Captain_Trojan
299
    @staticmethod
300 9247d70a Captain_Trojan
    def get_certificate_trust_chain_by_id(id):
301 5b57121e Captain_Trojan
        """get certificate's trust chain by ID
302
303 9247d70a Captain_Trojan
        Get certificate trust chain in PEM format by ID
304 5b57121e Captain_Trojan
305
        :param id: ID of a child certificate whose chain is to be queried
306
        :type id: dict | bytes
307
308
        :rtype: PemResponse
309
        """
310 9247d70a Captain_Trojan
        CertController.setup()                                                      # TODO remove after issue fixed
311 5b57121e Captain_Trojan
312 aa740737 Captain_Trojan
        try:
313
            v = int(id)
314
        except ValueError:
315 9247d70a Captain_Trojan
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
316 aa740737 Captain_Trojan
317
        cert = CERTIFICATE_SERVICE.get_certificate(v)
318
319
        if cert is None:
320 9247d70a Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
321 aa740737 Captain_Trojan
322 11a90594 Jan Pašek
        if cert.parent_id is None:
323
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA
324 aa740737 Captain_Trojan
325 11a90594 Jan Pašek
        trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id)
326
327
        ret = []
328
        for intermediate in trust_chain:
329
            ret.append(intermediate.pem_data)
330 5b6d9513 Captain_Trojan
331 9247d70a Captain_Trojan
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
332 5b6d9513 Captain_Trojan
333
    @staticmethod
334
    def cert_to_dict_partial(c):
335 9247d70a Captain_Trojan
        """
336
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
337
        :param c: target cert
338
        :return: certificate dict (compliant with some parts of the REST API)
339
        """
340 5b6d9513 Captain_Trojan
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
341
        if c_issuer is None:
342
            return None
343
344
        return {
345
            ID: c.certificate_id,
346
            COMMON_NAME: c.common_name,
347
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
348
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
349
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
350
            ISSUER: {
351
                ID: c_issuer.certificate_id,
352
                COMMON_NAME: c_issuer.common_name
353
            }
354
        }
355
356
    @staticmethod
357
    def cert_to_dict_full(c):
358 9247d70a Captain_Trojan
        """
359
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
360
        Contains full information.
361
        :param c: target cert
362
        :return: certificate dict (compliant with some parts of the REST API)
363
        """
364 5b6d9513 Captain_Trojan
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
365
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
366
        if c_issuer is None:
367
            return None
368
369
        return {
370
            SUBJECT: subj.to_dict(),
371
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
372
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
373
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
374
            CA: c_issuer.certificate_id
375
        }