Projekt

Obecné

Profil

Stáhnout (25.4 KB) Statistiky
| Větev: | Tag: | Revize:
1 b3c80ccb David Friesecký
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2 9e22e20c David Friesecký
from typing import Dict, List
3 1636aefe David Friesecký
4 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
5 1d2add74 Jan Pašek
from injector import inject
6 f8b23532 David Friesecký
from src.constants import *
7 1d2add74 Jan Pašek
from src.model.certificate import Certificate
8 5e31b492 David Friesecký
from src.utils.logger import Logger
9 e9e55282 David Friesecký
10 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
11
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
12
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
13
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
14
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
15
ERROR_MSG = "Unknown exception."
16
17 e9e55282 David Friesecký
18 f8b23532 David Friesecký
class CertificateRepository:
19 25053504 David Friesecký
20 1d2add74 Jan Pašek
    @inject
21
    def __init__(self, connection: Connection):
22 a0602bad David Friesecký
        """
23 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
24 a0602bad David Friesecký
25 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
26 a0602bad David Friesecký
        """
27 f8b23532 David Friesecký
        self.connection = connection
28 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
29 e9e55282 David Friesecký
30 805077f5 David Friesecký
    def create(self, certificate: Certificate):
31 a0602bad David Friesecký
        """
32 f8b23532 David Friesecký
        Creates a certificate.
33
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
34 a0602bad David Friesecký
35 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
36 a0602bad David Friesecký
37 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
38 a0602bad David Friesecký
        """
39
40 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
41
42 f8b23532 David Friesecký
        try:
43
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
44
                   f"({COL_COMMON_NAME},"
45
                   f"{COL_VALID_FROM},"
46
                   f"{COL_VALID_TO},"
47
                   f"{COL_PEM_DATA},"
48
                   f"{COL_PRIVATE_KEY_ID},"
49
                   f"{COL_TYPE_ID},"
50
                   f"{COL_PARENT_ID})"
51
                   f"VALUES(?,?,?,?,?,?,?)")
52
            values = [certificate.common_name,
53
                      certificate.valid_from,
54
                      certificate.valid_to,
55
                      certificate.pem_data,
56
                      certificate.private_key_id,
57
                      certificate.type_id,
58
                      certificate.parent_id]
59
            self.cursor.execute(sql, values)
60
            self.connection.commit()
61
62
            last_id: int = self.cursor.lastrowid
63
64 f3125948 Stanislav Král
            # TODO assure that this is correct
65
            if certificate.type_id == ROOT_CA_ID:
66 f8b23532 David Friesecký
                certificate.parent_id = last_id
67 093d06df Stanislav Král
                self.update(last_id, certificate)
68 f8b23532 David Friesecký
            else:
69 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
70 f8b23532 David Friesecký
                    if usage_value:
71
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
72
                               f"({COL_CERTIFICATE_ID},"
73
                               f"{COL_USAGE_TYPE_ID}) "
74
                               f"VALUES (?,?)")
75
                        values = [last_id, usage_id]
76
                        self.cursor.execute(sql, values)
77
                        self.connection.commit()
78 b3c80ccb David Friesecký
        except IntegrityError:
79
            Logger.error(INTEGRITY_ERROR_MSG)
80
            raise DatabaseException(INTEGRITY_ERROR_MSG)
81
        except ProgrammingError:
82
            Logger.error(PROGRAMMING_ERROR_MSG)
83
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
84
        except OperationalError:
85
            Logger.error(OPERATIONAL_ERROR_MSG)
86
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
87
        except NotSupportedError:
88
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
89
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
90
        except DatabaseError:
91
            Logger.error(DATABASE_ERROR_MSG)
92
            raise DatabaseException(DATABASE_ERROR_MSG)
93
        except Error:
94
            Logger.error(ERROR_MSG)
95
            raise DatabaseException(ERROR_MSG)
96 f8b23532 David Friesecký
97 805077f5 David Friesecký
        return last_id
98 f8b23532 David Friesecký
99 805077f5 David Friesecký
    def read(self, certificate_id: int):
100 f8b23532 David Friesecký
        """
101
        Reads (selects) a certificate.
102 e9e55282 David Friesecký
103 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
104 e9e55282 David Friesecký
105 f8b23532 David Friesecký
        :return: instance of the Certificate object
106
        """
107 e9e55282 David Friesecký
108 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
109
110 f8b23532 David Friesecký
        try:
111
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
112
                   f"WHERE {COL_ID} = ?")
113
            values = [certificate_id]
114
            self.cursor.execute(sql, values)
115 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
116 e9e55282 David Friesecký
117 6f4a5f24 Captain_Trojan
            if certificate_row is None:
118
                return None
119
120 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
121
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
122
            self.cursor.execute(sql, values)
123
            usage_rows = self.cursor.fetchall()
124 1636aefe David Friesecký
125
            usage_dict: Dict[int, bool] = {}
126
            for usage_row in usage_rows:
127 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
128
129
            certificate: Certificate = Certificate(certificate_row[0],
130
                                                   certificate_row[1],
131
                                                   certificate_row[2],
132
                                                   certificate_row[3],
133
                                                   certificate_row[4],
134
                                                   certificate_row[7],
135 58051326 David Friesecký
                                                   certificate_row[8],
136
                                                   certificate_row[9],
137
                                                   usage_dict,
138
                                                   certificate_row[5],
139
                                                   certificate_row[6])
140 b3c80ccb David Friesecký
        except IntegrityError:
141
            Logger.error(INTEGRITY_ERROR_MSG)
142
            raise DatabaseException(INTEGRITY_ERROR_MSG)
143
        except ProgrammingError:
144
            Logger.error(PROGRAMMING_ERROR_MSG)
145
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
146
        except OperationalError:
147
            Logger.error(OPERATIONAL_ERROR_MSG)
148
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
149
        except NotSupportedError:
150
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
151
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
152
        except DatabaseError:
153
            Logger.error(DATABASE_ERROR_MSG)
154
            raise DatabaseException(DATABASE_ERROR_MSG)
155
        except Error:
156
            Logger.error(ERROR_MSG)
157
            raise DatabaseException(ERROR_MSG)
158 f8b23532 David Friesecký
159 d65b022d David Friesecký
        return certificate
160 f8b23532 David Friesecký
161 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
162 9e22e20c David Friesecký
        """
163
        Reads (selects) all certificates (with type).
164
165
        :param filter_type: ID of certificate type from CertificateTypes table
166
167
        :return: list of certificates
168
        """
169
170 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
171
172 9e22e20c David Friesecký
        try:
173
            sql_extension = ""
174
            values = []
175
            if filter_type is not None:
176
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
177 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
178 9e22e20c David Friesecký
                values = [filter_type]
179
180 8b049f43 David Friesecký
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
181 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
182
            certificate_rows = self.cursor.fetchall()
183
184
            certificates: List[Certificate] = []
185
            for certificate_row in certificate_rows:
186
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
187
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
188
                values = [certificate_row[0]]
189
                self.cursor.execute(sql, values)
190
                usage_rows = self.cursor.fetchall()
191
192
                usage_dict: Dict[int, bool] = {}
193
                for usage_row in usage_rows:
194
                    usage_dict[usage_row[2]] = True
195
196
                certificates.append(Certificate(certificate_row[0],
197
                                                certificate_row[1],
198
                                                certificate_row[2],
199
                                                certificate_row[3],
200
                                                certificate_row[4],
201
                                                certificate_row[7],
202 58051326 David Friesecký
                                                certificate_row[8],
203
                                                certificate_row[9],
204
                                                usage_dict,
205
                                                certificate_row[5],
206
                                                certificate_row[6]))
207 b3c80ccb David Friesecký
        except IntegrityError:
208
            Logger.error(INTEGRITY_ERROR_MSG)
209
            raise DatabaseException(INTEGRITY_ERROR_MSG)
210
        except ProgrammingError:
211
            Logger.error(PROGRAMMING_ERROR_MSG)
212
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
213
        except OperationalError:
214
            Logger.error(OPERATIONAL_ERROR_MSG)
215
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
216
        except NotSupportedError:
217
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
218
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
219
        except DatabaseError:
220
            Logger.error(DATABASE_ERROR_MSG)
221
            raise DatabaseException(DATABASE_ERROR_MSG)
222
        except Error:
223
            Logger.error(ERROR_MSG)
224
            raise DatabaseException(ERROR_MSG)
225 9e22e20c David Friesecký
226 0f3af523 Stanislav Král
        return certificates
227 9e22e20c David Friesecký
228 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
229 a0602bad David Friesecký
        """
230 f8b23532 David Friesecký
        Updates a certificate.
231
        If the parameter of certificate (Certificate object) is not to be changed,
232
        the same value must be specified.
233 a0602bad David Friesecký
234
        :param certificate_id: ID of specific certificate
235 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
236 a0602bad David Friesecký
237
        :return: the result of whether the updation was successful
238
        """
239
240 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
241
242 f8b23532 David Friesecký
        try:
243
            sql = (f"UPDATE {TAB_CERTIFICATES} "
244
                   f"SET {COL_COMMON_NAME} = ?, "
245
                   f"{COL_VALID_FROM} = ?, "
246
                   f"{COL_VALID_TO} = ?, "
247
                   f"{COL_PEM_DATA} = ?, "
248
                   f"{COL_PRIVATE_KEY_ID} = ?, "
249
                   f"{COL_TYPE_ID} = ?, "
250
                   f"{COL_PARENT_ID} = ? "
251
                   f"WHERE {COL_ID} = ?")
252
            values = [certificate.common_name,
253
                      certificate.valid_from,
254
                      certificate.valid_to,
255
                      certificate.pem_data,
256
                      certificate.private_key_id,
257
                      certificate.type_id,
258 805077f5 David Friesecký
                      certificate.parent_id,
259
                      certificate_id]
260 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
261
            self.connection.commit()
262
263
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
264
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
265
            values = [certificate_id]
266
            self.cursor.execute(sql, values)
267
            self.connection.commit()
268
269 f3125948 Stanislav Král
            # iterate over usage pairs
270
            for usage_id, usage_value in certificate.usages.items():
271 f8b23532 David Friesecký
                if usage_value:
272
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
273
                           f"({COL_CERTIFICATE_ID},"
274
                           f"{COL_USAGE_TYPE_ID}) "
275
                           f"VALUES (?,?)")
276
                    values = [certificate_id, usage_id]
277
                    self.cursor.execute(sql, values)
278
                    self.connection.commit()
279 b3c80ccb David Friesecký
        except IntegrityError:
280
            Logger.error(INTEGRITY_ERROR_MSG)
281
            raise DatabaseException(INTEGRITY_ERROR_MSG)
282
        except ProgrammingError:
283
            Logger.error(PROGRAMMING_ERROR_MSG)
284
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
285
        except OperationalError:
286
            Logger.error(OPERATIONAL_ERROR_MSG)
287
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
288
        except NotSupportedError:
289
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
290
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
291
        except DatabaseError:
292
            Logger.error(DATABASE_ERROR_MSG)
293
            raise DatabaseException(DATABASE_ERROR_MSG)
294
        except Error:
295
            Logger.error(ERROR_MSG)
296
            raise DatabaseException(ERROR_MSG)
297 f8b23532 David Friesecký
298 d65b022d David Friesecký
        return self.cursor.rowcount > 0
299 e9e55282 David Friesecký
300 58051326 David Friesecký
    def delete(self, certificate_id: int):
301 a0602bad David Friesecký
        """
302
        Deletes a certificate
303
304
        :param certificate_id: ID of specific certificate
305
306
        :return: the result of whether the deletion was successful
307
        """
308
309 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
310
311 f8b23532 David Friesecký
        try:
312
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
313
                   f"WHERE {COL_ID} = ?")
314
            values = [certificate_id]
315
            self.cursor.execute(sql, values)
316
            self.connection.commit()
317 b3c80ccb David Friesecký
        except IntegrityError:
318
            Logger.error(INTEGRITY_ERROR_MSG)
319
            raise DatabaseException(INTEGRITY_ERROR_MSG)
320
        except ProgrammingError:
321
            Logger.error(PROGRAMMING_ERROR_MSG)
322
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
323
        except OperationalError:
324
            Logger.error(OPERATIONAL_ERROR_MSG)
325
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
326
        except NotSupportedError:
327
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
328
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
329
        except DatabaseError:
330
            Logger.error(DATABASE_ERROR_MSG)
331
            raise DatabaseException(DATABASE_ERROR_MSG)
332
        except Error:
333
            Logger.error(ERROR_MSG)
334
            raise DatabaseException(ERROR_MSG)
335 f8b23532 David Friesecký
336 45744020 Stanislav Král
        return self.cursor.rowcount > 0
337 58051326 David Friesecký
338
    def set_certificate_revoked(
339 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
340 58051326 David Friesecký
        """
341
        Revoke a certificate
342
343
        :param certificate_id: ID of specific certificate
344
        :param revocation_date: Date, when the certificate is revoked
345
        :param revocation_reason: Reason of the revocation
346
347 8b049f43 David Friesecký
        :return:
348
            the result of whether the revocation was successful OR
349
            sqlite3.Error if an exception is thrown
350 58051326 David Friesecký
        """
351
352 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
353
354 58051326 David Friesecký
        try:
355 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
356
                revocation_reason = REV_REASON_UNSPECIFIED
357
            elif revocation_date == "":
358
                return False
359
360 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
361
                   f"SET {COL_REVOCATION_DATE} = ?, "
362 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
363 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
364
            values = [revocation_date,
365
                      revocation_reason,
366
                      certificate_id]
367
            self.cursor.execute(sql, values)
368
            self.connection.commit()
369 b3c80ccb David Friesecký
        except IntegrityError:
370
            Logger.error(INTEGRITY_ERROR_MSG)
371
            raise DatabaseException(INTEGRITY_ERROR_MSG)
372
        except ProgrammingError:
373
            Logger.error(PROGRAMMING_ERROR_MSG)
374
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
375
        except OperationalError:
376
            Logger.error(OPERATIONAL_ERROR_MSG)
377
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
378
        except NotSupportedError:
379
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
380
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
381
        except DatabaseError:
382
            Logger.error(DATABASE_ERROR_MSG)
383
            raise DatabaseException(DATABASE_ERROR_MSG)
384
        except Error:
385
            Logger.error(ERROR_MSG)
386
            raise DatabaseException(ERROR_MSG)
387 8b049f43 David Friesecký
388 d65b022d David Friesecký
        return self.cursor.rowcount > 0
389 58051326 David Friesecký
390
    def clear_certificate_revocation(self, certificate_id: int):
391
        """
392
        Clear revocation of a certificate
393
394
        :param certificate_id: ID of specific certificate
395
396 8b049f43 David Friesecký
        :return:
397
            the result of whether the clear revocation was successful OR
398
            sqlite3.Error if an exception is thrown
399 58051326 David Friesecký
        """
400
401 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
402
403 58051326 David Friesecký
        try:
404
            sql = (f"UPDATE {TAB_CERTIFICATES} "
405
                   f"SET {COL_REVOCATION_DATE} = '', "
406 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
407 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
408
            values = [certificate_id]
409
            self.cursor.execute(sql, values)
410
            self.connection.commit()
411 b3c80ccb David Friesecký
        except IntegrityError:
412
            Logger.error(INTEGRITY_ERROR_MSG)
413
            raise DatabaseException(INTEGRITY_ERROR_MSG)
414
        except ProgrammingError:
415
            Logger.error(PROGRAMMING_ERROR_MSG)
416
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
417
        except OperationalError:
418
            Logger.error(OPERATIONAL_ERROR_MSG)
419
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
420
        except NotSupportedError:
421
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
422
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
423
        except DatabaseError:
424
            Logger.error(DATABASE_ERROR_MSG)
425
            raise DatabaseException(DATABASE_ERROR_MSG)
426
        except Error:
427
            Logger.error(ERROR_MSG)
428
            raise DatabaseException(ERROR_MSG)
429 f8b23532 David Friesecký
430 45744020 Stanislav Král
        return self.cursor.rowcount > 0
431 58051326 David Friesecký
432
    def get_all_revoked_by(self, certificate_id: int):
433
        """
434
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
435
436
        :param certificate_id: ID of specific certificate
437
438
        :return:
439 8b049f43 David Friesecký
            list of the certificates OR
440
            None if the list is empty OR
441
            sqlite3.Error if an exception is thrown
442 58051326 David Friesecký
        """
443
444 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
445
446 58051326 David Friesecký
        try:
447
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
448
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
449
            values = [certificate_id]
450
            self.cursor.execute(sql, values)
451
            certificate_rows = self.cursor.fetchall()
452
453
            certificates: List[Certificate] = []
454
            for certificate_row in certificate_rows:
455
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
456
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
457
                values = [certificate_row[0]]
458
                self.cursor.execute(sql, values)
459
                usage_rows = self.cursor.fetchall()
460
461
                usage_dict: Dict[int, bool] = {}
462
                for usage_row in usage_rows:
463
                    usage_dict[usage_row[2]] = True
464
465
                certificates.append(Certificate(certificate_row[0],
466
                                                certificate_row[1],
467
                                                certificate_row[2],
468
                                                certificate_row[3],
469
                                                certificate_row[4],
470
                                                certificate_row[7],
471
                                                certificate_row[8],
472
                                                certificate_row[9],
473
                                                usage_dict,
474
                                                certificate_row[5],
475
                                                certificate_row[6]))
476 b3c80ccb David Friesecký
        except IntegrityError:
477
            Logger.error(INTEGRITY_ERROR_MSG)
478
            raise DatabaseException(INTEGRITY_ERROR_MSG)
479
        except ProgrammingError:
480
            Logger.error(PROGRAMMING_ERROR_MSG)
481
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
482
        except OperationalError:
483
            Logger.error(OPERATIONAL_ERROR_MSG)
484
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
485
        except NotSupportedError:
486
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
487
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
488
        except DatabaseError:
489
            Logger.error(DATABASE_ERROR_MSG)
490
            raise DatabaseException(DATABASE_ERROR_MSG)
491
        except Error:
492
            Logger.error(ERROR_MSG)
493
            raise DatabaseException(ERROR_MSG)
494 58051326 David Friesecký
495 d65b022d David Friesecký
        return certificates
496 58051326 David Friesecký
497
    def get_all_issued_by(self, certificate_id: int):
498
        """
499
        Get list of the certificates that are direct descendants of the certificate with the ID
500
501
        :param certificate_id: ID of specific certificate
502
503
        :return:
504 8b049f43 David Friesecký
            list of the certificates OR
505
            None if the list is empty OR
506
            sqlite3.Error if an exception is thrown
507 58051326 David Friesecký
        """
508
509 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
510
511 58051326 David Friesecký
        try:
512
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
513 8b049f43 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
514
            values = [certificate_id, certificate_id]
515 58051326 David Friesecký
            self.cursor.execute(sql, values)
516
            certificate_rows = self.cursor.fetchall()
517
518
            certificates: List[Certificate] = []
519
            for certificate_row in certificate_rows:
520
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
521
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
522
                values = [certificate_row[0]]
523
                self.cursor.execute(sql, values)
524
                usage_rows = self.cursor.fetchall()
525
526
                usage_dict: Dict[int, bool] = {}
527
                for usage_row in usage_rows:
528
                    usage_dict[usage_row[2]] = True
529
530
                certificates.append(Certificate(certificate_row[0],
531
                                                certificate_row[1],
532
                                                certificate_row[2],
533
                                                certificate_row[3],
534
                                                certificate_row[4],
535
                                                certificate_row[7],
536
                                                certificate_row[8],
537
                                                certificate_row[9],
538
                                                usage_dict,
539
                                                certificate_row[5],
540
                                                certificate_row[6]))
541 b3c80ccb David Friesecký
        except IntegrityError:
542
            Logger.error(INTEGRITY_ERROR_MSG)
543
            raise DatabaseException(INTEGRITY_ERROR_MSG)
544
        except ProgrammingError:
545
            Logger.error(PROGRAMMING_ERROR_MSG)
546
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
547
        except OperationalError:
548
            Logger.error(OPERATIONAL_ERROR_MSG)
549
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
550
        except NotSupportedError:
551
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
552
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
553
        except DatabaseError:
554
            Logger.error(DATABASE_ERROR_MSG)
555
            raise DatabaseException(DATABASE_ERROR_MSG)
556
        except Error:
557
            Logger.error(ERROR_MSG)
558
            raise DatabaseException(ERROR_MSG)
559 58051326 David Friesecký
560 d65b022d David Friesecký
        return certificates
561 94f6d8b8 Jan Pašek
562 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
563
        """
564
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
565
        between C and its root certificate authority (i.e. is an ancestor of C).
566
        :param certificate_id: target certificate ID
567
        :return: list of all descendants
568
        """
569
570 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
571
572
        try:
573
            def dfs(children_of, this, collection: list):
574
                for child in children_of(this.certificate_id):
575
                    dfs(children_of, child, collection)
576
                collection.append(this)
577
578
            subtree_root = self.read(certificate_id)
579
            if subtree_root is None:
580
                return None
581
582
            all_certs = []
583
            dfs(self.get_all_issued_by, subtree_root, all_certs)
584
        except IntegrityError:
585
            Logger.error(INTEGRITY_ERROR_MSG)
586
            raise DatabaseException(INTEGRITY_ERROR_MSG)
587
        except ProgrammingError:
588
            Logger.error(PROGRAMMING_ERROR_MSG)
589
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
590
        except OperationalError:
591
            Logger.error(OPERATIONAL_ERROR_MSG)
592
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
593
        except NotSupportedError:
594
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
595
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
596
        except DatabaseError:
597
            Logger.error(DATABASE_ERROR_MSG)
598
            raise DatabaseException(DATABASE_ERROR_MSG)
599
        except Error:
600
            Logger.error(ERROR_MSG)
601
            raise DatabaseException(ERROR_MSG)
602 85003184 Captain_Trojan
603
        return all_certs
604
605 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
606
        """
607
        Get identifier of the next certificate that will be inserted into the database
608
        :return: identifier of the next certificate that will be added into the database
609
        """
610 b3c80ccb David Friesecký
611
        Logger.debug("Function launched.")
612
613
        try:
614
            # get next IDs of all tables
615
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
616
            results = self.cursor.fetchall()
617
618
            # search for next ID in Certificates table and return it
619
            for result in results:
620
                if result[0] == TAB_CERTIFICATES:
621
                    return result[1] + 1  # current last id + 1
622
            # if certificates table is not present in the query results, return 1
623
        except IntegrityError:
624
            Logger.error(INTEGRITY_ERROR_MSG)
625
            raise DatabaseException(INTEGRITY_ERROR_MSG)
626
        except ProgrammingError:
627
            Logger.error(PROGRAMMING_ERROR_MSG)
628
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
629
        except OperationalError:
630
            Logger.error(OPERATIONAL_ERROR_MSG)
631
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
632
        except NotSupportedError:
633
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
634
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
635
        except DatabaseError:
636
            Logger.error(DATABASE_ERROR_MSG)
637
            raise DatabaseException(DATABASE_ERROR_MSG)
638
        except Error:
639
            Logger.error(ERROR_MSG)
640
            raise DatabaseException(ERROR_MSG)
641
642 94f6d8b8 Jan Pašek
        return 1