Projekt

Obecné

Profil

Stáhnout (12.2 KB) Statistiky
| Větev: | Tag: | Revize:
1
from datetime import datetime
2
from itertools import chain
3
from flask import request
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository  # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService  # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection  # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
13
#  responsibility. 4
14
from src.services.key_service import KeyService
15

    
16
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
17
# cursor = _.cursor()                                                   # TODO responsibility of the
18
                                                                        #  CertificateRepository. It makes no sense to
19
                                                                        #  supply a different cursor than precisely
20
                                                                        #  the one corresponding to the connection.
21
                                                                        #  The cursor can always be generated from the
22
                                                                        #  connection instance.
23
FILTERING = "filtering"
24
ISSUER = "issuer"
25
US = "usage"
26
NOT_AFTER = "notAfter"
27
NOT_BEFORE = "notBefore"
28
COMMON_NAME = "CN"
29
ID = "id"
30
E_NO_ISSUER_FOUND = {"success": False,
31
                           "data": "No certificate authority with such unique ID exists."}
32
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
33
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38
USAGE = "usage"
39
SUBJECT = "subject"
40
VALIDITY_DAYS = "validityDays"
41
CA = "CA"
42

    
43
__ = CryptographyService()  # TODO not the Controller's responsibility. 6
44
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
45
# TODO open for discussion. Expected:
46
#  CS = CertificateService.get_instance()
47
#  or something like that.
48

    
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))  # TODO as above
50

    
51

    
52
class CertController:
53
    KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
54
    INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
55

    
56
    @staticmethod
57
    def setup():
58
        """
59
        SQLite3 thread issue hack.
60
        :return:
61
        """
62
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
63
        CERTIFICATE_SERVICE.certificate_repository.connection = _
64
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
65
        KEY_SERVICE.private_key_repository.connection = _
66
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
67

    
68
    @staticmethod
69
    def create_certificate():  # noqa: E501
70
        """create new certificate
71

    
72
        Create a new certificate based on given information # noqa: E501
73

    
74
        :param body: Certificate data to be created
75
        :type body: dict | bytes
76

    
77
        :rtype: CreatedResponse
78
        """
79
        CertController.setup()  # TODO remove after issue fixed
80

    
81
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}
82

    
83
        if request.is_json:
84
            body = request.get_json()
85
            if not all(k in body for k in required_keys):
86
                return E_MISSING_PARAMETERS, 400
87

    
88
            if not isinstance(body[VALIDITY_DAYS], int):
89
                return E_WRONG_PARAMETERS, 400
90

    
91
            subject = Subject.from_dict(body[SUBJECT])
92

    
93
            if subject is None:
94
                return E_WRONG_PARAMETERS, 400
95

    
96
            usages_dict = {}
97

    
98
            if not isinstance(body[USAGE], dict):
99
                return E_WRONG_PARAMETERS, 400
100

    
101
            for k, v in body[USAGE].items():
102
                if k not in CertController.KEY_MAP:
103
                    return E_WRONG_PARAMETERS, 400
104
                usages_dict[CertController.KEY_MAP[k]] = v
105

    
106
            key = KEY_SERVICE.create_new_key()  # TODO pass key
107

    
108
            if CA not in body:
109
                cert = CERTIFICATE_SERVICE.create_root_ca(
110
                    key,
111
                    subject,
112
                    usages=usages_dict,
113
                    days=body[VALIDITY_DAYS]
114
                )
115
            else:
116
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])
117

    
118
                if issuer is None:
119
                    KEY_SERVICE.delete_key(key.private_key_id)
120
                    return E_NO_ISSUER_FOUND, 400
121

    
122
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
123

    
124
                if issuer_key is None:
125
                    KEY_SERVICE.delete_key(key.private_key_id)
126
                    return E_CORRUPTED_DATABASE, 500
127

    
128
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
129
                    CERTIFICATE_SERVICE.create_end_cert
130

    
131
                cert = f(
132
                    key,
133
                    subject,
134
                    issuer,
135
                    issuer_key,
136
                    usages=usages_dict,
137
                    days=body[VALIDITY_DAYS]
138
                )
139

    
140
            if cert is not None:
141
                return {"success": True,
142
                        "data": cert.certificate_id}, 201
143
            else:
144
                KEY_SERVICE.delete_key(key.private_key_id)
145
                return {"success": False,
146
                        "data": "Internal error: The certificate could not have been created."}, 400
147
        else:
148
            return E_NOT_JSON_FORMAT, 400
149

    
150
    @staticmethod
151
    def get_certificate_by_id(id):  # noqa: E501
152
        """get certificate by ID
153

    
154
        Get certificate in PEM format by ID # noqa: E501
155

    
156
        :param id: ID of a certificate to be queried
157
        :type id: dict | bytes
158

    
159
        :rtype: PemResponse
160
        """
161
        CertController.setup()  # TODO remove after issue fixed
162

    
163
        try:
164
            v = int(id)
165
        except ValueError:
166
            return E_WRONG_PARAMETERS, 400
167

    
168
        cert = CERTIFICATE_SERVICE.get_certificate(v)
169

    
170
        if cert is None:
171
            return E_NO_CERTIFICATES_FOUND, 205                 # TODO related to 204 issue
172
        else:
173
            return {"success": True, "data": cert.pem_data}
174

    
175
    @staticmethod
176
    def get_certificate_details_by_id(id):  # noqa: E501
177
        """get certificate's details by ID
178

    
179
        Get certificate details by ID # noqa: E501
180

    
181
        :param id: ID of a certificate whose details are to be queried
182
        :type id: dict | bytes
183

    
184
        :rtype: CertificateResponse
185
        """
186
        CertController.setup()  # TODO remove after issue fixed
187

    
188
        try:
189
            v = int(id)
190
        except ValueError:
191
            return E_WRONG_PARAMETERS, 400
192

    
193
        cert = CERTIFICATE_SERVICE.get_certificate(v)
194

    
195
        if cert is None:
196
            return E_NO_CERTIFICATES_FOUND, 205  # TODO related to 204 issue
197
        else:
198
            data = CertController.cert_to_dict_full(cert)
199
            if data is None:
200
                return E_CORRUPTED_DATABASE, 500
201
            return {"success": True, "data": data}
202

    
203
    @staticmethod
204
    def get_certificate_list():  # noqa: E501
205
        """get list of certificates
206

    
207
        Lists certificates based on provided filtering options # noqa: E501
208

    
209
        :param filtering: Filter certificate type to be queried
210
        :type filtering: dict | bytes
211

    
212
        :rtype: CertificateListResponse
213
        """
214
        CertController.setup()  # TODO remove after issue fixed
215

    
216
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
217
        if request.is_json:
218
            data = request.get_json()
219
            if FILTERING in data:
220
                if isinstance(data[FILTERING], dict):
221
                    if CA in data[FILTERING]:
222
                        if isinstance(data[FILTERING][CA], bool):
223
                            if data[FILTERING][CA]:
224
                                targets.remove(CERTIFICATE_ID)
225
                            else:
226
                                targets.remove(ROOT_CA_ID)
227
                                targets.remove(INTERMEDIATE_CA_ID)
228
                        else:
229
                            return E_WRONG_PARAMETERS, 400
230
                else:
231
                    return E_WRONG_PARAMETERS, 400
232

    
233
        if len(targets) == 3:                               # potentially risky
234
            certs = CERTIFICATE_SERVICE.get_certificates()
235
        else:
236
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
237

    
238
        if certs is None:
239
            return E_GENERAL_ERROR, 500
240
        elif len(certs) == 0:
241
            return E_NO_CERTIFICATES_FOUND, 205         # TODO related to 204 issue
242
        else:
243
            ret = []
244
            for c in certs:
245
                data = CertController.cert_to_dict_partial(c)
246
                if data is None:
247
                    return E_CORRUPTED_DATABASE, 500
248
                ret.append(
249
                    data
250
                )
251
            return {"success": True, "data": ret}
252

    
253
    @staticmethod
254
    def get_certificate_root_by_id(id):  # noqa: E501
255
        """get certificate's root of trust chain by ID
256

    
257
        Get certificate's root of trust chain in PEM format by ID # noqa: E501
258

    
259
        :param id: ID of a child certificate whose root is to be queried
260
        :type id: dict | bytes
261

    
262
        :rtype: PemResponse
263
        """
264
        CertController.setup()  # TODO remove after issue fixed
265

    
266
        try:
267
            v = int(id)
268
        except ValueError:
269
            return E_WRONG_PARAMETERS, 400
270

    
271
        cert = CERTIFICATE_SERVICE.get_certificate(v)
272

    
273
        if cert is None:
274
            return E_NO_CERTIFICATES_FOUND, 205  # TODO related to 204 issue
275

    
276
        while cert.parent_id != cert.certificate_id:
277
            cert = CERTIFICATE_SERVICE.get_certificate(cert.parent_id)
278
            if cert is None:
279
                return E_CORRUPTED_DATABASE, 500
280

    
281
        return {"success": True, "data": cert.pem_data}
282

    
283
    @staticmethod
284
    def get_certificate_trust_chain_by_id(id):  # noqa: E501
285
        """get certificate's trust chain by ID
286

    
287
        Get certificate trust chain in PEM format by ID # noqa: E501
288

    
289
        :param id: ID of a child certificate whose chain is to be queried
290
        :type id: dict | bytes
291

    
292
        :rtype: PemResponse
293
        """
294
        CertController.setup()  # TODO remove after issue fixed
295

    
296
        if connexion.request.is_json:
297
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
298
        return 'do some magic!'
299

    
300

    
301
    @staticmethod
302
    def cert_to_dict_partial(c):
303
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
304
        if c_issuer is None:
305
            return None
306

    
307
        return {
308
            ID: c.certificate_id,
309
            COMMON_NAME: c.common_name,
310
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
311
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
312
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
313
            ISSUER: {
314
                ID: c_issuer.certificate_id,
315
                COMMON_NAME: c_issuer.common_name
316
            }
317
        }
318

    
319
    @staticmethod
320
    def cert_to_dict_full(c):
321
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
322
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
323
        if c_issuer is None:
324
            return None
325

    
326
        return {
327
            SUBJECT: subj.to_dict(),
328
            NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
329
            NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
330
            USAGE: {CertController.INVERSE_KEY_MAP[k]: v for k, v in c.usages.items()},
331
            CA: c_issuer.certificate_id
332
        }
(2-2/2)