Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 1e07432d

Přidáno uživatelem Michal Seják před asi 4 roky(ů)

Re #8476 - Implemented `list_certificates` with unit tests.

Zobrazit rozdíly:

swagger_server/controllers/certificates_controller.py
1
from datetime import datetime
2

  
1 3
import connexion
2 4
import six
3 5

  
......
8 10
from src.services.cryptography import CryptographyService           # TODO not the Controller's responsibility. 2
9 11
from sqlite3 import Connection                                      # TODO not the Controller's responsibility. 3
10 12
from src.constants import DICT_USAGES, CA_ID, \
11
    DATABASE_FILE_LOCATION                                          # TODO DATABASE_FILE - not the Controller's
13
    DATABASE_FILE_LOCATION, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, \
14
    DATETIME_FORMAT  # TODO DATABASE_FILE - not the Controller's
12 15
                                                                    #  responsibility. 4
13 16
from src.services.key_service import KeyService
14
from swagger_server.models import CertificateRequest
17
from swagger_server.models import CertificateRequest, CertificateListItem, CAUsage, IssuerListItem
15 18

  
16 19
from swagger_server.models.certificate import Certificate  # noqa: E501
17 20
from swagger_server.models.certificate_list_response import CertificateListResponse  # noqa: E501
......
31 34
                                                                        #  the one corresponding to the connection.
32 35
                                                                        #  The cursor can always be generated from the
33 36
                                                                        #  connection instance.
37
GENERAL_ERROR = "An error occured during processing of the request."
38
CORRUPTED_DATABASE = "Internal server error (corrupted database)."
34 39

  
35 40
__ = CryptographyService()                                              # TODO not the Controller's responsibility. 6
36 41
CERTIFICATE_SERVICE = CertificateService(__, CertificateRepository(None, None))
......
104 109
            if issuer_key is None:
105 110
                return ErrorResponse(
106 111
                    success=False,
107
                    data="Internal server error (corrupted database)."
112
                    data=CORRUPTED_DATABASE
108 113
                ), 400
109 114

  
110 115
            f = CERTIFICATE_SERVICE.create_ca if CA_ID in usages_dict and usages_dict[CA_ID] else \
......
130 135
                data="The certificate could not have been created."
131 136
            ), 400
132 137
    else:
133
        return 400
138
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
134 139

  
135 140

  
136 141
def get_certificate_by_id(id):  # noqa: E501
......
175 180
    """
176 181
    setup()                                                             # TODO remove after issue fixed
177 182

  
178
    if connexion.request.is_json:
179
        filtering = Filtering.from_dict(connexion.request.get_json())  # noqa: E501
180
        print(filtering)
181
        return 200
183
    key_map = {CA_ID: 'ca', SSL_ID: 'ssl', SIGNATURE_ID: 'digital_signature', AUTHENTICATION_ID: 'authentication'}
184

  
185
    if len(connexion.request.data) == 0:
186
        certs = CERTIFICATE_SERVICE.get_certificates()
187
        if certs is None:
188
            return ErrorResponse(success=False, data=GENERAL_ERROR), 500
189
        elif len(certs) == 0:
190
            return ErrorResponse(success=False, data="No certificates found."), 204
191
        else:
192
            ret = []
193
            for c in certs:
194
                c_issuer = CERTIFICATE_SERVICE.get_certificate(c.parent_id)
195
                if c_issuer is None:
196
                    return ErrorResponse(success=False, data=CORRUPTED_DATABASE)
197

  
198
                ret.append(
199
                    CertificateListItem(
200
                        id=c.certificate_id,
201
                        cn=c.common_name,
202
                        not_before=datetime.strptime(c.valid_from, DATETIME_FORMAT).date(),
203
                        not_after=datetime.strptime(c.valid_to, DATETIME_FORMAT).date(),
204
                        usage=CAUsage(**{key_map[k]: v for k, v in c.usages.items()}),
205
                        issuer=IssuerListItem(
206
                            id=c_issuer.certificate_id,
207
                            cn=c_issuer.common_name
208
                        )
209
                    )
210
                )
211
            return CertificateListResponse(success=True, data=ret)
182 212
    else:
183
        return 400
213
        # TODO fix filtering issue (somehow)
214
        return ErrorResponse(success=False, data="The request must be JSON-formatted."), 400
184 215

  
185 216

  
186 217
def get_certificate_root_by_id(id):  # noqa: E501

Také k dispozici: Unified diff