Projekt

Obecné

Profil

Stáhnout (16.4 KB) Statistiky
| Větev: | Tag: | Revize:
1 1d2add74 Jan Pašek
from sqlite3 import Connection, Error
2 9e22e20c David Friesecký
from typing import Dict, List
3 1636aefe David Friesecký
4 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
5 1d2add74 Jan Pašek
from injector import inject
6 f8b23532 David Friesecký
from src.constants import *
7 1d2add74 Jan Pašek
from src.model.certificate import Certificate
8 5e31b492 David Friesecký
from src.utils.logger import Logger
9 e9e55282 David Friesecký
10
11 f8b23532 David Friesecký
class CertificateRepository:
12 25053504 David Friesecký
13 1d2add74 Jan Pašek
    @inject
14
    def __init__(self, connection: Connection):
15 a0602bad David Friesecký
        """
16 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
17 a0602bad David Friesecký
18 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
19 a0602bad David Friesecký
        """
20 f8b23532 David Friesecký
        self.connection = connection
21 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
22 e9e55282 David Friesecký
23 805077f5 David Friesecký
    def create(self, certificate: Certificate):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Creates a certificate.
26
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
29 a0602bad David Friesecký
30 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
31 a0602bad David Friesecký
        """
32
33 f8b23532 David Friesecký
        try:
34
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
35
                   f"({COL_COMMON_NAME},"
36
                   f"{COL_VALID_FROM},"
37
                   f"{COL_VALID_TO},"
38
                   f"{COL_PEM_DATA},"
39
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47
                      certificate.private_key_id,
48
                      certificate.type_id,
49
                      certificate.parent_id]
50
            self.cursor.execute(sql, values)
51
            self.connection.commit()
52
53
            last_id: int = self.cursor.lastrowid
54
55 f3125948 Stanislav Král
            # TODO assure that this is correct
56
            if certificate.type_id == ROOT_CA_ID:
57 f8b23532 David Friesecký
                certificate.parent_id = last_id
58 093d06df Stanislav Král
                self.update(last_id, certificate)
59 f8b23532 David Friesecký
            else:
60 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
61 f8b23532 David Friesecký
                    if usage_value:
62
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
63
                               f"({COL_CERTIFICATE_ID},"
64
                               f"{COL_USAGE_TYPE_ID}) "
65
                               f"VALUES (?,?)")
66
                        values = [last_id, usage_id]
67
                        self.cursor.execute(sql, values)
68
                        self.connection.commit()
69
        except Error as e:
70 d65b022d David Friesecký
            raise DatabaseException(e)
71 f8b23532 David Friesecký
72 805077f5 David Friesecký
        return last_id
73 f8b23532 David Friesecký
74 805077f5 David Friesecký
    def read(self, certificate_id: int):
75 f8b23532 David Friesecký
        """
76
        Reads (selects) a certificate.
77 e9e55282 David Friesecký
78 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
79 e9e55282 David Friesecký
80 f8b23532 David Friesecký
        :return: instance of the Certificate object
81
        """
82 e9e55282 David Friesecký
83 f8b23532 David Friesecký
        try:
84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
85
                   f"WHERE {COL_ID} = ?")
86
            values = [certificate_id]
87
            self.cursor.execute(sql, values)
88 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
89 e9e55282 David Friesecký
90 6f4a5f24 Captain_Trojan
            if certificate_row is None:
91
                return None
92
93 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
94
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
95
            self.cursor.execute(sql, values)
96
            usage_rows = self.cursor.fetchall()
97 1636aefe David Friesecký
98
            usage_dict: Dict[int, bool] = {}
99
            for usage_row in usage_rows:
100 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
101
102
            certificate: Certificate = Certificate(certificate_row[0],
103
                                                   certificate_row[1],
104
                                                   certificate_row[2],
105
                                                   certificate_row[3],
106
                                                   certificate_row[4],
107
                                                   certificate_row[7],
108 58051326 David Friesecký
                                                   certificate_row[8],
109
                                                   certificate_row[9],
110
                                                   usage_dict,
111
                                                   certificate_row[5],
112
                                                   certificate_row[6])
113 f8b23532 David Friesecký
        except Error as e:
114 d65b022d David Friesecký
            raise DatabaseException(e)
115 f8b23532 David Friesecký
116 d65b022d David Friesecký
        return certificate
117 f8b23532 David Friesecký
118 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
119 9e22e20c David Friesecký
        """
120
        Reads (selects) all certificates (with type).
121
122
        :param filter_type: ID of certificate type from CertificateTypes table
123
124
        :return: list of certificates
125
        """
126
127
        try:
128
            sql_extension = ""
129
            values = []
130
            if filter_type is not None:
131
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
132 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
133 9e22e20c David Friesecký
                values = [filter_type]
134
135 8b049f43 David Friesecký
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
136 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
137
            certificate_rows = self.cursor.fetchall()
138
139
            certificates: List[Certificate] = []
140
            for certificate_row in certificate_rows:
141
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
142
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
143
                values = [certificate_row[0]]
144
                self.cursor.execute(sql, values)
145
                usage_rows = self.cursor.fetchall()
146
147
                usage_dict: Dict[int, bool] = {}
148
                for usage_row in usage_rows:
149
                    usage_dict[usage_row[2]] = True
150
151
                certificates.append(Certificate(certificate_row[0],
152
                                                certificate_row[1],
153
                                                certificate_row[2],
154
                                                certificate_row[3],
155
                                                certificate_row[4],
156
                                                certificate_row[7],
157 58051326 David Friesecký
                                                certificate_row[8],
158
                                                certificate_row[9],
159
                                                usage_dict,
160
                                                certificate_row[5],
161
                                                certificate_row[6]))
162 9e22e20c David Friesecký
        except Error as e:
163 d65b022d David Friesecký
            raise DatabaseException(e)
164 9e22e20c David Friesecký
165 0f3af523 Stanislav Král
        return certificates
166 9e22e20c David Friesecký
167 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
168 a0602bad David Friesecký
        """
169 f8b23532 David Friesecký
        Updates a certificate.
170
        If the parameter of certificate (Certificate object) is not to be changed,
171
        the same value must be specified.
172 a0602bad David Friesecký
173
        :param certificate_id: ID of specific certificate
174 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
175 a0602bad David Friesecký
176
        :return: the result of whether the updation was successful
177
        """
178
179 f8b23532 David Friesecký
        try:
180
            sql = (f"UPDATE {TAB_CERTIFICATES} "
181
                   f"SET {COL_COMMON_NAME} = ?, "
182
                   f"{COL_VALID_FROM} = ?, "
183
                   f"{COL_VALID_TO} = ?, "
184
                   f"{COL_PEM_DATA} = ?, "
185
                   f"{COL_PRIVATE_KEY_ID} = ?, "
186
                   f"{COL_TYPE_ID} = ?, "
187
                   f"{COL_PARENT_ID} = ? "
188
                   f"WHERE {COL_ID} = ?")
189
            values = [certificate.common_name,
190
                      certificate.valid_from,
191
                      certificate.valid_to,
192
                      certificate.pem_data,
193
                      certificate.private_key_id,
194
                      certificate.type_id,
195 805077f5 David Friesecký
                      certificate.parent_id,
196
                      certificate_id]
197 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
198
            self.connection.commit()
199
200
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
201
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
202
            values = [certificate_id]
203
            self.cursor.execute(sql, values)
204
            self.connection.commit()
205
206 f3125948 Stanislav Král
            # iterate over usage pairs
207
            for usage_id, usage_value in certificate.usages.items():
208 f8b23532 David Friesecký
                if usage_value:
209
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
210
                           f"({COL_CERTIFICATE_ID},"
211
                           f"{COL_USAGE_TYPE_ID}) "
212
                           f"VALUES (?,?)")
213
                    values = [certificate_id, usage_id]
214
                    self.cursor.execute(sql, values)
215
                    self.connection.commit()
216
        except Error as e:
217 d65b022d David Friesecký
            raise DatabaseException(e)
218 f8b23532 David Friesecký
219 d65b022d David Friesecký
        return self.cursor.rowcount > 0
220 e9e55282 David Friesecký
221 58051326 David Friesecký
    def delete(self, certificate_id: int):
222 a0602bad David Friesecký
        """
223
        Deletes a certificate
224
225
        :param certificate_id: ID of specific certificate
226
227
        :return: the result of whether the deletion was successful
228
        """
229
230 f8b23532 David Friesecký
        try:
231
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
232
                   f"WHERE {COL_ID} = ?")
233
            values = [certificate_id]
234
            self.cursor.execute(sql, values)
235
            self.connection.commit()
236
        except Error as e:
237 d65b022d David Friesecký
            raise DatabaseException(e)
238 f8b23532 David Friesecký
239 45744020 Stanislav Král
        return self.cursor.rowcount > 0
240 58051326 David Friesecký
241
    def set_certificate_revoked(
242 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
243 58051326 David Friesecký
        """
244
        Revoke a certificate
245
246
        :param certificate_id: ID of specific certificate
247
        :param revocation_date: Date, when the certificate is revoked
248
        :param revocation_reason: Reason of the revocation
249
250 8b049f43 David Friesecký
        :return:
251
            the result of whether the revocation was successful OR
252
            sqlite3.Error if an exception is thrown
253 58051326 David Friesecký
        """
254
255
        try:
256 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
257
                revocation_reason = REV_REASON_UNSPECIFIED
258
            elif revocation_date == "":
259
                return False
260
261 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
262
                   f"SET {COL_REVOCATION_DATE} = ?, "
263 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
264 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
265
            values = [revocation_date,
266
                      revocation_reason,
267
                      certificate_id]
268
            self.cursor.execute(sql, values)
269
            self.connection.commit()
270
        except Error as e:
271 d65b022d David Friesecký
            raise DatabaseException(e)
272 8b049f43 David Friesecký
273 d65b022d David Friesecký
        return self.cursor.rowcount > 0
274 58051326 David Friesecký
275
    def clear_certificate_revocation(self, certificate_id: int):
276
        """
277
        Clear revocation of a certificate
278
279
        :param certificate_id: ID of specific certificate
280
281 8b049f43 David Friesecký
        :return:
282
            the result of whether the clear revocation was successful OR
283
            sqlite3.Error if an exception is thrown
284 58051326 David Friesecký
        """
285
286
        try:
287
            sql = (f"UPDATE {TAB_CERTIFICATES} "
288
                   f"SET {COL_REVOCATION_DATE} = '', "
289 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
290 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
291
            values = [certificate_id]
292
            self.cursor.execute(sql, values)
293
            self.connection.commit()
294
        except Error as e:
295 d65b022d David Friesecký
            raise DatabaseException(e)
296 f8b23532 David Friesecký
297 45744020 Stanislav Král
        return self.cursor.rowcount > 0
298 58051326 David Friesecký
299
    def get_all_revoked_by(self, certificate_id: int):
300
        """
301
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
302
303
        :param certificate_id: ID of specific certificate
304
305
        :return:
306 8b049f43 David Friesecký
            list of the certificates OR
307
            None if the list is empty OR
308
            sqlite3.Error if an exception is thrown
309 58051326 David Friesecký
        """
310
311
        try:
312
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
313
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
314
            values = [certificate_id]
315
            self.cursor.execute(sql, values)
316
            certificate_rows = self.cursor.fetchall()
317
318
            certificates: List[Certificate] = []
319
            for certificate_row in certificate_rows:
320
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
321
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
322
                values = [certificate_row[0]]
323
                self.cursor.execute(sql, values)
324
                usage_rows = self.cursor.fetchall()
325
326
                usage_dict: Dict[int, bool] = {}
327
                for usage_row in usage_rows:
328
                    usage_dict[usage_row[2]] = True
329
330
                certificates.append(Certificate(certificate_row[0],
331
                                                certificate_row[1],
332
                                                certificate_row[2],
333
                                                certificate_row[3],
334
                                                certificate_row[4],
335
                                                certificate_row[7],
336
                                                certificate_row[8],
337
                                                certificate_row[9],
338
                                                usage_dict,
339
                                                certificate_row[5],
340
                                                certificate_row[6]))
341
        except Error as e:
342 d65b022d David Friesecký
            raise DatabaseException(e)
343 58051326 David Friesecký
344 d65b022d David Friesecký
        return certificates
345 58051326 David Friesecký
346
    def get_all_issued_by(self, certificate_id: int):
347
        """
348
        Get list of the certificates that are direct descendants of the certificate with the ID
349
350
        :param certificate_id: ID of specific certificate
351
352
        :return:
353 8b049f43 David Friesecký
            list of the certificates OR
354
            None if the list is empty OR
355
            sqlite3.Error if an exception is thrown
356 58051326 David Friesecký
        """
357
358
        try:
359
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
360 8b049f43 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
361
            values = [certificate_id, certificate_id]
362 58051326 David Friesecký
            self.cursor.execute(sql, values)
363
            certificate_rows = self.cursor.fetchall()
364
365
            certificates: List[Certificate] = []
366
            for certificate_row in certificate_rows:
367
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
368
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
369
                values = [certificate_row[0]]
370
                self.cursor.execute(sql, values)
371
                usage_rows = self.cursor.fetchall()
372
373
                usage_dict: Dict[int, bool] = {}
374
                for usage_row in usage_rows:
375
                    usage_dict[usage_row[2]] = True
376
377
                certificates.append(Certificate(certificate_row[0],
378
                                                certificate_row[1],
379
                                                certificate_row[2],
380
                                                certificate_row[3],
381
                                                certificate_row[4],
382
                                                certificate_row[7],
383
                                                certificate_row[8],
384
                                                certificate_row[9],
385
                                                usage_dict,
386
                                                certificate_row[5],
387
                                                certificate_row[6]))
388
        except Error as e:
389 d65b022d David Friesecký
            raise DatabaseException(e)
390 58051326 David Friesecký
391 d65b022d David Friesecký
        return certificates
392 94f6d8b8 Jan Pašek
393 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
394
        """
395
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
396
        between C and its root certificate authority (i.e. is an ancestor of C).
397
        :param certificate_id: target certificate ID
398
        :return: list of all descendants
399
        """
400
        def dfs(children_of, this, collection: list):
401
            for child in children_of(this.certificate_id):
402
                dfs(children_of, child, collection)
403
            collection.append(this)
404
405
        subtree_root = self.read(certificate_id)
406
        if subtree_root is None:
407
            return None
408
409
        all_certs = []
410
        dfs(self.get_all_issued_by, subtree_root, all_certs)
411
        return all_certs
412
413 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
414
        """
415
        Get identifier of the next certificate that will be inserted into the database
416
        :return: identifier of the next certificate that will be added into the database
417
        """
418
        # get next IDs of all tables
419
        self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
420
        results = self.cursor.fetchall()
421
422
        # search for next ID in Certificates table and return it
423
        for result in results:
424
            if result[0] == TAB_CERTIFICATES:
425
                return result[1] + 1  # current last id + 1
426
        # if certificates table is not present in the query results, return 1
427
        return 1