Revize 1e07432d
Přidáno uživatelem Michal Seják před asi 4 roky(ů)
swagger_server/controllers/certificates_controller.py | ||
---|---|---|
1 |
from datetime import datetime |
|
2 |
|
|
1 | 3 |
import connexion |
2 | 4 |
import six |
3 | 5 |
|
... | ... | |
8 | 10 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. 2 |
9 | 11 |
from sqlite3 import Connection # TODO not the Controller's responsibility. 3 |
10 | 12 |
from src.constants import DICT_USAGES, CA_ID, \ |
11 |
DATABASE_FILE_LOCATION # TODO DATABASE_FILE - not the Controller's |
|
13 |
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
14 |
DATETIME_FORMAT # TODO DATABASE_FILE - not the Controller's |
|
12 | 15 |
# responsibility. 4 |
13 | 16 |
from src.services.key_service import KeyService |
14 |
from swagger_server.models import CertificateRequest |
|
17 |
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
|
|
15 | 18 |
|
16 | 19 |
from swagger_server.models.certificate import Certificate # noqa: E501 |
17 | 20 |
from swagger_server.models.certificate_list_response import CertificateListResponse # noqa: E501 |
... | ... | |
31 | 34 |
# the one corresponding to the connection. |
32 | 35 |
# The cursor can always be generated from the |
33 | 36 |
# connection instance. |
37 |
GENERAL_ERROR = "An error occured during processing of the request." |
|
38 |
CORRUPTED_DATABASE = "Internal server error (corrupted database)." |
|
34 | 39 |
|
35 | 40 |
__ = CryptographyService() # TODO not the Controller's responsibility. 6 |
36 | 41 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None)) |
... | ... | |
104 | 109 |
if issuer_key is None: |
105 | 110 |
return ErrorResponse( |
106 | 111 |
success=False, |
107 |
data="Internal server error (corrupted database)."
|
|
112 |
data=CORRUPTED_DATABASE
|
|
108 | 113 |
), 400 |
109 | 114 |
|
110 | 115 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
... | ... | |
130 | 135 |
data="The certificate could not have been created." |
131 | 136 |
), 400 |
132 | 137 |
else: |
133 |
return 400 |
|
138 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
|
|
134 | 139 |
|
135 | 140 |
|
136 | 141 |
def get_certificate_by_id(id): # noqa: E501 |
... | ... | |
175 | 180 |
""" |
176 | 181 |
setup() # TODO remove after issue fixed |
177 | 182 |
|
178 |
if connexion.request.is_json: |
|
179 |
filtering = Filtering.from_dict(connexion.request.get_json()) # noqa: E501 |
|
180 |
print(filtering) |
|
181 |
return 200 |
|
183 |
key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'} |
|
184 |
|
|
185 |
if len(connexion.request.data) == 0: |
|
186 |
certs = CERTIFICATE_SERVICE.get_certificates() |
|
187 |
if certs is None: |
|
188 |
return ErrorResponse(success=False, data=GENERAL_ERROR), 500 |
|
189 |
elif len(certs) == 0: |
|
190 |
return ErrorResponse(success=False, data="No certificates found."), 204 |
|
191 |
else: |
|
192 |
ret = [] |
|
193 |
for c in certs: |
|
194 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
195 |
if c_issuer is None: |
|
196 |
return ErrorResponse(success=False, data=CORRUPTED_DATABASE) |
|
197 |
|
|
198 |
ret.append( |
|
199 |
CertificateListItem( |
|
200 |
id=c.certificate_id, |
|
201 |
cn=c.common_name, |
|
202 |
not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
203 |
not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
204 |
usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}), |
|
205 |
issuer=IssuerListItem( |
|
206 |
id=c_issuer.certificate_id, |
|
207 |
cn=c_issuer.common_name |
|
208 |
) |
|
209 |
) |
|
210 |
) |
|
211 |
return CertificateListResponse(success=True, data=ret) |
|
182 | 212 |
else: |
183 |
return 400 |
|
213 |
# TODO fix filtering issue (somehow) |
|
214 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400 |
|
184 | 215 |
|
185 | 216 |
|
186 | 217 |
def get_certificate_root_by_id(id): # noqa: E501 |
Také k dispozici: Unified diff
Re #8476 - Implemented `list_certificates` with unit tests.