Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 9247d70a

Přidáno uživatelem Michal Seják před asi 4 roky(ů)

Re #8476 - Heavy refactoring and added comments.

Zobrazit rozdíly:

src/controllers/certificates_controller.py
4 4
from src.dao.private_key_repository import PrivateKeyRepository
5 5
from src.model.subject import Subject
6 6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository  # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService  # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection  # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
7
from src.dao.certificate_repository import CertificateRepository        # TODO not the Controller's responsibility.
8
from src.services.cryptography import CryptographyService               # TODO not the Controller's responsibility.
9
from sqlite3 import Connection                                          # TODO not the Controller's responsibility.
10
from src.constants import CA_ID, \
11 11
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
13
#  responsibility. 4
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID     # TODO DATABASE_FILE - not the Controller's
13
                                                                        #  responsibility.
14 14
from src.services.key_service import KeyService
15 15

  
16
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
17
# cursor = _.cursor()                                                   # TODO responsibility of the
18
                                                                        #  CertificateRepository. It makes no sense to
19
                                                                        #  supply a different cursor than precisely
20
                                                                        #  the one corresponding to the connection.
21
                                                                        #  The cursor can always be generated from the
22
                                                                        #  connection instance.
16
TREE_NODE_TYPE_COUNT = 3
17

  
23 18
FILTERING = "filtering"
24 19
ISSUER = "issuer"
25 20
US = "usage"
......
27 22
NOT_BEFORE = "notBefore"
28 23
COMMON_NAME = "CN"
29 24
ID = "id"
30
E_NO_ISSUER_FOUND = {"success": False,
31
                           "data": "No certificate authority with such unique ID exists."}
25
USAGE = "usage"
26
SUBJECT = "subject"
27
VALIDITY_DAYS = "validityDays"
28
CA = "CA"
29

  
30
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."}
32 31
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."}
33 32
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34 33
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35 34
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36 35
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37 36
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38
USAGE = "usage"
39
SUBJECT = "subject"
40
VALIDITY_DAYS = "validityDays"
41
CA = "CA"
42 37

  
43
__ = CryptographyService()  # TODO not the Controller's responsibility. 6
44
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
45
# TODO open for discussion. Expected:
46
#  CS = CertificateService.get_instance()
47
#  or something like that.
38
C_CREATED_SUCCESSFULLY = 201
39
C_BAD_REQUEST = 400
40
C_NO_DATA = 205                                                         # TODO related to 204 issue
41
C_INTERNAL_SERVER_ERROR = 500
42
C_SUCCESS = 200
48 43

  
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))  # TODO as above
44
__ = CryptographyService()                                              # TODO not the Controller's responsibility.
45
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
46
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO not the Controller's responsibility.
50 47

  
51 48

  
52 49
class CertController:
......
66 63
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
67 64

  
68 65
    @staticmethod
69
    def create_certificate():  # noqa: E501
66
    def create_certificate():
70 67
        """create new certificate
71 68

  
72
        Create a new certificate based on given information # noqa: E501
69
        Create a new certificate based on given information
73 70

  
74 71
        :param body: Certificate data to be created
75 72
        :type body: dict | bytes
76 73

  
77 74
        :rtype: CreatedResponse
78 75
        """
79
        CertController.setup()  # TODO remove after issue fixed
76
        CertController.setup()                                                      # TODO remove after issue fixed
80 77

  
81
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}
78
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}                             # required fields of the POST req
82 79

  
83
        if request.is_json:
80
        if request.is_json:                                                         # accept JSON only
84 81
            body = request.get_json()
85
            if not all(k in body for k in required_keys):
86
                return E_MISSING_PARAMETERS, 400
82
            if not all(k in body for k in required_keys):                           # verify that all keys are present
83
                return E_MISSING_PARAMETERS, C_BAD_REQUEST
87 84

  
88
            if not isinstance(body[VALIDITY_DAYS], int):
89
                return E_WRONG_PARAMETERS, 400
85
            if not isinstance(body[VALIDITY_DAYS], int):                            # type checking
86
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
90 87

  
91
            subject = Subject.from_dict(body[SUBJECT])
88
            subject = Subject.from_dict(body[SUBJECT])                              # generate Subject from passed dict
92 89

  
93
            if subject is None:
94
                return E_WRONG_PARAMETERS, 400
90
            if subject is None:                                                     # if the format is incorrect
91
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
95 92

  
96 93
            usages_dict = {}
97 94

  
98
            if not isinstance(body[USAGE], dict):
99
                return E_WRONG_PARAMETERS, 400
95
            if not isinstance(body[USAGE], dict):                                   # type checking
96
                return E_WRONG_PARAMETERS, C_BAD_REQUEST
100 97

  
101
            for k, v in body[USAGE].items():
102
                if k not in CertController.KEY_MAP:
103
                    return E_WRONG_PARAMETERS, 400
104
                usages_dict[CertController.KEY_MAP[k]] = v
98
            for k, v in body[USAGE].items():                                        # for each usage
99
                if k not in CertController.KEY_MAP:                                 # check that it is a valid usage
100
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST                        # and throw if it is not
101
                usages_dict[CertController.KEY_MAP[k]] = v                          # otherwise translate key and set
105 102

  
106
            key = KEY_SERVICE.create_new_key()  # TODO pass key
103
            key = KEY_SERVICE.create_new_key()                                      # TODO pass key
107 104

  
108
            if CA not in body:
109
                cert = CERTIFICATE_SERVICE.create_root_ca(
105
            if CA not in body:                                                      # if issuer omitted (legal)
106
                cert = CERTIFICATE_SERVICE.create_root_ca(                          # create a root CA
110 107
                    key,
111 108
                    subject,
112
                    usages=usages_dict,
109
                    usages=usages_dict,                                             # TODO ignoring usages -> discussion
113 110
                    days=body[VALIDITY_DAYS]
114 111
                )
115 112
            else:
116
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])
113
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])              # get base issuer info
117 114

  
118
                if issuer is None:
119
                    KEY_SERVICE.delete_key(key.private_key_id)
120
                    return E_NO_ISSUER_FOUND, 400
115
                if issuer is None:                                                  # if such issuer does not exist
116
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
117
                    return E_NO_ISSUER_FOUND, C_BAD_REQUEST                         # and throw
121 118

  
122
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
119
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)             # get issuer's key, which must exist
123 120

  
124
                if issuer_key is None:
125
                    KEY_SERVICE.delete_key(key.private_key_id)
126
                    return E_CORRUPTED_DATABASE, 500
121
                if issuer_key is None:                                              # if it does not
122
                    KEY_SERVICE.delete_key(key.private_key_id)                      # free
123
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR            # and throw
127 124

  
128 125
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
129 126
                    CERTIFICATE_SERVICE.create_end_cert
130 127

  
131
                cert = f(
132
                    key,
133
                    subject,
128
                # noinspection PyTypeChecker
129
                cert = f(                                                           # create inter CA or end cert
130
                    key,                                                            # according to whether 'CA' is among
131
                    subject,                                                        # the usages' fields
134 132
                    issuer,
135 133
                    issuer_key,
136 134
                    usages=usages_dict,
......
139 137

  
140 138
            if cert is not None:
141 139
                return {"success": True,
142
                        "data": cert.certificate_id}, 201
143
            else:
144
                KEY_SERVICE.delete_key(key.private_key_id)
145
                return {"success": False,
146
                        "data": "Internal error: The certificate could not have been created."}, 400
140
                        "data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
141
            else:                                                                   # if this fails, then
142
                KEY_SERVICE.delete_key(key.private_key_id)                          # free
143
                return {"success": False,                                           # and wonder what the cause is,
144
                        "data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
145
                                                                                    # as obj/None carries only one bit
146
                                                                                    # of error information
147 147
        else:
148
            return E_NOT_JSON_FORMAT, 400
148
            return E_NOT_JSON_FORMAT, C_BAD_REQUEST                                 # throw in case of non-JSON format
149 149

  
150 150
    @staticmethod
151
    def get_certificate_by_id(id):  # noqa: E501
151
    def get_certificate_by_id(id):
152 152
        """get certificate by ID
153 153

  
154
        Get certificate in PEM format by ID # noqa: E501
154
        Get certificate in PEM format by ID
155 155

  
156 156
        :param id: ID of a certificate to be queried
157 157
        :type id: dict | bytes
158 158

  
159 159
        :rtype: PemResponse
160 160
        """
161
        CertController.setup()  # TODO remove after issue fixed
161
        CertController.setup()                                                      # TODO remove after issue fixed
162 162

  
163 163
        try:
164 164
            v = int(id)
165 165
        except ValueError:
166
            return E_WRONG_PARAMETERS, 400
166
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
167 167

  
168 168
        cert = CERTIFICATE_SERVICE.get_certificate(v)
169 169

  
170 170
        if cert is None:
171
            return E_NO_CERTIFICATES_FOUND, 205                 # TODO related to 204 issue
171
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
172 172
        else:
173
            return {"success": True, "data": cert.pem_data}
173
            return {"success": True, "data": cert.pem_data}, C_SUCCESS
174 174

  
175 175
    @staticmethod
176
    def get_certificate_details_by_id(id):  # noqa: E501
176
    def get_certificate_details_by_id(id):
177 177
        """get certificate's details by ID
178 178

  
179
        Get certificate details by ID # noqa: E501
179
        Get certificate details by ID
180 180

  
181 181
        :param id: ID of a certificate whose details are to be queried
182 182
        :type id: dict | bytes
183 183

  
184 184
        :rtype: CertificateResponse
185 185
        """
186
        CertController.setup()  # TODO remove after issue fixed
186
        CertController.setup()                                                      # TODO remove after issue fixed
187 187

  
188 188
        try:
189 189
            v = int(id)
190 190
        except ValueError:
191
            return E_WRONG_PARAMETERS, 400
191
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
192 192

  
193 193
        cert = CERTIFICATE_SERVICE.get_certificate(v)
194 194

  
195 195
        if cert is None:
196
            return E_NO_CERTIFICATES_FOUND, 205  # TODO related to 204 issue
196
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
197 197
        else:
198 198
            data = CertController.cert_to_dict_full(cert)
199 199
            if data is None:
200
                return E_CORRUPTED_DATABASE, 500
201
            return {"success": True, "data": data}
200
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
201
            return {"success": True, "data": data}, C_SUCCESS
202 202

  
203 203
    @staticmethod
204
    def get_certificate_list():  # noqa: E501
204
    def get_certificate_list():
205 205
        """get list of certificates
206 206

  
207
        Lists certificates based on provided filtering options # noqa: E501
207
        Lists certificates based on provided filtering options
208 208

  
209 209
        :param filtering: Filter certificate type to be queried
210 210
        :type filtering: dict | bytes
211 211

  
212 212
        :rtype: CertificateListResponse
213 213
        """
214
        CertController.setup()  # TODO remove after issue fixed
215

  
216
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
217
        if request.is_json:
218
            data = request.get_json()
219
            if FILTERING in data:
220
                if isinstance(data[FILTERING], dict):
221
                    if CA in data[FILTERING]:
222
                        if isinstance(data[FILTERING][CA], bool):
223
                            if data[FILTERING][CA]:
214
        CertController.setup()                                                      # TODO remove after issue fixed
215

  
216
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}                  # all targets
217
        if request.is_json:                                                         # if the request carries JSON data
218
            data = request.get_json()                                               # get it
219
            if FILTERING in data:                                                   # if the 'filtering' field exists
220
                if isinstance(data[FILTERING], dict):                               # and it is also a 'dict'
221
                    if CA in data[FILTERING]:                                       # containing 'CA'
222
                        if isinstance(data[FILTERING][CA], bool):                   # which is a 'bool',
223
                            if data[FILTERING][CA]:                                 # then filter according to 'CA'.
224 224
                                targets.remove(CERTIFICATE_ID)
225 225
                            else:
226 226
                                targets.remove(ROOT_CA_ID)
227 227
                                targets.remove(INTERMEDIATE_CA_ID)
228 228
                        else:
229
                            return E_WRONG_PARAMETERS, 400
229
                            return E_WRONG_PARAMETERS, C_BAD_REQUEST
230 230
                else:
231
                    return E_WRONG_PARAMETERS, 400
231
                    return E_WRONG_PARAMETERS, C_BAD_REQUEST
232 232

  
233
        if len(targets) == 3:                               # potentially risky
234
            certs = CERTIFICATE_SERVICE.get_certificates()
235
        else:
233
        if len(targets) == TREE_NODE_TYPE_COUNT:                                    # = 3 -> root node,
234
                                                                                    # intermediate node,
235
                                                                                    # or leaf node
236

  
237
                                                                                    # if filtering did not change the
238
                                                                                    # targets,
239
            certs = CERTIFICATE_SERVICE.get_certificates()                          # fetch everything
240
        else:                                                                       # otherwise fetch targets only
236 241
            certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
237 242

  
238 243
        if certs is None:
239
            return E_GENERAL_ERROR, 500
244
            return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
240 245
        elif len(certs) == 0:
241
            return E_NO_CERTIFICATES_FOUND, 205         # TODO related to 204 issue
246
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
242 247
        else:
243 248
            ret = []
244 249
            for c in certs:
245 250
                data = CertController.cert_to_dict_partial(c)
246 251
                if data is None:
247
                    return E_CORRUPTED_DATABASE, 500
252
                    return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
248 253
                ret.append(
249 254
                    data
250 255
                )
251
            return {"success": True, "data": ret}
256
            return {"success": True, "data": ret}, C_SUCCESS
252 257

  
253 258
    @staticmethod
254
    def get_certificate_root_by_id(id):  # noqa: E501
259
    def get_certificate_root_by_id(id):
255 260
        """get certificate's root of trust chain by ID
256 261

  
257
        Get certificate's root of trust chain in PEM format by ID # noqa: E501
262
        Get certificate's root of trust chain in PEM format by ID
258 263

  
259 264
        :param id: ID of a child certificate whose root is to be queried
260 265
        :type id: dict | bytes
261 266

  
262 267
        :rtype: PemResponse
263 268
        """
264
        CertController.setup()  # TODO remove after issue fixed
269
        CertController.setup()                                                      # TODO remove after issue fixed
265 270

  
266 271
        try:
267 272
            v = int(id)
268 273
        except ValueError:
269
            return E_WRONG_PARAMETERS, 400
274
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
270 275

  
271 276
        cert = CERTIFICATE_SERVICE.get_certificate(v)
272 277

  
273 278
        if cert is None:
274
            return E_NO_CERTIFICATES_FOUND, 205  # TODO related to 204 issue
279
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
275 280

  
276 281
        while cert.parent_id != cert.certificate_id:
277 282
            cert = CERTIFICATE_SERVICE.get_certificate(cert.parent_id)
278 283
            if cert is None:
279
                return E_CORRUPTED_DATABASE, 500
284
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
280 285

  
281
        return {"success": True, "data": cert.pem_data}
286
        return {"success": True, "data": cert.pem_data}, C_SUCCESS
282 287

  
283 288
    @staticmethod
284
    def get_certificate_trust_chain_by_id(id):  # noqa: E501
289
    def get_certificate_trust_chain_by_id(id):
285 290
        """get certificate's trust chain by ID
286 291

  
287
        Get certificate trust chain in PEM format by ID # noqa: E501
292
        Get certificate trust chain in PEM format by ID
288 293

  
289 294
        :param id: ID of a child certificate whose chain is to be queried
290 295
        :type id: dict | bytes
291 296

  
292 297
        :rtype: PemResponse
293 298
        """
294
        CertController.setup()  # TODO remove after issue fixed
299
        CertController.setup()                                                      # TODO remove after issue fixed
295 300

  
296 301
        try:
297 302
            v = int(id)
298 303
        except ValueError:
299
            return E_WRONG_PARAMETERS, 400
304
            return E_WRONG_PARAMETERS, C_BAD_REQUEST
300 305

  
301 306
        cert = CERTIFICATE_SERVICE.get_certificate(v)
302 307

  
303 308
        if cert is None:
304
            return E_NO_CERTIFICATES_FOUND, 205  # TODO related to 204 issue
309
            return E_NO_CERTIFICATES_FOUND, C_NO_DATA                               
305 310

  
306 311
        ret = []
307 312

  
308 313
        while True:
309 314
            cert = CERTIFICATE_SERVICE.get_certificate(cert.parent_id)
310 315
            if cert is None:
311
                return E_CORRUPTED_DATABASE, 500
316
                return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
312 317
            elif cert.parent_id == cert.certificate_id:
313 318
                break
314 319
            ret.append(cert.pem_data)
315 320

  
316
        return {"success": True, "data": "".join(ret)}
321
        return {"success": True, "data": "".join(ret)}, C_SUCCESS
317 322

  
318 323
    @staticmethod
319 324
    def cert_to_dict_partial(c):
325
        """
326
        Dictionarizes a certificate directly fetched from the database. Contains partial information.
327
        :param c: target cert
328
        :return: certificate dict (compliant with some parts of the REST API)
329
        """
320 330
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
321 331
        if c_issuer is None:
322 332
            return None
......
335 345

  
336 346
    @staticmethod
337 347
    def cert_to_dict_full(c):
348
        """
349
        Dictionarizes a certificate directly fetched from the database, but adds subject info.
350
        Contains full information.
351
        :param c: target cert
352
        :return: certificate dict (compliant with some parts of the REST API)
353
        """
338 354
        subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
339 355
        c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
340 356
        if c_issuer is None:

Také k dispozici: Unified diff