Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 1e07432d

Přidáno uživatelem Michal Seják před asi 4 roky(ů)

Re #8476 - Implemented `list_certificates` with unit tests.

Zobrazit rozdíly:

src/constants.py
2 2

  
3 3
DATABASE_FILE = "db/database_sqlite.db"
4 4
DATABASE_FILE_LOCATION = FileAnchor("aswi2021jmsd", DATABASE_FILE)
5

  
5
DATETIME_FORMAT = "%m.%d.%Y %H:%M:%S"
6 6

  
7 7
# Types of certificates
8 8
ROOT_CA_ID = 1
swagger_server/controllers/certificates_controller.py
1
from datetime import datetime
2

  
1 3
import connexion
2 4
import six
3 5

  
......
8 10
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9 11
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
10 12
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION                                          # TODO DATABASE_FILE - not the Controller's
13
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
14
    DATETIME_FORMAT  # TODO DATABASE_FILE - not the Controller's
12 15
                                                                    #  responsibility. 4
13 16
from src.services.key_service import KeyService
14
from swagger_server.models import CertificateRequest
17
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
15 18

  
16 19
from swagger_server.models.certificate import Certificate  # noqa: E501
17 20
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
......
31 34
                                                                        #  the one corresponding to the connection.
32 35
                                                                        #  The cursor can always be generated from the
33 36
                                                                        #  connection instance.
37
GENERAL_ERROR = "An error occured during processing of the request."
38
CORRUPTED_DATABASE = "Internal server error (corrupted database)."
34 39

  
35 40
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
36 41
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
......
104 109
            if issuer_key is None:
105 110
                return ErrorResponse(
106 111
                    success=False,
107
                    data="Internal server error (corrupted database)."
112
                    data=CORRUPTED_DATABASE
108 113
                ), 400
109 114

  
110 115
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
......
130 135
                data="The certificate could not have been created."
131 136
            ), 400
132 137
    else:
133
        return 400
138
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
134 139

  
135 140

  
136 141
def get_certificate_by_id(id):  # noqa: E501
......
175 180
    """
176 181
    setup()                                                             # TODO remove after issue fixed
177 182

  
178
    if connexion.request.is_json:
179
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
180
        print(filtering)
181
        return 200
183
    key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'}
184

  
185
    if len(connexion.request.data) == 0:
186
        certs = CERTIFICATE_SERVICE.get_certificates()
187
        if certs is None:
188
            return ErrorResponse(success=False, data=GENERAL_ERROR), 500
189
        elif len(certs) == 0:
190
            return ErrorResponse(success=False, data="No certificates found."), 204
191
        else:
192
            ret = []
193
            for c in certs:
194
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
195
                if c_issuer is None:
196
                    return ErrorResponse(success=False, data=CORRUPTED_DATABASE)
197

  
198
                ret.append(
199
                    CertificateListItem(
200
                        id=c.certificate_id,
201
                        cn=c.common_name,
202
                        not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
203
                        not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
204
                        usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}),
205
                        issuer=IssuerListItem(
206
                            id=c_issuer.certificate_id,
207
                            cn=c_issuer.common_name
208
                        )
209
                    )
210
                )
211
            return CertificateListResponse(success=True, data=ret)
182 212
    else:
183
        return 400
213
        # TODO fix filtering issue (somehow)
214
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
184 215

  
185 216

  
186 217
def get_certificate_root_by_id(id):  # noqa: E501
tests/unit_tests/rest_api/certificates.py
123 123
    ret = server.post("/api/certificates", content_type="application/json", json={
124 124
        "subject": {
125 125
            "C": "CZ",
126
            "CN": "Root CA s.r.o.",
126
            "CN": "Another Root CA s.r.o.",
127 127
            "L": "Pilsen",
128
            "O": "Root CA s.r.o.",
128
            "O": "Another Root CA s.r.o.",
129 129
            "OU": "IT department",
130 130
            "ST": "Pilsen Region",
131 131
            "emailAddress": "root@ca.com"
......
149 149
    assert d["success"]
150 150

  
151 151
    ret = server.post("/api/certificates", content_type="application/json", json={
152
        "CA": 2,
152 153
        "subject": {
153
            "CA": 2,
154 154
            "C": "CZ",
155
            "CN": "Root CA s.r.o.",
155
            "CN": "Intermediate CA s.r.o.",
156 156
            "L": "Pilsen",
157
            "O": "Root CA s.r.o.",
157
            "O": "Intermediate CA s.r.o.",
158 158
            "OU": "IT department",
159 159
            "ST": "Pilsen Region",
160
            "emailAddress": "root@ca.com"
160
            "emailAddress": "inter@ca.com"
161 161
        },
162 162
        "usage": {
163 163
            "CA": True,
......
175 175
    assert "data" in d
176 176
    assert d["data"] == 3
177 177
    assert "success" in d
178
    assert d["success"]
179

  
180

  
181
def test_list_of_certificates(server):
182
    ret = server.get("/api/certificates")
183

  
184
    assert ret.status_code == 200
185

  
186
    usage_dict = {'CA': True, 'SSL': True, 'authentication': True, 'digitalSignature': True}
187
    s1 = "Root CA s.r.o."
188
    s2 = "Another Root CA s.r.o."
189
    s3 = "Intermediate CA s.r.o."
190

  
191
    assert "data" in ret.json
192
    assert "success" in ret.json
193
    assert ret.json["success"]
194

  
195
    d = ret.json["data"]
196

  
197
    # print(d)
198

  
199
    assert d[0]['CN'] == s1
200
    assert d[1]['CN'] == s2
201
    assert d[2]['CN'] == s3
202

  
203
    assert all(d[i]['id'] == i + 1 for i in range(3))
204
    assert all(d[i]['usage'] == usage_dict for i in range(3))
205

  
206
    assert d[0]['issuer']['CN'] == s1
207
    assert d[1]['issuer']['CN'] == s2
208
    assert d[2]['issuer']['CN'] == s2
209

  
210
    assert d[0]['issuer']['id'] == 1
211
    assert d[1]['issuer']['id'] == 2
212
    assert d[2]['issuer']['id'] == 2
213

  
214

  
215
def test_sign_by_non_ca(server):
216
    ret = server.post("/api/certificates", content_type="application/json", json={
217
        "CA": 2,
218
        "subject": {
219
            "C": "CZ",
220
            "CN": "Fake Intermediate CA s.r.o.",
221
            "L": "Pilsen",
222
            "O": "Fake Intermediate CA s.r.o.",
223
            "OU": "IT department",
224
            "ST": "Pilsen Region",
225
            "emailAddress": "inter@ca.com"
226
        },
227
        "usage": {
228
            "CA": False,
229
            "SSL": True,
230
            "authentication": True,
231
            "digitalSignature": True
232
        },
233
        "validityDays": 30
234
    })
235

  
236
    assert ret.status_code == 201
237

  
238
    d = ret.json
239

  
240
    assert "data" in d
241
    assert d["data"] == 4
242
    assert "success" in d
243
    assert d["success"]
244

  
245
    ret = server.post("/api/certificates", content_type="application/json", json={
246
        "CA": 2,
247
        "subject": {
248
            "C": "CZ",
249
            "CN": "End certificate signed by end certificate s.r.o.",
250
            "L": "Pilsen",
251
            "O": "This is probably bad s.r.o.",
252
            "OU": "IT department",
253
            "ST": "Pilsen Region",
254
            "emailAddress": "end@ca.com"
255
        },
256
        "usage": {
257
            "CA": False,
258
            "SSL": True,
259
            "authentication": True,
260
            "digitalSignature": True
261
        },
262
        "validityDays": 30
263
    })
264

  
265
    # TODO discussion -> assert ret.status_code == 400
266
    assert ret.status_code == 201
267

  
268
    d = ret.json
269

  
270
    assert "data" in d
271
    assert d["data"] == 5
272
    assert "success" in d
178 273
    assert d["success"]

Také k dispozici: Unified diff