Revize 1fa243ca
Přidáno uživatelem Jan Pašek před asi 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
4 | 4 |
from json import JSONDecodeError |
5 | 5 |
|
6 | 6 |
from flask import request |
7 |
from src.dao.private_key_repository import PrivateKeyRepository |
|
7 |
from injector import inject |
|
8 |
|
|
9 |
from src.constants import CA_ID, \ |
|
10 |
SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
11 |
DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's |
|
8 | 12 |
from src.model.subject import Subject |
9 | 13 |
from src.services.certificate_service import CertificateService |
10 |
from src.dao.certificate_repository import CertificateRepository # TODO not the Controller's responsibility. |
|
11 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. |
|
12 |
from sqlite3 import Connection # TODO not the Controller's responsibility. |
|
13 |
from src.constants import CA_ID, \ |
|
14 |
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
15 |
DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's |
|
16 |
# responsibility. |
|
14 |
# responsibility. |
|
17 | 15 |
from src.services.key_service import KeyService |
18 | 16 |
|
19 | 17 |
TREE_NODE_TYPE_COUNT = 3 |
... | ... | |
44 | 42 |
C_INTERNAL_SERVER_ERROR = 500 |
45 | 43 |
C_SUCCESS = 200 |
46 | 44 |
|
47 |
__ = CryptographyService() # TODO not the Controller's responsibility. |
|
48 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None)) |
|
49 |
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None)) # TODO not the Controller's responsibility. |
|
50 |
|
|
51 | 45 |
|
52 | 46 |
class CertController: |
53 | 47 |
KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
54 | 48 |
INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()} |
55 | 49 |
|
56 |
@staticmethod |
|
57 |
def setup(): |
|
58 |
""" |
|
59 |
SQLite3 thread issue hack. |
|
60 |
:return: |
|
61 |
""" |
|
62 |
_ = Connection("db/database_sqlite.db") |
|
63 |
CERTIFICATE_SERVICE.certificate_repository.connection = _ |
|
64 |
CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor() |
|
65 |
KEY_SERVICE.private_key_repository.connection = _ |
|
66 |
KEY_SERVICE.private_key_repository.cursor = _.cursor() |
|
67 |
|
|
68 |
@staticmethod |
|
69 |
def create_certificate(): |
|
50 |
@inject |
|
51 |
def __init__(self, certificate_service: CertificateService, key_service: KeyService): |
|
52 |
self.certificate_service = certificate_service |
|
53 |
self.key_service = key_service |
|
54 |
|
|
55 |
def create_certificate(self): |
|
70 | 56 |
"""create new certificate |
71 | 57 |
|
72 | 58 |
Create a new certificate based on given information |
... | ... | |
76 | 62 |
|
77 | 63 |
:rtype: CreatedResponse |
78 | 64 |
""" |
79 |
CertController.setup() # TODO remove after issue fixed |
|
80 |
|
|
81 | 65 |
required_keys = {SUBJECT, USAGE, VALIDITY_DAYS} # required fields of the POST req |
82 | 66 |
|
83 | 67 |
if request.is_json: # accept JSON only |
... | ... | |
103 | 87 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST # and throw if it is not |
104 | 88 |
usages_dict[CertController.KEY_MAP[k]] = v # otherwise translate key and set |
105 | 89 |
|
106 |
key = KEY_SERVICE.create_new_key() # TODO pass key
|
|
90 |
key = self.key_service.create_new_key() # TODO pass key
|
|
107 | 91 |
|
108 | 92 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none |
109 |
cert = CERTIFICATE_SERVICE.create_root_ca( # create a root CA
|
|
93 |
cert = self.certificate_service.create_root_ca( # create a root CA
|
|
110 | 94 |
key, |
111 | 95 |
subject, |
112 | 96 |
usages=usages_dict, # TODO ignoring usages -> discussion |
113 | 97 |
days=body[VALIDITY_DAYS] |
114 | 98 |
) |
115 | 99 |
else: |
116 |
issuer = CERTIFICATE_SERVICE.get_certificate(body[CA]) # get base issuer info
|
|
100 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info
|
|
117 | 101 |
|
118 | 102 |
if issuer is None: # if such issuer does not exist |
119 |
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
103 |
self.key_service.delete_key(key.private_key_id) # free
|
|
120 | 104 |
return E_NO_ISSUER_FOUND, C_BAD_REQUEST # and throw |
121 | 105 |
|
122 |
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) # get issuer's key, which must exist
|
|
106 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist
|
|
123 | 107 |
|
124 | 108 |
if issuer_key is None: # if it does not |
125 |
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
109 |
self.key_service.delete_key(key.private_key_id) # free
|
|
126 | 110 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR # and throw |
127 | 111 |
|
128 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
|
|
129 |
CERTIFICATE_SERVICE.create_end_cert
|
|
112 |
f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
|
|
113 |
self.certificate_service.create_end_cert
|
|
130 | 114 |
|
131 | 115 |
# noinspection PyTypeChecker |
132 | 116 |
cert = f( # create inter CA or end cert |
... | ... | |
142 | 126 |
return {"success": True, |
143 | 127 |
"data": cert.certificate_id}, C_CREATED_SUCCESSFULLY |
144 | 128 |
else: # if this fails, then |
145 |
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
129 |
self.key_service.delete_key(key.private_key_id) # free
|
|
146 | 130 |
return {"success": False, # and wonder what the cause is, |
147 | 131 |
"data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST |
148 | 132 |
# as obj/None carries only one bit |
... | ... | |
150 | 134 |
else: |
151 | 135 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST # throw in case of non-JSON format |
152 | 136 |
|
153 |
@staticmethod |
|
154 |
def get_certificate_by_id(id): |
|
137 |
def get_certificate_by_id(self, id): |
|
155 | 138 |
"""get certificate by ID |
156 | 139 |
|
157 | 140 |
Get certificate in PEM format by ID |
... | ... | |
161 | 144 |
|
162 | 145 |
:rtype: PemResponse |
163 | 146 |
""" |
164 |
CertController.setup() # TODO remove after issue fixed |
|
165 |
|
|
166 | 147 |
try: |
167 | 148 |
v = int(id) |
168 | 149 |
except ValueError: |
169 | 150 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
170 | 151 |
|
171 |
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
152 |
cert = self.certificate_service.get_certificate(v)
|
|
172 | 153 |
|
173 | 154 |
if cert is None: |
174 | 155 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
175 | 156 |
else: |
176 | 157 |
return {"success": True, "data": cert.pem_data}, C_SUCCESS |
177 | 158 |
|
178 |
@staticmethod |
|
179 |
def get_certificate_details_by_id(id): |
|
159 |
def get_certificate_details_by_id(self, id): |
|
180 | 160 |
"""get certificate's details by ID |
181 | 161 |
|
182 | 162 |
Get certificate details by ID |
... | ... | |
186 | 166 |
|
187 | 167 |
:rtype: CertificateResponse |
188 | 168 |
""" |
189 |
CertController.setup() # TODO remove after issue fixed |
|
190 |
|
|
191 | 169 |
try: |
192 | 170 |
v = int(id) |
193 | 171 |
except ValueError: |
194 | 172 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
195 | 173 |
|
196 |
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
174 |
cert = self.certificate_service.get_certificate(v)
|
|
197 | 175 |
|
198 | 176 |
if cert is None: |
199 | 177 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
200 | 178 |
else: |
201 |
data = CertController.cert_to_dict_full(cert)
|
|
179 |
data = self.cert_to_dict_full(cert)
|
|
202 | 180 |
if data is None: |
203 | 181 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR |
204 | 182 |
return {"success": True, "data": data}, C_SUCCESS |
205 | 183 |
|
206 |
@staticmethod |
|
207 |
def get_certificate_list(): |
|
184 |
def get_certificate_list(self): |
|
208 | 185 |
"""get list of certificates |
209 | 186 |
|
210 | 187 |
Lists certificates based on provided filtering options |
... | ... | |
214 | 191 |
|
215 | 192 |
:rtype: CertificateListResponse |
216 | 193 |
""" |
217 |
CertController.setup() # TODO remove after issue fixed |
|
218 |
|
|
219 | 194 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} # all targets |
220 | 195 |
|
221 | 196 |
# the filtering parameter can be read as URL argument or as a request body |
... | ... | |
248 | 223 |
|
249 | 224 |
# if filtering did not change the |
250 | 225 |
# targets, |
251 |
certs = CERTIFICATE_SERVICE.get_certificates() # fetch everything
|
|
226 |
certs = self.certificate_service.get_certificates() # fetch everything
|
|
252 | 227 |
else: # otherwise fetch targets only |
253 |
certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
|
|
228 |
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
|
|
254 | 229 |
|
255 | 230 |
if certs is None: |
256 | 231 |
return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR |
... | ... | |
259 | 234 |
else: |
260 | 235 |
ret = [] |
261 | 236 |
for c in certs: |
262 |
data = CertController.cert_to_dict_partial(c)
|
|
237 |
data = self.cert_to_dict_partial(c)
|
|
263 | 238 |
if data is None: |
264 | 239 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR |
265 | 240 |
ret.append( |
... | ... | |
267 | 242 |
) |
268 | 243 |
return {"success": True, "data": ret}, C_SUCCESS |
269 | 244 |
|
270 |
@staticmethod |
|
271 |
def get_certificate_root_by_id(id): |
|
245 |
def get_certificate_root_by_id(self, id): |
|
272 | 246 |
"""get certificate's root of trust chain by ID |
273 | 247 |
|
274 | 248 |
Get certificate's root of trust chain in PEM format by ID |
... | ... | |
278 | 252 |
|
279 | 253 |
:rtype: PemResponse |
280 | 254 |
""" |
281 |
CertController.setup() # TODO remove after issue fixed |
|
282 |
|
|
283 | 255 |
try: |
284 | 256 |
v = int(id) |
285 | 257 |
except ValueError: |
286 | 258 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
287 | 259 |
|
288 |
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
260 |
cert = self.certificate_service.get_certificate(v)
|
|
289 | 261 |
|
290 | 262 |
if cert is None: |
291 | 263 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
292 | 264 |
|
293 |
trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id, exclude_root=False)
|
|
294 |
if trust_chain is None or len(trust_chain) is 0:
|
|
265 |
trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
|
|
266 |
if trust_chain is None or len(trust_chain) == 0:
|
|
295 | 267 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
296 | 268 |
|
297 | 269 |
return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS |
298 | 270 |
|
299 |
@staticmethod |
|
300 |
def get_certificate_trust_chain_by_id(id): |
|
271 |
def get_certificate_trust_chain_by_id(self, id): |
|
301 | 272 |
"""get certificate's trust chain by ID |
302 | 273 |
|
303 | 274 |
Get certificate trust chain in PEM format by ID |
... | ... | |
307 | 278 |
|
308 | 279 |
:rtype: PemResponse |
309 | 280 |
""" |
310 |
CertController.setup() # TODO remove after issue fixed |
|
311 | 281 |
|
312 | 282 |
try: |
313 | 283 |
v = int(id) |
314 | 284 |
except ValueError: |
315 | 285 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
316 | 286 |
|
317 |
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
287 |
cert = self.certificate_service.get_certificate(v)
|
|
318 | 288 |
|
319 | 289 |
if cert is None: |
320 | 290 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
... | ... | |
322 | 292 |
if cert.parent_id is None: |
323 | 293 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA |
324 | 294 |
|
325 |
trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id)
|
|
295 |
trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id)
|
|
326 | 296 |
|
327 | 297 |
ret = [] |
328 | 298 |
for intermediate in trust_chain: |
... | ... | |
330 | 300 |
|
331 | 301 |
return {"success": True, "data": "".join(ret)}, C_SUCCESS |
332 | 302 |
|
333 |
@staticmethod |
|
334 |
def cert_to_dict_partial(c): |
|
303 |
def cert_to_dict_partial(self, c): |
|
335 | 304 |
""" |
336 | 305 |
Dictionarizes a certificate directly fetched from the database. Contains partial information. |
337 | 306 |
:param c: target cert |
338 | 307 |
:return: certificate dict (compliant with some parts of the REST API) |
339 | 308 |
""" |
340 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
|
|
309 |
c_issuer = self.certificate_service.get_certificate(c.parent_id)
|
|
341 | 310 |
if c_issuer is None: |
342 | 311 |
return None |
343 | 312 |
|
... | ... | |
353 | 322 |
} |
354 | 323 |
} |
355 | 324 |
|
356 |
@staticmethod |
|
357 |
def cert_to_dict_full(c): |
|
325 |
def cert_to_dict_full(self, c): |
|
358 | 326 |
""" |
359 | 327 |
Dictionarizes a certificate directly fetched from the database, but adds subject info. |
360 | 328 |
Contains full information. |
361 | 329 |
:param c: target cert |
362 | 330 |
:return: certificate dict (compliant with some parts of the REST API) |
363 | 331 |
""" |
364 |
subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
|
|
365 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
|
|
332 |
subj = self.certificate_service.get_subject_from_certificate(c)
|
|
333 |
c_issuer = self.certificate_service.get_certificate(c.parent_id)
|
|
366 | 334 |
if c_issuer is None: |
367 | 335 |
return None |
368 | 336 |
|
Také k dispozici: Unified diff
Re #8569 Added application initialization and running dependency injection