Revize 9e6f791a
Přidáno uživatelem Jan Pašek před asi 4 roky(ů)
src/controllers/certificates_controller.py | ||
---|---|---|
12 | 12 |
from src.exceptions.database_exception import DatabaseException |
13 | 13 |
from src.model.subject import Subject |
14 | 14 |
from src.services.certificate_service import CertificateService, RevocationReasonInvalidException, \ |
15 |
CertificateStatusInvalidException, CertificateNotFoundException |
|
15 |
CertificateStatusInvalidException, CertificateNotFoundException, CertificateAlreadyRevokedException
|
|
16 | 16 |
# responsibility. |
17 | 17 |
from src.services.key_service import KeyService |
18 | 18 |
|
... | ... | |
35 | 35 |
|
36 | 36 |
E_NO_ISSUER_FOUND = {"success": False, "data": "No certificate authority with such unique ID exists."} |
37 | 37 |
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No such certificate found."} |
38 |
E_NO_CERTIFICATE_ALREADY_REVOKED = {"success": False, "data": "Certificate is already revoked."} |
|
38 | 39 |
E_NO_CERT_PRIVATE_KEY_FOUND = {"success": False, |
39 | 40 |
"data": "Internal server error (certificate's private key cannot be found)."} |
40 | 41 |
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."} |
... | ... | |
340 | 341 |
try: |
341 | 342 |
# set certificate status using certificate_service |
342 | 343 |
self.certificate_service.set_certificate_revocation_status(identifier, status, reason) |
343 |
except (RevocationReasonInvalidException, CertificateStatusInvalidException, CertificateNotFoundException):
|
|
344 |
except (RevocationReasonInvalidException, CertificateStatusInvalidException): |
|
344 | 345 |
# these exceptions are thrown in case invalid status or revocation reason is passed to the controller |
345 | 346 |
return E_WRONG_PARAMETERS, C_BAD_REQUEST |
347 |
except CertificateAlreadyRevokedException: |
|
348 |
return E_NO_CERTIFICATE_ALREADY_REVOKED, C_BAD_REQUEST |
|
349 |
except CertificateNotFoundException: |
|
350 |
return E_NO_CERTIFICATES_FOUND, C_NOT_FOUND |
|
346 | 351 |
return {"success": True, |
347 | 352 |
"data": "Certificate status updated successfully."}, C_SUCCESS |
348 | 353 |
# throw an error in case the request does not contain a json body |
src/exceptions/unknown_exception.py | ||
---|---|---|
1 |
class UnknownException(Exception): |
|
2 |
def __init__(self, message): |
|
3 |
self.message = message |
|
4 |
|
|
5 |
def __str__(self): |
|
6 |
return self.message |
src/services/certificate_service.py | ||
---|---|---|
6 | 6 |
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \ |
7 | 7 |
CERTIFICATE_REVOCATION_REASONS |
8 | 8 |
from src.dao.certificate_repository import CertificateRepository |
9 |
from src.exceptions.unknown_exception import UnknownException |
|
9 | 10 |
from src.model.certificate import Certificate |
10 | 11 |
from src.model.private_key import PrivateKey |
11 | 12 |
from src.model.subject import Subject |
... | ... | |
279 | 280 |
if reason not in CERTIFICATE_REVOCATION_REASONS: |
280 | 281 |
raise RevocationReasonInvalidException(reason) |
281 | 282 |
|
283 |
# check whether the certificate exists |
|
284 |
certificate = self.certificate_repository.read(id) |
|
285 |
if certificate is None: |
|
286 |
raise CertificateNotFoundException(id) |
|
287 |
|
|
282 | 288 |
updated = False |
283 | 289 |
if status == STATUS_VALID: |
284 | 290 |
updated = self.certificate_repository.clear_certificate_revocation(id) |
285 | 291 |
elif status == STATUS_REVOKED: |
292 |
# check if the certificate is not revoked already |
|
293 |
revoked = self.certificate_repository.get_all_revoked_by(certificate.parent_id) |
|
294 |
if certificate.certificate_id in [x.certificate_id for x in revoked]: |
|
295 |
raise CertificateAlreadyRevokedException(id) |
|
296 |
|
|
286 | 297 |
revocation_timestamp = int(time.time()) |
287 | 298 |
updated = self.certificate_repository.set_certificate_revoked(id, str(revocation_timestamp), reason) |
288 | 299 |
|
289 | 300 |
if not updated: |
290 |
raise CertificateNotFoundException(id) |
|
301 |
# TODO log this |
|
302 |
raise UnknownException("Repository returned 'false' from clear_certificate_revocation() " |
|
303 |
"or set_certificate_revoked().") |
|
291 | 304 |
|
292 | 305 |
def get_subject_from_certificate(self, certificate: Certificate) -> Subject: |
293 | 306 |
""" |
... | ... | |
357 | 370 |
class CertificateNotFoundException(Exception): |
358 | 371 |
""" |
359 | 372 |
Exception that denotes that the caller was trying to set |
360 |
a certificate to an invalid state
|
|
373 |
work with non-existing certificate
|
|
361 | 374 |
""" |
362 | 375 |
|
363 | 376 |
def __init__(self, id): |
... | ... | |
365 | 378 |
|
366 | 379 |
def __str__(self): |
367 | 380 |
return f"Certificate id '{self.id}' does not exist." |
381 |
|
|
382 |
|
|
383 |
class CertificateAlreadyRevokedException(Exception): |
|
384 |
""" |
|
385 |
Exception that denotes that the caller was trying to revoke |
|
386 |
a certificate that is already revoked |
|
387 |
""" |
|
388 |
|
|
389 |
def __init__(self, id): |
|
390 |
self.id = id |
|
391 |
|
|
392 |
def __str__(self): |
|
393 |
return f"Certificate id '{self.id}' is already revoked." |
src/services/cryptography.py | ||
---|---|---|
126 | 126 |
# reference in openssl req command using -extensions option. |
127 | 127 |
extensions += "\n"+CA_EXTENSIONS |
128 | 128 |
if len(config) == 0: |
129 |
config += MINIMAL_CONFIG_FILE+"[ " + SSCRT_SECTION + " ]"+"\n"+extensions
|
|
129 |
config += MINIMAL_CONFIG_FILE |
|
130 | 130 |
config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions |
131 | 131 |
|
132 | 132 |
with TemporaryFile("openssl.conf", config) as conf_path: |
tests/integration_tests/rest_api/certificates_test.py | ||
---|---|---|
634 | 634 |
assert "success" in revoke_ret.json |
635 | 635 |
assert revoke_ret.json["success"] |
636 | 636 |
|
637 |
revoke_ret = server.patch(f"/api/certificates/{cert_id}", content_type="application/json", json=revocation_body) |
|
638 |
|
|
639 |
assert revoke_ret.status_code == 400 |
|
640 |
assert "data" in revoke_ret.json |
|
641 |
assert "success" in revoke_ret.json |
|
642 |
assert not revoke_ret.json["success"] |
|
643 |
|
|
637 | 644 |
# set to valid again |
638 | 645 |
valid_body = { |
639 | 646 |
"status": "valid" |
... | ... | |
687 | 694 |
} |
688 | 695 |
revoke_ret = server.patch(f"/api/certificates/54791", content_type="application/json", json=revocation_body) |
689 | 696 |
|
690 |
assert revoke_ret.status_code == 400
|
|
697 |
assert revoke_ret.status_code == 404
|
|
691 | 698 |
assert "data" in revoke_ret.json |
692 | 699 |
assert "success" in revoke_ret.json |
693 | 700 |
assert not revoke_ret.json["success"] |
tests/integration_tests/services/certificate_service_test.py | ||
---|---|---|
5 | 5 |
from src.constants import SSL_ID, CA_ID, AUTHENTICATION_ID, INTERMEDIATE_CA_ID, ROOT_CA_ID, CERTIFICATE_ID, SIGNATURE_ID |
6 | 6 |
from src.model.subject import Subject |
7 | 7 |
from src.services.certificate_service import RevocationReasonInvalidException, CertificateStatusInvalidException, \ |
8 |
CertificateNotFoundException |
|
8 |
CertificateNotFoundException, CertificateAlreadyRevokedException
|
|
9 | 9 |
|
10 | 10 |
|
11 | 11 |
def export_crt(crt): |
... | ... | |
321 | 321 |
|
322 | 322 |
with pytest.raises(CertificateNotFoundException) as e: |
323 | 323 |
certificate_service_unique.set_certificate_revocation_status(5974, "revoked", "unspecified") |
324 |
|
|
325 |
certificate_service_unique.set_certificate_revocation_status(root_ca_cert.certificate_id, "revoked", "unspecified") |
|
326 |
|
|
327 |
with pytest.raises(CertificateAlreadyRevokedException) as e: |
|
328 |
certificate_service_unique.set_certificate_revocation_status(root_ca_cert.certificate_id, "revoked", |
|
329 |
"unspecified") |
|
330 |
|
Také k dispozici: Unified diff
Re #8571 - Fixed problems during walk-through code review