Projekt

Obecné

Profil

Stáhnout (8.01 KB) Statistiky
| Větev: | Tag: | Revize:
1 88a68c29 Captain_Trojan
import connexion
2
import six
3
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9 eff25d7c Captain_Trojan
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION                                          # TODO DATABASE_FILE - not the Controller's
12 08f22957 Captain_Trojan
                                                                    #  responsibility. 4
13 88a68c29 Captain_Trojan
from src.services.key_service import KeyService
14
from swagger_server.models import CertificateRequest
15
16
from swagger_server.models.certificate import Certificate  # noqa: E501
17
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
18
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
19
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
20
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
21
from swagger_server.models.filtering import Filtering  # noqa: E501
22
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
23
from swagger_server.models.pem_response import PemResponse  # noqa: E501
24
from swagger_server import util
25
26
27 eff25d7c Captain_Trojan
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
28 88a68c29 Captain_Trojan
# cursor = _.cursor()                                                   # TODO responsibility of the
29
                                                                        #  CertificateRepository. It makes no sense to
30
                                                                        #  supply a different cursor than precisely
31
                                                                        #  the one corresponding to the connection.
32
                                                                        #  The cursor can always be generated from the
33
                                                                        #  connection instance.
34
35
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
36 eff25d7c Captain_Trojan
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
37 88a68c29 Captain_Trojan
                                                                        # TODO open for discussion. Expected:
38
                                                                        #  CS = CertificateService.get_instance()
39
                                                                        #  or something like that.
40
41 eff25d7c Captain_Trojan
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO as above
42
43 88a68c29 Captain_Trojan
44
45
def setup():
46 08f22957 Captain_Trojan
    """
47
    SQLite3 thread issue hack.
48
    :return:
49
    """
50 eff25d7c Captain_Trojan
    _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
51 88a68c29 Captain_Trojan
    CERTIFICATE_SERVICE.certificate_repository.connection = _
52
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
53
    KEY_SERVICE.private_key_repository.connection = _
54
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
55
56
57
def create_certificate(body=None):  # noqa: E501
58
    """create new certificate
59
60
    Create a new certificate based on given information # noqa: E501
61
62
    :param body: Certificate data to be created
63
    :type body: dict | bytes
64
65
    :rtype: CreatedResponse
66
    """
67 eff25d7c Captain_Trojan
    setup()                                                             # TODO remove after issue fixed
68 88a68c29 Captain_Trojan
69
    if connexion.request.is_json:
70
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
71 0397ef52 Captain_Trojan
        # if body.subject is None or body.usage is None or body.validity_days is None:
72
        #     return 400
73 88a68c29 Captain_Trojan
74
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
75
        subject = Subject(
76
            common_name=body.subject.cn,
77
            country=body.subject.c,
78
            locality=body.subject.l,
79
            state=body.subject.st,
80
            organization=body.subject.o,
81
            organization_unit=body.subject.ou,
82
            email_address=body.subject.email_address
83
        )
84
        usages_dict = DICT_USAGES.copy()
85
86
        if body.ca is None:
87
            cert = CERTIFICATE_SERVICE.create_root_ca(
88
                key,
89
                subject,
90
                usages=usages_dict,
91
                days=body.validity_days
92
            )
93
        else:
94
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
95
96
            if issuer is None:
97
                return ErrorResponse(
98
                    success=False,
99
                    data="No certificate authority with such unique ID exists."
100
                ), 400
101
102
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
103
104
            if issuer_key is None:
105
                return ErrorResponse(
106
                    success=False,
107
                    data="Internal server error (corrupted database)."
108
                ), 400
109
110
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
111
                CERTIFICATE_SERVICE.create_end_cert
112
113
            cert = f(
114
                key,
115
                subject,
116
                issuer,
117
                issuer_key,
118
                usages=usages_dict,
119
                days=body.validity_days
120
            )
121
122
        if cert is not None:
123 ee883227 Captain_Trojan
            return CreatedResponse(
124 88a68c29 Captain_Trojan
                success=True,
125
                data=cert.certificate_id
126
            ), 201
127
        else:
128
            return ErrorResponse(
129
                success=False,
130
                data="The certificate could not have been created."
131
            ), 400
132
    else:
133
        return 400
134
135
136
def get_certificate_by_id(id):  # noqa: E501
137
    """get certificate by ID
138
139
    Get certificate in PEM format by ID # noqa: E501
140
141
    :param id: ID of a certificate to be queried
142
    :type id: dict | bytes
143
144
    :rtype: PemResponse
145
    """
146
    if connexion.request.is_json:
147
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
148
    return 'do some magic!'
149
150
151
def get_certificate_details_by_id(id):  # noqa: E501
152
    """get certificate's details by ID
153
154
    Get certificate details by ID # noqa: E501
155
156
    :param id: ID of a certificate whose details are to be queried
157
    :type id: dict | bytes
158
159
    :rtype: CertificateResponse
160
    """
161
    if connexion.request.is_json:
162
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
163
    return 'do some magic!'
164
165
166
def get_certificate_list(filtering=None):  # noqa: E501
167
    """get list of certificates
168
169
    Lists certificates based on provided filtering options # noqa: E501
170
171
    :param filtering: Filter certificate type to be queried
172
    :type filtering: dict | bytes
173
174
    :rtype: CertificateListResponse
175
    """
176 eff25d7c Captain_Trojan
    setup()                                                             # TODO remove after issue fixed
177
178 88a68c29 Captain_Trojan
    if connexion.request.is_json:
179
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
180 0397ef52 Captain_Trojan
        print(filtering)
181
        return 200
182 eff25d7c Captain_Trojan
    else:
183
        return 400
184 88a68c29 Captain_Trojan
185
186
def get_certificate_root_by_id(id):  # noqa: E501
187
    """get certificate's root of trust chain by ID
188
189
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
190
191
    :param id: ID of a child certificate whose root is to be queried
192
    :type id: dict | bytes
193
194
    :rtype: PemResponse
195
    """
196
    if connexion.request.is_json:
197
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
198
    return 'do some magic!'
199
200
201
def get_certificate_trust_chain_by_id(id):  # noqa: E501
202
    """get certificate's trust chain by ID
203
204
    Get certificate trust chain in PEM format by ID # noqa: E501
205
206
    :param id: ID of a child certificate whose chain is to be queried
207
    :type id: dict | bytes
208
209
    :rtype: PemResponse
210
    """
211
    if connexion.request.is_json:
212
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
213
    return 'do some magic!'