Projekt

Obecné

Profil

Stáhnout (25.9 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 b3c80ccb David Friesecký
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
3 9e22e20c David Friesecký
from typing import Dict, List
4 1636aefe David Friesecký
5 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
6 1d2add74 Jan Pašek
from injector import inject
7 f8b23532 David Friesecký
from src.constants import *
8 1d2add74 Jan Pašek
from src.model.certificate import Certificate
9 5e31b492 David Friesecký
from src.utils.logger import Logger
10 e9e55282 David Friesecký
11 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17 e9e55282 David Friesecký
18
19 f8b23532 David Friesecký
class CertificateRepository:
20 25053504 David Friesecký
21 1d2add74 Jan Pašek
    @inject
22
    def __init__(self, connection: Connection):
23 a0602bad David Friesecký
        """
24 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
27 a0602bad David Friesecký
        """
28 f8b23532 David Friesecký
        self.connection = connection
29 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
30 e9e55282 David Friesecký
31 805077f5 David Friesecký
    def create(self, certificate: Certificate):
32 a0602bad David Friesecký
        """
33 f8b23532 David Friesecký
        Creates a certificate.
34
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
35 a0602bad David Friesecký
36 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
37 a0602bad David Friesecký
38 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
39 a0602bad David Friesecký
        """
40
41 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
42
43 f8b23532 David Friesecký
        try:
44
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
45
                   f"({COL_COMMON_NAME},"
46
                   f"{COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_PRIVATE_KEY_ID},"
50
                   f"{COL_TYPE_ID},"
51
                   f"{COL_PARENT_ID})"
52
                   f"VALUES(?,?,?,?,?,?,?)")
53
            values = [certificate.common_name,
54
                      certificate.valid_from,
55
                      certificate.valid_to,
56
                      certificate.pem_data,
57
                      certificate.private_key_id,
58
                      certificate.type_id,
59
                      certificate.parent_id]
60
            self.cursor.execute(sql, values)
61
            self.connection.commit()
62
63
            last_id: int = self.cursor.lastrowid
64
65 f3125948 Stanislav Král
            # TODO assure that this is correct
66
            if certificate.type_id == ROOT_CA_ID:
67 f8b23532 David Friesecký
                certificate.parent_id = last_id
68 093d06df Stanislav Král
                self.update(last_id, certificate)
69 f8b23532 David Friesecký
            else:
70 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
71 f8b23532 David Friesecký
                    if usage_value:
72
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
73
                               f"({COL_CERTIFICATE_ID},"
74
                               f"{COL_USAGE_TYPE_ID}) "
75
                               f"VALUES (?,?)")
76
                        values = [last_id, usage_id]
77
                        self.cursor.execute(sql, values)
78
                        self.connection.commit()
79 b3c80ccb David Friesecký
        except IntegrityError:
80
            Logger.error(INTEGRITY_ERROR_MSG)
81
            raise DatabaseException(INTEGRITY_ERROR_MSG)
82
        except ProgrammingError:
83
            Logger.error(PROGRAMMING_ERROR_MSG)
84
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
85
        except OperationalError:
86
            Logger.error(OPERATIONAL_ERROR_MSG)
87
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
88
        except NotSupportedError:
89
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
90
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
91
        except DatabaseError:
92
            Logger.error(DATABASE_ERROR_MSG)
93
            raise DatabaseException(DATABASE_ERROR_MSG)
94
        except Error:
95
            Logger.error(ERROR_MSG)
96
            raise DatabaseException(ERROR_MSG)
97 f8b23532 David Friesecký
98 805077f5 David Friesecký
        return last_id
99 f8b23532 David Friesecký
100 805077f5 David Friesecký
    def read(self, certificate_id: int):
101 f8b23532 David Friesecký
        """
102
        Reads (selects) a certificate.
103 e9e55282 David Friesecký
104 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
105 e9e55282 David Friesecký
106 f8b23532 David Friesecký
        :return: instance of the Certificate object
107
        """
108 e9e55282 David Friesecký
109 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
110
111 f8b23532 David Friesecký
        try:
112
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
113 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
114 f8b23532 David Friesecký
            values = [certificate_id]
115
            self.cursor.execute(sql, values)
116 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
117 e9e55282 David Friesecký
118 6f4a5f24 Captain_Trojan
            if certificate_row is None:
119
                return None
120
121 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
122
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
123
            self.cursor.execute(sql, values)
124
            usage_rows = self.cursor.fetchall()
125 1636aefe David Friesecký
126
            usage_dict: Dict[int, bool] = {}
127
            for usage_row in usage_rows:
128 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
129
130
            certificate: Certificate = Certificate(certificate_row[0],
131
                                                   certificate_row[1],
132
                                                   certificate_row[2],
133
                                                   certificate_row[3],
134
                                                   certificate_row[4],
135 58051326 David Friesecký
                                                   certificate_row[8],
136
                                                   certificate_row[9],
137 6425fa36 David Friesecký
                                                   certificate_row[10],
138 58051326 David Friesecký
                                                   usage_dict,
139
                                                   certificate_row[5],
140
                                                   certificate_row[6])
141 b3c80ccb David Friesecký
        except IntegrityError:
142
            Logger.error(INTEGRITY_ERROR_MSG)
143
            raise DatabaseException(INTEGRITY_ERROR_MSG)
144
        except ProgrammingError:
145
            Logger.error(PROGRAMMING_ERROR_MSG)
146
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
147
        except OperationalError:
148
            Logger.error(OPERATIONAL_ERROR_MSG)
149
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
150
        except NotSupportedError:
151
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
152
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
153
        except DatabaseError:
154
            Logger.error(DATABASE_ERROR_MSG)
155
            raise DatabaseException(DATABASE_ERROR_MSG)
156
        except Error:
157
            Logger.error(ERROR_MSG)
158
            raise DatabaseException(ERROR_MSG)
159 f8b23532 David Friesecký
160 d65b022d David Friesecký
        return certificate
161 f8b23532 David Friesecký
162 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
163 9e22e20c David Friesecký
        """
164
        Reads (selects) all certificates (with type).
165
166
        :param filter_type: ID of certificate type from CertificateTypes table
167
168
        :return: list of certificates
169
        """
170
171 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
172
173 9e22e20c David Friesecký
        try:
174
            sql_extension = ""
175
            values = []
176
            if filter_type is not None:
177 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
178 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
179 9e22e20c David Friesecký
                values = [filter_type]
180
181 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
182
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
183 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
184
            certificate_rows = self.cursor.fetchall()
185
186
            certificates: List[Certificate] = []
187
            for certificate_row in certificate_rows:
188
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
189
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
190
                values = [certificate_row[0]]
191
                self.cursor.execute(sql, values)
192
                usage_rows = self.cursor.fetchall()
193
194
                usage_dict: Dict[int, bool] = {}
195
                for usage_row in usage_rows:
196
                    usage_dict[usage_row[2]] = True
197
198
                certificates.append(Certificate(certificate_row[0],
199
                                                certificate_row[1],
200
                                                certificate_row[2],
201
                                                certificate_row[3],
202
                                                certificate_row[4],
203 58051326 David Friesecký
                                                certificate_row[8],
204
                                                certificate_row[9],
205 6425fa36 David Friesecký
                                                certificate_row[10],
206 58051326 David Friesecký
                                                usage_dict,
207
                                                certificate_row[5],
208
                                                certificate_row[6]))
209 b3c80ccb David Friesecký
        except IntegrityError:
210
            Logger.error(INTEGRITY_ERROR_MSG)
211
            raise DatabaseException(INTEGRITY_ERROR_MSG)
212
        except ProgrammingError:
213
            Logger.error(PROGRAMMING_ERROR_MSG)
214
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
215
        except OperationalError:
216
            Logger.error(OPERATIONAL_ERROR_MSG)
217
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
218
        except NotSupportedError:
219
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
220
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
221
        except DatabaseError:
222
            Logger.error(DATABASE_ERROR_MSG)
223
            raise DatabaseException(DATABASE_ERROR_MSG)
224
        except Error:
225
            Logger.error(ERROR_MSG)
226
            raise DatabaseException(ERROR_MSG)
227 9e22e20c David Friesecký
228 0f3af523 Stanislav Král
        return certificates
229 9e22e20c David Friesecký
230 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
231 a0602bad David Friesecký
        """
232 f8b23532 David Friesecký
        Updates a certificate.
233
        If the parameter of certificate (Certificate object) is not to be changed,
234
        the same value must be specified.
235 a0602bad David Friesecký
236
        :param certificate_id: ID of specific certificate
237 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
238 a0602bad David Friesecký
239
        :return: the result of whether the updation was successful
240
        """
241
242 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
243
244 f8b23532 David Friesecký
        try:
245
            sql = (f"UPDATE {TAB_CERTIFICATES} "
246
                   f"SET {COL_COMMON_NAME} = ?, "
247
                   f"{COL_VALID_FROM} = ?, "
248
                   f"{COL_VALID_TO} = ?, "
249
                   f"{COL_PEM_DATA} = ?, "
250
                   f"{COL_PRIVATE_KEY_ID} = ?, "
251
                   f"{COL_TYPE_ID} = ?, "
252
                   f"{COL_PARENT_ID} = ? "
253
                   f"WHERE {COL_ID} = ?")
254
            values = [certificate.common_name,
255
                      certificate.valid_from,
256
                      certificate.valid_to,
257
                      certificate.pem_data,
258
                      certificate.private_key_id,
259
                      certificate.type_id,
260 805077f5 David Friesecký
                      certificate.parent_id,
261
                      certificate_id]
262 6425fa36 David Friesecký
263 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
264
            self.connection.commit()
265
266
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
267
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
268
            values = [certificate_id]
269
            self.cursor.execute(sql, values)
270
            self.connection.commit()
271
272 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
273
274 f3125948 Stanislav Král
            # iterate over usage pairs
275
            for usage_id, usage_value in certificate.usages.items():
276 f8b23532 David Friesecký
                if usage_value:
277
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
278
                           f"({COL_CERTIFICATE_ID},"
279
                           f"{COL_USAGE_TYPE_ID}) "
280
                           f"VALUES (?,?)")
281
                    values = [certificate_id, usage_id]
282
                    self.cursor.execute(sql, values)
283
                    self.connection.commit()
284 b3c80ccb David Friesecký
        except IntegrityError:
285
            Logger.error(INTEGRITY_ERROR_MSG)
286
            raise DatabaseException(INTEGRITY_ERROR_MSG)
287
        except ProgrammingError:
288
            Logger.error(PROGRAMMING_ERROR_MSG)
289
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
290
        except OperationalError:
291
            Logger.error(OPERATIONAL_ERROR_MSG)
292
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
293
        except NotSupportedError:
294
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
295
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
296
        except DatabaseError:
297
            Logger.error(DATABASE_ERROR_MSG)
298
            raise DatabaseException(DATABASE_ERROR_MSG)
299
        except Error:
300
            Logger.error(ERROR_MSG)
301
            raise DatabaseException(ERROR_MSG)
302 f8b23532 David Friesecký
303 c36d3299 David Friesecký
        return check_updated
304 e9e55282 David Friesecký
305 58051326 David Friesecký
    def delete(self, certificate_id: int):
306 a0602bad David Friesecký
        """
307
        Deletes a certificate
308
309
        :param certificate_id: ID of specific certificate
310
311
        :return: the result of whether the deletion was successful
312
        """
313
314 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
315
316 f8b23532 David Friesecký
        try:
317 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
318
                   f"SET {COL_DELETION_DATE} = ? "
319
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
320
            values = [int(time.time()),
321
                      certificate_id]
322 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
323
            self.connection.commit()
324 b3c80ccb David Friesecký
        except IntegrityError:
325
            Logger.error(INTEGRITY_ERROR_MSG)
326
            raise DatabaseException(INTEGRITY_ERROR_MSG)
327
        except ProgrammingError:
328
            Logger.error(PROGRAMMING_ERROR_MSG)
329
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
330
        except OperationalError:
331
            Logger.error(OPERATIONAL_ERROR_MSG)
332
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
333
        except NotSupportedError:
334
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
335
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
336
        except DatabaseError:
337
            Logger.error(DATABASE_ERROR_MSG)
338
            raise DatabaseException(DATABASE_ERROR_MSG)
339
        except Error:
340
            Logger.error(ERROR_MSG)
341
            raise DatabaseException(ERROR_MSG)
342 f8b23532 David Friesecký
343 45744020 Stanislav Král
        return self.cursor.rowcount > 0
344 58051326 David Friesecký
345
    def set_certificate_revoked(
346 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
347 58051326 David Friesecký
        """
348
        Revoke a certificate
349
350
        :param certificate_id: ID of specific certificate
351
        :param revocation_date: Date, when the certificate is revoked
352
        :param revocation_reason: Reason of the revocation
353
354 8b049f43 David Friesecký
        :return:
355
            the result of whether the revocation was successful OR
356
            sqlite3.Error if an exception is thrown
357 58051326 David Friesecký
        """
358
359 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
360
361 58051326 David Friesecký
        try:
362 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
363
                revocation_reason = REV_REASON_UNSPECIFIED
364
            elif revocation_date == "":
365
                return False
366
367 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
368
                   f"SET {COL_REVOCATION_DATE} = ?, "
369 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
370 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
371
            values = [revocation_date,
372
                      revocation_reason,
373
                      certificate_id]
374
            self.cursor.execute(sql, values)
375
            self.connection.commit()
376 b3c80ccb David Friesecký
        except IntegrityError:
377
            Logger.error(INTEGRITY_ERROR_MSG)
378
            raise DatabaseException(INTEGRITY_ERROR_MSG)
379
        except ProgrammingError:
380
            Logger.error(PROGRAMMING_ERROR_MSG)
381
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
382
        except OperationalError:
383
            Logger.error(OPERATIONAL_ERROR_MSG)
384
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
385
        except NotSupportedError:
386
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
387
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
388
        except DatabaseError:
389
            Logger.error(DATABASE_ERROR_MSG)
390
            raise DatabaseException(DATABASE_ERROR_MSG)
391
        except Error:
392
            Logger.error(ERROR_MSG)
393
            raise DatabaseException(ERROR_MSG)
394 8b049f43 David Friesecký
395 d65b022d David Friesecký
        return self.cursor.rowcount > 0
396 58051326 David Friesecký
397
    def clear_certificate_revocation(self, certificate_id: int):
398
        """
399
        Clear revocation of a certificate
400
401
        :param certificate_id: ID of specific certificate
402
403 8b049f43 David Friesecký
        :return:
404
            the result of whether the clear revocation was successful OR
405
            sqlite3.Error if an exception is thrown
406 58051326 David Friesecký
        """
407
408 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
409
410 58051326 David Friesecký
        try:
411
            sql = (f"UPDATE {TAB_CERTIFICATES} "
412
                   f"SET {COL_REVOCATION_DATE} = '', "
413 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
414 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
415
            values = [certificate_id]
416
            self.cursor.execute(sql, values)
417
            self.connection.commit()
418 b3c80ccb David Friesecký
        except IntegrityError:
419
            Logger.error(INTEGRITY_ERROR_MSG)
420
            raise DatabaseException(INTEGRITY_ERROR_MSG)
421
        except ProgrammingError:
422
            Logger.error(PROGRAMMING_ERROR_MSG)
423
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
424
        except OperationalError:
425
            Logger.error(OPERATIONAL_ERROR_MSG)
426
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
427
        except NotSupportedError:
428
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
429
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
430
        except DatabaseError:
431
            Logger.error(DATABASE_ERROR_MSG)
432
            raise DatabaseException(DATABASE_ERROR_MSG)
433
        except Error:
434
            Logger.error(ERROR_MSG)
435
            raise DatabaseException(ERROR_MSG)
436 f8b23532 David Friesecký
437 45744020 Stanislav Král
        return self.cursor.rowcount > 0
438 58051326 David Friesecký
439
    def get_all_revoked_by(self, certificate_id: int):
440
        """
441
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
442
443
        :param certificate_id: ID of specific certificate
444
445
        :return:
446 8b049f43 David Friesecký
            list of the certificates OR
447
            None if the list is empty OR
448
            sqlite3.Error if an exception is thrown
449 58051326 David Friesecký
        """
450
451 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
452
453 58051326 David Friesecký
        try:
454
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
455
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
456
            values = [certificate_id]
457
            self.cursor.execute(sql, values)
458
            certificate_rows = self.cursor.fetchall()
459
460
            certificates: List[Certificate] = []
461
            for certificate_row in certificate_rows:
462
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
463
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
464
                values = [certificate_row[0]]
465
                self.cursor.execute(sql, values)
466
                usage_rows = self.cursor.fetchall()
467
468
                usage_dict: Dict[int, bool] = {}
469
                for usage_row in usage_rows:
470
                    usage_dict[usage_row[2]] = True
471
472
                certificates.append(Certificate(certificate_row[0],
473
                                                certificate_row[1],
474
                                                certificate_row[2],
475
                                                certificate_row[3],
476
                                                certificate_row[4],
477
                                                certificate_row[8],
478
                                                certificate_row[9],
479 6425fa36 David Friesecký
                                                certificate_row[10],
480 58051326 David Friesecký
                                                usage_dict,
481
                                                certificate_row[5],
482
                                                certificate_row[6]))
483 b3c80ccb David Friesecký
        except IntegrityError:
484
            Logger.error(INTEGRITY_ERROR_MSG)
485
            raise DatabaseException(INTEGRITY_ERROR_MSG)
486
        except ProgrammingError:
487
            Logger.error(PROGRAMMING_ERROR_MSG)
488
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
489
        except OperationalError:
490
            Logger.error(OPERATIONAL_ERROR_MSG)
491
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
492
        except NotSupportedError:
493
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
494
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
495
        except DatabaseError:
496
            Logger.error(DATABASE_ERROR_MSG)
497
            raise DatabaseException(DATABASE_ERROR_MSG)
498
        except Error:
499
            Logger.error(ERROR_MSG)
500
            raise DatabaseException(ERROR_MSG)
501 58051326 David Friesecký
502 d65b022d David Friesecký
        return certificates
503 58051326 David Friesecký
504
    def get_all_issued_by(self, certificate_id: int):
505
        """
506
        Get list of the certificates that are direct descendants of the certificate with the ID
507
508
        :param certificate_id: ID of specific certificate
509
510
        :return:
511 8b049f43 David Friesecký
            list of the certificates OR
512
            None if the list is empty OR
513
            sqlite3.Error if an exception is thrown
514 58051326 David Friesecký
        """
515
516 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
517
518 58051326 David Friesecký
        try:
519
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
520 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
521
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
522 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
523 58051326 David Friesecký
            self.cursor.execute(sql, values)
524
            certificate_rows = self.cursor.fetchall()
525
526
            certificates: List[Certificate] = []
527
            for certificate_row in certificate_rows:
528
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
529
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
530
                values = [certificate_row[0]]
531
                self.cursor.execute(sql, values)
532
                usage_rows = self.cursor.fetchall()
533
534
                usage_dict: Dict[int, bool] = {}
535
                for usage_row in usage_rows:
536
                    usage_dict[usage_row[2]] = True
537
538
                certificates.append(Certificate(certificate_row[0],
539
                                                certificate_row[1],
540
                                                certificate_row[2],
541
                                                certificate_row[3],
542
                                                certificate_row[4],
543
                                                certificate_row[8],
544
                                                certificate_row[9],
545 6425fa36 David Friesecký
                                                certificate_row[10],
546 58051326 David Friesecký
                                                usage_dict,
547
                                                certificate_row[5],
548
                                                certificate_row[6]))
549 b3c80ccb David Friesecký
        except IntegrityError:
550
            Logger.error(INTEGRITY_ERROR_MSG)
551
            raise DatabaseException(INTEGRITY_ERROR_MSG)
552
        except ProgrammingError:
553
            Logger.error(PROGRAMMING_ERROR_MSG)
554
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
555
        except OperationalError:
556
            Logger.error(OPERATIONAL_ERROR_MSG)
557
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
558
        except NotSupportedError:
559
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
560
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
561
        except DatabaseError:
562
            Logger.error(DATABASE_ERROR_MSG)
563
            raise DatabaseException(DATABASE_ERROR_MSG)
564
        except Error:
565
            Logger.error(ERROR_MSG)
566
            raise DatabaseException(ERROR_MSG)
567 58051326 David Friesecký
568 d65b022d David Friesecký
        return certificates
569 94f6d8b8 Jan Pašek
570 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
571
        """
572
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
573
        between C and its root certificate authority (i.e. is an ancestor of C).
574
        :param certificate_id: target certificate ID
575
        :return: list of all descendants
576
        """
577
578 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
579
580
        try:
581
            def dfs(children_of, this, collection: list):
582
                for child in children_of(this.certificate_id):
583
                    dfs(children_of, child, collection)
584
                collection.append(this)
585
586
            subtree_root = self.read(certificate_id)
587
            if subtree_root is None:
588
                return None
589
590
            all_certs = []
591
            dfs(self.get_all_issued_by, subtree_root, all_certs)
592
        except IntegrityError:
593
            Logger.error(INTEGRITY_ERROR_MSG)
594
            raise DatabaseException(INTEGRITY_ERROR_MSG)
595
        except ProgrammingError:
596
            Logger.error(PROGRAMMING_ERROR_MSG)
597
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
598
        except OperationalError:
599
            Logger.error(OPERATIONAL_ERROR_MSG)
600
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
601
        except NotSupportedError:
602
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
603
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
604
        except DatabaseError:
605
            Logger.error(DATABASE_ERROR_MSG)
606
            raise DatabaseException(DATABASE_ERROR_MSG)
607
        except Error:
608
            Logger.error(ERROR_MSG)
609
            raise DatabaseException(ERROR_MSG)
610 85003184 Captain_Trojan
611
        return all_certs
612
613 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
614
        """
615
        Get identifier of the next certificate that will be inserted into the database
616
        :return: identifier of the next certificate that will be added into the database
617
        """
618 b3c80ccb David Friesecký
619
        Logger.debug("Function launched.")
620
621
        try:
622
            # get next IDs of all tables
623
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
624
            results = self.cursor.fetchall()
625
626
            # search for next ID in Certificates table and return it
627
            for result in results:
628
                if result[0] == TAB_CERTIFICATES:
629
                    return result[1] + 1  # current last id + 1
630
            # if certificates table is not present in the query results, return 1
631
        except IntegrityError:
632
            Logger.error(INTEGRITY_ERROR_MSG)
633
            raise DatabaseException(INTEGRITY_ERROR_MSG)
634
        except ProgrammingError:
635
            Logger.error(PROGRAMMING_ERROR_MSG)
636
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
637
        except OperationalError:
638
            Logger.error(OPERATIONAL_ERROR_MSG)
639
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
640
        except NotSupportedError:
641
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
642
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
643
        except DatabaseError:
644
            Logger.error(DATABASE_ERROR_MSG)
645
            raise DatabaseException(DATABASE_ERROR_MSG)
646
        except Error:
647
            Logger.error(ERROR_MSG)
648
            raise DatabaseException(ERROR_MSG)
649
650 94f6d8b8 Jan Pašek
        return 1