Projekt

Obecné

Profil

Stáhnout (9.61 KB) Statistiky
| Větev: | Tag: | Revize:
1
from datetime import datetime
2

    
3
import connexion
4
import six
5

    
6
from src.dao.private_key_repository import PrivateKeyRepository
7
from src.model.subject import Subject
8
from src.services.certificate_service import CertificateService
9
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
10
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
11
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
12
from src.constants import DICT_USAGES, CA_ID, \
13
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
14
    DATETIME_FORMAT  # TODO DATABASE_FILE - not the Controller's
15
                                                                    #  responsibility. 4
16
from src.services.key_service import KeyService
17
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
18

    
19
from swagger_server.models.certificate import Certificate  # noqa: E501
20
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
21
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
22
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
23
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
24
from swagger_server.models.filtering import Filtering  # noqa: E501
25
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
26
from swagger_server.models.pem_response import PemResponse  # noqa: E501
27
from swagger_server import util
28

    
29

    
30
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
31
# cursor = _.cursor()                                                   # TODO responsibility of the
32
                                                                        #  CertificateRepository. It makes no sense to
33
                                                                        #  supply a different cursor than precisely
34
                                                                        #  the one corresponding to the connection.
35
                                                                        #  The cursor can always be generated from the
36
                                                                        #  connection instance.
37
GENERAL_ERROR = "An error occured during processing of the request."
38
CORRUPTED_DATABASE = "Internal server error (corrupted database)."
39

    
40
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
41
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
42
                                                                        # TODO open for discussion. Expected:
43
                                                                        #  CS = CertificateService.get_instance()
44
                                                                        #  or something like that.
45

    
46
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO as above
47

    
48

    
49

    
50
def setup():
51
    """
52
    SQLite3 thread issue hack.
53
    :return:
54
    """
55
    _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
56
    CERTIFICATE_SERVICE.certificate_repository.connection = _
57
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
58
    KEY_SERVICE.private_key_repository.connection = _
59
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
60

    
61

    
62
def create_certificate(body=None):  # noqa: E501
63
    """create new certificate
64

    
65
    Create a new certificate based on given information # noqa: E501
66

    
67
    :param body: Certificate data to be created
68
    :type body: dict | bytes
69

    
70
    :rtype: CreatedResponse
71
    """
72
    setup()                                                             # TODO remove after issue fixed
73

    
74
    if connexion.request.is_json:
75
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
76
        # if body.subject is None or body.usage is None or body.validity_days is None:
77
        #     return 400
78

    
79
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
80
        subject = Subject(
81
            common_name=body.subject.cn,
82
            country=body.subject.c,
83
            locality=body.subject.l,
84
            state=body.subject.st,
85
            organization=body.subject.o,
86
            organization_unit=body.subject.ou,
87
            email_address=body.subject.email_address
88
        )
89
        usages_dict = DICT_USAGES.copy()
90

    
91
        if body.ca is None:
92
            cert = CERTIFICATE_SERVICE.create_root_ca(
93
                key,
94
                subject,
95
                usages=usages_dict,
96
                days=body.validity_days
97
            )
98
        else:
99
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
100

    
101
            if issuer is None:
102
                return ErrorResponse(
103
                    success=False,
104
                    data="No certificate authority with such unique ID exists."
105
                ), 400
106

    
107
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
108

    
109
            if issuer_key is None:
110
                return ErrorResponse(
111
                    success=False,
112
                    data=CORRUPTED_DATABASE
113
                ), 400
114

    
115
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
116
                CERTIFICATE_SERVICE.create_end_cert
117

    
118
            cert = f(
119
                key,
120
                subject,
121
                issuer,
122
                issuer_key,
123
                usages=usages_dict,
124
                days=body.validity_days
125
            )
126

    
127
        if cert is not None:
128
            return CreatedResponse(
129
                success=True,
130
                data=cert.certificate_id
131
            ), 201
132
        else:
133
            return ErrorResponse(
134
                success=False,
135
                data="The certificate could not have been created."
136
            ), 400
137
    else:
138
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
139

    
140

    
141
def get_certificate_by_id(id):  # noqa: E501
142
    """get certificate by ID
143

    
144
    Get certificate in PEM format by ID # noqa: E501
145

    
146
    :param id: ID of a certificate to be queried
147
    :type id: dict | bytes
148

    
149
    :rtype: PemResponse
150
    """
151
    if connexion.request.is_json:
152
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
153
    return 'do some magic!'
154

    
155

    
156
def get_certificate_details_by_id(id):  # noqa: E501
157
    """get certificate's details by ID
158

    
159
    Get certificate details by ID # noqa: E501
160

    
161
    :param id: ID of a certificate whose details are to be queried
162
    :type id: dict | bytes
163

    
164
    :rtype: CertificateResponse
165
    """
166
    if connexion.request.is_json:
167
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
168
    return 'do some magic!'
169

    
170

    
171
def get_certificate_list(filtering=None):  # noqa: E501
172
    """get list of certificates
173

    
174
    Lists certificates based on provided filtering options # noqa: E501
175

    
176
    :param filtering: Filter certificate type to be queried
177
    :type filtering: dict | bytes
178

    
179
    :rtype: CertificateListResponse
180
    """
181
    setup()                                                             # TODO remove after issue fixed
182

    
183
    key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'}
184

    
185
    if len(connexion.request.data) == 0:
186
        certs = CERTIFICATE_SERVICE.get_certificates()
187
        if certs is None:
188
            return ErrorResponse(success=False, data=GENERAL_ERROR), 500
189
        elif len(certs) == 0:
190
            return ErrorResponse(success=False, data="No certificates found."), 204
191
        else:
192
            ret = []
193
            for c in certs:
194
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
195
                if c_issuer is None:
196
                    return ErrorResponse(success=False, data=CORRUPTED_DATABASE)
197

    
198
                ret.append(
199
                    CertificateListItem(
200
                        id=c.certificate_id,
201
                        cn=c.common_name,
202
                        not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
203
                        not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
204
                        usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}),
205
                        issuer=IssuerListItem(
206
                            id=c_issuer.certificate_id,
207
                            cn=c_issuer.common_name
208
                        )
209
                    )
210
                )
211
            return CertificateListResponse(success=True, data=ret)
212
    else:
213
        # TODO fix filtering issue (somehow)
214
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
215

    
216

    
217
def get_certificate_root_by_id(id):  # noqa: E501
218
    """get certificate's root of trust chain by ID
219

    
220
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
221

    
222
    :param id: ID of a child certificate whose root is to be queried
223
    :type id: dict | bytes
224

    
225
    :rtype: PemResponse
226
    """
227
    if connexion.request.is_json:
228
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
229
    return 'do some magic!'
230

    
231

    
232
def get_certificate_trust_chain_by_id(id):  # noqa: E501
233
    """get certificate's trust chain by ID
234

    
235
    Get certificate trust chain in PEM format by ID # noqa: E501
236

    
237
    :param id: ID of a child certificate whose chain is to be queried
238
    :type id: dict | bytes
239

    
240
    :rtype: PemResponse
241
    """
242
    if connexion.request.is_json:
243
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
244
    return 'do some magic!'
(3-3/3)