Projekt

Obecné

Profil

Stáhnout (10.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
from datetime import datetime
2
from itertools import chain
3
from flask import request
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository  # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService  # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection  # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
13
#  responsibility. 4
14
from src.services.key_service import KeyService
15

    
16
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
17
# cursor = _.cursor()                                                   # TODO responsibility of the
18
                                                                        #  CertificateRepository. It makes no sense to
19
                                                                        #  supply a different cursor than precisely
20
                                                                        #  the one corresponding to the connection.
21
                                                                        #  The cursor can always be generated from the
22
                                                                        #  connection instance.
23
FILTERING = "filtering"
24
ISSUER = "issuer"
25
US = "usage"
26
NOT_AFTER = "notAfter"
27
NOT_BEFORE = "notBefore"
28
COMMON_NAME = "CN"
29
ID = "id"
30
E_NO_ISSUER_FOUND = {"success": False,
31
                           "data": "No certificate authority with such unique ID exists."}
32
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No certificates found."}
33
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38
USAGE = "usage"
39
SUBJECT = "subject"
40
VALIDITY_DAYS = "validityDays"
41
CA = "CA"
42

    
43
__ = CryptographyService()  # TODO not the Controller's responsibility. 6
44
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
45
# TODO open for discussion. Expected:
46
#  CS = CertificateService.get_instance()
47
#  or something like that.
48

    
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))  # TODO as above
50

    
51

    
52
class CertController:
53

    
54
    @staticmethod
55
    def setup():
56
        """
57
        SQLite3 thread issue hack.
58
        :return:
59
        """
60
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
61
        CERTIFICATE_SERVICE.certificate_repository.connection = _
62
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
63
        KEY_SERVICE.private_key_repository.connection = _
64
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
65

    
66
    @staticmethod
67
    def create_certificate():  # noqa: E501
68
        """create new certificate
69

    
70
        Create a new certificate based on given information # noqa: E501
71

    
72
        :param body: Certificate data to be created
73
        :type body: dict | bytes
74

    
75
        :rtype: CreatedResponse
76
        """
77
        CertController.setup()  # TODO remove after issue fixed
78

    
79
        key_map = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
80
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}
81

    
82
        if request.is_json:
83
            body = request.get_json()
84
            if not all(k in body for k in required_keys):
85
                return E_MISSING_PARAMETERS, 400
86

    
87
            if not isinstance(body[VALIDITY_DAYS], int):
88
                return E_WRONG_PARAMETERS, 400
89

    
90
            subject = Subject.from_dict(body[SUBJECT])
91

    
92
            if subject is None:
93
                return E_WRONG_PARAMETERS, 400
94

    
95
            usages_dict = DICT_USAGES.copy()
96

    
97
            if not isinstance(body[USAGE], dict):
98
                return E_WRONG_PARAMETERS, 400
99

    
100
            for k, v in body[USAGE].items():
101
                if k not in key_map:
102
                    return E_WRONG_PARAMETERS, 400
103
                usages_dict[key_map[k]] = v
104

    
105
            key = KEY_SERVICE.create_new_key()  # TODO pass key
106

    
107
            if CA not in body:
108
                cert = CERTIFICATE_SERVICE.create_root_ca(
109
                    key,
110
                    subject,
111
                    usages=usages_dict,
112
                    days=body[VALIDITY_DAYS]
113
                )
114
            else:
115
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])
116

    
117
                if issuer is None:
118
                    return E_NO_ISSUER_FOUND, 400
119

    
120
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
121

    
122
                if issuer_key is None:
123
                    return E_CORRUPTED_DATABASE, 500
124

    
125
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
126
                    CERTIFICATE_SERVICE.create_end_cert
127

    
128
                cert = f(
129
                    key,
130
                    subject,
131
                    issuer,
132
                    issuer_key,
133
                    usages=usages_dict,
134
                    days=body[VALIDITY_DAYS]
135
                )
136

    
137
            if cert is not None:
138
                return {"success": True,
139
                        "data": cert.certificate_id}, 201
140
            else:
141
                return {"success": False,
142
                        "data": "Internal error: The certificate could not have been created."}, 400
143
        else:
144
            return E_NOT_JSON_FORMAT, 400
145

    
146
    @staticmethod
147
    def get_certificate_by_id(id):  # noqa: E501
148
        """get certificate by ID
149

    
150
        Get certificate in PEM format by ID # noqa: E501
151

    
152
        :param id: ID of a certificate to be queried
153
        :type id: dict | bytes
154

    
155
        :rtype: PemResponse
156
        """
157
        CertController.setup()  # TODO remove after issue fixed
158

    
159
        if connexion.request.is_json:
160
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
161
        return 'do some magic!'
162

    
163
    @staticmethod
164
    def get_certificate_details_by_id(id):  # noqa: E501
165
        """get certificate's details by ID
166

    
167
        Get certificate details by ID # noqa: E501
168

    
169
        :param id: ID of a certificate whose details are to be queried
170
        :type id: dict | bytes
171

    
172
        :rtype: CertificateResponse
173
        """
174
        CertController.setup()  # TODO remove after issue fixed
175

    
176
        if connexion.request.is_json:
177
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
178
        return 'do some magic!'
179

    
180
    @staticmethod
181
    def get_certificate_list():  # noqa: E501
182
        """get list of certificates
183

    
184
        Lists certificates based on provided filtering options # noqa: E501
185

    
186
        :param filtering: Filter certificate type to be queried
187
        :type filtering: dict | bytes
188

    
189
        :rtype: CertificateListResponse
190
        """
191
        CertController.setup()  # TODO remove after issue fixed
192

    
193
        key_map = {CA_ID: 'CA', SSL_ID: 'SSL', SIGNATURE_ID: 'digitalSignature', AUTHENTICATION_ID: 'authentication'}
194

    
195
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
196
        if request.is_json:
197
            data = request.get_json()
198
            if FILTERING in data:
199
                if isinstance(data[FILTERING], dict):
200
                    if CA in data[FILTERING]:
201
                        if isinstance(data[FILTERING][CA], bool):
202
                            if data[FILTERING][CA]:
203
                                targets.remove(CERTIFICATE_ID)
204
                            else:
205
                                targets.remove(ROOT_CA_ID)
206
                                targets.remove(INTERMEDIATE_CA_ID)
207
                        else:
208
                            return E_WRONG_PARAMETERS, 400
209
                else:
210
                    return E_WRONG_PARAMETERS, 400
211

    
212
        if len(targets) == 3:
213
            certs = CERTIFICATE_SERVICE.get_certificates()
214
        else:
215
            certs = list(chain(CERTIFICATE_SERVICE.get_certificates(target) for target in targets))
216

    
217
        if certs is None:
218
            return E_GENERAL_ERROR, 500
219
        elif len(certs) == 0:
220
            return E_NO_CERTIFICATES_FOUND, 204
221
        else:
222
            ret = []
223
            for c in certs:
224
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
225
                if c_issuer is None:
226
                    return E_CORRUPTED_DATABASE, 500
227

    
228
                ret.append(
229
                    {
230
                        ID: c.certificate_id,
231
                        COMMON_NAME: c.common_name,
232
                        NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
233
                        NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
234
                        USAGE: {key_map[k]: v for k, v in c.usages.items()},
235
                        ISSUER: {
236
                            ID: c_issuer.certificate_id,
237
                            COMMON_NAME: c_issuer.common_name
238
                        }
239
                    }
240
                )
241
            return {"success": True, "data": ret}
242

    
243

    
244

    
245
    @staticmethod
246
    def get_certificate_root_by_id(id):  # noqa: E501
247
        """get certificate's root of trust chain by ID
248

    
249
        Get certificate's root of trust chain in PEM format by ID # noqa: E501
250

    
251
        :param id: ID of a child certificate whose root is to be queried
252
        :type id: dict | bytes
253

    
254
        :rtype: PemResponse
255
        """
256
        CertController.setup()  # TODO remove after issue fixed
257

    
258
        if connexion.request.is_json:
259
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
260
        return 'do some magic!'
261

    
262
    @staticmethod
263
    def get_certificate_trust_chain_by_id(id):  # noqa: E501
264
        """get certificate's trust chain by ID
265

    
266
        Get certificate trust chain in PEM format by ID # noqa: E501
267

    
268
        :param id: ID of a child certificate whose chain is to be queried
269
        :type id: dict | bytes
270

    
271
        :rtype: PemResponse
272
        """
273
        CertController.setup()  # TODO remove after issue fixed
274

    
275
        if connexion.request.is_json:
276
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
277
        return 'do some magic!'
(2-2/2)