Revize 1e07432d
Přidáno uživatelem Michal Seják před asi 4 roky(ů)
src/constants.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
DATABASE_FILE = "db/database_sqlite.db" |
4 | 4 |
DATABASE_FILE_LOCATION = FileAnchor("aswi2021jmsd", DATABASE_FILE) |
5 |
|
|
5 |
DATETIME_FORMAT = "%m.%d.%Y %H:%M:%S" |
|
6 | 6 |
|
7 | 7 |
# Types of certificates |
8 | 8 |
ROOT_CA_ID = 1 |
swagger_server/controllers/certificates_controller.py | ||
---|---|---|
1 |
from datetime import datetime |
|
2 |
|
|
1 | 3 |
import connexion |
2 | 4 |
import six |
3 | 5 |
|
... | ... | |
8 | 10 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. 2 |
9 | 11 |
from sqlite3 import Connection # TODO not the Controller's responsibility. 3 |
10 | 12 |
from src.constants import DICT_USAGES, CA_ID, \ |
11 |
DATABASE_FILE_LOCATION # TODO DATABASE_FILE - not the Controller's |
|
13 |
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
14 |
DATETIME_FORMAT # TODO DATABASE_FILE - not the Controller's |
|
12 | 15 |
# responsibility. 4 |
13 | 16 |
from src.services.key_service import KeyService |
14 |
from swagger_server.models import CertificateRequest |
|
17 |
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
|
|
15 | 18 |
|
16 | 19 |
from swagger_server.models.certificate import Certificate # noqa: E501 |
17 | 20 |
from swagger_server.models.certificate_list_response import CertificateListResponse # noqa: E501 |
... | ... | |
31 | 34 |
# the one corresponding to the connection. |
32 | 35 |
# The cursor can always be generated from the |
33 | 36 |
# connection instance. |
37 |
GENERAL_ERROR = "An error occured during processing of the request." |
|
38 |
CORRUPTED_DATABASE = "Internal server error (corrupted database)." |
|
34 | 39 |
|
35 | 40 |
__ = CryptographyService() # TODO not the Controller's responsibility. 6 |
36 | 41 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None)) |
... | ... | |
104 | 109 |
if issuer_key is None: |
105 | 110 |
return ErrorResponse( |
106 | 111 |
success=False, |
107 |
data="Internal server error (corrupted database)."
|
|
112 |
data=CORRUPTED_DATABASE
|
|
108 | 113 |
), 400 |
109 | 114 |
|
110 | 115 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
... | ... | |
130 | 135 |
data="The certificate could not have been created." |
131 | 136 |
), 400 |
132 | 137 |
else: |
133 |
return 400 |
|
138 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
|
|
134 | 139 |
|
135 | 140 |
|
136 | 141 |
def get_certificate_by_id(id): # noqa: E501 |
... | ... | |
175 | 180 |
""" |
176 | 181 |
setup() # TODO remove after issue fixed |
177 | 182 |
|
178 |
if connexion.request.is_json: |
|
179 |
filtering = Filtering.from_dict(connexion.request.get_json()) # noqa: E501 |
|
180 |
print(filtering) |
|
181 |
return 200 |
|
183 |
key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'} |
|
184 |
|
|
185 |
if len(connexion.request.data) == 0: |
|
186 |
certs = CERTIFICATE_SERVICE.get_certificates() |
|
187 |
if certs is None: |
|
188 |
return ErrorResponse(success=False, data=GENERAL_ERROR), 500 |
|
189 |
elif len(certs) == 0: |
|
190 |
return ErrorResponse(success=False, data="No certificates found."), 204 |
|
191 |
else: |
|
192 |
ret = [] |
|
193 |
for c in certs: |
|
194 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
195 |
if c_issuer is None: |
|
196 |
return ErrorResponse(success=False, data=CORRUPTED_DATABASE) |
|
197 |
|
|
198 |
ret.append( |
|
199 |
CertificateListItem( |
|
200 |
id=c.certificate_id, |
|
201 |
cn=c.common_name, |
|
202 |
not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
203 |
not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
204 |
usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}), |
|
205 |
issuer=IssuerListItem( |
|
206 |
id=c_issuer.certificate_id, |
|
207 |
cn=c_issuer.common_name |
|
208 |
) |
|
209 |
) |
|
210 |
) |
|
211 |
return CertificateListResponse(success=True, data=ret) |
|
182 | 212 |
else: |
183 |
return 400 |
|
213 |
# TODO fix filtering issue (somehow) |
|
214 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400 |
|
184 | 215 |
|
185 | 216 |
|
186 | 217 |
def get_certificate_root_by_id(id): # noqa: E501 |
tests/unit_tests/rest_api/certificates.py | ||
---|---|---|
123 | 123 |
ret = server.post("/api/certificates", content_type="application/json", json={ |
124 | 124 |
"subject": { |
125 | 125 |
"C": "CZ", |
126 |
"CN": "Root CA s.r.o.", |
|
126 |
"CN": "Another Root CA s.r.o.",
|
|
127 | 127 |
"L": "Pilsen", |
128 |
"O": "Root CA s.r.o.", |
|
128 |
"O": "Another Root CA s.r.o.",
|
|
129 | 129 |
"OU": "IT department", |
130 | 130 |
"ST": "Pilsen Region", |
131 | 131 |
"emailAddress": "root@ca.com" |
... | ... | |
149 | 149 |
assert d["success"] |
150 | 150 |
|
151 | 151 |
ret = server.post("/api/certificates", content_type="application/json", json={ |
152 |
"CA": 2, |
|
152 | 153 |
"subject": { |
153 |
"CA": 2, |
|
154 | 154 |
"C": "CZ", |
155 |
"CN": "Root CA s.r.o.",
|
|
155 |
"CN": "Intermediate CA s.r.o.",
|
|
156 | 156 |
"L": "Pilsen", |
157 |
"O": "Root CA s.r.o.",
|
|
157 |
"O": "Intermediate CA s.r.o.",
|
|
158 | 158 |
"OU": "IT department", |
159 | 159 |
"ST": "Pilsen Region", |
160 |
"emailAddress": "root@ca.com"
|
|
160 |
"emailAddress": "inter@ca.com"
|
|
161 | 161 |
}, |
162 | 162 |
"usage": { |
163 | 163 |
"CA": True, |
... | ... | |
175 | 175 |
assert "data" in d |
176 | 176 |
assert d["data"] == 3 |
177 | 177 |
assert "success" in d |
178 |
assert d["success"] |
|
179 |
|
|
180 |
|
|
181 |
def test_list_of_certificates(server): |
|
182 |
ret = server.get("/api/certificates") |
|
183 |
|
|
184 |
assert ret.status_code == 200 |
|
185 |
|
|
186 |
usage_dict = {'CA': True, 'SSL': True, 'authentication': True, 'digitalSignature': True} |
|
187 |
s1 = "Root CA s.r.o." |
|
188 |
s2 = "Another Root CA s.r.o." |
|
189 |
s3 = "Intermediate CA s.r.o." |
|
190 |
|
|
191 |
assert "data" in ret.json |
|
192 |
assert "success" in ret.json |
|
193 |
assert ret.json["success"] |
|
194 |
|
|
195 |
d = ret.json["data"] |
|
196 |
|
|
197 |
# print(d) |
|
198 |
|
|
199 |
assert d[0]['CN'] == s1 |
|
200 |
assert d[1]['CN'] == s2 |
|
201 |
assert d[2]['CN'] == s3 |
|
202 |
|
|
203 |
assert all(d[i]['id'] == i + 1 for i in range(3)) |
|
204 |
assert all(d[i]['usage'] == usage_dict for i in range(3)) |
|
205 |
|
|
206 |
assert d[0]['issuer']['CN'] == s1 |
|
207 |
assert d[1]['issuer']['CN'] == s2 |
|
208 |
assert d[2]['issuer']['CN'] == s2 |
|
209 |
|
|
210 |
assert d[0]['issuer']['id'] == 1 |
|
211 |
assert d[1]['issuer']['id'] == 2 |
|
212 |
assert d[2]['issuer']['id'] == 2 |
|
213 |
|
|
214 |
|
|
215 |
def test_sign_by_non_ca(server): |
|
216 |
ret = server.post("/api/certificates", content_type="application/json", json={ |
|
217 |
"CA": 2, |
|
218 |
"subject": { |
|
219 |
"C": "CZ", |
|
220 |
"CN": "Fake Intermediate CA s.r.o.", |
|
221 |
"L": "Pilsen", |
|
222 |
"O": "Fake Intermediate CA s.r.o.", |
|
223 |
"OU": "IT department", |
|
224 |
"ST": "Pilsen Region", |
|
225 |
"emailAddress": "inter@ca.com" |
|
226 |
}, |
|
227 |
"usage": { |
|
228 |
"CA": False, |
|
229 |
"SSL": True, |
|
230 |
"authentication": True, |
|
231 |
"digitalSignature": True |
|
232 |
}, |
|
233 |
"validityDays": 30 |
|
234 |
}) |
|
235 |
|
|
236 |
assert ret.status_code == 201 |
|
237 |
|
|
238 |
d = ret.json |
|
239 |
|
|
240 |
assert "data" in d |
|
241 |
assert d["data"] == 4 |
|
242 |
assert "success" in d |
|
243 |
assert d["success"] |
|
244 |
|
|
245 |
ret = server.post("/api/certificates", content_type="application/json", json={ |
|
246 |
"CA": 2, |
|
247 |
"subject": { |
|
248 |
"C": "CZ", |
|
249 |
"CN": "End certificate signed by end certificate s.r.o.", |
|
250 |
"L": "Pilsen", |
|
251 |
"O": "This is probably bad s.r.o.", |
|
252 |
"OU": "IT department", |
|
253 |
"ST": "Pilsen Region", |
|
254 |
"emailAddress": "end@ca.com" |
|
255 |
}, |
|
256 |
"usage": { |
|
257 |
"CA": False, |
|
258 |
"SSL": True, |
|
259 |
"authentication": True, |
|
260 |
"digitalSignature": True |
|
261 |
}, |
|
262 |
"validityDays": 30 |
|
263 |
}) |
|
264 |
|
|
265 |
# TODO discussion -> assert ret.status_code == 400 |
|
266 |
assert ret.status_code == 201 |
|
267 |
|
|
268 |
d = ret.json |
|
269 |
|
|
270 |
assert "data" in d |
|
271 |
assert d["data"] == 5 |
|
272 |
assert "success" in d |
|
178 | 273 |
assert d["success"] |
Také k dispozici: Unified diff
Re #8476 - Implemented `list_certificates` with unit tests.