Revize 88a68c29
Přidáno uživatelem Michal Seják před asi 4 roky(ů)
swagger_server/controllers/certificates_controller.py | ||
---|---|---|
1 |
import connexion |
|
2 |
import six |
|
3 |
|
|
4 |
from src.dao.private_key_repository import PrivateKeyRepository |
|
5 |
from src.model.subject import Subject |
|
6 |
from src.services.certificate_service import CertificateService |
|
7 |
from src.dao.certificate_repository import CertificateRepository # TODO not the Controller's responsibility. 1 |
|
8 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. 2 |
|
9 |
from sqlite3 import Connection, Cursor # TODO not the Controller's responsibility. 3 |
|
10 |
from src.constants import DATABASE_FILE, DICT_USAGES, CA_ID # TODO not the Controller's responsibility. 4 |
|
11 |
from src.services.key_service import KeyService |
|
12 |
from swagger_server.models import CertificateRequest |
|
13 |
|
|
14 |
from swagger_server.models.certificate import Certificate # noqa: E501 |
|
15 |
from swagger_server.models.certificate_list_response import CertificateListResponse # noqa: E501 |
|
16 |
from swagger_server.models.certificate_response import CertificateResponse # noqa: E501 |
|
17 |
from swagger_server.models.created_response import CreatedResponse # noqa: E501 |
|
18 |
from swagger_server.models.error_response import ErrorResponse # noqa: E501 |
|
19 |
from swagger_server.models.filtering import Filtering # noqa: E501 |
|
20 |
from swagger_server.models.id_parameter import IdParameter # noqa: E501 |
|
21 |
from swagger_server.models.pem_response import PemResponse # noqa: E501 |
|
22 |
from swagger_server import util |
|
23 |
|
|
24 |
|
|
25 |
_ = Connection("../" + DATABASE_FILE) # TODO not the Controller's responsibility. 5 |
|
26 |
# cursor = _.cursor() # TODO responsibility of the |
|
27 |
# CertificateRepository. It makes no sense to |
|
28 |
# supply a different cursor than precisely |
|
29 |
# the one corresponding to the connection. |
|
30 |
# The cursor can always be generated from the |
|
31 |
# connection instance. |
|
32 |
|
|
33 |
__ = CryptographyService() # TODO not the Controller's responsibility. 6 |
|
34 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(_, _.cursor())) |
|
35 |
# TODO open for discussion. Expected: |
|
36 |
# CS = CertificateService.get_instance() |
|
37 |
# or something like that. |
|
38 |
|
|
39 |
KEY_SERVICE = KeyService(__, PrivateKeyRepository(_, _.cursor())) # TODO as above |
|
40 |
|
|
41 |
|
|
42 |
def setup(): |
|
43 |
_ = Connection("../" + DATABASE_FILE) |
|
44 |
CERTIFICATE_SERVICE.certificate_repository.connection = _ |
|
45 |
CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor() |
|
46 |
KEY_SERVICE.private_key_repository.connection = _ |
|
47 |
KEY_SERVICE.private_key_repository.cursor = _.cursor() |
|
48 |
|
|
49 |
|
|
50 |
def create_certificate(body=None): # noqa: E501 |
|
51 |
"""create new certificate |
|
52 |
|
|
53 |
Create a new certificate based on given information # noqa: E501 |
|
54 |
|
|
55 |
:param body: Certificate data to be created |
|
56 |
:type body: dict | bytes |
|
57 |
|
|
58 |
:rtype: CreatedResponse |
|
59 |
""" |
|
60 |
setup() |
|
61 |
|
|
62 |
if connexion.request.is_json: |
|
63 |
body = CertificateRequest.from_dict(connexion.request.get_json()) # noqa: E501 |
|
64 |
if body.subject is None or body.usage is None or body.validity_days is None: |
|
65 |
return 400 |
|
66 |
|
|
67 |
key = KEY_SERVICE.create_new_key() # TODO pass key |
|
68 |
subject = Subject( |
|
69 |
common_name=body.subject.cn, |
|
70 |
country=body.subject.c, |
|
71 |
locality=body.subject.l, |
|
72 |
state=body.subject.st, |
|
73 |
organization=body.subject.o, |
|
74 |
organization_unit=body.subject.ou, |
|
75 |
email_address=body.subject.email_address |
|
76 |
) |
|
77 |
usages_dict = DICT_USAGES.copy() |
|
78 |
|
|
79 |
if body.ca is None: |
|
80 |
cert = CERTIFICATE_SERVICE.create_root_ca( |
|
81 |
key, |
|
82 |
subject, |
|
83 |
usages=usages_dict, |
|
84 |
days=body.validity_days |
|
85 |
) |
|
86 |
else: |
|
87 |
issuer = CERTIFICATE_SERVICE.get_certificate(body.ca) |
|
88 |
|
|
89 |
if issuer is None: |
|
90 |
return ErrorResponse( |
|
91 |
success=False, |
|
92 |
data="No certificate authority with such unique ID exists." |
|
93 |
), 400 |
|
94 |
|
|
95 |
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) |
|
96 |
|
|
97 |
if issuer_key is None: |
|
98 |
return ErrorResponse( |
|
99 |
success=False, |
|
100 |
data="Internal server error (corrupted database)." |
|
101 |
), 400 |
|
102 |
|
|
103 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
|
104 |
CERTIFICATE_SERVICE.create_end_cert |
|
105 |
|
|
106 |
cert = f( |
|
107 |
key, |
|
108 |
subject, |
|
109 |
issuer, |
|
110 |
issuer_key, |
|
111 |
usages=usages_dict, |
|
112 |
days=body.validity_days |
|
113 |
) |
|
114 |
|
|
115 |
if cert is not None: |
|
116 |
return CertificateResponse( |
|
117 |
success=True, |
|
118 |
data=cert.certificate_id |
|
119 |
), 201 |
|
120 |
else: |
|
121 |
return ErrorResponse( |
|
122 |
success=False, |
|
123 |
data="The certificate could not have been created." |
|
124 |
), 400 |
|
125 |
else: |
|
126 |
return 400 |
|
127 |
|
|
128 |
|
|
129 |
def get_certificate_by_id(id): # noqa: E501 |
|
130 |
"""get certificate by ID |
|
131 |
|
|
132 |
Get certificate in PEM format by ID # noqa: E501 |
|
133 |
|
|
134 |
:param id: ID of a certificate to be queried |
|
135 |
:type id: dict | bytes |
|
136 |
|
|
137 |
:rtype: PemResponse |
|
138 |
""" |
|
139 |
if connexion.request.is_json: |
|
140 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
141 |
return 'do some magic!' |
|
142 |
|
|
143 |
|
|
144 |
def get_certificate_details_by_id(id): # noqa: E501 |
|
145 |
"""get certificate's details by ID |
|
146 |
|
|
147 |
Get certificate details by ID # noqa: E501 |
|
148 |
|
|
149 |
:param id: ID of a certificate whose details are to be queried |
|
150 |
:type id: dict | bytes |
|
151 |
|
|
152 |
:rtype: CertificateResponse |
|
153 |
""" |
|
154 |
if connexion.request.is_json: |
|
155 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
156 |
return 'do some magic!' |
|
157 |
|
|
158 |
|
|
159 |
def get_certificate_list(filtering=None): # noqa: E501 |
|
160 |
"""get list of certificates |
|
161 |
|
|
162 |
Lists certificates based on provided filtering options # noqa: E501 |
|
163 |
|
|
164 |
:param filtering: Filter certificate type to be queried |
|
165 |
:type filtering: dict | bytes |
|
166 |
|
|
167 |
:rtype: CertificateListResponse |
|
168 |
""" |
|
169 |
if connexion.request.is_json: |
|
170 |
filtering = Filtering.from_dict(connexion.request.get_json()) # noqa: E501 |
|
171 |
return 'do some magic! XD' |
|
172 |
|
|
173 |
|
|
174 |
def get_certificate_root_by_id(id): # noqa: E501 |
|
175 |
"""get certificate's root of trust chain by ID |
|
176 |
|
|
177 |
Get certificate's root of trust chain in PEM format by ID # noqa: E501 |
|
178 |
|
|
179 |
:param id: ID of a child certificate whose root is to be queried |
|
180 |
:type id: dict | bytes |
|
181 |
|
|
182 |
:rtype: PemResponse |
|
183 |
""" |
|
184 |
if connexion.request.is_json: |
|
185 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
186 |
return 'do some magic!' |
|
187 |
|
|
188 |
|
|
189 |
def get_certificate_trust_chain_by_id(id): # noqa: E501 |
|
190 |
"""get certificate's trust chain by ID |
|
191 |
|
|
192 |
Get certificate trust chain in PEM format by ID # noqa: E501 |
|
193 |
|
|
194 |
:param id: ID of a child certificate whose chain is to be queried |
|
195 |
:type id: dict | bytes |
|
196 |
|
|
197 |
:rtype: PemResponse |
|
198 |
""" |
|
199 |
if connexion.request.is_json: |
|
200 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
201 |
return 'do some magic!' |
Také k dispozici: Unified diff
Re #8476 - Added as a part of the swagger-generated server stub. Imported required objects, SQLite3 thread issue hack (performance impact negligible, violates encapsulation), implemented `create_certificate`.