Projekt

Obecné

Profil

Stáhnout (16.8 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 1d2add74 Jan Pašek
from sqlite3 import Connection, Error
3 9e22e20c David Friesecký
from typing import Dict, List
4 1636aefe David Friesecký
5 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
6 1d2add74 Jan Pašek
from injector import inject
7 f8b23532 David Friesecký
from src.constants import *
8 1d2add74 Jan Pašek
from src.model.certificate import Certificate
9 e9e55282 David Friesecký
10
11 f8b23532 David Friesecký
class CertificateRepository:
12 25053504 David Friesecký
13 1d2add74 Jan Pašek
    @inject
14
    def __init__(self, connection: Connection):
15 a0602bad David Friesecký
        """
16 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
17 a0602bad David Friesecký
18 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
19 a0602bad David Friesecký
        """
20 f8b23532 David Friesecký
        self.connection = connection
21 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
22 e9e55282 David Friesecký
23 805077f5 David Friesecký
    def create(self, certificate: Certificate):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Creates a certificate.
26
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
29 a0602bad David Friesecký
30 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
31 a0602bad David Friesecký
        """
32
33 f8b23532 David Friesecký
        try:
34
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
35
                   f"({COL_COMMON_NAME},"
36
                   f"{COL_VALID_FROM},"
37
                   f"{COL_VALID_TO},"
38
                   f"{COL_PEM_DATA},"
39
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47
                      certificate.private_key_id,
48
                      certificate.type_id,
49
                      certificate.parent_id]
50
            self.cursor.execute(sql, values)
51
            self.connection.commit()
52
53
            last_id: int = self.cursor.lastrowid
54
55 f3125948 Stanislav Král
            # TODO assure that this is correct
56
            if certificate.type_id == ROOT_CA_ID:
57 f8b23532 David Friesecký
                certificate.parent_id = last_id
58 093d06df Stanislav Král
                self.update(last_id, certificate)
59 f8b23532 David Friesecký
            else:
60 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
61 f8b23532 David Friesecký
                    if usage_value:
62
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
63
                               f"({COL_CERTIFICATE_ID},"
64
                               f"{COL_USAGE_TYPE_ID}) "
65
                               f"VALUES (?,?)")
66
                        values = [last_id, usage_id]
67
                        self.cursor.execute(sql, values)
68
                        self.connection.commit()
69
        except Error as e:
70 d65b022d David Friesecký
            raise DatabaseException(e)
71 f8b23532 David Friesecký
72 805077f5 David Friesecký
        return last_id
73 f8b23532 David Friesecký
74 805077f5 David Friesecký
    def read(self, certificate_id: int):
75 f8b23532 David Friesecký
        """
76
        Reads (selects) a certificate.
77 e9e55282 David Friesecký
78 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
79 e9e55282 David Friesecký
80 f8b23532 David Friesecký
        :return: instance of the Certificate object
81
        """
82 e9e55282 David Friesecký
83 f8b23532 David Friesecký
        try:
84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
85 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
86 f8b23532 David Friesecký
            values = [certificate_id]
87
            self.cursor.execute(sql, values)
88 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
89 e9e55282 David Friesecký
90 6f4a5f24 Captain_Trojan
            if certificate_row is None:
91
                return None
92
93 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
94
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
95
            self.cursor.execute(sql, values)
96
            usage_rows = self.cursor.fetchall()
97 1636aefe David Friesecký
98
            usage_dict: Dict[int, bool] = {}
99
            for usage_row in usage_rows:
100 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
101
102
            certificate: Certificate = Certificate(certificate_row[0],
103
                                                   certificate_row[1],
104
                                                   certificate_row[2],
105
                                                   certificate_row[3],
106
                                                   certificate_row[4],
107 58051326 David Friesecký
                                                   certificate_row[8],
108
                                                   certificate_row[9],
109 6425fa36 David Friesecký
                                                   certificate_row[10],
110 58051326 David Friesecký
                                                   usage_dict,
111
                                                   certificate_row[5],
112
                                                   certificate_row[6])
113 f8b23532 David Friesecký
        except Error as e:
114 d65b022d David Friesecký
            raise DatabaseException(e)
115 f8b23532 David Friesecký
116 d65b022d David Friesecký
        return certificate
117 f8b23532 David Friesecký
118 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
119 9e22e20c David Friesecký
        """
120
        Reads (selects) all certificates (with type).
121
122
        :param filter_type: ID of certificate type from CertificateTypes table
123
124
        :return: list of certificates
125
        """
126
127
        try:
128
            sql_extension = ""
129
            values = []
130
            if filter_type is not None:
131 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
132 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
133 9e22e20c David Friesecký
                values = [filter_type]
134
135 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
136
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
137 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
138
            certificate_rows = self.cursor.fetchall()
139
140
            certificates: List[Certificate] = []
141
            for certificate_row in certificate_rows:
142
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
143
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
144
                values = [certificate_row[0]]
145
                self.cursor.execute(sql, values)
146
                usage_rows = self.cursor.fetchall()
147
148
                usage_dict: Dict[int, bool] = {}
149
                for usage_row in usage_rows:
150
                    usage_dict[usage_row[2]] = True
151
152
                certificates.append(Certificate(certificate_row[0],
153
                                                certificate_row[1],
154
                                                certificate_row[2],
155
                                                certificate_row[3],
156
                                                certificate_row[4],
157 58051326 David Friesecký
                                                certificate_row[8],
158
                                                certificate_row[9],
159 6425fa36 David Friesecký
                                                certificate_row[10],
160 58051326 David Friesecký
                                                usage_dict,
161
                                                certificate_row[5],
162
                                                certificate_row[6]))
163 9e22e20c David Friesecký
        except Error as e:
164 d65b022d David Friesecký
            raise DatabaseException(e)
165 9e22e20c David Friesecký
166 0f3af523 Stanislav Král
        return certificates
167 9e22e20c David Friesecký
168 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
169 a0602bad David Friesecký
        """
170 f8b23532 David Friesecký
        Updates a certificate.
171
        If the parameter of certificate (Certificate object) is not to be changed,
172
        the same value must be specified.
173 a0602bad David Friesecký
174
        :param certificate_id: ID of specific certificate
175 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
176 a0602bad David Friesecký
177
        :return: the result of whether the updation was successful
178
        """
179
180 f8b23532 David Friesecký
        try:
181
            sql = (f"UPDATE {TAB_CERTIFICATES} "
182
                   f"SET {COL_COMMON_NAME} = ?, "
183
                   f"{COL_VALID_FROM} = ?, "
184
                   f"{COL_VALID_TO} = ?, "
185
                   f"{COL_PEM_DATA} = ?, "
186
                   f"{COL_PRIVATE_KEY_ID} = ?, "
187
                   f"{COL_TYPE_ID} = ?, "
188
                   f"{COL_PARENT_ID} = ? "
189
                   f"WHERE {COL_ID} = ?")
190
            values = [certificate.common_name,
191
                      certificate.valid_from,
192
                      certificate.valid_to,
193
                      certificate.pem_data,
194
                      certificate.private_key_id,
195
                      certificate.type_id,
196 805077f5 David Friesecký
                      certificate.parent_id,
197
                      certificate_id]
198 6425fa36 David Friesecký
199 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
200
            self.connection.commit()
201
202
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
203
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
204
            values = [certificate_id]
205
            self.cursor.execute(sql, values)
206
            self.connection.commit()
207
208 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
209
210 f3125948 Stanislav Král
            # iterate over usage pairs
211
            for usage_id, usage_value in certificate.usages.items():
212 f8b23532 David Friesecký
                if usage_value:
213
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
214
                           f"({COL_CERTIFICATE_ID},"
215
                           f"{COL_USAGE_TYPE_ID}) "
216
                           f"VALUES (?,?)")
217
                    values = [certificate_id, usage_id]
218
                    self.cursor.execute(sql, values)
219
                    self.connection.commit()
220
        except Error as e:
221 d65b022d David Friesecký
            raise DatabaseException(e)
222 f8b23532 David Friesecký
223 c36d3299 David Friesecký
        return check_updated
224 e9e55282 David Friesecký
225 58051326 David Friesecký
    def delete(self, certificate_id: int):
226 a0602bad David Friesecký
        """
227
        Deletes a certificate
228
229
        :param certificate_id: ID of specific certificate
230
231
        :return: the result of whether the deletion was successful
232
        """
233
234 f8b23532 David Friesecký
        try:
235 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
236
                   f"SET {COL_DELETION_DATE} = ? "
237
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
238
            values = [int(time.time()),
239
                      certificate_id]
240 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
241
            self.connection.commit()
242
        except Error as e:
243 d65b022d David Friesecký
            raise DatabaseException(e)
244 f8b23532 David Friesecký
245 45744020 Stanislav Král
        return self.cursor.rowcount > 0
246 58051326 David Friesecký
247
    def set_certificate_revoked(
248 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
249 58051326 David Friesecký
        """
250
        Revoke a certificate
251
252
        :param certificate_id: ID of specific certificate
253
        :param revocation_date: Date, when the certificate is revoked
254
        :param revocation_reason: Reason of the revocation
255
256 8b049f43 David Friesecký
        :return:
257
            the result of whether the revocation was successful OR
258
            sqlite3.Error if an exception is thrown
259 58051326 David Friesecký
        """
260
261
        try:
262 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
263
                revocation_reason = REV_REASON_UNSPECIFIED
264
            elif revocation_date == "":
265
                return False
266
267 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
268
                   f"SET {COL_REVOCATION_DATE} = ?, "
269 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
270 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
271
            values = [revocation_date,
272
                      revocation_reason,
273
                      certificate_id]
274
            self.cursor.execute(sql, values)
275
            self.connection.commit()
276
        except Error as e:
277 d65b022d David Friesecký
            raise DatabaseException(e)
278 8b049f43 David Friesecký
279 d65b022d David Friesecký
        return self.cursor.rowcount > 0
280 58051326 David Friesecký
281
    def clear_certificate_revocation(self, certificate_id: int):
282
        """
283
        Clear revocation of a certificate
284
285
        :param certificate_id: ID of specific certificate
286
287 8b049f43 David Friesecký
        :return:
288
            the result of whether the clear revocation was successful OR
289
            sqlite3.Error if an exception is thrown
290 58051326 David Friesecký
        """
291
292
        try:
293
            sql = (f"UPDATE {TAB_CERTIFICATES} "
294
                   f"SET {COL_REVOCATION_DATE} = '', "
295 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
296 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
297
            values = [certificate_id]
298
            self.cursor.execute(sql, values)
299
            self.connection.commit()
300
        except Error as e:
301 d65b022d David Friesecký
            raise DatabaseException(e)
302 f8b23532 David Friesecký
303 45744020 Stanislav Král
        return self.cursor.rowcount > 0
304 58051326 David Friesecký
305
    def get_all_revoked_by(self, certificate_id: int):
306
        """
307
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
308
309
        :param certificate_id: ID of specific certificate
310
311
        :return:
312 8b049f43 David Friesecký
            list of the certificates OR
313
            None if the list is empty OR
314
            sqlite3.Error if an exception is thrown
315 58051326 David Friesecký
        """
316
317
        try:
318
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
319
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
320
            values = [certificate_id]
321
            self.cursor.execute(sql, values)
322
            certificate_rows = self.cursor.fetchall()
323
324
            certificates: List[Certificate] = []
325
            for certificate_row in certificate_rows:
326
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
327
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
328
                values = [certificate_row[0]]
329
                self.cursor.execute(sql, values)
330
                usage_rows = self.cursor.fetchall()
331
332
                usage_dict: Dict[int, bool] = {}
333
                for usage_row in usage_rows:
334
                    usage_dict[usage_row[2]] = True
335
336
                certificates.append(Certificate(certificate_row[0],
337
                                                certificate_row[1],
338
                                                certificate_row[2],
339
                                                certificate_row[3],
340
                                                certificate_row[4],
341
                                                certificate_row[8],
342
                                                certificate_row[9],
343 6425fa36 David Friesecký
                                                certificate_row[10],
344 58051326 David Friesecký
                                                usage_dict,
345
                                                certificate_row[5],
346
                                                certificate_row[6]))
347
        except Error as e:
348 d65b022d David Friesecký
            raise DatabaseException(e)
349 58051326 David Friesecký
350 d65b022d David Friesecký
        return certificates
351 58051326 David Friesecký
352
    def get_all_issued_by(self, certificate_id: int):
353
        """
354
        Get list of the certificates that are direct descendants of the certificate with the ID
355
356
        :param certificate_id: ID of specific certificate
357
358
        :return:
359 8b049f43 David Friesecký
            list of the certificates OR
360
            None if the list is empty OR
361
            sqlite3.Error if an exception is thrown
362 58051326 David Friesecký
        """
363
364
        try:
365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
366 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
367
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
368 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
369 58051326 David Friesecký
            self.cursor.execute(sql, values)
370
            certificate_rows = self.cursor.fetchall()
371
372
            certificates: List[Certificate] = []
373
            for certificate_row in certificate_rows:
374
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
375
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
376
                values = [certificate_row[0]]
377
                self.cursor.execute(sql, values)
378
                usage_rows = self.cursor.fetchall()
379
380
                usage_dict: Dict[int, bool] = {}
381
                for usage_row in usage_rows:
382
                    usage_dict[usage_row[2]] = True
383
384
                certificates.append(Certificate(certificate_row[0],
385
                                                certificate_row[1],
386
                                                certificate_row[2],
387
                                                certificate_row[3],
388
                                                certificate_row[4],
389
                                                certificate_row[8],
390
                                                certificate_row[9],
391 6425fa36 David Friesecký
                                                certificate_row[10],
392 58051326 David Friesecký
                                                usage_dict,
393
                                                certificate_row[5],
394
                                                certificate_row[6]))
395
        except Error as e:
396 d65b022d David Friesecký
            raise DatabaseException(e)
397 58051326 David Friesecký
398 d65b022d David Friesecký
        return certificates
399 94f6d8b8 Jan Pašek
400 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
401
        """
402
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
403
        between C and its root certificate authority (i.e. is an ancestor of C).
404
        :param certificate_id: target certificate ID
405
        :return: list of all descendants
406
        """
407
        def dfs(children_of, this, collection: list):
408
            for child in children_of(this.certificate_id):
409
                dfs(children_of, child, collection)
410
            collection.append(this)
411
412
        subtree_root = self.read(certificate_id)
413
        if subtree_root is None:
414
            return None
415
416
        all_certs = []
417
        dfs(self.get_all_issued_by, subtree_root, all_certs)
418
        return all_certs
419
420 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
421
        """
422
        Get identifier of the next certificate that will be inserted into the database
423
        :return: identifier of the next certificate that will be added into the database
424
        """
425
        # get next IDs of all tables
426
        self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
427
        results = self.cursor.fetchall()
428
429
        # search for next ID in Certificates table and return it
430
        for result in results:
431
            if result[0] == TAB_CERTIFICATES:
432
                return result[1] + 1  # current last id + 1
433
        # if certificates table is not present in the query results, return 1
434
        return 1