Projekt

Obecné

Profil

Stáhnout (10.7 KB) Statistiky
| Větev: | Tag: | Revize:
1 5b57121e Captain_Trojan
from datetime import datetime
2
from itertools import chain
3
from flask import request
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository  # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService  # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection  # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
13
#  responsibility. 4
14
from src.services.key_service import KeyService
15
16
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
17
# cursor = _.cursor()                                                   # TODO responsibility of the
18
                                                                        #  CertificateRepository. It makes no sense to
19
                                                                        #  supply a different cursor than precisely
20
                                                                        #  the one corresponding to the connection.
21
                                                                        #  The cursor can always be generated from the
22
                                                                        #  connection instance.
23
FILTERING = "filtering"
24
ISSUER = "issuer"
25
US = "usage"
26
NOT_AFTER = "notAfter"
27
NOT_BEFORE = "notBefore"
28
COMMON_NAME = "CN"
29
ID = "id"
30
E_NO_ISSUER_FOUND = {"success": False,
31
                           "data": "No certificate authority with such unique ID exists."}
32
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No certificates found."}
33
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38
USAGE = "usage"
39
SUBJECT = "subject"
40
VALIDITY_DAYS = "validityDays"
41
CA = "CA"
42
43
__ = CryptographyService()  # TODO not the Controller's responsibility. 6
44
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
45
# TODO open for discussion. Expected:
46
#  CS = CertificateService.get_instance()
47
#  or something like that.
48
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))  # TODO as above
50
51
52
class CertController:
53
54
    @staticmethod
55
    def setup():
56
        """
57
        SQLite3 thread issue hack.
58
        :return:
59
        """
60
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
61
        CERTIFICATE_SERVICE.certificate_repository.connection = _
62
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
63
        KEY_SERVICE.private_key_repository.connection = _
64
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
65
66
    @staticmethod
67
    def create_certificate():  # noqa: E501
68
        """create new certificate
69
70
        Create a new certificate based on given information # noqa: E501
71
72
        :param body: Certificate data to be created
73
        :type body: dict | bytes
74
75
        :rtype: CreatedResponse
76
        """
77
        CertController.setup()  # TODO remove after issue fixed
78
79
        key_map = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
80
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}
81
82
        if request.is_json:
83
            body = request.get_json()
84
            if not all(k in body for k in required_keys):
85
                return E_MISSING_PARAMETERS, 400
86
87
            if not isinstance(body[VALIDITY_DAYS], int):
88
                return E_WRONG_PARAMETERS, 400
89
90
            subject = Subject.from_dict(body[SUBJECT])
91
92
            if subject is None:
93
                return E_WRONG_PARAMETERS, 400
94
95
            usages_dict = DICT_USAGES.copy()
96
97
            if not isinstance(body[USAGE], dict):
98
                return E_WRONG_PARAMETERS, 400
99
100
            for k, v in body[USAGE].items():
101
                if k not in key_map:
102
                    return E_WRONG_PARAMETERS, 400
103
                usages_dict[key_map[k]] = v
104
105
            key = KEY_SERVICE.create_new_key()  # TODO pass key
106
107
            if CA not in body:
108
                cert = CERTIFICATE_SERVICE.create_root_ca(
109
                    key,
110
                    subject,
111
                    usages=usages_dict,
112
                    days=body[VALIDITY_DAYS]
113
                )
114
            else:
115
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])
116
117
                if issuer is None:
118
                    return E_NO_ISSUER_FOUND, 400
119
120
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
121
122
                if issuer_key is None:
123
                    return E_CORRUPTED_DATABASE, 500
124
125
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
126
                    CERTIFICATE_SERVICE.create_end_cert
127
128
                cert = f(
129
                    key,
130
                    subject,
131
                    issuer,
132
                    issuer_key,
133
                    usages=usages_dict,
134
                    days=body[VALIDITY_DAYS]
135
                )
136
137
            if cert is not None:
138
                return {"success": True,
139
                        "data": cert.certificate_id}, 201
140
            else:
141
                return {"success": False,
142
                        "data": "Internal error: The certificate could not have been created."}, 400
143
        else:
144
            return E_NOT_JSON_FORMAT, 400
145
146
    @staticmethod
147
    def get_certificate_by_id(id):  # noqa: E501
148
        """get certificate by ID
149
150
        Get certificate in PEM format by ID # noqa: E501
151
152
        :param id: ID of a certificate to be queried
153
        :type id: dict | bytes
154
155
        :rtype: PemResponse
156
        """
157
        CertController.setup()  # TODO remove after issue fixed
158
159 fb987403 Captain_Trojan
        try:
160
            v = int(id)
161
        except ValueError:
162
            return E_WRONG_PARAMETERS, 400
163
164
        cert = CERTIFICATE_SERVICE.get_certificate(v)
165
166
        if cert is None:
167
            return E_NO_CERTIFICATES_FOUND, 205                 # TODO related to 204 issue
168
        else:
169
            return {"success": True, "data": cert.pem_data}
170 5b57121e Captain_Trojan
171
    @staticmethod
172
    def get_certificate_details_by_id(id):  # noqa: E501
173
        """get certificate's details by ID
174
175
        Get certificate details by ID # noqa: E501
176
177
        :param id: ID of a certificate whose details are to be queried
178
        :type id: dict | bytes
179
180
        :rtype: CertificateResponse
181
        """
182
        CertController.setup()  # TODO remove after issue fixed
183
184
        if connexion.request.is_json:
185
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
186
        return 'do some magic!'
187
188
    @staticmethod
189
    def get_certificate_list():  # noqa: E501
190
        """get list of certificates
191
192
        Lists certificates based on provided filtering options # noqa: E501
193
194
        :param filtering: Filter certificate type to be queried
195
        :type filtering: dict | bytes
196
197
        :rtype: CertificateListResponse
198
        """
199
        CertController.setup()  # TODO remove after issue fixed
200
201
        key_map = {CA_ID: 'CA', SSL_ID: 'SSL', SIGNATURE_ID: 'digitalSignature', AUTHENTICATION_ID: 'authentication'}
202
203
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
204
        if request.is_json:
205
            data = request.get_json()
206
            if FILTERING in data:
207
                if isinstance(data[FILTERING], dict):
208
                    if CA in data[FILTERING]:
209
                        if isinstance(data[FILTERING][CA], bool):
210
                            if data[FILTERING][CA]:
211
                                targets.remove(CERTIFICATE_ID)
212
                            else:
213
                                targets.remove(ROOT_CA_ID)
214
                                targets.remove(INTERMEDIATE_CA_ID)
215
                        else:
216
                            return E_WRONG_PARAMETERS, 400
217
                else:
218
                    return E_WRONG_PARAMETERS, 400
219
220
        if len(targets) == 3:
221
            certs = CERTIFICATE_SERVICE.get_certificates()
222
        else:
223 49f22fd9 Captain_Trojan
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
224 5b57121e Captain_Trojan
225
        if certs is None:
226
            return E_GENERAL_ERROR, 500
227
        elif len(certs) == 0:
228 fb987403 Captain_Trojan
            return E_NO_CERTIFICATES_FOUND, 205         # TODO related to 204 issue
229 5b57121e Captain_Trojan
        else:
230
            ret = []
231
            for c in certs:
232
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
233
                if c_issuer is None:
234
                    return E_CORRUPTED_DATABASE, 500
235
236
                ret.append(
237
                    {
238
                        ID: c.certificate_id,
239
                        COMMON_NAME: c.common_name,
240
                        NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
241
                        NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
242
                        USAGE: {key_map[k]: v for k, v in c.usages.items()},
243
                        ISSUER: {
244
                            ID: c_issuer.certificate_id,
245
                            COMMON_NAME: c_issuer.common_name
246
                        }
247
                    }
248
                )
249
            return {"success": True, "data": ret}
250
251
252
253
    @staticmethod
254
    def get_certificate_root_by_id(id):  # noqa: E501
255
        """get certificate's root of trust chain by ID
256
257
        Get certificate's root of trust chain in PEM format by ID # noqa: E501
258
259
        :param id: ID of a child certificate whose root is to be queried
260
        :type id: dict | bytes
261
262
        :rtype: PemResponse
263
        """
264
        CertController.setup()  # TODO remove after issue fixed
265
266
        if connexion.request.is_json:
267
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
268
        return 'do some magic!'
269
270
    @staticmethod
271
    def get_certificate_trust_chain_by_id(id):  # noqa: E501
272
        """get certificate's trust chain by ID
273
274
        Get certificate trust chain in PEM format by ID # noqa: E501
275
276
        :param id: ID of a child certificate whose chain is to be queried
277
        :type id: dict | bytes
278
279
        :rtype: PemResponse
280
        """
281
        CertController.setup()  # TODO remove after issue fixed
282
283
        if connexion.request.is_json:
284
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
285
        return 'do some magic!'