Projekt

Obecné

Profil

Stáhnout (15 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import Dict, List
2
from sqlite3 import Connection, Cursor, Error
3

    
4
from src.model.certificate import Certificate
5
from src.constants import *
6

    
7

    
8
class CertificateRepository:
9

    
10
    def __init__(self, connection: Connection, cursor: Cursor):
11
        """
12
        Constructor of the CertificateRepository object
13

    
14
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16
        """
17

    
18
        self.connection = connection
19
        self.cursor = cursor
20

    
21
    def create(self, certificate: Certificate):
22
        """
23
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25

    
26
        :param certificate: Instance of the Certificate object
27

    
28
        :return: the result of whether the creation was successful
29
        """
30

    
31
        try:
32
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
33
                   f"({COL_COMMON_NAME},"
34
                   f"{COL_VALID_FROM},"
35
                   f"{COL_VALID_TO},"
36
                   f"{COL_PEM_DATA},"
37
                   f"{COL_REVOCATION_DATE},"
38
                   f"{COL_REVOCATION_REASON}"
39
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47
                      certificate.revocation_date,
48
                      certificate.revocation_reason,
49
                      certificate.private_key_id,
50
                      certificate.type_id,
51
                      certificate.parent_id]
52
            self.cursor.execute(sql, values)
53
            self.connection.commit()
54

    
55
            last_id: int = self.cursor.lastrowid
56

    
57
            # TODO assure that this is correct
58
            if certificate.type_id == ROOT_CA_ID:
59
                certificate.parent_id = last_id
60
                self.update(last_id, certificate)
61
            else:
62
                for usage_id, usage_value in certificate.usages.items():
63
                    if usage_value:
64
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
65
                               f"({COL_CERTIFICATE_ID},"
66
                               f"{COL_USAGE_TYPE_ID}) "
67
                               f"VALUES (?,?)")
68
                        values = [last_id, usage_id]
69
                        self.cursor.execute(sql, values)
70
                        self.connection.commit()
71
        except Error as e:
72
            return e
73

    
74
        return last_id
75

    
76
    def read(self, certificate_id: int):
77
        """
78
        Reads (selects) a certificate.
79

    
80
        :param certificate_id: ID of specific certificate
81

    
82
        :return: instance of the Certificate object
83
        """
84

    
85
        try:
86
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
87
                   f"WHERE {COL_ID} = ?")
88
            values = [certificate_id]
89
            self.cursor.execute(sql, values)
90
            certificate_row = self.cursor.fetchone()
91

    
92
            if certificate_row is None:
93
                return None
94

    
95
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
96
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
97
            self.cursor.execute(sql, values)
98
            usage_rows = self.cursor.fetchall()
99

    
100
            usage_dict: Dict[int, bool] = {}
101
            for usage_row in usage_rows:
102
                usage_dict[usage_row[2]] = True
103

    
104
            certificate: Certificate = Certificate(certificate_row[0],
105
                                                   certificate_row[1],
106
                                                   certificate_row[2],
107
                                                   certificate_row[3],
108
                                                   certificate_row[4],
109
                                                   certificate_row[7],
110
                                                   certificate_row[8],
111
                                                   certificate_row[9],
112
                                                   usage_dict,
113
                                                   certificate_row[5],
114
                                                   certificate_row[6])
115
        except Error as e:
116
            return e
117

    
118
        if len(certificate_row) > 0:
119
            return certificate
120
        else:
121
            return None
122

    
123
    def read_all(self, filter_type: int = None):
124
        """
125
        Reads (selects) all certificates (with type).
126

    
127
        :param filter_type: ID of certificate type from CertificateTypes table
128

    
129
        :return: list of certificates
130
        """
131

    
132
        try:
133
            sql_extension = ""
134
            values = []
135
            if filter_type is not None:
136
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
137
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
138
                values = [filter_type]
139

    
140
            sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}")
141
            self.cursor.execute(sql, values)
142
            certificate_rows = self.cursor.fetchall()
143

    
144
            certificates: List[Certificate] = []
145
            for certificate_row in certificate_rows:
146
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
147
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
148
                values = [certificate_row[0]]
149
                self.cursor.execute(sql, values)
150
                usage_rows = self.cursor.fetchall()
151

    
152
                usage_dict: Dict[int, bool] = {}
153
                for usage_row in usage_rows:
154
                    usage_dict[usage_row[2]] = True
155

    
156
                certificates.append(Certificate(certificate_row[0],
157
                                                certificate_row[1],
158
                                                certificate_row[2],
159
                                                certificate_row[3],
160
                                                certificate_row[4],
161
                                                certificate_row[7],
162
                                                certificate_row[8],
163
                                                certificate_row[9],
164
                                                usage_dict,
165
                                                certificate_row[5],
166
                                                certificate_row[6]))
167
        except Error as e:
168
            return e
169

    
170
        if len(certificates) > 0:
171
            return certificates
172
        else:
173
            return None
174

    
175
    def update(self, certificate_id: int, certificate: Certificate):
176
        """
177
        Updates a certificate.
178
        If the parameter of certificate (Certificate object) is not to be changed,
179
        the same value must be specified.
180

    
181
        :param certificate_id: ID of specific certificate
182
        :param certificate: Instance of the Certificate object
183

    
184
        :return: the result of whether the updation was successful
185
        """
186

    
187
        try:
188
            sql = (f"UPDATE {TAB_CERTIFICATES} "
189
                   f"SET {COL_COMMON_NAME} = ?, "
190
                   f"{COL_VALID_FROM} = ?, "
191
                   f"{COL_VALID_TO} = ?, "
192
                   f"{COL_PEM_DATA} = ?, "
193
                   f"{COL_REVOCATION_DATE} = ?, "
194
                   f"{COL_REVOCATION_REASON} = ?, "
195
                   f"{COL_PRIVATE_KEY_ID} = ?, "
196
                   f"{COL_TYPE_ID} = ?, "
197
                   f"{COL_PARENT_ID} = ? "
198
                   f"WHERE {COL_ID} = ?")
199
            values = [certificate.common_name,
200
                      certificate.valid_from,
201
                      certificate.valid_to,
202
                      certificate.pem_data,
203
                      certificate.revocation_date,
204
                      certificate.revocation_reason,
205
                      certificate.private_key_id,
206
                      certificate.type_id,
207
                      certificate.parent_id,
208
                      certificate_id]
209
            self.cursor.execute(sql, values)
210
            self.connection.commit()
211

    
212
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
213
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
214
            values = [certificate_id]
215
            self.cursor.execute(sql, values)
216
            self.connection.commit()
217

    
218
            # iterate over usage pairs
219
            for usage_id, usage_value in certificate.usages.items():
220
                if usage_value:
221
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
222
                           f"({COL_CERTIFICATE_ID},"
223
                           f"{COL_USAGE_TYPE_ID}) "
224
                           f"VALUES (?,?)")
225
                    values = [certificate_id, usage_id]
226
                    self.cursor.execute(sql, values)
227
                    self.connection.commit()
228
        except Error as e:
229
            return e
230

    
231
        return True
232

    
233
    def delete(self, certificate_id: int):
234
        """
235
        Deletes a certificate
236

    
237
        :param certificate_id: ID of specific certificate
238

    
239
        :return: the result of whether the deletion was successful
240
        """
241

    
242
        try:
243
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
244
                   f"WHERE {COL_ID} = ?")
245
            values = [certificate_id]
246
            self.cursor.execute(sql, values)
247
            self.connection.commit()
248
        except Error as e:
249
            return e
250

    
251
        return self.cursor.rowcount > 0
252

    
253
    def set_certificate_revoked(
254
            self, certificate_id: int, revocation_date: str, revocation_reason: str = "unspecified"):
255
        """
256
        Revoke a certificate
257

    
258
        :param certificate_id: ID of specific certificate
259
        :param revocation_date: Date, when the certificate is revoked
260
        :param revocation_reason: Reason of the revocation
261

    
262
        :return: the result of whether the revocation was successful
263
        """
264

    
265
        try:
266
            sql = (f"UPDATE {TAB_CERTIFICATES} "
267
                   f"SET {COL_REVOCATION_DATE} = ?, "
268
                   f"{COL_REVOCATION_REASON} = ?, "
269
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
270
            values = [revocation_date,
271
                      revocation_reason,
272
                      certificate_id]
273
            self.cursor.execute(sql, values)
274
            self.connection.commit()
275
        except Error as e:
276
            return e
277

    
278
        return True
279

    
280
    def clear_certificate_revocation(self, certificate_id: int):
281
        """
282
        Clear revocation of a certificate
283

    
284
        :param certificate_id: ID of specific certificate
285

    
286
        :return: the result of whether the clear revocation was successful
287
        """
288

    
289
        try:
290
            sql = (f"UPDATE {TAB_CERTIFICATES} "
291
                   f"SET {COL_REVOCATION_DATE} = '', "
292
                   f"{COL_REVOCATION_REASON} = '', "
293
                   f"WHERE {COL_ID} = ?")
294
            values = [certificate_id]
295
            self.cursor.execute(sql, values)
296
            self.connection.commit()
297
        except Error as e:
298
            return e
299

    
300
        return True
301

    
302
    def get_all_revoked_by(self, certificate_id: int):
303
        """
304
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
305

    
306
        :param certificate_id: ID of specific certificate
307

    
308
        :return:
309
            # list of the certificates
310
            # None if the list is empty
311
            # sqlite3.Error if an exception is thrown
312
        """
313

    
314
        try:
315
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
316
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
317
            values = [certificate_id]
318
            self.cursor.execute(sql, values)
319
            certificate_rows = self.cursor.fetchall()
320

    
321
            certificates: List[Certificate] = []
322
            for certificate_row in certificate_rows:
323
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
324
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
325
                values = [certificate_row[0]]
326
                self.cursor.execute(sql, values)
327
                usage_rows = self.cursor.fetchall()
328

    
329
                usage_dict: Dict[int, bool] = {}
330
                for usage_row in usage_rows:
331
                    usage_dict[usage_row[2]] = True
332

    
333
                certificates.append(Certificate(certificate_row[0],
334
                                                certificate_row[1],
335
                                                certificate_row[2],
336
                                                certificate_row[3],
337
                                                certificate_row[4],
338
                                                certificate_row[7],
339
                                                certificate_row[8],
340
                                                certificate_row[9],
341
                                                usage_dict,
342
                                                certificate_row[5],
343
                                                certificate_row[6]))
344
        except Error as e:
345
            return e
346

    
347
        if len(certificates) > 0:
348
            return certificates
349
        else:
350
            return None
351

    
352
    def get_all_issued_by(self, certificate_id: int):
353
        """
354
        Get list of the certificates that are direct descendants of the certificate with the ID
355

    
356
        :param certificate_id: ID of specific certificate
357

    
358
        :return:
359
            # list of the certificates
360
            # None if the list is empty
361
            # sqlite3.Error if an exception is thrown
362
        """
363

    
364
        try:
365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
366
                   f"WHERE {COL_PARENT_ID} = ?")
367
            values = [certificate_id]
368
            self.cursor.execute(sql, values)
369
            certificate_rows = self.cursor.fetchall()
370

    
371
            certificates: List[Certificate] = []
372
            for certificate_row in certificate_rows:
373
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
374
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
375
                values = [certificate_row[0]]
376
                self.cursor.execute(sql, values)
377
                usage_rows = self.cursor.fetchall()
378

    
379
                usage_dict: Dict[int, bool] = {}
380
                for usage_row in usage_rows:
381
                    usage_dict[usage_row[2]] = True
382

    
383
                certificates.append(Certificate(certificate_row[0],
384
                                                certificate_row[1],
385
                                                certificate_row[2],
386
                                                certificate_row[3],
387
                                                certificate_row[4],
388
                                                certificate_row[7],
389
                                                certificate_row[8],
390
                                                certificate_row[9],
391
                                                usage_dict,
392
                                                certificate_row[5],
393
                                                certificate_row[6]))
394
        except Error as e:
395
            return e
396

    
397
        if len(certificates) > 0:
398
            return certificates
399
        else:
400
            return None
(2-2/3)