Projekt

Obecné

Profil

Stáhnout (9.61 KB) Statistiky
| Větev: | Tag: | Revize:
1 1e07432d Captain_Trojan
from datetime import datetime
2
3 88a68c29 Captain_Trojan
import connexion
4
import six
5
6
from src.dao.private_key_repository import PrivateKeyRepository
7
from src.model.subject import Subject
8
from src.services.certificate_service import CertificateService
9
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
10
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
11 eff25d7c Captain_Trojan
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
12
from src.constants import DICT_USAGES, CA_ID, \
13 1e07432d Captain_Trojan
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
14
    DATETIME_FORMAT  # TODO DATABASE_FILE - not the Controller's
15 08f22957 Captain_Trojan
                                                                    #  responsibility. 4
16 88a68c29 Captain_Trojan
from src.services.key_service import KeyService
17 1e07432d Captain_Trojan
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
18 88a68c29 Captain_Trojan
19
from swagger_server.models.certificate import Certificate  # noqa: E501
20
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
21
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
22
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
23
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
24
from swagger_server.models.filtering import Filtering  # noqa: E501
25
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
26
from swagger_server.models.pem_response import PemResponse  # noqa: E501
27
from swagger_server import util
28
29
30 eff25d7c Captain_Trojan
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
31 88a68c29 Captain_Trojan
# cursor = _.cursor()                                                   # TODO responsibility of the
32
                                                                        #  CertificateRepository. It makes no sense to
33
                                                                        #  supply a different cursor than precisely
34
                                                                        #  the one corresponding to the connection.
35
                                                                        #  The cursor can always be generated from the
36
                                                                        #  connection instance.
37 1e07432d Captain_Trojan
GENERAL_ERROR = "An error occured during processing of the request."
38
CORRUPTED_DATABASE = "Internal server error (corrupted database)."
39 88a68c29 Captain_Trojan
40
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
41 eff25d7c Captain_Trojan
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
42 88a68c29 Captain_Trojan
                                                                        # TODO open for discussion. Expected:
43
                                                                        #  CS = CertificateService.get_instance()
44
                                                                        #  or something like that.
45
46 eff25d7c Captain_Trojan
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO as above
47
48 88a68c29 Captain_Trojan
49
50
def setup():
51 08f22957 Captain_Trojan
    """
52
    SQLite3 thread issue hack.
53
    :return:
54
    """
55 eff25d7c Captain_Trojan
    _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
56 88a68c29 Captain_Trojan
    CERTIFICATE_SERVICE.certificate_repository.connection = _
57
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
58
    KEY_SERVICE.private_key_repository.connection = _
59
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
60
61
62
def create_certificate(body=None):  # noqa: E501
63
    """create new certificate
64
65
    Create a new certificate based on given information # noqa: E501
66
67
    :param body: Certificate data to be created
68
    :type body: dict | bytes
69
70
    :rtype: CreatedResponse
71
    """
72 eff25d7c Captain_Trojan
    setup()                                                             # TODO remove after issue fixed
73 88a68c29 Captain_Trojan
74
    if connexion.request.is_json:
75
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
76 0397ef52 Captain_Trojan
        # if body.subject is None or body.usage is None or body.validity_days is None:
77
        #     return 400
78 88a68c29 Captain_Trojan
79
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
80
        subject = Subject(
81
            common_name=body.subject.cn,
82
            country=body.subject.c,
83
            locality=body.subject.l,
84
            state=body.subject.st,
85
            organization=body.subject.o,
86
            organization_unit=body.subject.ou,
87
            email_address=body.subject.email_address
88
        )
89
        usages_dict = DICT_USAGES.copy()
90
91
        if body.ca is None:
92
            cert = CERTIFICATE_SERVICE.create_root_ca(
93
                key,
94
                subject,
95
                usages=usages_dict,
96
                days=body.validity_days
97
            )
98
        else:
99
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
100
101
            if issuer is None:
102
                return ErrorResponse(
103
                    success=False,
104
                    data="No certificate authority with such unique ID exists."
105
                ), 400
106
107
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
108
109
            if issuer_key is None:
110
                return ErrorResponse(
111
                    success=False,
112 1e07432d Captain_Trojan
                    data=CORRUPTED_DATABASE
113 88a68c29 Captain_Trojan
                ), 400
114
115
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
116
                CERTIFICATE_SERVICE.create_end_cert
117
118
            cert = f(
119
                key,
120
                subject,
121
                issuer,
122
                issuer_key,
123
                usages=usages_dict,
124
                days=body.validity_days
125
            )
126
127
        if cert is not None:
128 ee883227 Captain_Trojan
            return CreatedResponse(
129 88a68c29 Captain_Trojan
                success=True,
130
                data=cert.certificate_id
131
            ), 201
132
        else:
133
            return ErrorResponse(
134
                success=False,
135
                data="The certificate could not have been created."
136
            ), 400
137
    else:
138 1e07432d Captain_Trojan
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
139 88a68c29 Captain_Trojan
140
141
def get_certificate_by_id(id):  # noqa: E501
142
    """get certificate by ID
143
144
    Get certificate in PEM format by ID # noqa: E501
145
146
    :param id: ID of a certificate to be queried
147
    :type id: dict | bytes
148
149
    :rtype: PemResponse
150
    """
151
    if connexion.request.is_json:
152
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
153
    return 'do some magic!'
154
155
156
def get_certificate_details_by_id(id):  # noqa: E501
157
    """get certificate's details by ID
158
159
    Get certificate details by ID # noqa: E501
160
161
    :param id: ID of a certificate whose details are to be queried
162
    :type id: dict | bytes
163
164
    :rtype: CertificateResponse
165
    """
166
    if connexion.request.is_json:
167
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
168
    return 'do some magic!'
169
170
171
def get_certificate_list(filtering=None):  # noqa: E501
172
    """get list of certificates
173
174
    Lists certificates based on provided filtering options # noqa: E501
175
176
    :param filtering: Filter certificate type to be queried
177
    :type filtering: dict | bytes
178
179
    :rtype: CertificateListResponse
180
    """
181 eff25d7c Captain_Trojan
    setup()                                                             # TODO remove after issue fixed
182
183 1e07432d Captain_Trojan
    key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'}
184
185
    if len(connexion.request.data) == 0:
186
        certs = CERTIFICATE_SERVICE.get_certificates()
187
        if certs is None:
188
            return ErrorResponse(success=False, data=GENERAL_ERROR), 500
189
        elif len(certs) == 0:
190
            return ErrorResponse(success=False, data="No certificates found."), 204
191
        else:
192
            ret = []
193
            for c in certs:
194
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
195
                if c_issuer is None:
196
                    return ErrorResponse(success=False, data=CORRUPTED_DATABASE)
197
198
                ret.append(
199
                    CertificateListItem(
200
                        id=c.certificate_id,
201
                        cn=c.common_name,
202
                        not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
203
                        not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
204
                        usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}),
205
                        issuer=IssuerListItem(
206
                            id=c_issuer.certificate_id,
207
                            cn=c_issuer.common_name
208
                        )
209
                    )
210
                )
211
            return CertificateListResponse(success=True, data=ret)
212 eff25d7c Captain_Trojan
    else:
213 1e07432d Captain_Trojan
        # TODO fix filtering issue (somehow)
214
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
215 88a68c29 Captain_Trojan
216
217
def get_certificate_root_by_id(id):  # noqa: E501
218
    """get certificate's root of trust chain by ID
219
220
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
221
222
    :param id: ID of a child certificate whose root is to be queried
223
    :type id: dict | bytes
224
225
    :rtype: PemResponse
226
    """
227
    if connexion.request.is_json:
228
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
229
    return 'do some magic!'
230
231
232
def get_certificate_trust_chain_by_id(id):  # noqa: E501
233
    """get certificate's trust chain by ID
234
235
    Get certificate trust chain in PEM format by ID # noqa: E501
236
237
    :param id: ID of a child certificate whose chain is to be queried
238
    :type id: dict | bytes
239
240
    :rtype: PemResponse
241
    """
242
    if connexion.request.is_json:
243
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
244
    return 'do some magic!'