Revize 5b57121e
Přidáno uživatelem Michal Seják před asi 4 roky(ů)
app.py | ||
---|---|---|
1 | 1 |
from flask import Flask |
2 | 2 |
import os |
3 |
from src.controllers.certificates_controller import CertController |
|
3 | 4 |
|
4 | 5 |
app = Flask(__name__) |
5 | 6 |
|
... | ... | |
9 | 10 |
return 'Welcome to the X.509 management application homepage!' |
10 | 11 |
|
11 | 12 |
|
13 |
@app.route('/api/certificates', methods=["POST"]) |
|
14 |
def create_certificate(): |
|
15 |
return CertController.create_certificate() |
|
16 |
|
|
17 |
|
|
18 |
@app.route('/api/certificates', methods=["GET"]) |
|
19 |
def get_cert_list(): |
|
20 |
return CertController.get_certificate_list() |
|
21 |
|
|
22 |
|
|
12 | 23 |
if __name__ == '__main__': |
13 | 24 |
host = "0.0.0.0" |
14 | 25 |
port = 5000 |
... | ... | |
20 | 31 |
if "FLASK_PORT" in os.environ: |
21 | 32 |
host = os.environ["FLASK_PORT"] |
22 | 33 |
|
23 |
app.run(host=host, port=port) |
|
24 |
|
|
34 |
app.run(host=host, port=port) |
requirements.txt | ||
---|---|---|
1 | 1 |
Flask==1.1.2 |
2 |
pytest~=6.2.3 |
|
3 |
|
|
4 |
connexion~=2.7.0 |
|
5 |
connexion[swagger-ui] |
|
6 |
six~=1.15.0 |
|
2 |
pytest~=6.2.3 |
src/controllers/certificates_controller.py | ||
---|---|---|
1 |
from datetime import datetime |
|
2 |
from itertools import chain |
|
3 |
from flask import request |
|
4 |
from src.dao.private_key_repository import PrivateKeyRepository |
|
5 |
from src.model.subject import Subject |
|
6 |
from src.services.certificate_service import CertificateService |
|
7 |
from src.dao.certificate_repository import CertificateRepository # TODO not the Controller's responsibility. 1 |
|
8 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. 2 |
|
9 |
from sqlite3 import Connection # TODO not the Controller's responsibility. 3 |
|
10 |
from src.constants import DICT_USAGES, CA_ID, \ |
|
11 |
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
12 |
DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID # TODO DATABASE_FILE - not the Controller's |
|
13 |
# responsibility. 4 |
|
14 |
from src.services.key_service import KeyService |
|
15 |
|
|
16 |
# _ = Connection("../" + DATABASE_FILE) # TODO not the Controller's responsibility. 5 |
|
17 |
# cursor = _.cursor() # TODO responsibility of the |
|
18 |
# CertificateRepository. It makes no sense to |
|
19 |
# supply a different cursor than precisely |
|
20 |
# the one corresponding to the connection. |
|
21 |
# The cursor can always be generated from the |
|
22 |
# connection instance. |
|
23 |
FILTERING = "filtering" |
|
24 |
ISSUER = "issuer" |
|
25 |
US = "usage" |
|
26 |
NOT_AFTER = "notAfter" |
|
27 |
NOT_BEFORE = "notBefore" |
|
28 |
COMMON_NAME = "CN" |
|
29 |
ID = "id" |
|
30 |
E_NO_ISSUER_FOUND = {"success": False, |
|
31 |
"data": "No certificate authority with such unique ID exists."} |
|
32 |
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No certificates found."} |
|
33 |
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."} |
|
34 |
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."} |
|
35 |
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."} |
|
36 |
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."} |
|
37 |
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."} |
|
38 |
USAGE = "usage" |
|
39 |
SUBJECT = "subject" |
|
40 |
VALIDITY_DAYS = "validityDays" |
|
41 |
CA = "CA" |
|
42 |
|
|
43 |
__ = CryptographyService() # TODO not the Controller's responsibility. 6 |
|
44 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None)) |
|
45 |
# TODO open for discussion. Expected: |
|
46 |
# CS = CertificateService.get_instance() |
|
47 |
# or something like that. |
|
48 |
|
|
49 |
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None)) # TODO as above |
|
50 |
|
|
51 |
|
|
52 |
class CertController: |
|
53 |
|
|
54 |
@staticmethod |
|
55 |
def setup(): |
|
56 |
""" |
|
57 |
SQLite3 thread issue hack. |
|
58 |
:return: |
|
59 |
""" |
|
60 |
_ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path()) |
|
61 |
CERTIFICATE_SERVICE.certificate_repository.connection = _ |
|
62 |
CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor() |
|
63 |
KEY_SERVICE.private_key_repository.connection = _ |
|
64 |
KEY_SERVICE.private_key_repository.cursor = _.cursor() |
|
65 |
|
|
66 |
@staticmethod |
|
67 |
def create_certificate(): # noqa: E501 |
|
68 |
"""create new certificate |
|
69 |
|
|
70 |
Create a new certificate based on given information # noqa: E501 |
|
71 |
|
|
72 |
:param body: Certificate data to be created |
|
73 |
:type body: dict | bytes |
|
74 |
|
|
75 |
:rtype: CreatedResponse |
|
76 |
""" |
|
77 |
CertController.setup() # TODO remove after issue fixed |
|
78 |
|
|
79 |
key_map = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID} |
|
80 |
required_keys = {SUBJECT, USAGE, VALIDITY_DAYS} |
|
81 |
|
|
82 |
if request.is_json: |
|
83 |
body = request.get_json() |
|
84 |
if not all(k in body for k in required_keys): |
|
85 |
return E_MISSING_PARAMETERS, 400 |
|
86 |
|
|
87 |
if not isinstance(body[VALIDITY_DAYS], int): |
|
88 |
return E_WRONG_PARAMETERS, 400 |
|
89 |
|
|
90 |
subject = Subject.from_dict(body[SUBJECT]) |
|
91 |
|
|
92 |
if subject is None: |
|
93 |
return E_WRONG_PARAMETERS, 400 |
|
94 |
|
|
95 |
usages_dict = DICT_USAGES.copy() |
|
96 |
|
|
97 |
if not isinstance(body[USAGE], dict): |
|
98 |
return E_WRONG_PARAMETERS, 400 |
|
99 |
|
|
100 |
for k, v in body[USAGE].items(): |
|
101 |
if k not in key_map: |
|
102 |
return E_WRONG_PARAMETERS, 400 |
|
103 |
usages_dict[key_map[k]] = v |
|
104 |
|
|
105 |
key = KEY_SERVICE.create_new_key() # TODO pass key |
|
106 |
|
|
107 |
if CA not in body: |
|
108 |
cert = CERTIFICATE_SERVICE.create_root_ca( |
|
109 |
key, |
|
110 |
subject, |
|
111 |
usages=usages_dict, |
|
112 |
days=body[VALIDITY_DAYS] |
|
113 |
) |
|
114 |
else: |
|
115 |
issuer = CERTIFICATE_SERVICE.get_certificate(body[CA]) |
|
116 |
|
|
117 |
if issuer is None: |
|
118 |
return E_NO_ISSUER_FOUND, 400 |
|
119 |
|
|
120 |
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) |
|
121 |
|
|
122 |
if issuer_key is None: |
|
123 |
return E_CORRUPTED_DATABASE, 500 |
|
124 |
|
|
125 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
|
126 |
CERTIFICATE_SERVICE.create_end_cert |
|
127 |
|
|
128 |
cert = f( |
|
129 |
key, |
|
130 |
subject, |
|
131 |
issuer, |
|
132 |
issuer_key, |
|
133 |
usages=usages_dict, |
|
134 |
days=body[VALIDITY_DAYS] |
|
135 |
) |
|
136 |
|
|
137 |
if cert is not None: |
|
138 |
return {"success": True, |
|
139 |
"data": cert.certificate_id}, 201 |
|
140 |
else: |
|
141 |
return {"success": False, |
|
142 |
"data": "Internal error: The certificate could not have been created."}, 400 |
|
143 |
else: |
|
144 |
return E_NOT_JSON_FORMAT, 400 |
|
145 |
|
|
146 |
@staticmethod |
|
147 |
def get_certificate_by_id(id): # noqa: E501 |
|
148 |
"""get certificate by ID |
|
149 |
|
|
150 |
Get certificate in PEM format by ID # noqa: E501 |
|
151 |
|
|
152 |
:param id: ID of a certificate to be queried |
|
153 |
:type id: dict | bytes |
|
154 |
|
|
155 |
:rtype: PemResponse |
|
156 |
""" |
|
157 |
CertController.setup() # TODO remove after issue fixed |
|
158 |
|
|
159 |
if connexion.request.is_json: |
|
160 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
161 |
return 'do some magic!' |
|
162 |
|
|
163 |
@staticmethod |
|
164 |
def get_certificate_details_by_id(id): # noqa: E501 |
|
165 |
"""get certificate's details by ID |
|
166 |
|
|
167 |
Get certificate details by ID # noqa: E501 |
|
168 |
|
|
169 |
:param id: ID of a certificate whose details are to be queried |
|
170 |
:type id: dict | bytes |
|
171 |
|
|
172 |
:rtype: CertificateResponse |
|
173 |
""" |
|
174 |
CertController.setup() # TODO remove after issue fixed |
|
175 |
|
|
176 |
if connexion.request.is_json: |
|
177 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
178 |
return 'do some magic!' |
|
179 |
|
|
180 |
@staticmethod |
|
181 |
def get_certificate_list(): # noqa: E501 |
|
182 |
"""get list of certificates |
|
183 |
|
|
184 |
Lists certificates based on provided filtering options # noqa: E501 |
|
185 |
|
|
186 |
:param filtering: Filter certificate type to be queried |
|
187 |
:type filtering: dict | bytes |
|
188 |
|
|
189 |
:rtype: CertificateListResponse |
|
190 |
""" |
|
191 |
CertController.setup() # TODO remove after issue fixed |
|
192 |
|
|
193 |
key_map = {CA_ID: 'CA', SSL_ID: 'SSL', SIGNATURE_ID: 'digitalSignature', AUTHENTICATION_ID: 'authentication'} |
|
194 |
|
|
195 |
targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID} |
|
196 |
if request.is_json: |
|
197 |
data = request.get_json() |
|
198 |
if FILTERING in data: |
|
199 |
if isinstance(data[FILTERING], dict): |
|
200 |
if CA in data[FILTERING]: |
|
201 |
if isinstance(data[FILTERING][CA], bool): |
|
202 |
if data[FILTERING][CA]: |
|
203 |
targets.remove(CERTIFICATE_ID) |
|
204 |
else: |
|
205 |
targets.remove(ROOT_CA_ID) |
|
206 |
targets.remove(INTERMEDIATE_CA_ID) |
|
207 |
else: |
|
208 |
return E_WRONG_PARAMETERS, 400 |
|
209 |
else: |
|
210 |
return E_WRONG_PARAMETERS, 400 |
|
211 |
|
|
212 |
if len(targets) == 3: |
|
213 |
certs = CERTIFICATE_SERVICE.get_certificates() |
|
214 |
else: |
|
215 |
certs = list(chain(CERTIFICATE_SERVICE.get_certificates(target) for target in targets)) |
|
216 |
|
|
217 |
if certs is None: |
|
218 |
return E_GENERAL_ERROR, 500 |
|
219 |
elif len(certs) == 0: |
|
220 |
return E_NO_CERTIFICATES_FOUND, 204 |
|
221 |
else: |
|
222 |
ret = [] |
|
223 |
for c in certs: |
|
224 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
225 |
if c_issuer is None: |
|
226 |
return E_CORRUPTED_DATABASE, 500 |
|
227 |
|
|
228 |
ret.append( |
|
229 |
{ |
|
230 |
ID: c.certificate_id, |
|
231 |
COMMON_NAME: c.common_name, |
|
232 |
NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
233 |
NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
234 |
USAGE: {key_map[k]: v for k, v in c.usages.items()}, |
|
235 |
ISSUER: { |
|
236 |
ID: c_issuer.certificate_id, |
|
237 |
COMMON_NAME: c_issuer.common_name |
|
238 |
} |
|
239 |
} |
|
240 |
) |
|
241 |
return {"success": True, "data": ret} |
|
242 |
|
|
243 |
|
|
244 |
|
|
245 |
@staticmethod |
|
246 |
def get_certificate_root_by_id(id): # noqa: E501 |
|
247 |
"""get certificate's root of trust chain by ID |
|
248 |
|
|
249 |
Get certificate's root of trust chain in PEM format by ID # noqa: E501 |
|
250 |
|
|
251 |
:param id: ID of a child certificate whose root is to be queried |
|
252 |
:type id: dict | bytes |
|
253 |
|
|
254 |
:rtype: PemResponse |
|
255 |
""" |
|
256 |
CertController.setup() # TODO remove after issue fixed |
|
257 |
|
|
258 |
if connexion.request.is_json: |
|
259 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
260 |
return 'do some magic!' |
|
261 |
|
|
262 |
@staticmethod |
|
263 |
def get_certificate_trust_chain_by_id(id): # noqa: E501 |
|
264 |
"""get certificate's trust chain by ID |
|
265 |
|
|
266 |
Get certificate trust chain in PEM format by ID # noqa: E501 |
|
267 |
|
|
268 |
:param id: ID of a child certificate whose chain is to be queried |
|
269 |
:type id: dict | bytes |
|
270 |
|
|
271 |
:rtype: PemResponse |
|
272 |
""" |
|
273 |
CertController.setup() # TODO remove after issue fixed |
|
274 |
|
|
275 |
if connexion.request.is_json: |
|
276 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
277 |
return 'do some magic!' |
src/model/subject.py | ||
---|---|---|
1 | 1 |
class Subject: |
2 |
ATTR_MAP = {"C": "common_name", "ST": "state", "L": "locality", "CN": "common_name", "O": "organization", |
|
3 |
"OU": "organization_unit", "emailAddress": "email_address"} |
|
2 | 4 |
|
3 | 5 |
def __init__(self, common_name=None, country=None, locality=None, state=None, organization=None, |
4 | 6 |
organization_unit=None, email_address=None): |
... | ... | |
9 | 11 |
self.organization = organization |
10 | 12 |
self.organization_unit = organization_unit |
11 | 13 |
self.email_address = email_address |
14 |
|
|
15 |
@staticmethod |
|
16 |
def from_dict(d): |
|
17 |
if not isinstance(d, dict): |
|
18 |
return None |
|
19 |
|
|
20 |
s = Subject() |
|
21 |
for k, v in Subject.ATTR_MAP.items(): |
|
22 |
if k in d: |
|
23 |
if not isinstance(d[k], str): |
|
24 |
return None |
|
25 |
s.__setattr__(v, d[k]) |
|
26 |
|
|
27 |
return s |
src/swagger.yaml | ||
---|---|---|
1 |
openapi: 3.0.0 |
|
2 |
info: |
|
3 |
title: X.509 certificate management |
|
4 |
description: API for certificate management created for YOSO company |
|
5 |
version: 1.0.1 |
|
6 |
servers: |
|
7 |
- url: https://virtserver.swaggerhub.com/janpasek97/X509_management/1.0.1 |
|
8 |
description: X509 management API |
|
9 |
tags: |
|
10 |
- name: certificates |
|
11 |
description: API for creating and querying certificates |
|
12 |
paths: |
|
13 |
/api/certificates/{id}: |
|
14 |
get: |
|
15 |
tags: |
|
16 |
- certificates |
|
17 |
summary: get certificate by ID |
|
18 |
description: Get certificate in PEM format by ID |
|
19 |
operationId: get_certificate_by_id |
|
20 |
parameters: |
|
21 |
- name: id |
|
22 |
in: path |
|
23 |
description: ID of a certificate to be queried |
|
24 |
required: true |
|
25 |
style: simple |
|
26 |
explode: false |
|
27 |
schema: |
|
28 |
$ref: '#/components/schemas/IdParameter' |
|
29 |
responses: |
|
30 |
"200": |
|
31 |
description: returning the certificate |
|
32 |
content: |
|
33 |
application/json: |
|
34 |
schema: |
|
35 |
$ref: '#/components/schemas/PemResponse' |
|
36 |
"204": |
|
37 |
description: the certificate was not found |
|
38 |
content: |
|
39 |
application/json: |
|
40 |
schema: |
|
41 |
$ref: '#/components/schemas/ErrorResponse' |
|
42 |
"400": |
|
43 |
description: bad request |
|
44 |
content: |
|
45 |
application/json: |
|
46 |
schema: |
|
47 |
$ref: '#/components/schemas/ErrorResponse' |
|
48 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
49 |
/api/certificates/{id}/chain: |
|
50 |
get: |
|
51 |
tags: |
|
52 |
- certificates |
|
53 |
summary: get certificate's trust chain by ID |
|
54 |
description: Get certificate trust chain in PEM format by ID |
|
55 |
operationId: get_certificate_trust_chain_by_id |
|
56 |
parameters: |
|
57 |
- name: id |
|
58 |
in: path |
|
59 |
description: ID of a child certificate whose chain is to be queried |
|
60 |
required: true |
|
61 |
style: simple |
|
62 |
explode: false |
|
63 |
schema: |
|
64 |
$ref: '#/components/schemas/IdParameter' |
|
65 |
responses: |
|
66 |
"200": |
|
67 |
description: returning the trust chain |
|
68 |
content: |
|
69 |
application/json: |
|
70 |
schema: |
|
71 |
$ref: '#/components/schemas/PemResponse' |
|
72 |
"204": |
|
73 |
description: the certificate was not found |
|
74 |
content: |
|
75 |
application/json: |
|
76 |
schema: |
|
77 |
$ref: '#/components/schemas/ErrorResponse' |
|
78 |
"400": |
|
79 |
description: bad request |
|
80 |
content: |
|
81 |
application/json: |
|
82 |
schema: |
|
83 |
$ref: '#/components/schemas/ErrorResponse' |
|
84 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
85 |
/api/certificates/{id}/root: |
|
86 |
get: |
|
87 |
tags: |
|
88 |
- certificates |
|
89 |
summary: get certificate's root of trust chain by ID |
|
90 |
description: Get certificate's root of trust chain in PEM format by ID |
|
91 |
operationId: get_certificate_root_by_id |
|
92 |
parameters: |
|
93 |
- name: id |
|
94 |
in: path |
|
95 |
description: ID of a child certificate whose root is to be queried |
|
96 |
required: true |
|
97 |
style: simple |
|
98 |
explode: false |
|
99 |
schema: |
|
100 |
$ref: '#/components/schemas/IdParameter' |
|
101 |
responses: |
|
102 |
"200": |
|
103 |
description: returning the root of trust chain |
|
104 |
content: |
|
105 |
application/json: |
|
106 |
schema: |
|
107 |
$ref: '#/components/schemas/PemResponse' |
|
108 |
"204": |
|
109 |
description: the certificate was not found |
|
110 |
content: |
|
111 |
application/json: |
|
112 |
schema: |
|
113 |
$ref: '#/components/schemas/ErrorResponse' |
|
114 |
"400": |
|
115 |
description: bad request |
|
116 |
content: |
|
117 |
application/json: |
|
118 |
schema: |
|
119 |
$ref: '#/components/schemas/ErrorResponse' |
|
120 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
121 |
/api/certificates/{id}/details: |
|
122 |
get: |
|
123 |
tags: |
|
124 |
- certificates |
|
125 |
summary: get certificate's details by ID |
|
126 |
description: Get certificate details by ID |
|
127 |
operationId: get_certificate_details_by_id |
|
128 |
parameters: |
|
129 |
- name: id |
|
130 |
in: path |
|
131 |
description: ID of a certificate whose details are to be queried |
|
132 |
required: true |
|
133 |
style: simple |
|
134 |
explode: false |
|
135 |
schema: |
|
136 |
$ref: '#/components/schemas/IdParameter' |
|
137 |
responses: |
|
138 |
"200": |
|
139 |
description: returning the certificate details |
|
140 |
content: |
|
141 |
application/json: |
|
142 |
schema: |
|
143 |
$ref: '#/components/schemas/CertificateResponse' |
|
144 |
"204": |
|
145 |
description: the certificate was not found |
|
146 |
content: |
|
147 |
application/json: |
|
148 |
schema: |
|
149 |
$ref: '#/components/schemas/ErrorResponse' |
|
150 |
"400": |
|
151 |
description: bad request |
|
152 |
content: |
|
153 |
application/json: |
|
154 |
schema: |
|
155 |
$ref: '#/components/schemas/ErrorResponse' |
|
156 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
157 |
/api/certificates: |
|
158 |
get: |
|
159 |
tags: |
|
160 |
- certificates |
|
161 |
summary: get list of certificates |
|
162 |
description: Lists certificates based on provided filtering options |
|
163 |
operationId: get_certificate_list |
|
164 |
parameters: |
|
165 |
- name: filtering |
|
166 |
in: query |
|
167 |
description: Filter certificate type to be queried |
|
168 |
required: false |
|
169 |
style: form |
|
170 |
explode: true |
|
171 |
schema: |
|
172 |
$ref: '#/components/schemas/Filtering' |
|
173 |
responses: |
|
174 |
"200": |
|
175 |
description: returning results matching filtering criteria |
|
176 |
content: |
|
177 |
application/json: |
|
178 |
schema: |
|
179 |
$ref: '#/components/schemas/CertificateListResponse' |
|
180 |
"204": |
|
181 |
description: no certificates found |
|
182 |
content: |
|
183 |
application/json: |
|
184 |
schema: |
|
185 |
$ref: '#/components/schemas/ErrorResponse' |
|
186 |
"400": |
|
187 |
description: bad request |
|
188 |
content: |
|
189 |
application/json: |
|
190 |
schema: |
|
191 |
$ref: '#/components/schemas/ErrorResponse' |
|
192 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
193 |
post: |
|
194 |
tags: |
|
195 |
- certificates |
|
196 |
summary: create new certificate |
|
197 |
description: Create a new certificate based on given information |
|
198 |
operationId: create_certificate |
|
199 |
requestBody: |
|
200 |
description: Certificate data to be created |
|
201 |
content: |
|
202 |
application/json: |
|
203 |
schema: |
|
204 |
$ref: '#/components/schemas/CertificateRequest' |
|
205 |
responses: |
|
206 |
"201": |
|
207 |
description: item created |
|
208 |
content: |
|
209 |
application/json: |
|
210 |
schema: |
|
211 |
$ref: '#/components/schemas/CreatedResponse' |
|
212 |
"400": |
|
213 |
description: "invalid input, object invalid" |
|
214 |
content: |
|
215 |
application/json: |
|
216 |
schema: |
|
217 |
$ref: '#/components/schemas/ErrorResponse' |
|
218 |
"409": |
|
219 |
description: an existing item already exists |
|
220 |
content: |
|
221 |
application/json: |
|
222 |
schema: |
|
223 |
$ref: '#/components/schemas/ErrorResponse' |
|
224 |
x-openapi-router-controller: swagger_server.controllers.certificates_controller |
|
225 |
components: |
|
226 |
schemas: |
|
227 |
CAUsage: |
|
228 |
required: |
|
229 |
- CA |
|
230 |
- SSL |
|
231 |
- authentication |
|
232 |
- digitalSignature |
|
233 |
properties: |
|
234 |
CA: |
|
235 |
type: boolean |
|
236 |
authentication: |
|
237 |
type: boolean |
|
238 |
digitalSignature: |
|
239 |
type: boolean |
|
240 |
SSL: |
|
241 |
type: boolean |
|
242 |
example: |
|
243 |
digitalSignature: true |
|
244 |
SSL: true |
|
245 |
CA: true |
|
246 |
authentication: true |
|
247 |
IssuerListItem: |
|
248 |
required: |
|
249 |
- CN |
|
250 |
- id |
|
251 |
properties: |
|
252 |
id: |
|
253 |
type: integer |
|
254 |
example: 547 |
|
255 |
CN: |
|
256 |
type: string |
|
257 |
example: Root CA s.r.o. |
|
258 |
example: |
|
259 |
id: 547 |
|
260 |
CN: Root CA s.r.o. |
|
261 |
CertificateListResponse: |
|
262 |
properties: |
|
263 |
success: |
|
264 |
type: boolean |
|
265 |
example: true |
|
266 |
data: |
|
267 |
type: array |
|
268 |
items: |
|
269 |
$ref: '#/components/schemas/CertificateListItem' |
|
270 |
example: |
|
271 |
data: |
|
272 |
- notAfter: 2021-07-01T00:00:00.000+00:00 |
|
273 |
usage: |
|
274 |
digitalSignature: true |
|
275 |
SSL: true |
|
276 |
CA: true |
|
277 |
authentication: true |
|
278 |
id: 547 |
|
279 |
CN: Root CA s.r.o. |
|
280 |
notBefore: 2021-03-31T00:00:00.000+00:00 |
|
281 |
issuer: |
|
282 |
id: 547 |
|
283 |
CN: Root CA s.r.o. |
|
284 |
- notAfter: 2021-07-01T00:00:00.000+00:00 |
|
285 |
usage: |
|
286 |
digitalSignature: true |
|
287 |
SSL: true |
|
288 |
CA: true |
|
289 |
authentication: true |
|
290 |
id: 547 |
|
291 |
CN: Root CA s.r.o. |
|
292 |
notBefore: 2021-03-31T00:00:00.000+00:00 |
|
293 |
issuer: |
|
294 |
id: 547 |
|
295 |
CN: Root CA s.r.o. |
|
296 |
success: true |
|
297 |
CertificateListItem: |
|
298 |
properties: |
|
299 |
id: |
|
300 |
type: integer |
|
301 |
example: 547 |
|
302 |
CN: |
|
303 |
type: string |
|
304 |
example: Root CA s.r.o. |
|
305 |
notBefore: |
|
306 |
type: string |
|
307 |
format: date |
|
308 |
example: 2021-03-31 |
|
309 |
notAfter: |
|
310 |
type: string |
|
311 |
format: date |
|
312 |
example: 2021-07-01 |
|
313 |
usage: |
|
314 |
$ref: '#/components/schemas/CAUsage' |
|
315 |
issuer: |
|
316 |
$ref: '#/components/schemas/IssuerListItem' |
|
317 |
example: |
|
318 |
notAfter: 2021-07-01T00:00:00.000+00:00 |
|
319 |
usage: |
|
320 |
digitalSignature: true |
|
321 |
SSL: true |
|
322 |
CA: true |
|
323 |
authentication: true |
|
324 |
id: 547 |
|
325 |
CN: Root CA s.r.o. |
|
326 |
notBefore: 2021-03-31T00:00:00.000+00:00 |
|
327 |
issuer: |
|
328 |
id: 547 |
|
329 |
CN: Root CA s.r.o. |
|
330 |
Filtering: |
|
331 |
properties: |
|
332 |
CA: |
|
333 |
type: boolean |
|
334 |
Subject: |
|
335 |
required: |
|
336 |
- CN |
|
337 |
properties: |
|
338 |
C: |
|
339 |
type: string |
|
340 |
description: Country code |
|
341 |
example: CZ |
|
342 |
ST: |
|
343 |
type: string |
|
344 |
description: State/Province |
|
345 |
example: Pilsen Region |
|
346 |
L: |
|
347 |
type: string |
|
348 |
description: Locality |
|
349 |
example: Pilsen |
|
350 |
CN: |
|
351 |
type: string |
|
352 |
description: Common name |
|
353 |
example: Root CA s.r.o. |
|
354 |
O: |
|
355 |
type: string |
|
356 |
description: Organization |
|
357 |
example: Root CA s.r.o. |
|
358 |
OU: |
|
359 |
type: string |
|
360 |
description: Organization Unit |
|
361 |
example: IT department |
|
362 |
emailAddress: |
|
363 |
type: string |
|
364 |
description: Email Address |
|
365 |
example: root@ca.com |
|
366 |
example: |
|
367 |
ST: Pilsen Region |
|
368 |
emailAddress: root@ca.com |
|
369 |
C: CZ |
|
370 |
OU: IT department |
|
371 |
CN: Root CA s.r.o. |
|
372 |
L: Pilsen |
|
373 |
O: Root CA s.r.o. |
|
374 |
Certificate: |
|
375 |
required: |
|
376 |
- notAfter |
|
377 |
- notBefore |
|
378 |
- subject |
|
379 |
- usage |
|
380 |
properties: |
|
381 |
subject: |
|
382 |
$ref: '#/components/schemas/Subject' |
|
383 |
notBefore: |
|
384 |
type: string |
|
385 |
format: date |
|
386 |
example: 2021-03-31 |
|
387 |
notAfter: |
|
388 |
type: string |
|
389 |
format: date |
|
390 |
example: 2021-07-01 |
|
391 |
usage: |
|
392 |
$ref: '#/components/schemas/CAUsage' |
|
393 |
CA: |
|
394 |
type: integer |
|
395 |
description: ID of the new item |
|
396 |
example: 547 |
|
397 |
example: |
|
398 |
notAfter: 2021-07-01T00:00:00.000+00:00 |
|
399 |
subject: |
|
400 |
ST: Pilsen Region |
|
401 |
emailAddress: root@ca.com |
|
402 |
C: CZ |
|
403 |
OU: IT department |
|
404 |
CN: Root CA s.r.o. |
|
405 |
L: Pilsen |
|
406 |
O: Root CA s.r.o. |
|
407 |
usage: |
|
408 |
digitalSignature: true |
|
409 |
SSL: true |
|
410 |
CA: true |
|
411 |
authentication: true |
|
412 |
notBefore: 2021-03-31T00:00:00.000+00:00 |
|
413 |
CA: 547 |
|
414 |
CertificateRequest: |
|
415 |
required: |
|
416 |
- subject |
|
417 |
- usage |
|
418 |
- validityDays |
|
419 |
properties: |
|
420 |
subject: |
|
421 |
$ref: '#/components/schemas/Subject' |
|
422 |
validityDays: |
|
423 |
type: integer |
|
424 |
example: 30 |
|
425 |
usage: |
|
426 |
$ref: '#/components/schemas/CAUsage' |
|
427 |
CA: |
|
428 |
type: integer |
|
429 |
description: ID of the new item |
|
430 |
example: 547 |
|
431 |
CreatedResponse: |
|
432 |
required: |
|
433 |
- data |
|
434 |
- success |
|
435 |
properties: |
|
436 |
success: |
|
437 |
type: boolean |
|
438 |
example: true |
|
439 |
data: |
|
440 |
type: integer |
|
441 |
example: 457 |
|
442 |
description: Item was created |
|
443 |
example: |
|
444 |
data: 457 |
|
445 |
success: true |
|
446 |
ErrorResponse: |
|
447 |
required: |
|
448 |
- data |
|
449 |
- success |
|
450 |
properties: |
|
451 |
success: |
|
452 |
type: boolean |
|
453 |
example: false |
|
454 |
data: |
|
455 |
type: string |
|
456 |
example: An error occured |
|
457 |
CertificateResponse: |
|
458 |
required: |
|
459 |
- data |
|
460 |
- success |
|
461 |
properties: |
|
462 |
success: |
|
463 |
type: boolean |
|
464 |
example: true |
|
465 |
data: |
|
466 |
$ref: '#/components/schemas/Certificate' |
|
467 |
example: |
|
468 |
data: |
|
469 |
notAfter: 2021-07-01T00:00:00.000+00:00 |
|
470 |
subject: |
|
471 |
ST: Pilsen Region |
|
472 |
emailAddress: root@ca.com |
|
473 |
C: CZ |
|
474 |
OU: IT department |
|
475 |
CN: Root CA s.r.o. |
|
476 |
L: Pilsen |
|
477 |
O: Root CA s.r.o. |
|
478 |
usage: |
|
479 |
digitalSignature: true |
|
480 |
SSL: true |
|
481 |
CA: true |
|
482 |
authentication: true |
|
483 |
notBefore: 2021-03-31T00:00:00.000+00:00 |
|
484 |
CA: 547 |
|
485 |
success: true |
|
486 |
PemResponse: |
|
487 |
required: |
|
488 |
- data |
|
489 |
- success |
|
490 |
properties: |
|
491 |
success: |
|
492 |
type: boolean |
|
493 |
example: true |
|
494 |
data: |
|
495 |
type: string |
|
496 |
description: Single PEM file or concatenation of multiple PEM formatted |
|
497 |
certificates |
|
498 |
example: '-----BEGIN CERTIFICATE-----MIICLDCCAdKgAwIBAgIBADAKBggqhkjOPQQDAjB9MQswCQYDVQQGEwJCRTEPMA0GA1UEChMGR251VExTMSUwIwYDVQQ...etc-----END |
|
499 |
CERTIFICATE-----' |
|
500 |
example: |
|
501 |
data: '-----BEGIN CERTIFICATE-----MIICLDCCAdKgAwIBAgIBADAKBggqhkjOPQQDAjB9MQswCQYDVQQGEwJCRTEPMA0GA1UEChMGR251VExTMSUwIwYDVQQ...etc-----END |
|
502 |
CERTIFICATE-----' |
|
503 |
success: true |
|
504 |
IdParameter: |
|
505 |
required: |
|
506 |
- id |
|
507 |
properties: |
|
508 |
id: |
|
509 |
type: integer |
|
510 |
example: 444 |
|
511 |
|
src/utils/util.py | ||
---|---|---|
1 |
import datetime |
|
2 |
|
|
3 |
import six |
|
4 |
import typing |
|
5 |
|
|
6 |
|
|
7 |
def _deserialize(data, klass): |
|
8 |
"""Deserializes dict, list, str into an object. |
|
9 |
|
|
10 |
:param data: dict, list or str. |
|
11 |
:param klass: class literal, or string of class name. |
|
12 |
|
|
13 |
:return: object. |
|
14 |
""" |
|
15 |
if data is None: |
|
16 |
return None |
|
17 |
|
|
18 |
if klass in six.integer_types or klass in (float, str, bool): |
|
19 |
return _deserialize_primitive(data, klass) |
|
20 |
elif klass == object: |
|
21 |
return _deserialize_object(data) |
|
22 |
elif klass == datetime.date: |
|
23 |
return deserialize_date(data) |
|
24 |
elif klass == datetime.datetime: |
|
25 |
return deserialize_datetime(data) |
|
26 |
elif hasattr(klass, '__origin__'): |
|
27 |
if klass.__origin__ == list: |
|
28 |
return _deserialize_list(data, klass.__args__[0]) |
|
29 |
if klass.__origin__ == dict: |
|
30 |
return _deserialize_dict(data, klass.__args__[1]) |
|
31 |
else: |
|
32 |
return deserialize_model(data, klass) |
|
33 |
|
|
34 |
|
|
35 |
def _deserialize_primitive(data, klass): |
|
36 |
"""Deserializes to primitive type. |
|
37 |
|
|
38 |
:param data: data to deserialize. |
|
39 |
:param klass: class literal. |
|
40 |
|
|
41 |
:return: int, long, float, str, bool. |
|
42 |
:rtype: int | long | float | str | bool |
|
43 |
""" |
|
44 |
try: |
|
45 |
value = klass(data) |
|
46 |
except UnicodeEncodeError: |
|
47 |
value = six.u(data) |
|
48 |
except TypeError: |
|
49 |
value = data |
|
50 |
return value |
|
51 |
|
|
52 |
|
|
53 |
def _deserialize_object(value): |
|
54 |
"""Return a original value. |
|
55 |
|
|
56 |
:return: object. |
|
57 |
""" |
|
58 |
return value |
|
59 |
|
|
60 |
|
|
61 |
def deserialize_date(string): |
|
62 |
"""Deserializes string to date. |
|
63 |
|
|
64 |
:param string: str. |
|
65 |
:type string: str |
|
66 |
:return: date. |
|
67 |
:rtype: date |
|
68 |
""" |
|
69 |
try: |
|
70 |
from dateutil.parser import parse |
|
71 |
return parse(string).date() |
|
72 |
except ImportError: |
|
73 |
return string |
|
74 |
|
|
75 |
|
|
76 |
def deserialize_datetime(string): |
|
77 |
"""Deserializes string to datetime. |
|
78 |
|
|
79 |
The string should be in iso8601 datetime format. |
|
80 |
|
|
81 |
:param string: str. |
|
82 |
:type string: str |
|
83 |
:return: datetime. |
|
84 |
:rtype: datetime |
|
85 |
""" |
|
86 |
try: |
|
87 |
from dateutil.parser import parse |
|
88 |
return parse(string) |
|
89 |
except ImportError: |
|
90 |
return string |
|
91 |
|
|
92 |
|
|
93 |
def deserialize_model(data, klass): |
|
94 |
"""Deserializes list or dict to model. |
|
95 |
|
|
96 |
:param data: dict, list. |
|
97 |
:type data: dict | list |
|
98 |
:param klass: class literal. |
|
99 |
:return: model object. |
|
100 |
""" |
|
101 |
instance = klass() |
|
102 |
|
|
103 |
if not instance.swagger_types: |
|
104 |
return data |
|
105 |
|
|
106 |
for attr, attr_type in six.iteritems(instance.swagger_types): |
|
107 |
if data is not None \ |
|
108 |
and instance.attribute_map[attr] in data \ |
|
109 |
and isinstance(data, (list, dict)): |
|
110 |
value = data[instance.attribute_map[attr]] |
|
111 |
setattr(instance, attr, _deserialize(value, attr_type)) |
|
112 |
|
|
113 |
return instance |
|
114 |
|
|
115 |
|
|
116 |
def _deserialize_list(data, boxed_type): |
|
117 |
"""Deserializes a list and its elements. |
|
118 |
|
|
119 |
:param data: list to deserialize. |
|
120 |
:type data: list |
|
121 |
:param boxed_type: class literal. |
|
122 |
|
|
123 |
:return: deserialized list. |
|
124 |
:rtype: list |
|
125 |
""" |
|
126 |
return [_deserialize(sub_data, boxed_type) |
|
127 |
for sub_data in data] |
|
128 |
|
|
129 |
|
|
130 |
def _deserialize_dict(data, boxed_type): |
|
131 |
"""Deserializes a dict and its elements. |
|
132 |
|
|
133 |
:param data: dict to deserialize. |
|
134 |
:type data: dict |
|
135 |
:param boxed_type: class literal. |
|
136 |
|
|
137 |
:return: deserialized dict. |
|
138 |
:rtype: dict |
|
139 |
""" |
|
140 |
return {k: _deserialize(v, boxed_type) |
|
141 |
for k, v in six.iteritems(data)} |
swagger_server/__main__.py | ||
---|---|---|
1 |
#!/usr/bin/env python3 |
|
2 |
import logging |
|
3 |
|
|
4 |
import connexion |
|
5 |
|
|
6 |
from swagger_server import encoder |
|
7 |
|
|
8 |
|
|
9 |
app = connexion.App(__name__, specification_dir='./swagger/', debug=False) |
|
10 |
PORT = 8080 |
|
11 |
|
|
12 |
|
|
13 |
def setup(a): |
|
14 |
a.app.json_encoder = encoder.JSONEncoder |
|
15 |
a.add_api('swagger.yaml', arguments={'title': 'X.509 certificate management'}, base_path="/", pythonic_params=True) |
|
16 |
# a.run(port=PORT) |
|
17 |
|
|
18 |
|
|
19 |
if __name__ == '__main__': |
|
20 |
setup(app) |
|
21 |
app.run(PORT) |
swagger_server/controllers/authorization_controller.py | ||
---|---|---|
1 |
from typing import List |
|
2 |
""" |
|
3 |
controller generated to handled auth operation described at: |
|
4 |
https://connexion.readthedocs.io/en/latest/security.html |
|
5 |
""" |
|
6 |
|
swagger_server/controllers/certificates_controller.py | ||
---|---|---|
1 |
from datetime import datetime |
|
2 |
|
|
3 |
import connexion |
|
4 |
import six |
|
5 |
|
|
6 |
from src.dao.private_key_repository import PrivateKeyRepository |
|
7 |
from src.model.subject import Subject |
|
8 |
from src.services.certificate_service import CertificateService |
|
9 |
from src.dao.certificate_repository import CertificateRepository # TODO not the Controller's responsibility. 1 |
|
10 |
from src.services.cryptography import CryptographyService # TODO not the Controller's responsibility. 2 |
|
11 |
from sqlite3 import Connection # TODO not the Controller's responsibility. 3 |
|
12 |
from src.constants import DICT_USAGES, CA_ID, \ |
|
13 |
DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \ |
|
14 |
DATETIME_FORMAT # TODO DATABASE_FILE - not the Controller's |
|
15 |
# responsibility. 4 |
|
16 |
from src.services.key_service import KeyService |
|
17 |
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem |
|
18 |
|
|
19 |
from swagger_server.models.certificate import Certificate # noqa: E501 |
|
20 |
from swagger_server.models.certificate_list_response import CertificateListResponse # noqa: E501 |
|
21 |
from swagger_server.models.certificate_response import CertificateResponse # noqa: E501 |
|
22 |
from swagger_server.models.created_response import CreatedResponse # noqa: E501 |
|
23 |
from swagger_server.models.error_response import ErrorResponse # noqa: E501 |
|
24 |
from swagger_server.models.filtering import Filtering # noqa: E501 |
|
25 |
from swagger_server.models.id_parameter import IdParameter # noqa: E501 |
|
26 |
from swagger_server.models.pem_response import PemResponse # noqa: E501 |
|
27 |
from swagger_server import util |
|
28 |
|
|
29 |
|
|
30 |
# _ = Connection("../" + DATABASE_FILE) # TODO not the Controller's responsibility. 5 |
|
31 |
# cursor = _.cursor() # TODO responsibility of the |
|
32 |
# CertificateRepository. It makes no sense to |
|
33 |
# supply a different cursor than precisely |
|
34 |
# the one corresponding to the connection. |
|
35 |
# The cursor can always be generated from the |
|
36 |
# connection instance. |
|
37 |
GENERAL_ERROR = "An error occured during processing of the request." |
|
38 |
CORRUPTED_DATABASE = "Internal server error (corrupted database)." |
|
39 |
|
|
40 |
__ = CryptographyService() # TODO not the Controller's responsibility. 6 |
|
41 |
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None)) |
|
42 |
# TODO open for discussion. Expected: |
|
43 |
# CS = CertificateService.get_instance() |
|
44 |
# or something like that. |
|
45 |
|
|
46 |
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None)) # TODO as above |
|
47 |
|
|
48 |
|
|
49 |
|
|
50 |
def setup(): |
|
51 |
""" |
|
52 |
SQLite3 thread issue hack. |
|
53 |
:return: |
|
54 |
""" |
|
55 |
_ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path()) |
|
56 |
CERTIFICATE_SERVICE.certificate_repository.connection = _ |
|
57 |
CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor() |
|
58 |
KEY_SERVICE.private_key_repository.connection = _ |
|
59 |
KEY_SERVICE.private_key_repository.cursor = _.cursor() |
|
60 |
|
|
61 |
|
|
62 |
def create_certificate(body=None): # noqa: E501 |
|
63 |
"""create new certificate |
|
64 |
|
|
65 |
Create a new certificate based on given information # noqa: E501 |
|
66 |
|
|
67 |
:param body: Certificate data to be created |
|
68 |
:type body: dict | bytes |
|
69 |
|
|
70 |
:rtype: CreatedResponse |
|
71 |
""" |
|
72 |
setup() # TODO remove after issue fixed |
|
73 |
|
|
74 |
if connexion.request.is_json: |
|
75 |
body = CertificateRequest.from_dict(connexion.request.get_json()) # noqa: E501 |
|
76 |
# if body.subject is None or body.usage is None or body.validity_days is None: |
|
77 |
# return 400 |
|
78 |
|
|
79 |
key = KEY_SERVICE.create_new_key() # TODO pass key |
|
80 |
subject = Subject( |
|
81 |
common_name=body.subject.cn, |
|
82 |
country=body.subject.c, |
|
83 |
locality=body.subject.l, |
|
84 |
state=body.subject.st, |
|
85 |
organization=body.subject.o, |
|
86 |
organization_unit=body.subject.ou, |
|
87 |
email_address=body.subject.email_address |
|
88 |
) |
|
89 |
usages_dict = DICT_USAGES.copy() |
|
90 |
|
|
91 |
if body.ca is None: |
|
92 |
cert = CERTIFICATE_SERVICE.create_root_ca( |
|
93 |
key, |
|
94 |
subject, |
|
95 |
usages=usages_dict, |
|
96 |
days=body.validity_days |
|
97 |
) |
|
98 |
else: |
|
99 |
issuer = CERTIFICATE_SERVICE.get_certificate(body.ca) |
|
100 |
|
|
101 |
if issuer is None: |
|
102 |
return ErrorResponse( |
|
103 |
success=False, |
|
104 |
data="No certificate authority with such unique ID exists." |
|
105 |
), 400 |
|
106 |
|
|
107 |
issuer_key = KEY_SERVICE.get_key(issuer.private_key_id) |
|
108 |
|
|
109 |
if issuer_key is None: |
|
110 |
return ErrorResponse( |
|
111 |
success=False, |
|
112 |
data=CORRUPTED_DATABASE |
|
113 |
), 400 |
|
114 |
|
|
115 |
f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \ |
|
116 |
CERTIFICATE_SERVICE.create_end_cert |
|
117 |
|
|
118 |
cert = f( |
|
119 |
key, |
|
120 |
subject, |
|
121 |
issuer, |
|
122 |
issuer_key, |
|
123 |
usages=usages_dict, |
|
124 |
days=body.validity_days |
|
125 |
) |
|
126 |
|
|
127 |
if cert is not None: |
|
128 |
return CreatedResponse( |
|
129 |
success=True, |
|
130 |
data=cert.certificate_id |
|
131 |
), 201 |
|
132 |
else: |
|
133 |
return ErrorResponse( |
|
134 |
success=False, |
|
135 |
data="The certificate could not have been created." |
|
136 |
), 400 |
|
137 |
else: |
|
138 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400 |
|
139 |
|
|
140 |
|
|
141 |
def get_certificate_by_id(id): # noqa: E501 |
|
142 |
"""get certificate by ID |
|
143 |
|
|
144 |
Get certificate in PEM format by ID # noqa: E501 |
|
145 |
|
|
146 |
:param id: ID of a certificate to be queried |
|
147 |
:type id: dict | bytes |
|
148 |
|
|
149 |
:rtype: PemResponse |
|
150 |
""" |
|
151 |
if connexion.request.is_json: |
|
152 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
153 |
return 'do some magic!' |
|
154 |
|
|
155 |
|
|
156 |
def get_certificate_details_by_id(id): # noqa: E501 |
|
157 |
"""get certificate's details by ID |
|
158 |
|
|
159 |
Get certificate details by ID # noqa: E501 |
|
160 |
|
|
161 |
:param id: ID of a certificate whose details are to be queried |
|
162 |
:type id: dict | bytes |
|
163 |
|
|
164 |
:rtype: CertificateResponse |
|
165 |
""" |
|
166 |
if connexion.request.is_json: |
|
167 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
168 |
return 'do some magic!' |
|
169 |
|
|
170 |
|
|
171 |
def get_certificate_list(filtering=None): # noqa: E501 |
|
172 |
"""get list of certificates |
|
173 |
|
|
174 |
Lists certificates based on provided filtering options # noqa: E501 |
|
175 |
|
|
176 |
:param filtering: Filter certificate type to be queried |
|
177 |
:type filtering: dict | bytes |
|
178 |
|
|
179 |
:rtype: CertificateListResponse |
|
180 |
""" |
|
181 |
setup() # TODO remove after issue fixed |
|
182 |
|
|
183 |
key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'} |
|
184 |
|
|
185 |
if len(connexion.request.data) == 0: |
|
186 |
certs = CERTIFICATE_SERVICE.get_certificates() |
|
187 |
if certs is None: |
|
188 |
return ErrorResponse(success=False, data=GENERAL_ERROR), 500 |
|
189 |
elif len(certs) == 0: |
|
190 |
return ErrorResponse(success=False, data="No certificates found."), 204 |
|
191 |
else: |
|
192 |
ret = [] |
|
193 |
for c in certs: |
|
194 |
c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id) |
|
195 |
if c_issuer is None: |
|
196 |
return ErrorResponse(success=False, data=CORRUPTED_DATABASE) |
|
197 |
|
|
198 |
ret.append( |
|
199 |
CertificateListItem( |
|
200 |
id=c.certificate_id, |
|
201 |
cn=c.common_name, |
|
202 |
not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(), |
|
203 |
not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(), |
|
204 |
usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}), |
|
205 |
issuer=IssuerListItem( |
|
206 |
id=c_issuer.certificate_id, |
|
207 |
cn=c_issuer.common_name |
|
208 |
) |
|
209 |
) |
|
210 |
) |
|
211 |
return CertificateListResponse(success=True, data=ret) |
|
212 |
else: |
|
213 |
# TODO fix filtering issue (somehow) |
|
214 |
return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400 |
|
215 |
|
|
216 |
|
|
217 |
def get_certificate_root_by_id(id): # noqa: E501 |
|
218 |
"""get certificate's root of trust chain by ID |
|
219 |
|
|
220 |
Get certificate's root of trust chain in PEM format by ID # noqa: E501 |
|
221 |
|
|
222 |
:param id: ID of a child certificate whose root is to be queried |
|
223 |
:type id: dict | bytes |
|
224 |
|
|
225 |
:rtype: PemResponse |
|
226 |
""" |
|
227 |
if connexion.request.is_json: |
|
228 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
229 |
return 'do some magic!' |
|
230 |
|
|
231 |
|
|
232 |
def get_certificate_trust_chain_by_id(id): # noqa: E501 |
|
233 |
"""get certificate's trust chain by ID |
|
234 |
|
|
235 |
Get certificate trust chain in PEM format by ID # noqa: E501 |
|
236 |
|
|
237 |
:param id: ID of a child certificate whose chain is to be queried |
|
238 |
:type id: dict | bytes |
|
239 |
|
|
240 |
:rtype: PemResponse |
|
241 |
""" |
|
242 |
if connexion.request.is_json: |
|
243 |
id = IdParameter.from_dict(connexion.request.get_json()) # noqa: E501 |
|
244 |
return 'do some magic!' |
swagger_server/encoder.py | ||
---|---|---|
1 |
from connexion.apps.flask_app import FlaskJSONEncoder |
|
2 |
import six |
|
3 |
|
|
4 |
from swagger_server.models.base_model_ import Model |
|
5 |
|
|
6 |
|
|
7 |
class JSONEncoder(FlaskJSONEncoder): |
|
8 |
include_nulls = False |
|
9 |
|
|
10 |
def default(self, o): |
|
11 |
if isinstance(o, Model): |
|
12 |
dikt = {} |
|
13 |
for attr, _ in six.iteritems(o.swagger_types): |
|
14 |
value = getattr(o, attr) |
|
15 |
if value is None and not self.include_nulls: |
|
16 |
continue |
|
17 |
attr = o.attribute_map[attr] |
|
18 |
dikt[attr] = value |
|
19 |
return dikt |
|
20 |
return FlaskJSONEncoder.default(self, o) |
swagger_server/models/__init__.py | ||
---|---|---|
1 |
# coding: utf-8 |
|
2 |
|
|
3 |
# flake8: noqa |
|
4 |
from __future__ import absolute_import |
|
5 |
# import models into model package |
|
6 |
from swagger_server.models.ca_usage import CAUsage |
|
7 |
from swagger_server.models.certificate import Certificate |
|
8 |
from swagger_server.models.certificate_list_item import CertificateListItem |
|
9 |
from swagger_server.models.certificate_list_response import CertificateListResponse |
|
10 |
from swagger_server.models.certificate_request import CertificateRequest |
|
11 |
from swagger_server.models.certificate_response import CertificateResponse |
|
12 |
from swagger_server.models.created_response import CreatedResponse |
|
13 |
from swagger_server.models.error_response import ErrorResponse |
|
14 |
from swagger_server.models.filtering import Filtering |
|
15 |
from swagger_server.models.id_parameter import IdParameter |
|
16 |
from swagger_server.models.issuer_list_item import IssuerListItem |
|
17 |
from swagger_server.models.pem_response import PemResponse |
|
18 |
from swagger_server.models.subject import Subject |
swagger_server/models/base_model_.py | ||
---|---|---|
1 |
import pprint |
|
2 |
|
|
3 |
import six |
|
4 |
import typing |
|
5 |
|
|
6 |
from swagger_server import util |
|
7 |
|
|
8 |
T = typing.TypeVar('T') |
|
9 |
|
|
10 |
|
|
11 |
class Model(object): |
|
12 |
# swaggerTypes: The key is attribute name and the |
|
13 |
# value is attribute type. |
|
14 |
swagger_types = {} |
|
15 |
|
|
16 |
# attributeMap: The key is attribute name and the |
|
17 |
# value is json key in definition. |
|
18 |
attribute_map = {} |
|
19 |
|
|
20 |
@classmethod |
|
21 |
def from_dict(cls: typing.Type[T], dikt) -> T: |
|
22 |
"""Returns the dict as a model""" |
|
23 |
return util.deserialize_model(dikt, cls) |
|
24 |
|
|
25 |
def to_dict(self): |
|
26 |
"""Returns the model properties as a dict |
|
27 |
|
|
28 |
:rtype: dict |
|
29 |
""" |
|
30 |
result = {} |
|
31 |
|
|
32 |
for attr, _ in six.iteritems(self.swagger_types): |
|
33 |
value = getattr(self, attr) |
|
34 |
if isinstance(value, list): |
|
35 |
result[attr] = list(map( |
|
36 |
lambda x: x.to_dict() if hasattr(x, "to_dict") else x, |
|
37 |
value |
|
38 |
)) |
|
39 |
elif hasattr(value, "to_dict"): |
|
40 |
result[attr] = value.to_dict() |
|
41 |
elif isinstance(value, dict): |
|
42 |
result[attr] = dict(map( |
|
43 |
lambda item: (item[0], item[1].to_dict()) |
|
44 |
if hasattr(item[1], "to_dict") else item, |
|
45 |
value.items() |
|
46 |
)) |
|
47 |
else: |
|
48 |
result[attr] = value |
|
49 |
|
|
50 |
return result |
|
51 |
|
|
52 |
def to_str(self): |
|
53 |
"""Returns the string representation of the model |
|
54 |
|
|
55 |
:rtype: str |
|
56 |
""" |
|
57 |
return pprint.pformat(self.to_dict()) |
|
58 |
|
|
59 |
def __repr__(self): |
|
60 |
"""For `print` and `pprint`""" |
|
61 |
return self.to_str() |
|
62 |
|
|
63 |
def __eq__(self, other): |
|
64 |
"""Returns true if both objects are equal""" |
|
65 |
return self.__dict__ == other.__dict__ |
|
66 |
|
|
67 |
def __ne__(self, other): |
|
68 |
"""Returns true if both objects are not equal""" |
|
69 |
return not self == other |
swagger_server/models/ca_usage.py | ||
---|---|---|
1 |
# coding: utf-8 |
|
2 |
|
|
3 |
from __future__ import absolute_import |
|
4 |
from datetime import date, datetime # noqa: F401 |
|
5 |
|
|
6 |
from typing import List, Dict # noqa: F401 |
|
7 |
|
|
8 |
from swagger_server.models.base_model_ import Model |
|
9 |
from swagger_server import util |
|
10 |
|
|
11 |
|
|
12 |
class CAUsage(Model): |
|
13 |
"""NOTE: This class is auto generated by the swagger code generator program. |
|
14 |
|
|
15 |
Do not edit the class manually. |
|
16 |
""" |
|
17 |
def __init__(self, ca: bool=None, authentication: bool=None, digital_signature: bool=None, ssl: bool=None): # noqa: E501 |
|
18 |
"""CAUsage - a model defined in Swagger |
|
19 |
|
|
20 |
:param ca: The ca of this CAUsage. # noqa: E501 |
|
21 |
:type ca: bool |
|
22 |
:param authentication: The authentication of this CAUsage. # noqa: E501 |
|
23 |
:type authentication: bool |
|
24 |
:param digital_signature: The digital_signature of this CAUsage. # noqa: E501 |
|
25 |
:type digital_signature: bool |
|
26 |
:param ssl: The ssl of this CAUsage. # noqa: E501 |
|
27 |
:type ssl: bool |
|
28 |
""" |
|
29 |
self.swagger_types = { |
|
30 |
'ca': bool, |
|
31 |
'authentication': bool, |
|
32 |
'digital_signature': bool, |
|
33 |
'ssl': bool |
|
34 |
} |
|
35 |
|
|
36 |
self.attribute_map = { |
|
37 |
'ca': 'CA', |
|
38 |
'authentication': 'authentication', |
|
39 |
'digital_signature': 'digitalSignature', |
|
40 |
'ssl': 'SSL' |
Také k dispozici: Unified diff
Re #8476 - Refactored the application; swagger is not used for code generation anymore, REST API is being implemented from scratch. Migration only, fixing tests.