Projekt

Obecné

Profil

Stáhnout (7.55 KB) Statistiky
| Větev: | Tag: | Revize:
1 88a68c29 Captain_Trojan
import connexion
2
import six
3
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection, Cursor                              # TODO not the Controller's responsibility. 3
10
from src.constants import DATABASE_FILE, DICT_USAGES, CA_ID  # TODO not the Controller's responsibility. 4
11
from src.services.key_service import KeyService
12
from swagger_server.models import CertificateRequest
13
14
from swagger_server.models.certificate import Certificate  # noqa: E501
15
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
16
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
17
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
18
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
19
from swagger_server.models.filtering import Filtering  # noqa: E501
20
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
21
from swagger_server.models.pem_response import PemResponse  # noqa: E501
22
from swagger_server import util
23
24
25
_ = Connection("../" + DATABASE_FILE)                                   # TODO not the Controller's responsibility. 5
26
# cursor = _.cursor()                                                   # TODO responsibility of the
27
                                                                        #  CertificateRepository. It makes no sense to
28
                                                                        #  supply a different cursor than precisely
29
                                                                        #  the one corresponding to the connection.
30
                                                                        #  The cursor can always be generated from the
31
                                                                        #  connection instance.
32
33
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
34
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(_, _.cursor()))
35
                                                                        # TODO open for discussion. Expected:
36
                                                                        #  CS = CertificateService.get_instance()
37
                                                                        #  or something like that.
38
39
KEY_SERVICE = KeyService(__, PrivateKeyRepository(_, _.cursor()))       # TODO as above
40
41
42
def setup():
43
    _ = Connection("../" + DATABASE_FILE)
44
    CERTIFICATE_SERVICE.certificate_repository.connection = _
45
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
46
    KEY_SERVICE.private_key_repository.connection = _
47
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
48
49
50
def create_certificate(body=None):  # noqa: E501
51
    """create new certificate
52
53
    Create a new certificate based on given information # noqa: E501
54
55
    :param body: Certificate data to be created
56
    :type body: dict | bytes
57
58
    :rtype: CreatedResponse
59
    """
60
    setup()
61
62
    if connexion.request.is_json:
63
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
64
        if body.subject is None or body.usage is None or body.validity_days is None:
65
            return 400
66
67
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
68
        subject = Subject(
69
            common_name=body.subject.cn,
70
            country=body.subject.c,
71
            locality=body.subject.l,
72
            state=body.subject.st,
73
            organization=body.subject.o,
74
            organization_unit=body.subject.ou,
75
            email_address=body.subject.email_address
76
        )
77
        usages_dict = DICT_USAGES.copy()
78
79
        if body.ca is None:
80
            cert = CERTIFICATE_SERVICE.create_root_ca(
81
                key,
82
                subject,
83
                usages=usages_dict,
84
                days=body.validity_days
85
            )
86
        else:
87
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
88
89
            if issuer is None:
90
                return ErrorResponse(
91
                    success=False,
92
                    data="No certificate authority with such unique ID exists."
93
                ), 400
94
95
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
96
97
            if issuer_key is None:
98
                return ErrorResponse(
99
                    success=False,
100
                    data="Internal server error (corrupted database)."
101
                ), 400
102
103
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
104
                CERTIFICATE_SERVICE.create_end_cert
105
106
            cert = f(
107
                key,
108
                subject,
109
                issuer,
110
                issuer_key,
111
                usages=usages_dict,
112
                days=body.validity_days
113
            )
114
115
        if cert is not None:
116
            return CertificateResponse(
117
                success=True,
118
                data=cert.certificate_id
119
            ), 201
120
        else:
121
            return ErrorResponse(
122
                success=False,
123
                data="The certificate could not have been created."
124
            ), 400
125
    else:
126
        return 400
127
128
129
def get_certificate_by_id(id):  # noqa: E501
130
    """get certificate by ID
131
132
    Get certificate in PEM format by ID # noqa: E501
133
134
    :param id: ID of a certificate to be queried
135
    :type id: dict | bytes
136
137
    :rtype: PemResponse
138
    """
139
    if connexion.request.is_json:
140
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
141
    return 'do some magic!'
142
143
144
def get_certificate_details_by_id(id):  # noqa: E501
145
    """get certificate's details by ID
146
147
    Get certificate details by ID # noqa: E501
148
149
    :param id: ID of a certificate whose details are to be queried
150
    :type id: dict | bytes
151
152
    :rtype: CertificateResponse
153
    """
154
    if connexion.request.is_json:
155
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
156
    return 'do some magic!'
157
158
159
def get_certificate_list(filtering=None):  # noqa: E501
160
    """get list of certificates
161
162
    Lists certificates based on provided filtering options # noqa: E501
163
164
    :param filtering: Filter certificate type to be queried
165
    :type filtering: dict | bytes
166
167
    :rtype: CertificateListResponse
168
    """
169
    if connexion.request.is_json:
170
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
171
    return 'do some magic! XD'
172
173
174
def get_certificate_root_by_id(id):  # noqa: E501
175
    """get certificate's root of trust chain by ID
176
177
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
178
179
    :param id: ID of a child certificate whose root is to be queried
180
    :type id: dict | bytes
181
182
    :rtype: PemResponse
183
    """
184
    if connexion.request.is_json:
185
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
186
    return 'do some magic!'
187
188
189
def get_certificate_trust_chain_by_id(id):  # noqa: E501
190
    """get certificate's trust chain by ID
191
192
    Get certificate trust chain in PEM format by ID # noqa: E501
193
194
    :param id: ID of a child certificate whose chain is to be queried
195
    :type id: dict | bytes
196
197
    :rtype: PemResponse
198
    """
199
    if connexion.request.is_json:
200
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
201
    return 'do some magic!'