Projekt

Obecné

Profil

Stáhnout (25.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
3
from typing import Dict, List
4

    
5
from src.exceptions.database_exception import DatabaseException
6
from injector import inject
7
from src.constants import *
8
from src.model.certificate import Certificate
9
from src.utils.logger import Logger
10

    
11
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17

    
18

    
19
class CertificateRepository:
20

    
21
    @inject
22
    def __init__(self, connection: Connection):
23
        """
24
        Constructor of the CertificateRepository object
25

    
26
        :param connection: Instance of the Connection object
27
        """
28
        self.connection = connection
29
        self.cursor = connection.cursor()
30

    
31
    def create(self, certificate: Certificate):
32
        """
33
        Creates a certificate.
34
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
35

    
36
        :param certificate: Instance of the Certificate object
37

    
38
        :return: the result of whether the creation was successful
39
        """
40

    
41
        Logger.debug("Function launched.")
42

    
43
        try:
44
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
45
                   f"({COL_COMMON_NAME},"
46
                   f"{COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_PRIVATE_KEY_ID},"
50
                   f"{COL_TYPE_ID},"
51
                   f"{COL_PARENT_ID})"
52
                   f"VALUES(?,?,?,?,?,?,?)")
53
            values = [certificate.common_name,
54
                      certificate.valid_from,
55
                      certificate.valid_to,
56
                      certificate.pem_data,
57
                      certificate.private_key_id,
58
                      certificate.type_id,
59
                      certificate.parent_id]
60
            self.cursor.execute(sql, values)
61
            self.connection.commit()
62

    
63
            last_id: int = self.cursor.lastrowid
64

    
65
            # TODO assure that this is correct
66
            if certificate.type_id == ROOT_CA_ID:
67
                certificate.parent_id = last_id
68
                self.update(last_id, certificate)
69
            else:
70
                for usage_id, usage_value in certificate.usages.items():
71
                    if usage_value:
72
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
73
                               f"({COL_CERTIFICATE_ID},"
74
                               f"{COL_USAGE_TYPE_ID}) "
75
                               f"VALUES (?,?)")
76
                        values = [last_id, usage_id]
77
                        self.cursor.execute(sql, values)
78
                        self.connection.commit()
79
        except IntegrityError:
80
            Logger.error(INTEGRITY_ERROR_MSG)
81
            raise DatabaseException(INTEGRITY_ERROR_MSG)
82
        except ProgrammingError:
83
            Logger.error(PROGRAMMING_ERROR_MSG)
84
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
85
        except OperationalError:
86
            Logger.error(OPERATIONAL_ERROR_MSG)
87
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
88
        except NotSupportedError:
89
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
90
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
91
        except DatabaseError:
92
            Logger.error(DATABASE_ERROR_MSG)
93
            raise DatabaseException(DATABASE_ERROR_MSG)
94
        except Error:
95
            Logger.error(ERROR_MSG)
96
            raise DatabaseException(ERROR_MSG)
97

    
98
        return last_id
99

    
100
    def read(self, certificate_id: int):
101
        """
102
        Reads (selects) a certificate.
103

    
104
        :param certificate_id: ID of specific certificate
105

    
106
        :return: instance of the Certificate object
107
        """
108

    
109
        Logger.debug("Function launched.")
110

    
111
        try:
112
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
113
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
114
            values = [certificate_id]
115
            self.cursor.execute(sql, values)
116
            certificate_row = self.cursor.fetchone()
117

    
118
            if certificate_row is None:
119
                return None
120

    
121
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
122
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
123
            self.cursor.execute(sql, values)
124
            usage_rows = self.cursor.fetchall()
125

    
126
            usage_dict: Dict[int, bool] = {}
127
            for usage_row in usage_rows:
128
                usage_dict[usage_row[2]] = True
129

    
130
            certificate: Certificate = Certificate(certificate_row[0],
131
                                                   certificate_row[1],
132
                                                   certificate_row[2],
133
                                                   certificate_row[3],
134
                                                   certificate_row[4],
135
                                                   certificate_row[8],
136
                                                   certificate_row[9],
137
                                                   certificate_row[10],
138
                                                   usage_dict,
139
                                                   certificate_row[5],
140
                                                   certificate_row[6])
141
        except IntegrityError:
142
            Logger.error(INTEGRITY_ERROR_MSG)
143
            raise DatabaseException(INTEGRITY_ERROR_MSG)
144
        except ProgrammingError:
145
            Logger.error(PROGRAMMING_ERROR_MSG)
146
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
147
        except OperationalError:
148
            Logger.error(OPERATIONAL_ERROR_MSG)
149
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
150
        except NotSupportedError:
151
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
152
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
153
        except DatabaseError:
154
            Logger.error(DATABASE_ERROR_MSG)
155
            raise DatabaseException(DATABASE_ERROR_MSG)
156
        except Error:
157
            Logger.error(ERROR_MSG)
158
            raise DatabaseException(ERROR_MSG)
159

    
160
        return certificate
161

    
162
    def read_all(self, filter_type: int = None):
163
        """
164
        Reads (selects) all certificates (with type).
165

    
166
        :param filter_type: ID of certificate type from CertificateTypes table
167

    
168
        :return: list of certificates
169
        """
170

    
171
        Logger.debug("Function launched.")
172

    
173
        try:
174
            sql_extension = ""
175
            values = []
176
            if filter_type is not None:
177
                sql_extension = (f" AND {COL_TYPE_ID} = ("
178
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
179
                values = [filter_type]
180

    
181
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
182
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
183
            self.cursor.execute(sql, values)
184
            certificate_rows = self.cursor.fetchall()
185

    
186
            certificates: List[Certificate] = []
187
            for certificate_row in certificate_rows:
188
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
189
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
190
                values = [certificate_row[0]]
191
                self.cursor.execute(sql, values)
192
                usage_rows = self.cursor.fetchall()
193

    
194
                usage_dict: Dict[int, bool] = {}
195
                for usage_row in usage_rows:
196
                    usage_dict[usage_row[2]] = True
197

    
198
                certificates.append(Certificate(certificate_row[0],
199
                                                certificate_row[1],
200
                                                certificate_row[2],
201
                                                certificate_row[3],
202
                                                certificate_row[4],
203
                                                certificate_row[8],
204
                                                certificate_row[9],
205
                                                certificate_row[10],
206
                                                usage_dict,
207
                                                certificate_row[5],
208
                                                certificate_row[6]))
209
        except IntegrityError:
210
            Logger.error(INTEGRITY_ERROR_MSG)
211
            raise DatabaseException(INTEGRITY_ERROR_MSG)
212
        except ProgrammingError:
213
            Logger.error(PROGRAMMING_ERROR_MSG)
214
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
215
        except OperationalError:
216
            Logger.error(OPERATIONAL_ERROR_MSG)
217
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
218
        except NotSupportedError:
219
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
220
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
221
        except DatabaseError:
222
            Logger.error(DATABASE_ERROR_MSG)
223
            raise DatabaseException(DATABASE_ERROR_MSG)
224
        except Error:
225
            Logger.error(ERROR_MSG)
226
            raise DatabaseException(ERROR_MSG)
227

    
228
        return certificates
229

    
230
    def update(self, certificate_id: int, certificate: Certificate):
231
        """
232
        Updates a certificate.
233
        If the parameter of certificate (Certificate object) is not to be changed,
234
        the same value must be specified.
235

    
236
        :param certificate_id: ID of specific certificate
237
        :param certificate: Instance of the Certificate object
238

    
239
        :return: the result of whether the updation was successful
240
        """
241

    
242
        Logger.debug("Function launched.")
243

    
244
        try:
245
            sql = (f"UPDATE {TAB_CERTIFICATES} "
246
                   f"SET {COL_COMMON_NAME} = ?, "
247
                   f"{COL_VALID_FROM} = ?, "
248
                   f"{COL_VALID_TO} = ?, "
249
                   f"{COL_PEM_DATA} = ?, "
250
                   f"{COL_PRIVATE_KEY_ID} = ?, "
251
                   f"{COL_TYPE_ID} = ?, "
252
                   f"{COL_PARENT_ID} = ? "
253
                   f"WHERE {COL_ID} = ?")
254
            values = [certificate.common_name,
255
                      certificate.valid_from,
256
                      certificate.valid_to,
257
                      certificate.pem_data,
258
                      certificate.private_key_id,
259
                      certificate.type_id,
260
                      certificate.parent_id,
261
                      certificate_id]
262

    
263
            self.cursor.execute(sql, values)
264
            self.connection.commit()
265

    
266
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
267
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
268
            values = [certificate_id]
269
            self.cursor.execute(sql, values)
270
            self.connection.commit()
271

    
272
            check_updated = self.cursor.rowcount > 0
273

    
274
            # iterate over usage pairs
275
            for usage_id, usage_value in certificate.usages.items():
276
                if usage_value:
277
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
278
                           f"({COL_CERTIFICATE_ID},"
279
                           f"{COL_USAGE_TYPE_ID}) "
280
                           f"VALUES (?,?)")
281
                    values = [certificate_id, usage_id]
282
                    self.cursor.execute(sql, values)
283
                    self.connection.commit()
284
        except IntegrityError:
285
            Logger.error(INTEGRITY_ERROR_MSG)
286
            raise DatabaseException(INTEGRITY_ERROR_MSG)
287
        except ProgrammingError:
288
            Logger.error(PROGRAMMING_ERROR_MSG)
289
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
290
        except OperationalError:
291
            Logger.error(OPERATIONAL_ERROR_MSG)
292
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
293
        except NotSupportedError:
294
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
295
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
296
        except DatabaseError:
297
            Logger.error(DATABASE_ERROR_MSG)
298
            raise DatabaseException(DATABASE_ERROR_MSG)
299
        except Error:
300
            Logger.error(ERROR_MSG)
301
            raise DatabaseException(ERROR_MSG)
302

    
303
        return check_updated
304

    
305
    def delete(self, certificate_id: int):
306
        """
307
        Deletes a certificate
308

    
309
        :param certificate_id: ID of specific certificate
310

    
311
        :return: the result of whether the deletion was successful
312
        """
313

    
314
        Logger.debug("Function launched.")
315

    
316
        try:
317
            sql = (f"UPDATE {TAB_CERTIFICATES} "
318
                   f"SET {COL_DELETION_DATE} = ? "
319
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
320
            values = [int(time.time()),
321
                      certificate_id]
322
            self.cursor.execute(sql, values)
323
            self.connection.commit()
324
        except IntegrityError:
325
            Logger.error(INTEGRITY_ERROR_MSG)
326
            raise DatabaseException(INTEGRITY_ERROR_MSG)
327
        except ProgrammingError:
328
            Logger.error(PROGRAMMING_ERROR_MSG)
329
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
330
        except OperationalError:
331
            Logger.error(OPERATIONAL_ERROR_MSG)
332
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
333
        except NotSupportedError:
334
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
335
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
336
        except DatabaseError:
337
            Logger.error(DATABASE_ERROR_MSG)
338
            raise DatabaseException(DATABASE_ERROR_MSG)
339
        except Error:
340
            Logger.error(ERROR_MSG)
341
            raise DatabaseException(ERROR_MSG)
342

    
343
        return self.cursor.rowcount > 0
344

    
345
    def set_certificate_revoked(
346
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
347
        """
348
        Revoke a certificate
349

    
350
        :param certificate_id: ID of specific certificate
351
        :param revocation_date: Date, when the certificate is revoked
352
        :param revocation_reason: Reason of the revocation
353

    
354
        :return:
355
            the result of whether the revocation was successful OR
356
            sqlite3.Error if an exception is thrown
357
        """
358

    
359
        Logger.debug("Function launched.")
360

    
361
        try:
362
            if revocation_date != "" and revocation_reason == "":
363
                revocation_reason = REV_REASON_UNSPECIFIED
364
            elif revocation_date == "":
365
                return False
366

    
367
            sql = (f"UPDATE {TAB_CERTIFICATES} "
368
                   f"SET {COL_REVOCATION_DATE} = ?, "
369
                   f"{COL_REVOCATION_REASON} = ? "
370
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
371
            values = [revocation_date,
372
                      revocation_reason,
373
                      certificate_id]
374
            self.cursor.execute(sql, values)
375
            self.connection.commit()
376
        except IntegrityError:
377
            Logger.error(INTEGRITY_ERROR_MSG)
378
            raise DatabaseException(INTEGRITY_ERROR_MSG)
379
        except ProgrammingError:
380
            Logger.error(PROGRAMMING_ERROR_MSG)
381
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
382
        except OperationalError:
383
            Logger.error(OPERATIONAL_ERROR_MSG)
384
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
385
        except NotSupportedError:
386
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
387
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
388
        except DatabaseError:
389
            Logger.error(DATABASE_ERROR_MSG)
390
            raise DatabaseException(DATABASE_ERROR_MSG)
391
        except Error:
392
            Logger.error(ERROR_MSG)
393
            raise DatabaseException(ERROR_MSG)
394

    
395
        return self.cursor.rowcount > 0
396

    
397
    def clear_certificate_revocation(self, certificate_id: int):
398
        """
399
        Clear revocation of a certificate
400

    
401
        :param certificate_id: ID of specific certificate
402

    
403
        :return:
404
            the result of whether the clear revocation was successful OR
405
            sqlite3.Error if an exception is thrown
406
        """
407

    
408
        Logger.debug("Function launched.")
409

    
410
        try:
411
            sql = (f"UPDATE {TAB_CERTIFICATES} "
412
                   f"SET {COL_REVOCATION_DATE} = '', "
413
                   f"{COL_REVOCATION_REASON} = '' "
414
                   f"WHERE {COL_ID} = ?")
415
            values = [certificate_id]
416
            self.cursor.execute(sql, values)
417
            self.connection.commit()
418
        except IntegrityError:
419
            Logger.error(INTEGRITY_ERROR_MSG)
420
            raise DatabaseException(INTEGRITY_ERROR_MSG)
421
        except ProgrammingError:
422
            Logger.error(PROGRAMMING_ERROR_MSG)
423
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
424
        except OperationalError:
425
            Logger.error(OPERATIONAL_ERROR_MSG)
426
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
427
        except NotSupportedError:
428
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
429
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
430
        except DatabaseError:
431
            Logger.error(DATABASE_ERROR_MSG)
432
            raise DatabaseException(DATABASE_ERROR_MSG)
433
        except Error:
434
            Logger.error(ERROR_MSG)
435
            raise DatabaseException(ERROR_MSG)
436

    
437
        return self.cursor.rowcount > 0
438

    
439
    def get_all_revoked_by(self, certificate_id: int):
440
        """
441
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
442

    
443
        :param certificate_id: ID of specific certificate
444

    
445
        :return:
446
            list of the certificates OR
447
            None if the list is empty OR
448
            sqlite3.Error if an exception is thrown
449
        """
450

    
451
        Logger.debug("Function launched.")
452

    
453
        try:
454
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
455
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
456
            values = [certificate_id]
457
            self.cursor.execute(sql, values)
458
            certificate_rows = self.cursor.fetchall()
459

    
460
            certificates: List[Certificate] = []
461
            for certificate_row in certificate_rows:
462
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
463
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
464
                values = [certificate_row[0]]
465
                self.cursor.execute(sql, values)
466
                usage_rows = self.cursor.fetchall()
467

    
468
                usage_dict: Dict[int, bool] = {}
469
                for usage_row in usage_rows:
470
                    usage_dict[usage_row[2]] = True
471

    
472
                certificates.append(Certificate(certificate_row[0],
473
                                                certificate_row[1],
474
                                                certificate_row[2],
475
                                                certificate_row[3],
476
                                                certificate_row[4],
477
                                                certificate_row[8],
478
                                                certificate_row[9],
479
                                                certificate_row[10],
480
                                                usage_dict,
481
                                                certificate_row[5],
482
                                                certificate_row[6]))
483
        except IntegrityError:
484
            Logger.error(INTEGRITY_ERROR_MSG)
485
            raise DatabaseException(INTEGRITY_ERROR_MSG)
486
        except ProgrammingError:
487
            Logger.error(PROGRAMMING_ERROR_MSG)
488
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
489
        except OperationalError:
490
            Logger.error(OPERATIONAL_ERROR_MSG)
491
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
492
        except NotSupportedError:
493
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
494
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
495
        except DatabaseError:
496
            Logger.error(DATABASE_ERROR_MSG)
497
            raise DatabaseException(DATABASE_ERROR_MSG)
498
        except Error:
499
            Logger.error(ERROR_MSG)
500
            raise DatabaseException(ERROR_MSG)
501

    
502
        return certificates
503

    
504
    def get_all_issued_by(self, certificate_id: int):
505
        """
506
        Get list of the certificates that are direct descendants of the certificate with the ID
507

    
508
        :param certificate_id: ID of specific certificate
509

    
510
        :return:
511
            list of the certificates OR
512
            None if the list is empty OR
513
            sqlite3.Error if an exception is thrown
514
        """
515

    
516
        Logger.debug("Function launched.")
517

    
518
        try:
519
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
520
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
521
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
522
            values = [certificate_id, certificate_id]
523
            self.cursor.execute(sql, values)
524
            certificate_rows = self.cursor.fetchall()
525

    
526
            certificates: List[Certificate] = []
527
            for certificate_row in certificate_rows:
528
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
529
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
530
                values = [certificate_row[0]]
531
                self.cursor.execute(sql, values)
532
                usage_rows = self.cursor.fetchall()
533

    
534
                usage_dict: Dict[int, bool] = {}
535
                for usage_row in usage_rows:
536
                    usage_dict[usage_row[2]] = True
537

    
538
                certificates.append(Certificate(certificate_row[0],
539
                                                certificate_row[1],
540
                                                certificate_row[2],
541
                                                certificate_row[3],
542
                                                certificate_row[4],
543
                                                certificate_row[8],
544
                                                certificate_row[9],
545
                                                certificate_row[10],
546
                                                usage_dict,
547
                                                certificate_row[5],
548
                                                certificate_row[6]))
549
        except IntegrityError:
550
            Logger.error(INTEGRITY_ERROR_MSG)
551
            raise DatabaseException(INTEGRITY_ERROR_MSG)
552
        except ProgrammingError:
553
            Logger.error(PROGRAMMING_ERROR_MSG)
554
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
555
        except OperationalError:
556
            Logger.error(OPERATIONAL_ERROR_MSG)
557
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
558
        except NotSupportedError:
559
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
560
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
561
        except DatabaseError:
562
            Logger.error(DATABASE_ERROR_MSG)
563
            raise DatabaseException(DATABASE_ERROR_MSG)
564
        except Error:
565
            Logger.error(ERROR_MSG)
566
            raise DatabaseException(ERROR_MSG)
567

    
568
        return certificates
569

    
570
    def get_all_descendants_of(self, certificate_id: int):
571
        """
572
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
573
        between C and its root certificate authority (i.e. is an ancestor of C).
574
        :param certificate_id: target certificate ID
575
        :return: list of all descendants
576
        """
577

    
578
        Logger.debug("Function launched.")
579

    
580
        try:
581
            def dfs(children_of, this, collection: list):
582
                for child in children_of(this.certificate_id):
583
                    dfs(children_of, child, collection)
584
                collection.append(this)
585

    
586
            subtree_root = self.read(certificate_id)
587
            if subtree_root is None:
588
                return None
589

    
590
            all_certs = []
591
            dfs(self.get_all_issued_by, subtree_root, all_certs)
592
        except IntegrityError:
593
            Logger.error(INTEGRITY_ERROR_MSG)
594
            raise DatabaseException(INTEGRITY_ERROR_MSG)
595
        except ProgrammingError:
596
            Logger.error(PROGRAMMING_ERROR_MSG)
597
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
598
        except OperationalError:
599
            Logger.error(OPERATIONAL_ERROR_MSG)
600
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
601
        except NotSupportedError:
602
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
603
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
604
        except DatabaseError:
605
            Logger.error(DATABASE_ERROR_MSG)
606
            raise DatabaseException(DATABASE_ERROR_MSG)
607
        except Error:
608
            Logger.error(ERROR_MSG)
609
            raise DatabaseException(ERROR_MSG)
610

    
611
        return all_certs
612

    
613
    def get_next_id(self) -> int:
614
        """
615
        Get identifier of the next certificate that will be inserted into the database
616
        :return: identifier of the next certificate that will be added into the database
617
        """
618

    
619
        Logger.debug("Function launched.")
620

    
621
        try:
622
            # get next IDs of all tables
623
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
624
            results = self.cursor.fetchall()
625

    
626
            # search for next ID in Certificates table and return it
627
            for result in results:
628
                if result[0] == TAB_CERTIFICATES:
629
                    return result[1] + 1  # current last id + 1
630
            # if certificates table is not present in the query results, return 1
631
        except IntegrityError:
632
            Logger.error(INTEGRITY_ERROR_MSG)
633
            raise DatabaseException(INTEGRITY_ERROR_MSG)
634
        except ProgrammingError:
635
            Logger.error(PROGRAMMING_ERROR_MSG)
636
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
637
        except OperationalError:
638
            Logger.error(OPERATIONAL_ERROR_MSG)
639
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
640
        except NotSupportedError:
641
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
642
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
643
        except DatabaseError:
644
            Logger.error(DATABASE_ERROR_MSG)
645
            raise DatabaseException(DATABASE_ERROR_MSG)
646
        except Error:
647
            Logger.error(ERROR_MSG)
648
            raise DatabaseException(ERROR_MSG)
649

    
650
        return 1
(2-2/3)