Projekt

Obecné

Profil

Stáhnout (7.98 KB) Statistiky
| Větev: | Tag: | Revize:
1
import connexion
2
import six
3

    
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
10
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION                                          # TODO DATABASE_FILE - not the Controller's
12
                                                                    #  responsibility. 4
13
from src.services.key_service import KeyService
14
from swagger_server.models import CertificateRequest
15

    
16
from swagger_server.models.certificate import Certificate  # noqa: E501
17
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
18
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
19
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
20
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
21
from swagger_server.models.filtering import Filtering  # noqa: E501
22
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
23
from swagger_server.models.pem_response import PemResponse  # noqa: E501
24
from swagger_server import util
25

    
26

    
27
# _ = Connection("../" + DATABASE_FILE)                                 # TODO not the Controller's responsibility. 5
28
# cursor = _.cursor()                                                   # TODO responsibility of the
29
                                                                        #  CertificateRepository. It makes no sense to
30
                                                                        #  supply a different cursor than precisely
31
                                                                        #  the one corresponding to the connection.
32
                                                                        #  The cursor can always be generated from the
33
                                                                        #  connection instance.
34

    
35
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
36
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
37
                                                                        # TODO open for discussion. Expected:
38
                                                                        #  CS = CertificateService.get_instance()
39
                                                                        #  or something like that.
40

    
41
KEY_SERVICE = KeyService(__, PrivateKeyRepository(None, None))          # TODO as above
42

    
43

    
44

    
45
def setup():
46
    """
47
    SQLite3 thread issue hack.
48
    :return:
49
    """
50
    _ = Connection(DATABASE_FILE_LOCATION.shortest_relative_path())
51
    CERTIFICATE_SERVICE.certificate_repository.connection = _
52
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
53
    KEY_SERVICE.private_key_repository.connection = _
54
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
55

    
56

    
57
def create_certificate(body=None):  # noqa: E501
58
    """create new certificate
59

    
60
    Create a new certificate based on given information # noqa: E501
61

    
62
    :param body: Certificate data to be created
63
    :type body: dict | bytes
64

    
65
    :rtype: CreatedResponse
66
    """
67
    setup()                                                             # TODO remove after issue fixed
68

    
69
    if connexion.request.is_json:
70
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
71
        if body.subject is None or body.usage is None or body.validity_days is None:
72
            return 400
73

    
74
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
75
        subject = Subject(
76
            common_name=body.subject.cn,
77
            country=body.subject.c,
78
            locality=body.subject.l,
79
            state=body.subject.st,
80
            organization=body.subject.o,
81
            organization_unit=body.subject.ou,
82
            email_address=body.subject.email_address
83
        )
84
        usages_dict = DICT_USAGES.copy()
85

    
86
        if body.ca is None:
87
            cert = CERTIFICATE_SERVICE.create_root_ca(
88
                key,
89
                subject,
90
                usages=usages_dict,
91
                days=body.validity_days
92
            )
93
        else:
94
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
95

    
96
            if issuer is None:
97
                return ErrorResponse(
98
                    success=False,
99
                    data="No certificate authority with such unique ID exists."
100
                ), 400
101

    
102
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
103

    
104
            if issuer_key is None:
105
                return ErrorResponse(
106
                    success=False,
107
                    data="Internal server error (corrupted database)."
108
                ), 400
109

    
110
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
111
                CERTIFICATE_SERVICE.create_end_cert
112

    
113
            cert = f(
114
                key,
115
                subject,
116
                issuer,
117
                issuer_key,
118
                usages=usages_dict,
119
                days=body.validity_days
120
            )
121

    
122
        if cert is not None:
123
            return CreatedResponse(
124
                success=True,
125
                data=cert.certificate_id
126
            ), 201
127
        else:
128
            return ErrorResponse(
129
                success=False,
130
                data="The certificate could not have been created."
131
            ), 400
132
    else:
133
        return 400
134

    
135

    
136
def get_certificate_by_id(id):  # noqa: E501
137
    """get certificate by ID
138

    
139
    Get certificate in PEM format by ID # noqa: E501
140

    
141
    :param id: ID of a certificate to be queried
142
    :type id: dict | bytes
143

    
144
    :rtype: PemResponse
145
    """
146
    if connexion.request.is_json:
147
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
148
    return 'do some magic!'
149

    
150

    
151
def get_certificate_details_by_id(id):  # noqa: E501
152
    """get certificate's details by ID
153

    
154
    Get certificate details by ID # noqa: E501
155

    
156
    :param id: ID of a certificate whose details are to be queried
157
    :type id: dict | bytes
158

    
159
    :rtype: CertificateResponse
160
    """
161
    if connexion.request.is_json:
162
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
163
    return 'do some magic!'
164

    
165

    
166
def get_certificate_list(filtering=None):  # noqa: E501
167
    """get list of certificates
168

    
169
    Lists certificates based on provided filtering options # noqa: E501
170

    
171
    :param filtering: Filter certificate type to be queried
172
    :type filtering: dict | bytes
173

    
174
    :rtype: CertificateListResponse
175
    """
176
    setup()                                                             # TODO remove after issue fixed
177

    
178
    if connexion.request.is_json:
179
        return 400
180
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
181
    else:
182
        return 400
183

    
184

    
185
def get_certificate_root_by_id(id):  # noqa: E501
186
    """get certificate's root of trust chain by ID
187

    
188
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
189

    
190
    :param id: ID of a child certificate whose root is to be queried
191
    :type id: dict | bytes
192

    
193
    :rtype: PemResponse
194
    """
195
    if connexion.request.is_json:
196
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
197
    return 'do some magic!'
198

    
199

    
200
def get_certificate_trust_chain_by_id(id):  # noqa: E501
201
    """get certificate's trust chain by ID
202

    
203
    Get certificate trust chain in PEM format by ID # noqa: E501
204

    
205
    :param id: ID of a child certificate whose chain is to be queried
206
    :type id: dict | bytes
207

    
208
    :rtype: PemResponse
209
    """
210
    if connexion.request.is_json:
211
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
212
    return 'do some magic!'
(3-3/3)