Projekt

Obecné

Profil

Stáhnout (7.71 KB) Statistiky
| Větev: | Tag: | Revize:
1
import connexion
2
import six
3

    
4
from src.dao.private_key_repository import PrivateKeyRepository
5
from src.model.subject import Subject
6
from src.services.certificate_service import CertificateService
7
from src.dao.certificate_repository import CertificateRepository    # TODO not the Controller's responsibility. 1
8
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9
from sqlite3 import Connection, Cursor                              # TODO not the Controller's responsibility. 3
10
from src.constants import DATABASE_FILE, DICT_USAGES, CA_ID         # TODO DATABASE_FILE - not the Controller's
11
                                                                    #  responsibility. 4
12
from src.services.key_service import KeyService
13
from swagger_server.models import CertificateRequest
14

    
15
from swagger_server.models.certificate import Certificate  # noqa: E501
16
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
17
from swagger_server.models.certificate_response import CertificateResponse  # noqa: E501
18
from swagger_server.models.created_response import CreatedResponse  # noqa: E501
19
from swagger_server.models.error_response import ErrorResponse  # noqa: E501
20
from swagger_server.models.filtering import Filtering  # noqa: E501
21
from swagger_server.models.id_parameter import IdParameter  # noqa: E501
22
from swagger_server.models.pem_response import PemResponse  # noqa: E501
23
from swagger_server import util
24

    
25

    
26
_ = Connection("../" + DATABASE_FILE)                                   # TODO not the Controller's responsibility. 5
27
# cursor = _.cursor()                                                   # TODO responsibility of the
28
                                                                        #  CertificateRepository. It makes no sense to
29
                                                                        #  supply a different cursor than precisely
30
                                                                        #  the one corresponding to the connection.
31
                                                                        #  The cursor can always be generated from the
32
                                                                        #  connection instance.
33

    
34
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
35
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(_, _.cursor()))
36
                                                                        # TODO open for discussion. Expected:
37
                                                                        #  CS = CertificateService.get_instance()
38
                                                                        #  or something like that.
39

    
40
KEY_SERVICE = KeyService(__, PrivateKeyRepository(_, _.cursor()))       # TODO as above
41

    
42

    
43
def setup():
44
    """
45
    SQLite3 thread issue hack.
46
    :return:
47
    """
48
    _ = Connection("../" + DATABASE_FILE)
49
    CERTIFICATE_SERVICE.certificate_repository.connection = _
50
    CERTIFICATE_SERVICE.certificate_repository.cursor = _.cursor()
51
    KEY_SERVICE.private_key_repository.connection = _
52
    KEY_SERVICE.private_key_repository.cursor = _.cursor()
53

    
54

    
55
def create_certificate(body=None):  # noqa: E501
56
    """create new certificate
57

    
58
    Create a new certificate based on given information # noqa: E501
59

    
60
    :param body: Certificate data to be created
61
    :type body: dict | bytes
62

    
63
    :rtype: CreatedResponse
64
    """
65
    setup()
66

    
67
    if connexion.request.is_json:
68
        body = CertificateRequest.from_dict(connexion.request.get_json())  # noqa: E501
69
        if body.subject is None or body.usage is None or body.validity_days is None:
70
            return 400
71

    
72
        key = KEY_SERVICE.create_new_key()                              # TODO pass key
73
        subject = Subject(
74
            common_name=body.subject.cn,
75
            country=body.subject.c,
76
            locality=body.subject.l,
77
            state=body.subject.st,
78
            organization=body.subject.o,
79
            organization_unit=body.subject.ou,
80
            email_address=body.subject.email_address
81
        )
82
        usages_dict = DICT_USAGES.copy()
83

    
84
        if body.ca is None:
85
            cert = CERTIFICATE_SERVICE.create_root_ca(
86
                key,
87
                subject,
88
                usages=usages_dict,
89
                days=body.validity_days
90
            )
91
        else:
92
            issuer = CERTIFICATE_SERVICE.get_certificate(body.ca)
93

    
94
            if issuer is None:
95
                return ErrorResponse(
96
                    success=False,
97
                    data="No certificate authority with such unique ID exists."
98
                ), 400
99

    
100
            issuer_key = KEY_SERVICE.get_key(issuer.private_key_id)
101

    
102
            if issuer_key is None:
103
                return ErrorResponse(
104
                    success=False,
105
                    data="Internal server error (corrupted database)."
106
                ), 400
107

    
108
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
109
                CERTIFICATE_SERVICE.create_end_cert
110

    
111
            cert = f(
112
                key,
113
                subject,
114
                issuer,
115
                issuer_key,
116
                usages=usages_dict,
117
                days=body.validity_days
118
            )
119

    
120
        if cert is not None:
121
            return CertificateResponse(
122
                success=True,
123
                data=cert.certificate_id
124
            ), 201
125
        else:
126
            return ErrorResponse(
127
                success=False,
128
                data="The certificate could not have been created."
129
            ), 400
130
    else:
131
        return 400
132

    
133

    
134
def get_certificate_by_id(id):  # noqa: E501
135
    """get certificate by ID
136

    
137
    Get certificate in PEM format by ID # noqa: E501
138

    
139
    :param id: ID of a certificate to be queried
140
    :type id: dict | bytes
141

    
142
    :rtype: PemResponse
143
    """
144
    if connexion.request.is_json:
145
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
146
    return 'do some magic!'
147

    
148

    
149
def get_certificate_details_by_id(id):  # noqa: E501
150
    """get certificate's details by ID
151

    
152
    Get certificate details by ID # noqa: E501
153

    
154
    :param id: ID of a certificate whose details are to be queried
155
    :type id: dict | bytes
156

    
157
    :rtype: CertificateResponse
158
    """
159
    if connexion.request.is_json:
160
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
161
    return 'do some magic!'
162

    
163

    
164
def get_certificate_list(filtering=None):  # noqa: E501
165
    """get list of certificates
166

    
167
    Lists certificates based on provided filtering options # noqa: E501
168

    
169
    :param filtering: Filter certificate type to be queried
170
    :type filtering: dict | bytes
171

    
172
    :rtype: CertificateListResponse
173
    """
174
    if connexion.request.is_json:
175
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
176
    return 'do some magic! XD'
177

    
178

    
179
def get_certificate_root_by_id(id):  # noqa: E501
180
    """get certificate's root of trust chain by ID
181

    
182
    Get certificate's root of trust chain in PEM format by ID # noqa: E501
183

    
184
    :param id: ID of a child certificate whose root is to be queried
185
    :type id: dict | bytes
186

    
187
    :rtype: PemResponse
188
    """
189
    if connexion.request.is_json:
190
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
191
    return 'do some magic!'
192

    
193

    
194
def get_certificate_trust_chain_by_id(id):  # noqa: E501
195
    """get certificate's trust chain by ID
196

    
197
    Get certificate trust chain in PEM format by ID # noqa: E501
198

    
199
    :param id: ID of a child certificate whose chain is to be queried
200
    :type id: dict | bytes
201

    
202
    :rtype: PemResponse
203
    """
204
    if connexion.request.is_json:
205
        id = IdParameter.from_dict(connexion.request.get_json())  # noqa: E501
206
    return 'do some magic!'
(3-3/3)