4 |
4 |
from json import JSONDecodeError
|
5 |
5 |
|
6 |
6 |
from flask import request
|
7 |
|
from src.dao.private_key_repository import PrivateKeyRepository
|
|
7 |
from injector import inject
|
|
8 |
|
|
9 |
from src.constants import CA_ID, \
|
|
10 |
SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
|
|
11 |
DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's
|
8 |
12 |
from src.model.subject import Subject
|
9 |
13 |
from src.services.certificate_service import CertificateService
|
10 |
|
from src.dao.certificate_repository import CertificateRepository # TODO not the Controller's responsibility.
|
11 |
|
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility.
|
12 |
|
from sqlite3 import Connection # TODO not the Controller's responsibility.
|
13 |
|
from src.constants import CA_ID, \
|
14 |
|
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
|
15 |
|
DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's
|
16 |
|
# responsibility.
|
|
14 |
# responsibility.
|
17 |
15 |
from src.services.key_service import KeyService
|
18 |
16 |
|
19 |
17 |
TREE_NODE_TYPE_COUNT = 3
|
... | ... | |
44 |
42 |
C_INTERNAL_SERVER_ERROR = 500
|
45 |
43 |
C_SUCCESS = 200
|
46 |
44 |
|
47 |
|
__ = CryptographyService() # TODO not the Controller's responsibility.
|
48 |
|
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
|
49 |
|
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None)) # TODO not the Controller's responsibility.
|
50 |
|
|
51 |
45 |
|
52 |
46 |
class CertController:
|
53 |
47 |
KEY_MAP = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
|
54 |
48 |
INVERSE_KEY_MAP = {k: v for v, k in KEY_MAP.items()}
|
55 |
49 |
|
56 |
|
@staticmethod
|
57 |
|
def setup():
|
58 |
|
"""
|
59 |
|
SQLite3 thread issue hack.
|
60 |
|
:return:
|
61 |
|
"""
|
62 |
|
_ = Connection("db/database_sqlite.db")
|
63 |
|
CERTIFICATE_SERVICE.certificate_repository.connection = _
|
64 |
|
CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
|
65 |
|
KEY_SERVICE.private_key_repository.connection = _
|
66 |
|
KEY_SERVICE.private_key_repository.cursor = _.cursor()
|
67 |
|
|
68 |
|
@staticmethod
|
69 |
|
def create_certificate():
|
|
50 |
@inject
|
|
51 |
def __init__(self, certificate_service: CertificateService, key_service: KeyService):
|
|
52 |
self.certificate_service = certificate_service
|
|
53 |
self.key_service = key_service
|
|
54 |
|
|
55 |
def create_certificate(self):
|
70 |
56 |
"""create new certificate
|
71 |
57 |
|
72 |
58 |
Create a new certificate based on given information
|
... | ... | |
76 |
62 |
|
77 |
63 |
:rtype: CreatedResponse
|
78 |
64 |
"""
|
79 |
|
CertController.setup() # TODO remove after issue fixed
|
80 |
|
|
81 |
65 |
required_keys = {SUBJECT, USAGE, VALIDITY_DAYS} # required fields of the POST req
|
82 |
66 |
|
83 |
67 |
if request.is_json: # accept JSON only
|
... | ... | |
103 |
87 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST # and throw if it is not
|
104 |
88 |
usages_dict[CertController.KEY_MAP[k]] = v # otherwise translate key and set
|
105 |
89 |
|
106 |
|
key = KEY_SERVICE.create_new_key() # TODO pass key
|
|
90 |
key = self.key_service.create_new_key() # TODO pass key
|
107 |
91 |
|
108 |
92 |
if CA not in body or body[CA] is None: # if issuer omitted (legal) or none
|
109 |
|
cert = CERTIFICATE_SERVICE.create_root_ca( # create a root CA
|
|
93 |
cert = self.certificate_service.create_root_ca( # create a root CA
|
110 |
94 |
key,
|
111 |
95 |
subject,
|
112 |
96 |
usages=usages_dict, # TODO ignoring usages -> discussion
|
113 |
97 |
days=body[VALIDITY_DAYS]
|
114 |
98 |
)
|
115 |
99 |
else:
|
116 |
|
issuer = CERTIFICATE_SERVICE.get_certificate(body[CA]) # get base issuer info
|
|
100 |
issuer = self.certificate_service.get_certificate(body[CA]) # get base issuer info
|
117 |
101 |
|
118 |
102 |
if issuer is None: # if such issuer does not exist
|
119 |
|
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
103 |
self.key_service.delete_key(key.private_key_id) # free
|
120 |
104 |
return E_NO_ISSUER_FOUND, C_BAD_REQUEST # and throw
|
121 |
105 |
|
122 |
|
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) # get issuer's key, which must exist
|
|
106 |
issuer_key = self.key_service.get_key(issuer.private_key_id) # get issuer's key, which must exist
|
123 |
107 |
|
124 |
108 |
if issuer_key is None: # if it does not
|
125 |
|
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
109 |
self.key_service.delete_key(key.private_key_id) # free
|
126 |
110 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR # and throw
|
127 |
111 |
|
128 |
|
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
|
129 |
|
CERTIFICATE_SERVICE.create_end_cert
|
|
112 |
f = self.certificate_service.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
|
|
113 |
self.certificate_service.create_end_cert
|
130 |
114 |
|
131 |
115 |
# noinspection PyTypeChecker
|
132 |
116 |
cert = f( # create inter CA or end cert
|
... | ... | |
142 |
126 |
return {"success": True,
|
143 |
127 |
"data": cert.certificate_id}, C_CREATED_SUCCESSFULLY
|
144 |
128 |
else: # if this fails, then
|
145 |
|
KEY_SERVICE.delete_key(key.private_key_id) # free
|
|
129 |
self.key_service.delete_key(key.private_key_id) # free
|
146 |
130 |
return {"success": False, # and wonder what the cause is,
|
147 |
131 |
"data": "Internal error: The certificate could not have been created."}, C_BAD_REQUEST
|
148 |
132 |
# as obj/None carries only one bit
|
... | ... | |
150 |
134 |
else:
|
151 |
135 |
return E_NOT_JSON_FORMAT, C_BAD_REQUEST # throw in case of non-JSON format
|
152 |
136 |
|
153 |
|
@staticmethod
|
154 |
|
def get_certificate_by_id(id):
|
|
137 |
def get_certificate_by_id(self, id):
|
155 |
138 |
"""get certificate by ID
|
156 |
139 |
|
157 |
140 |
Get certificate in PEM format by ID
|
... | ... | |
161 |
144 |
|
162 |
145 |
:rtype: PemResponse
|
163 |
146 |
"""
|
164 |
|
CertController.setup() # TODO remove after issue fixed
|
165 |
|
|
166 |
147 |
try:
|
167 |
148 |
v = int(id)
|
168 |
149 |
except ValueError:
|
169 |
150 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST
|
170 |
151 |
|
171 |
|
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
152 |
cert = self.certificate_service.get_certificate(v)
|
172 |
153 |
|
173 |
154 |
if cert is None:
|
174 |
155 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
175 |
156 |
else:
|
176 |
157 |
return {"success": True, "data": cert.pem_data}, C_SUCCESS
|
177 |
158 |
|
178 |
|
@staticmethod
|
179 |
|
def get_certificate_details_by_id(id):
|
|
159 |
def get_certificate_details_by_id(self, id):
|
180 |
160 |
"""get certificate's details by ID
|
181 |
161 |
|
182 |
162 |
Get certificate details by ID
|
... | ... | |
186 |
166 |
|
187 |
167 |
:rtype: CertificateResponse
|
188 |
168 |
"""
|
189 |
|
CertController.setup() # TODO remove after issue fixed
|
190 |
|
|
191 |
169 |
try:
|
192 |
170 |
v = int(id)
|
193 |
171 |
except ValueError:
|
194 |
172 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST
|
195 |
173 |
|
196 |
|
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
174 |
cert = self.certificate_service.get_certificate(v)
|
197 |
175 |
|
198 |
176 |
if cert is None:
|
199 |
177 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
200 |
178 |
else:
|
201 |
|
data = CertController.cert_to_dict_full(cert)
|
|
179 |
data = self.cert_to_dict_full(cert)
|
202 |
180 |
if data is None:
|
203 |
181 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
|
204 |
182 |
return {"success": True, "data": data}, C_SUCCESS
|
205 |
183 |
|
206 |
|
@staticmethod
|
207 |
|
def get_certificate_list():
|
|
184 |
def get_certificate_list(self):
|
208 |
185 |
"""get list of certificates
|
209 |
186 |
|
210 |
187 |
Lists certificates based on provided filtering options
|
... | ... | |
214 |
191 |
|
215 |
192 |
:rtype: CertificateListResponse
|
216 |
193 |
"""
|
217 |
|
CertController.setup() # TODO remove after issue fixed
|
218 |
|
|
219 |
194 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} # all targets
|
220 |
195 |
|
221 |
196 |
# the filtering parameter can be read as URL argument or as a request body
|
... | ... | |
248 |
223 |
|
249 |
224 |
# if filtering did not change the
|
250 |
225 |
# targets,
|
251 |
|
certs = CERTIFICATE_SERVICE.get_certificates() # fetch everything
|
|
226 |
certs = self.certificate_service.get_certificates() # fetch everything
|
252 |
227 |
else: # otherwise fetch targets only
|
253 |
|
certs = list(chain(*(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)))
|
|
228 |
certs = list(chain(*(self.certificate_service.get_certificates(target) for target in targets)))
|
254 |
229 |
|
255 |
230 |
if certs is None:
|
256 |
231 |
return E_GENERAL_ERROR, C_INTERNAL_SERVER_ERROR
|
... | ... | |
259 |
234 |
else:
|
260 |
235 |
ret = []
|
261 |
236 |
for c in certs:
|
262 |
|
data = CertController.cert_to_dict_partial(c)
|
|
237 |
data = self.cert_to_dict_partial(c)
|
263 |
238 |
if data is None:
|
264 |
239 |
return E_CORRUPTED_DATABASE, C_INTERNAL_SERVER_ERROR
|
265 |
240 |
ret.append(
|
... | ... | |
267 |
242 |
)
|
268 |
243 |
return {"success": True, "data": ret}, C_SUCCESS
|
269 |
244 |
|
270 |
|
@staticmethod
|
271 |
|
def get_certificate_root_by_id(id):
|
|
245 |
def get_certificate_root_by_id(self, id):
|
272 |
246 |
"""get certificate's root of trust chain by ID
|
273 |
247 |
|
274 |
248 |
Get certificate's root of trust chain in PEM format by ID
|
... | ... | |
278 |
252 |
|
279 |
253 |
:rtype: PemResponse
|
280 |
254 |
"""
|
281 |
|
CertController.setup() # TODO remove after issue fixed
|
282 |
|
|
283 |
255 |
try:
|
284 |
256 |
v = int(id)
|
285 |
257 |
except ValueError:
|
286 |
258 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST
|
287 |
259 |
|
288 |
|
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
260 |
cert = self.certificate_service.get_certificate(v)
|
289 |
261 |
|
290 |
262 |
if cert is None:
|
291 |
263 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
292 |
264 |
|
293 |
|
trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id, exclude_root=False)
|
294 |
|
if trust_chain is None or len(trust_chain) is 0:
|
|
265 |
trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id, exclude_root=False)
|
|
266 |
if trust_chain is None or len(trust_chain) == 0:
|
295 |
267 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
296 |
268 |
|
297 |
269 |
return {"success": True, "data": trust_chain[-1].pem_data}, C_SUCCESS
|
298 |
270 |
|
299 |
|
@staticmethod
|
300 |
|
def get_certificate_trust_chain_by_id(id):
|
|
271 |
def get_certificate_trust_chain_by_id(self, id):
|
301 |
272 |
"""get certificate's trust chain by ID
|
302 |
273 |
|
303 |
274 |
Get certificate trust chain in PEM format by ID
|
... | ... | |
307 |
278 |
|
308 |
279 |
:rtype: PemResponse
|
309 |
280 |
"""
|
310 |
|
CertController.setup() # TODO remove after issue fixed
|
311 |
281 |
|
312 |
282 |
try:
|
313 |
283 |
v = int(id)
|
314 |
284 |
except ValueError:
|
315 |
285 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST
|
316 |
286 |
|
317 |
|
cert = CERTIFICATE_SERVICE.get_certificate(v)
|
|
287 |
cert = self.certificate_service.get_certificate(v)
|
318 |
288 |
|
319 |
289 |
if cert is None:
|
320 |
290 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
... | ... | |
322 |
292 |
if cert.parent_id is None:
|
323 |
293 |
return E_NO_CERTIFICATES_FOUND, C_NO_DATA
|
324 |
294 |
|
325 |
|
trust_chain = CERTIFICATE_SERVICE.get_chain_of_trust(cert.parent_id)
|
|
295 |
trust_chain = self.certificate_service.get_chain_of_trust(cert.parent_id)
|
326 |
296 |
|
327 |
297 |
ret = []
|
328 |
298 |
for intermediate in trust_chain:
|
... | ... | |
330 |
300 |
|
331 |
301 |
return {"success": True, "data": "".join(ret)}, C_SUCCESS
|
332 |
302 |
|
333 |
|
@staticmethod
|
334 |
|
def cert_to_dict_partial(c):
|
|
303 |
def cert_to_dict_partial(self, c):
|
335 |
304 |
"""
|
336 |
305 |
Dictionarizes a certificate directly fetched from the database. Contains partial information.
|
337 |
306 |
:param c: target cert
|
338 |
307 |
:return: certificate dict (compliant with some parts of the REST API)
|
339 |
308 |
"""
|
340 |
|
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
|
|
309 |
c_issuer = self.certificate_service.get_certificate(c.parent_id)
|
341 |
310 |
if c_issuer is None:
|
342 |
311 |
return None
|
343 |
312 |
|
... | ... | |
353 |
322 |
}
|
354 |
323 |
}
|
355 |
324 |
|
356 |
|
@staticmethod
|
357 |
|
def cert_to_dict_full(c):
|
|
325 |
def cert_to_dict_full(self, c):
|
358 |
326 |
"""
|
359 |
327 |
Dictionarizes a certificate directly fetched from the database, but adds subject info.
|
360 |
328 |
Contains full information.
|
361 |
329 |
:param c: target cert
|
362 |
330 |
:return: certificate dict (compliant with some parts of the REST API)
|
363 |
331 |
"""
|
364 |
|
subj = CERTIFICATE_SERVICE.get_subject_from_certificate(c)
|
365 |
|
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
|
|
332 |
subj = self.certificate_service.get_subject_from_certificate(c)
|
|
333 |
c_issuer = self.certificate_service.get_certificate(c.parent_id)
|
366 |
334 |
if c_issuer is None:
|
367 |
335 |
return None
|
368 |
336 |
|
Re #8569 Added application initialization and running dependency injection