Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 5b57121e

Přidáno uživatelem Michal Seják před asi 4 roky(ů)

Re #8476 - Refactored the application; swagger is not used for code generation anymore, REST API is being implemented from scratch. Migration only, fixing tests.

Zobrazit rozdíly:

app.py
1 1
from flask import Flask
2 2
import os
3
from src.controllers.certificates_controller import CertController
3 4

  
4 5
app = Flask(__name__)
5 6

  
......
9 10
    return 'Welcome to the X.509 management application homepage!'
10 11

  
11 12

  
13
@app.route('/api/certificates', methods=["POST"])
14
def create_certificate():
15
    return CertController.create_certificate()
16

  
17

  
18
@app.route('/api/certificates', methods=["GET"])
19
def get_cert_list():
20
    return CertController.get_certificate_list()
21

  
22

  
12 23
if __name__ == '__main__':
13 24
    host = "0.0.0.0"
14 25
    port = 5000
......
20 31
    if "FLASK_PORT" in os.environ:
21 32
        host = os.environ["FLASK_PORT"]
22 33

  
23
    app.run(host=host, port=port)
24

  
34
    app.run(host=host, port=port)
requirements.txt
1 1
Flask==1.1.2
2
pytest~=6.2.3
3

  
4
connexion~=2.7.0
5
connexion[swagger-ui]
6
six~=1.15.0
2
pytest~=6.2.3
src/controllers/certificates_controller.py
1
from datetime import datetime
2
from itertools import chain
3
from flask import request
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository  # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService  # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection  # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
12
    DATETIME_FORMAT, ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID  # TODO DATABASE_FILE - not the Controller's
13
#  responsibility. 4
14
from src.services.key_service import KeyService
15

  
16
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
17
# cursor = _.cursor()                                                   # TODO responsibility of the
18
                                                                        #  CertificateRepository. It makes no sense to
19
                                                                        #  supply a different cursor than precisely
20
                                                                        #  the one corresponding to the connection.
21
                                                                        #  The cursor can always be generated from the
22
                                                                        #  connection instance.
23
FILTERING = "filtering"
24
ISSUER = "issuer"
25
US = "usage"
26
NOT_AFTER = "notAfter"
27
NOT_BEFORE = "notBefore"
28
COMMON_NAME = "CN"
29
ID = "id"
30
E_NO_ISSUER_FOUND = {"success": False,
31
                           "data": "No certificate authority with such unique ID exists."}
32
E_NO_CERTIFICATES_FOUND = {"success": False, "data": "No certificates found."}
33
E_NOT_JSON_FORMAT = {"success": False, "data": "The request must be JSON-formatted."}
34
E_CORRUPTED_DATABASE = {"success": False, "data": "Internal server error (corrupted database)."}
35
E_GENERAL_ERROR = {"success": False, "data": "Internal server error (unknown origin)."}
36
E_MISSING_PARAMETERS = {"success": False, "data": "Invalid request, missing parameters."}
37
E_WRONG_PARAMETERS = {"success": False, "data": "Invalid request, wrong parameters."}
38
USAGE = "usage"
39
SUBJECT = "subject"
40
VALIDITY_DAYS = "validityDays"
41
CA = "CA"
42

  
43
__ = CryptographyService()  # TODO not the Controller's responsibility. 6
44
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
45
# TODO open for discussion. Expected:
46
#  CS = CertificateService.get_instance()
47
#  or something like that.
48

  
49
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))  # TODO as above
50

  
51

  
52
class CertController:
53

  
54
    @staticmethod
55
    def setup():
56
        """
57
        SQLite3 thread issue hack.
58
        :return:
59
        """
60
        _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
61
        CERTIFICATE_SERVICE.certificate_repository.connection = _
62
        CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
63
        KEY_SERVICE.private_key_repository.connection = _
64
        KEY_SERVICE.private_key_repository.cursor = _.cursor()
65

  
66
    @staticmethod
67
    def create_certificate():  # noqa: E501
68
        """create new certificate
69

  
70
        Create a new certificate based on given information # noqa: E501
71

  
72
        :param body: Certificate data to be created
73
        :type body: dict | bytes
74

  
75
        :rtype: CreatedResponse
76
        """
77
        CertController.setup()  # TODO remove after issue fixed
78

  
79
        key_map = {'CA': CA_ID, 'SSL': SSL_ID, 'digitalSignature': SIGNATURE_ID, 'authentication': AUTHENTICATION_ID}
80
        required_keys = {SUBJECT, USAGE, VALIDITY_DAYS}
81

  
82
        if request.is_json:
83
            body = request.get_json()
84
            if not all(k in body for k in required_keys):
85
                return E_MISSING_PARAMETERS, 400
86

  
87
            if not isinstance(body[VALIDITY_DAYS], int):
88
                return E_WRONG_PARAMETERS, 400
89

  
90
            subject = Subject.from_dict(body[SUBJECT])
91

  
92
            if subject is None:
93
                return E_WRONG_PARAMETERS, 400
94

  
95
            usages_dict = DICT_USAGES.copy()
96

  
97
            if not isinstance(body[USAGE], dict):
98
                return E_WRONG_PARAMETERS, 400
99

  
100
            for k, v in body[USAGE].items():
101
                if k not in key_map:
102
                    return E_WRONG_PARAMETERS, 400
103
                usages_dict[key_map[k]] = v
104

  
105
            key = KEY_SERVICE.create_new_key()  # TODO pass key
106

  
107
            if CA not in body:
108
                cert = CERTIFICATE_SERVICE.create_root_ca(
109
                    key,
110
                    subject,
111
                    usages=usages_dict,
112
                    days=body[VALIDITY_DAYS]
113
                )
114
            else:
115
                issuer = CERTIFICATE_SERVICE.get_certificate(body[CA])
116

  
117
                if issuer is None:
118
                    return E_NO_ISSUER_FOUND, 400
119

  
120
                issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
121

  
122
                if issuer_key is None:
123
                    return E_CORRUPTED_DATABASE, 500
124

  
125
                f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
126
                    CERTIFICATE_SERVICE.create_end_cert
127

  
128
                cert = f(
129
                    key,
130
                    subject,
131
                    issuer,
132
                    issuer_key,
133
                    usages=usages_dict,
134
                    days=body[VALIDITY_DAYS]
135
                )
136

  
137
            if cert is not None:
138
                return {"success": True,
139
                        "data": cert.certificate_id}, 201
140
            else:
141
                return {"success": False,
142
                        "data": "Internal error: The certificate could not have been created."}, 400
143
        else:
144
            return E_NOT_JSON_FORMAT, 400
145

  
146
    @staticmethod
147
    def get_certificate_by_id(id):  # noqa: E501
148
        """get certificate by ID
149

  
150
        Get certificate in PEM format by ID # noqa: E501
151

  
152
        :param id: ID of a certificate to be queried
153
        :type id: dict | bytes
154

  
155
        :rtype: PemResponse
156
        """
157
        CertController.setup()  # TODO remove after issue fixed
158

  
159
        if connexion.request.is_json:
160
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
161
        return 'do some magic!'
162

  
163
    @staticmethod
164
    def get_certificate_details_by_id(id):  # noqa: E501
165
        """get certificate's details by ID
166

  
167
        Get certificate details by ID # noqa: E501
168

  
169
        :param id: ID of a certificate whose details are to be queried
170
        :type id: dict | bytes
171

  
172
        :rtype: CertificateResponse
173
        """
174
        CertController.setup()  # TODO remove after issue fixed
175

  
176
        if connexion.request.is_json:
177
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
178
        return 'do some magic!'
179

  
180
    @staticmethod
181
    def get_certificate_list():  # noqa: E501
182
        """get list of certificates
183

  
184
        Lists certificates based on provided filtering options # noqa: E501
185

  
186
        :param filtering: Filter certificate type to be queried
187
        :type filtering: dict | bytes
188

  
189
        :rtype: CertificateListResponse
190
        """
191
        CertController.setup()  # TODO remove after issue fixed
192

  
193
        key_map = {CA_ID: 'CA', SSL_ID: 'SSL', SIGNATURE_ID: 'digitalSignature', AUTHENTICATION_ID: 'authentication'}
194

  
195
        targets = {ROOT_CA_ID, INTERMEDIATE_CA_ID, CERTIFICATE_ID}
196
        if request.is_json:
197
            data = request.get_json()
198
            if FILTERING in data:
199
                if isinstance(data[FILTERING], dict):
200
                    if CA in data[FILTERING]:
201
                        if isinstance(data[FILTERING][CA], bool):
202
                            if data[FILTERING][CA]:
203
                                targets.remove(CERTIFICATE_ID)
204
                            else:
205
                                targets.remove(ROOT_CA_ID)
206
                                targets.remove(INTERMEDIATE_CA_ID)
207
                        else:
208
                            return E_WRONG_PARAMETERS, 400
209
                else:
210
                    return E_WRONG_PARAMETERS, 400
211

  
212
        if len(targets) == 3:
213
            certs = CERTIFICATE_SERVICE.get_certificates()
214
        else:
215
            certs = list(chain(CERTIFICATE_SERVICE.get_certificates(target) for target in targets))
216

  
217
        if certs is None:
218
            return E_GENERAL_ERROR, 500
219
        elif len(certs) == 0:
220
            return E_NO_CERTIFICATES_FOUND, 204
221
        else:
222
            ret = []
223
            for c in certs:
224
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
225
                if c_issuer is None:
226
                    return E_CORRUPTED_DATABASE, 500
227

  
228
                ret.append(
229
                    {
230
                        ID: c.certificate_id,
231
                        COMMON_NAME: c.common_name,
232
                        NOT_BEFORE: datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
233
                        NOT_AFTER: datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
234
                        USAGE: {key_map[k]: v for k, v in c.usages.items()},
235
                        ISSUER: {
236
                            ID: c_issuer.certificate_id,
237
                            COMMON_NAME: c_issuer.common_name
238
                        }
239
                    }
240
                )
241
            return {"success": True, "data": ret}
242

  
243

  
244

  
245
    @staticmethod
246
    def get_certificate_root_by_id(id):  # noqa: E501
247
        """get certificate's root of trust chain by ID
248

  
249
        Get certificate's root of trust chain in PEM format by ID # noqa: E501
250

  
251
        :param id: ID of a child certificate whose root is to be queried
252
        :type id: dict | bytes
253

  
254
        :rtype: PemResponse
255
        """
256
        CertController.setup()  # TODO remove after issue fixed
257

  
258
        if connexion.request.is_json:
259
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
260
        return 'do some magic!'
261

  
262
    @staticmethod
263
    def get_certificate_trust_chain_by_id(id):  # noqa: E501
264
        """get certificate's trust chain by ID
265

  
266
        Get certificate trust chain in PEM format by ID # noqa: E501
267

  
268
        :param id: ID of a child certificate whose chain is to be queried
269
        :type id: dict | bytes
270

  
271
        :rtype: PemResponse
272
        """
273
        CertController.setup()  # TODO remove after issue fixed
274

  
275
        if connexion.request.is_json:
276
            id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
277
        return 'do some magic!'
src/model/subject.py
1 1
class Subject:
2
    ATTR_MAP = {"C": "common_name", "ST": "state", "L": "locality", "CN": "common_name", "O": "organization",
3
                "OU": "organization_unit", "emailAddress": "email_address"}
2 4

  
3 5
    def __init__(self, common_name=None, country=None, locality=None, state=None, organization=None,
4 6
                 organization_unit=None, email_address=None):
......
9 11
        self.organization = organization
10 12
        self.organization_unit = organization_unit
11 13
        self.email_address = email_address
14

  
15
    @staticmethod
16
    def from_dict(d):
17
        if not isinstance(d, dict):
18
            return None
19

  
20
        s = Subject()
21
        for k, v in Subject.ATTR_MAP.items():
22
            if k in d:
23
                if not isinstance(d[k], str):
24
                    return None
25
                s.__setattr__(v, d[k])
26

  
27
        return s
src/swagger.yaml
1
openapi: 3.0.0
2
info:
3
  title: X.509 certificate management
4
  description: API for certificate management created for YOSO company
5
  version: 1.0.1
6
servers:
7
- url: https://virtserver.swaggerhub.com/janpasek97/X509_management/1.0.1
8
  description: X509 management API
9
tags:
10
- name: certificates
11
  description: API for creating and querying certificates
12
paths:
13
  /api/certificates/{id}:
14
    get:
15
      tags:
16
      - certificates
17
      summary: get certificate by ID
18
      description: Get certificate in PEM format by ID
19
      operationId: get_certificate_by_id
20
      parameters:
21
      - name: id
22
        in: path
23
        description: ID of a certificate to be queried
24
        required: true
25
        style: simple
26
        explode: false
27
        schema:
28
          $ref: '#/components/schemas/IdParameter'
29
      responses:
30
        "200":
31
          description: returning the certificate
32
          content:
33
            application/json:
34
              schema:
35
                $ref: '#/components/schemas/PemResponse'
36
        "204":
37
          description: the certificate was not found
38
          content:
39
            application/json:
40
              schema:
41
                $ref: '#/components/schemas/ErrorResponse'
42
        "400":
43
          description: bad request
44
          content:
45
            application/json:
46
              schema:
47
                $ref: '#/components/schemas/ErrorResponse'
48
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
49
  /api/certificates/{id}/chain:
50
    get:
51
      tags:
52
      - certificates
53
      summary: get certificate's trust chain by ID
54
      description: Get certificate trust chain in PEM format by ID
55
      operationId: get_certificate_trust_chain_by_id
56
      parameters:
57
      - name: id
58
        in: path
59
        description: ID of a child certificate whose chain is to be queried
60
        required: true
61
        style: simple
62
        explode: false
63
        schema:
64
          $ref: '#/components/schemas/IdParameter'
65
      responses:
66
        "200":
67
          description: returning the trust chain
68
          content:
69
            application/json:
70
              schema:
71
                $ref: '#/components/schemas/PemResponse'
72
        "204":
73
          description: the certificate was not found
74
          content:
75
            application/json:
76
              schema:
77
                $ref: '#/components/schemas/ErrorResponse'
78
        "400":
79
          description: bad request
80
          content:
81
            application/json:
82
              schema:
83
                $ref: '#/components/schemas/ErrorResponse'
84
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
85
  /api/certificates/{id}/root:
86
    get:
87
      tags:
88
      - certificates
89
      summary: get certificate's root of trust chain by ID
90
      description: Get certificate's root of trust chain in PEM format by ID
91
      operationId: get_certificate_root_by_id
92
      parameters:
93
      - name: id
94
        in: path
95
        description: ID of a child certificate whose root is to be queried
96
        required: true
97
        style: simple
98
        explode: false
99
        schema:
100
          $ref: '#/components/schemas/IdParameter'
101
      responses:
102
        "200":
103
          description: returning the root of trust chain
104
          content:
105
            application/json:
106
              schema:
107
                $ref: '#/components/schemas/PemResponse'
108
        "204":
109
          description: the certificate was not found
110
          content:
111
            application/json:
112
              schema:
113
                $ref: '#/components/schemas/ErrorResponse'
114
        "400":
115
          description: bad request
116
          content:
117
            application/json:
118
              schema:
119
                $ref: '#/components/schemas/ErrorResponse'
120
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
121
  /api/certificates/{id}/details:
122
    get:
123
      tags:
124
      - certificates
125
      summary: get certificate's details by ID
126
      description: Get certificate details by ID
127
      operationId: get_certificate_details_by_id
128
      parameters:
129
      - name: id
130
        in: path
131
        description: ID of a certificate whose details are to be queried
132
        required: true
133
        style: simple
134
        explode: false
135
        schema:
136
          $ref: '#/components/schemas/IdParameter'
137
      responses:
138
        "200":
139
          description: returning the certificate details
140
          content:
141
            application/json:
142
              schema:
143
                $ref: '#/components/schemas/CertificateResponse'
144
        "204":
145
          description: the certificate was not found
146
          content:
147
            application/json:
148
              schema:
149
                $ref: '#/components/schemas/ErrorResponse'
150
        "400":
151
          description: bad request
152
          content:
153
            application/json:
154
              schema:
155
                $ref: '#/components/schemas/ErrorResponse'
156
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
157
  /api/certificates:
158
    get:
159
      tags:
160
      - certificates
161
      summary: get list of certificates
162
      description: Lists certificates based on provided filtering options
163
      operationId: get_certificate_list
164
      parameters:
165
      - name: filtering
166
        in: query
167
        description: Filter certificate type to be queried
168
        required: false
169
        style: form
170
        explode: true
171
        schema:
172
          $ref: '#/components/schemas/Filtering'
173
      responses:
174
        "200":
175
          description: returning results matching filtering criteria
176
          content:
177
            application/json:
178
              schema:
179
                $ref: '#/components/schemas/CertificateListResponse'
180
        "204":
181
          description: no certificates found
182
          content:
183
            application/json:
184
              schema:
185
                $ref: '#/components/schemas/ErrorResponse'
186
        "400":
187
          description: bad request
188
          content:
189
            application/json:
190
              schema:
191
                $ref: '#/components/schemas/ErrorResponse'
192
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
193
    post:
194
      tags:
195
      - certificates
196
      summary: create new certificate
197
      description: Create a new certificate based on given information
198
      operationId: create_certificate
199
      requestBody:
200
        description: Certificate data to be created
201
        content:
202
          application/json:
203
            schema:
204
              $ref: '#/components/schemas/CertificateRequest'
205
      responses:
206
        "201":
207
          description: item created
208
          content:
209
            application/json:
210
              schema:
211
                $ref: '#/components/schemas/CreatedResponse'
212
        "400":
213
          description: "invalid input, object invalid"
214
          content:
215
            application/json:
216
              schema:
217
                $ref: '#/components/schemas/ErrorResponse'
218
        "409":
219
          description: an existing item already exists
220
          content:
221
            application/json:
222
              schema:
223
                $ref: '#/components/schemas/ErrorResponse'
224
      x-openapi-router-controller: swagger_server.controllers.certificates_controller
225
components:
226
  schemas:
227
    CAUsage:
228
      required:
229
      - CA
230
      - SSL
231
      - authentication
232
      - digitalSignature
233
      properties:
234
        CA:
235
          type: boolean
236
        authentication:
237
          type: boolean
238
        digitalSignature:
239
          type: boolean
240
        SSL:
241
          type: boolean
242
      example:
243
        digitalSignature: true
244
        SSL: true
245
        CA: true
246
        authentication: true
247
    IssuerListItem:
248
      required:
249
      - CN
250
      - id
251
      properties:
252
        id:
253
          type: integer
254
          example: 547
255
        CN:
256
          type: string
257
          example: Root CA s.r.o.
258
      example:
259
        id: 547
260
        CN: Root CA s.r.o.
261
    CertificateListResponse:
262
      properties:
263
        success:
264
          type: boolean
265
          example: true
266
        data:
267
          type: array
268
          items:
269
            $ref: '#/components/schemas/CertificateListItem'
270
      example:
271
        data:
272
        - notAfter: 2021-07-01T00:00:00.000+00:00
273
          usage:
274
            digitalSignature: true
275
            SSL: true
276
            CA: true
277
            authentication: true
278
          id: 547
279
          CN: Root CA s.r.o.
280
          notBefore: 2021-03-31T00:00:00.000+00:00
281
          issuer:
282
            id: 547
283
            CN: Root CA s.r.o.
284
        - notAfter: 2021-07-01T00:00:00.000+00:00
285
          usage:
286
            digitalSignature: true
287
            SSL: true
288
            CA: true
289
            authentication: true
290
          id: 547
291
          CN: Root CA s.r.o.
292
          notBefore: 2021-03-31T00:00:00.000+00:00
293
          issuer:
294
            id: 547
295
            CN: Root CA s.r.o.
296
        success: true
297
    CertificateListItem:
298
      properties:
299
        id:
300
          type: integer
301
          example: 547
302
        CN:
303
          type: string
304
          example: Root CA s.r.o.
305
        notBefore:
306
          type: string
307
          format: date
308
          example: 2021-03-31
309
        notAfter:
310
          type: string
311
          format: date
312
          example: 2021-07-01
313
        usage:
314
          $ref: '#/components/schemas/CAUsage'
315
        issuer:
316
          $ref: '#/components/schemas/IssuerListItem'
317
      example:
318
        notAfter: 2021-07-01T00:00:00.000+00:00
319
        usage:
320
          digitalSignature: true
321
          SSL: true
322
          CA: true
323
          authentication: true
324
        id: 547
325
        CN: Root CA s.r.o.
326
        notBefore: 2021-03-31T00:00:00.000+00:00
327
        issuer:
328
          id: 547
329
          CN: Root CA s.r.o.
330
    Filtering:
331
      properties:
332
        CA:
333
          type: boolean
334
    Subject:
335
      required:
336
      - CN
337
      properties:
338
        C:
339
          type: string
340
          description: Country code
341
          example: CZ
342
        ST:
343
          type: string
344
          description: State/Province
345
          example: Pilsen Region
346
        L:
347
          type: string
348
          description: Locality
349
          example: Pilsen
350
        CN:
351
          type: string
352
          description: Common name
353
          example: Root CA s.r.o.
354
        O:
355
          type: string
356
          description: Organization
357
          example: Root CA s.r.o.
358
        OU:
359
          type: string
360
          description: Organization Unit
361
          example: IT department
362
        emailAddress:
363
          type: string
364
          description: Email Address
365
          example: root@ca.com
366
      example:
367
        ST: Pilsen Region
368
        emailAddress: root@ca.com
369
        C: CZ
370
        OU: IT department
371
        CN: Root CA s.r.o.
372
        L: Pilsen
373
        O: Root CA s.r.o.
374
    Certificate:
375
      required:
376
      - notAfter
377
      - notBefore
378
      - subject
379
      - usage
380
      properties:
381
        subject:
382
          $ref: '#/components/schemas/Subject'
383
        notBefore:
384
          type: string
385
          format: date
386
          example: 2021-03-31
387
        notAfter:
388
          type: string
389
          format: date
390
          example: 2021-07-01
391
        usage:
392
          $ref: '#/components/schemas/CAUsage'
393
        CA:
394
          type: integer
395
          description: ID of the new item
396
          example: 547
397
      example:
398
        notAfter: 2021-07-01T00:00:00.000+00:00
399
        subject:
400
          ST: Pilsen Region
401
          emailAddress: root@ca.com
402
          C: CZ
403
          OU: IT department
404
          CN: Root CA s.r.o.
405
          L: Pilsen
406
          O: Root CA s.r.o.
407
        usage:
408
          digitalSignature: true
409
          SSL: true
410
          CA: true
411
          authentication: true
412
        notBefore: 2021-03-31T00:00:00.000+00:00
413
        CA: 547
414
    CertificateRequest:
415
      required:
416
      - subject
417
      - usage
418
      - validityDays
419
      properties:
420
        subject:
421
          $ref: '#/components/schemas/Subject'
422
        validityDays:
423
          type: integer
424
          example: 30
425
        usage:
426
          $ref: '#/components/schemas/CAUsage'
427
        CA:
428
          type: integer
429
          description: ID of the new item
430
          example: 547
431
    CreatedResponse:
432
      required:
433
      - data
434
      - success
435
      properties:
436
        success:
437
          type: boolean
438
          example: true
439
        data:
440
          type: integer
441
          example: 457
442
      description: Item was created
443
      example:
444
        data: 457
445
        success: true
446
    ErrorResponse:
447
      required:
448
      - data
449
      - success
450
      properties:
451
        success:
452
          type: boolean
453
          example: false
454
        data:
455
          type: string
456
          example: An error occured
457
    CertificateResponse:
458
      required:
459
      - data
460
      - success
461
      properties:
462
        success:
463
          type: boolean
464
          example: true
465
        data:
466
          $ref: '#/components/schemas/Certificate'
467
      example:
468
        data:
469
          notAfter: 2021-07-01T00:00:00.000+00:00
470
          subject:
471
            ST: Pilsen Region
472
            emailAddress: root@ca.com
473
            C: CZ
474
            OU: IT department
475
            CN: Root CA s.r.o.
476
            L: Pilsen
477
            O: Root CA s.r.o.
478
          usage:
479
            digitalSignature: true
480
            SSL: true
481
            CA: true
482
            authentication: true
483
          notBefore: 2021-03-31T00:00:00.000+00:00
484
          CA: 547
485
        success: true
486
    PemResponse:
487
      required:
488
      - data
489
      - success
490
      properties:
491
        success:
492
          type: boolean
493
          example: true
494
        data:
495
          type: string
496
          description: Single PEM file or concatenation of multiple PEM formatted
497
            certificates
498
          example: '-----BEGIN CERTIFICATE-----MIICLDCCAdKgAwIBAgIBADAKBggqhkjOPQQDAjB9MQswCQYDVQQGEwJCRTEPMA0GA1UEChMGR251VExTMSUwIwYDVQQ...etc-----END
499
            CERTIFICATE-----'
500
      example:
501
        data: '-----BEGIN CERTIFICATE-----MIICLDCCAdKgAwIBAgIBADAKBggqhkjOPQQDAjB9MQswCQYDVQQGEwJCRTEPMA0GA1UEChMGR251VExTMSUwIwYDVQQ...etc-----END
502
          CERTIFICATE-----'
503
        success: true
504
    IdParameter:
505
      required:
506
      - id
507
      properties:
508
        id:
509
          type: integer
510
          example: 444
511

  
src/utils/util.py
1
import datetime
2

  
3
import six
4
import typing
5

  
6

  
7
def _deserialize(data, klass):
8
    """Deserializes dict, list, str into an object.
9

  
10
    :param data: dict, list or str.
11
    :param klass: class literal, or string of class name.
12

  
13
    :return: object.
14
    """
15
    if data is None:
16
        return None
17

  
18
    if klass in six.integer_types or klass in (float, str, bool):
19
        return _deserialize_primitive(data, klass)
20
    elif klass == object:
21
        return _deserialize_object(data)
22
    elif klass == datetime.date:
23
        return deserialize_date(data)
24
    elif klass == datetime.datetime:
25
        return deserialize_datetime(data)
26
    elif hasattr(klass, '__origin__'):
27
        if klass.__origin__ == list:
28
            return _deserialize_list(data, klass.__args__[0])
29
        if klass.__origin__ == dict:
30
            return _deserialize_dict(data, klass.__args__[1])
31
    else:
32
        return deserialize_model(data, klass)
33

  
34

  
35
def _deserialize_primitive(data, klass):
36
    """Deserializes to primitive type.
37

  
38
    :param data: data to deserialize.
39
    :param klass: class literal.
40

  
41
    :return: int, long, float, str, bool.
42
    :rtype: int | long | float | str | bool
43
    """
44
    try:
45
        value = klass(data)
46
    except UnicodeEncodeError:
47
        value = six.u(data)
48
    except TypeError:
49
        value = data
50
    return value
51

  
52

  
53
def _deserialize_object(value):
54
    """Return a original value.
55

  
56
    :return: object.
57
    """
58
    return value
59

  
60

  
61
def deserialize_date(string):
62
    """Deserializes string to date.
63

  
64
    :param string: str.
65
    :type string: str
66
    :return: date.
67
    :rtype: date
68
    """
69
    try:
70
        from dateutil.parser import parse
71
        return parse(string).date()
72
    except ImportError:
73
        return string
74

  
75

  
76
def deserialize_datetime(string):
77
    """Deserializes string to datetime.
78

  
79
    The string should be in iso8601 datetime format.
80

  
81
    :param string: str.
82
    :type string: str
83
    :return: datetime.
84
    :rtype: datetime
85
    """
86
    try:
87
        from dateutil.parser import parse
88
        return parse(string)
89
    except ImportError:
90
        return string
91

  
92

  
93
def deserialize_model(data, klass):
94
    """Deserializes list or dict to model.
95

  
96
    :param data: dict, list.
97
    :type data: dict | list
98
    :param klass: class literal.
99
    :return: model object.
100
    """
101
    instance = klass()
102

  
103
    if not instance.swagger_types:
104
        return data
105

  
106
    for attr, attr_type in six.iteritems(instance.swagger_types):
107
        if data is not None \
108
                and instance.attribute_map[attr] in data \
109
                and isinstance(data, (list, dict)):
110
            value = data[instance.attribute_map[attr]]
111
            setattr(instance, attr, _deserialize(value, attr_type))
112

  
113
    return instance
114

  
115

  
116
def _deserialize_list(data, boxed_type):
117
    """Deserializes a list and its elements.
118

  
119
    :param data: list to deserialize.
120
    :type data: list
121
    :param boxed_type: class literal.
122

  
123
    :return: deserialized list.
124
    :rtype: list
125
    """
126
    return [_deserialize(sub_data, boxed_type)
127
            for sub_data in data]
128

  
129

  
130
def _deserialize_dict(data, boxed_type):
131
    """Deserializes a dict and its elements.
132

  
133
    :param data: dict to deserialize.
134
    :type data: dict
135
    :param boxed_type: class literal.
136

  
137
    :return: deserialized dict.
138
    :rtype: dict
139
    """
140
    return {k: _deserialize(v, boxed_type)
141
            for k, v in six.iteritems(data)}
swagger_server/__main__.py
1
#!/usr/bin/env python3
2
import logging
3

  
4
import connexion
5

  
6
from swagger_server import encoder
7

  
8

  
9
app = connexion.App(__name__, specification_dir='./swagger/', debug=False)
10
PORT = 8080
11

  
12

  
13
def setup(a):
14
    a.app.json_encoder = encoder.JSONEncoder
15
    a.add_api('swagger.yaml', arguments={'title': 'X.509 certificate management'}, base_path="/", pythonic_params=True)
16
    # a.run(port=PORT)
17

  
18

  
19
if __name__ == '__main__':
20
    setup(app)
21
    app.run(PORT)
swagger_server/controllers/authorization_controller.py
1
from typing import List
2
"""
3
controller generated to handled auth operation described at:
4
https://connexion.readthedocs.io/en/latest/security.html
5
"""
6

  
swagger_server/controllers/certificates_controller.py
1
from datetime import datetime
2

  
3
import connexion
4
import six
5

  
6
from src.dao.private_key_repository import PrivateKeyRepository
7
from src.model.subject import Subject
8
from src.services.certificate_service import CertificateService
9
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
10
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
11
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
12
from src.constants import DICT_USAGES, CA_ID, \
13
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
14
    DATETIME_FORMAT  # TODO DATABASE_FILE - not the Controller's
15
                                                                    #  responsibility. 4
16
from src.services.key_service import KeyService
17
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
18

  
19
from swagger_server.models.certificate import Certificate  # noqa: E501
20
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
21
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
22
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
23
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
24
from swagger_server.models.filtering import Filtering  # noqa: E501
25
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
26
from swagger_server.models.pem_response import PemResponse  # noqa: E501
27
from swagger_server import util
28

  
29

  
30
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
31
# cursor = _.cursor()                                                   # TODO responsibility of the
32
                                                                        #  CertificateRepository. It makes no sense to
33
                                                                        #  supply a different cursor than precisely
34
                                                                        #  the one corresponding to the connection.
35
                                                                        #  The cursor can always be generated from the
36
                                                                        #  connection instance.
37
GENERAL_ERROR = "An error occured during processing of the request."
38
CORRUPTED_DATABASE = "Internal server error (corrupted database)."
39

  
40
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
41
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
42
                                                                        # TODO open for discussion. Expected:
43
                                                                        #  CS = CertificateService.get_instance()
44
                                                                        #  or something like that.
45

  
46
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO as above
47

  
48

  
49

  
50
def setup():
51
    """
52
    SQLite3 thread issue hack.
53
    :return:
54
    """
55
    _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
56
    CERTIFICATE_SERVICE.certificate_repository.connection = _
57
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
58
    KEY_SERVICE.private_key_repository.connection = _
59
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
60

  
61

  
62
def create_certificate(body=None):  # noqa: E501
63
    """create new certificate
64

  
65
    Create a new certificate based on given information # noqa: E501
66

  
67
    :param body: Certificate data to be created
68
    :type body: dict | bytes
69

  
70
    :rtype: CreatedResponse
71
    """
72
    setup()                                                             # TODO remove after issue fixed
73

  
74
    if connexion.request.is_json:
75
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
76
        # if body.subject is None or body.usage is None or body.validity_days is None:
77
        #     return 400
78

  
79
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
80
        subject = Subject(
81
            common_name=body.subject.cn,
82
            country=body.subject.c,
83
            locality=body.subject.l,
84
            state=body.subject.st,
85
            organization=body.subject.o,
86
            organization_unit=body.subject.ou,
87
            email_address=body.subject.email_address
88
        )
89
        usages_dict = DICT_USAGES.copy()
90

  
91
        if body.ca is None:
92
            cert = CERTIFICATE_SERVICE.create_root_ca(
93
                key,
94
                subject,
95
                usages=usages_dict,
96
                days=body.validity_days
97
            )
98
        else:
99
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
100

  
101
            if issuer is None:
102
                return ErrorResponse(
103
                    success=False,
104
                    data="No certificate authority with such unique ID exists."
105
                ), 400
106

  
107
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
108

  
109
            if issuer_key is None:
110
                return ErrorResponse(
111
                    success=False,
112
                    data=CORRUPTED_DATABASE
113
                ), 400
114

  
115
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
116
                CERTIFICATE_SERVICE.create_end_cert
117

  
118
            cert = f(
119
                key,
120
                subject,
121
                issuer,
122
                issuer_key,
123
                usages=usages_dict,
124
                days=body.validity_days
125
            )
126

  
127
        if cert is not None:
128
            return CreatedResponse(
129
                success=True,
130
                data=cert.certificate_id
131
            ), 201
132
        else:
133
            return ErrorResponse(
134
                success=False,
135
                data="The certificate could not have been created."
136
            ), 400
137
    else:
138
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
139

  
140

  
141
def get_certificate_by_id(id):  # noqa: E501
142
    """get certificate by ID
143

  
144
    Get certificate in PEM format by ID # noqa: E501
145

  
146
    :param id: ID of a certificate to be queried
147
    :type id: dict | bytes
148

  
149
    :rtype: PemResponse
150
    """
151
    if connexion.request.is_json:
152
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
153
    return 'do some magic!'
154

  
155

  
156
def get_certificate_details_by_id(id):  # noqa: E501
157
    """get certificate's details by ID
158

  
159
    Get certificate details by ID # noqa: E501
160

  
161
    :param id: ID of a certificate whose details are to be queried
162
    :type id: dict | bytes
163

  
164
    :rtype: CertificateResponse
165
    """
166
    if connexion.request.is_json:
167
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
168
    return 'do some magic!'
169

  
170

  
171
def get_certificate_list(filtering=None):  # noqa: E501
172
    """get list of certificates
173

  
174
    Lists certificates based on provided filtering options # noqa: E501
175

  
176
    :param filtering: Filter certificate type to be queried
177
    :type filtering: dict | bytes
178

  
179
    :rtype: CertificateListResponse
180
    """
181
    setup()                                                             # TODO remove after issue fixed
182

  
183
    key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'}
184

  
185
    if len(connexion.request.data) == 0:
186
        certs = CERTIFICATE_SERVICE.get_certificates()
187
        if certs is None:
188
            return ErrorResponse(success=False, data=GENERAL_ERROR), 500
189
        elif len(certs) == 0:
190
            return ErrorResponse(success=False, data="No certificates found."), 204
191
        else:
192
            ret = []
193
            for c in certs:
194
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
195
                if c_issuer is None:
196
                    return ErrorResponse(success=False, data=CORRUPTED_DATABASE)
197

  
198
                ret.append(
199
                    CertificateListItem(
200
                        id=c.certificate_id,
201
                        cn=c.common_name,
202
                        not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
203
                        not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
204
                        usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}),
205
                        issuer=IssuerListItem(
206
                            id=c_issuer.certificate_id,
207
                            cn=c_issuer.common_name
208
                        )
209
                    )
210
                )
211
            return CertificateListResponse(success=True, data=ret)
212
    else:
213
        # TODO fix filtering issue (somehow)
214
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
215

  
216

  
217
def get_certificate_root_by_id(id):  # noqa: E501
218
    """get certificate's root of trust chain by ID
219

  
220
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
221

  
222
    :param id: ID of a child certificate whose root is to be queried
223
    :type id: dict | bytes
224

  
225
    :rtype: PemResponse
226
    """
227
    if connexion.request.is_json:
228
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
229
    return 'do some magic!'
230

  
231

  
232
def get_certificate_trust_chain_by_id(id):  # noqa: E501
233
    """get certificate's trust chain by ID
234

  
235
    Get certificate trust chain in PEM format by ID # noqa: E501
236

  
237
    :param id: ID of a child certificate whose chain is to be queried
238
    :type id: dict | bytes
239

  
240
    :rtype: PemResponse
241
    """
242
    if connexion.request.is_json:
243
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
244
    return 'do some magic!'
swagger_server/encoder.py
1
from connexion.apps.flask_app import FlaskJSONEncoder
2
import six
3

  
4
from swagger_server.models.base_model_ import Model
5

  
6

  
7
class JSONEncoder(FlaskJSONEncoder):
8
    include_nulls = False
9

  
10
    def default(self, o):
11
        if isinstance(o, Model):
12
            dikt = {}
13
            for attr, _ in six.iteritems(o.swagger_types):
14
                value = getattr(o, attr)
15
                if value is None and not self.include_nulls:
16
                    continue
17
                attr = o.attribute_map[attr]
18
                dikt[attr] = value
19
            return dikt
20
        return FlaskJSONEncoder.default(self, o)
swagger_server/models/__init__.py
1
# coding: utf-8
2

  
3
# flake8: noqa
4
from __future__ import absolute_import
5
# import models into model package
6
from swagger_server.models.ca_usage import CAUsage
7
from swagger_server.models.certificate import Certificate
8
from swagger_server.models.certificate_list_item import CertificateListItem
9
from swagger_server.models.certificate_list_response import CertificateListResponse
10
from swagger_server.models.certificate_request import CertificateRequest
11
from swagger_server.models.certificate_response import CertificateResponse
12
from swagger_server.models.created_response import CreatedResponse
13
from swagger_server.models.error_response import ErrorResponse
14
from swagger_server.models.filtering import Filtering
15
from swagger_server.models.id_parameter import IdParameter
16
from swagger_server.models.issuer_list_item import IssuerListItem
17
from swagger_server.models.pem_response import PemResponse
18
from swagger_server.models.subject import Subject
swagger_server/models/base_model_.py
1
import pprint
2

  
3
import six
4
import typing
5

  
6
from swagger_server import util
7

  
8
T = typing.TypeVar('T')
9

  
10

  
11
class Model(object):
12
    # swaggerTypes: The key is attribute name and the
13
    # value is attribute type.
14
    swagger_types = {}
15

  
16
    # attributeMap: The key is attribute name and the
17
    # value is json key in definition.
18
    attribute_map = {}
19

  
20
    @classmethod
21
    def from_dict(cls: typing.Type[T], dikt) -> T:
22
        """Returns the dict as a model"""
23
        return util.deserialize_model(dikt, cls)
24

  
25
    def to_dict(self):
26
        """Returns the model properties as a dict
27

  
28
        :rtype: dict
29
        """
30
        result = {}
31

  
32
        for attr, _ in six.iteritems(self.swagger_types):
33
            value = getattr(self, attr)
34
            if isinstance(value, list):
35
                result[attr] = list(map(
36
                    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
37
                    value
38
                ))
39
            elif hasattr(value, "to_dict"):
40
                result[attr] = value.to_dict()
41
            elif isinstance(value, dict):
42
                result[attr] = dict(map(
43
                    lambda item: (item[0], item[1].to_dict())
44
                    if hasattr(item[1], "to_dict") else item,
45
                    value.items()
46
                ))
47
            else:
48
                result[attr] = value
49

  
50
        return result
51

  
52
    def to_str(self):
53
        """Returns the string representation of the model
54

  
55
        :rtype: str
56
        """
57
        return pprint.pformat(self.to_dict())
58

  
59
    def __repr__(self):
60
        """For `print` and `pprint`"""
61
        return self.to_str()
62

  
63
    def __eq__(self, other):
64
        """Returns true if both objects are equal"""
65
        return self.__dict__ == other.__dict__
66

  
67
    def __ne__(self, other):
68
        """Returns true if both objects are not equal"""
69
        return not self == other
swagger_server/models/ca_usage.py
1
# coding: utf-8
2

  
3
from __future__ import absolute_import
4
from datetime import date, datetime  # noqa: F401
5

  
6
from typing import List, Dict  # noqa: F401
7

  
8
from swagger_server.models.base_model_ import Model
9
from swagger_server import util
10

  
11

  
12
class CAUsage(Model):
13
    """NOTE: This class is auto generated by the swagger code generator program.
14

  
15
    Do not edit the class manually.
16
    """
17
    def __init__(self, ca: bool=None, authentication: bool=None, digital_signature: bool=None, ssl: bool=None):  # noqa: E501
18
        """CAUsage - a model defined in Swagger
19

  
20
        :param ca: The ca of this CAUsage.  # noqa: E501
21
        :type ca: bool
22
        :param authentication: The authentication of this CAUsage.  # noqa: E501
23
        :type authentication: bool
24
        :param digital_signature: The digital_signature of this CAUsage.  # noqa: E501
25
        :type digital_signature: bool
26
        :param ssl: The ssl of this CAUsage.  # noqa: E501
27
        :type ssl: bool
28
        """
29
        self.swagger_types = {
30
            'ca': bool,
31
            'authentication': bool,
32
            'digital_signature': bool,
33
            'ssl': bool
34
        }
35

  
36
        self.attribute_map = {
37
            'ca': 'CA',
38
            'authentication': 'authentication',
39
            'digital_signature': 'digitalSignature',
40
            'ssl': 'SSL'
... Rozdílový soubor je zkrácen, protože jeho délka přesahuje max. limit.

Také k dispozici: Unified diff