Projekt

Obecné

Profil

Stáhnout (29.7 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
3
from typing import Dict, List
4

    
5
from src.exceptions.database_exception import DatabaseException
6
from injector import inject
7
from src.constants import *
8
from src.model.certificate import Certificate
9
from src.utils.logger import Logger
10

    
11
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17

    
18

    
19
class CertificateRepository:
20

    
21
    @inject
22
    def __init__(self, connection: Connection):
23
        """
24
        Constructor of the CertificateRepository object
25

    
26
        :param connection: Instance of the Connection object
27
        """
28
        self.connection = connection
29
        self.cursor = connection.cursor()
30

    
31
    def create(self, certificate: Certificate):
32
        """
33
        Creates a certificate.
34
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
35

    
36
        :param certificate: Instance of the Certificate object
37

    
38
        :return: the result of whether the creation was successful
39
        """
40

    
41
        Logger.debug("Function launched.")
42

    
43
        try:
44
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
45
                   f"({COL_VALID_FROM},"
46
                   f"{COL_VALID_TO},"
47
                   f"{COL_PEM_DATA},"
48
                   f"{COL_COMMON_NAME},"
49
                   f"{COL_COUNTRY_CODE},"
50
                   f"{COL_LOCALITY},"
51
                   f"{COL_PROVINCE},"
52
                   f"{COL_ORGANIZATION},"
53
                   f"{COL_ORGANIZATIONAL_UNIT},"
54
                   f"{COL_EMAIL_ADDRESS},"
55
                   f"{COL_TYPE_ID},"
56
                   f"{COL_PARENT_ID},"
57
                   f"{COL_PRIVATE_KEY_ID}) "
58
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
59
            values = [certificate.valid_from,
60
                      certificate.valid_to,
61
                      certificate.pem_data,
62
                      certificate.common_name,
63
                      certificate.country_code,
64
                      certificate.locality,
65
                      certificate.province,
66
                      certificate.organization,
67
                      certificate.organizational_unit,
68
                      certificate.email_address,
69
                      certificate.type_id,
70
                      certificate.parent_id,
71
                      certificate.private_key_id]
72
            self.cursor.execute(sql, values)
73
            self.connection.commit()
74

    
75
            last_id: int = self.cursor.lastrowid
76

    
77
            if certificate.type_id == ROOT_CA_ID:
78
                certificate.parent_id = last_id
79
                self.update(last_id, certificate)
80
            else:
81
                for usage_id, usage_value in certificate.usages.items():
82
                    if usage_value:
83
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
84
                               f"({COL_CERTIFICATE_ID},"
85
                               f"{COL_USAGE_TYPE_ID}) "
86
                               f"VALUES (?,?)")
87
                        values = [last_id, usage_id]
88
                        self.cursor.execute(sql, values)
89
                        self.connection.commit()
90
        except IntegrityError:
91
            Logger.error(INTEGRITY_ERROR_MSG)
92
            raise DatabaseException(INTEGRITY_ERROR_MSG)
93
        except ProgrammingError:
94
            Logger.error(PROGRAMMING_ERROR_MSG)
95
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
96
        except OperationalError:
97
            Logger.error(OPERATIONAL_ERROR_MSG)
98
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
99
        except NotSupportedError:
100
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
101
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
102
        except DatabaseError:
103
            Logger.error(DATABASE_ERROR_MSG)
104
            raise DatabaseException(DATABASE_ERROR_MSG)
105
        except Error:
106
            Logger.error(ERROR_MSG)
107
            raise DatabaseException(ERROR_MSG)
108

    
109
        return last_id
110

    
111
    def read(self, certificate_id: int):
112
        """
113
        Reads (selects) a certificate.
114

    
115
        :param certificate_id: ID of specific certificate
116

    
117
        :return: instance of the Certificate object
118
        """
119

    
120
        Logger.debug("Function launched.")
121

    
122
        try:
123
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
124
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
125
            values = [certificate_id]
126
            self.cursor.execute(sql, values)
127
            certificate_row = self.cursor.fetchone()
128

    
129
            if certificate_row is None:
130
                return None
131

    
132
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
133
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
134
            self.cursor.execute(sql, values)
135
            usage_rows = self.cursor.fetchall()
136

    
137
            usage_dict: Dict[int, bool] = {}
138
            for usage_row in usage_rows:
139
                usage_dict[usage_row[2]] = True
140

    
141
            certificate: Certificate = Certificate(certificate_row[0],      # ID
142
                                                   certificate_row[1],      # valid from
143
                                                   certificate_row[2],      # valid to
144
                                                   certificate_row[3],      # pem data
145
                                                   certificate_row[14],     # type ID
146
                                                   certificate_row[15],     # parent ID
147
                                                   certificate_row[16],     # private key ID
148
                                                   usage_dict,
149
                                                   certificate_row[4],      # common name
150
                                                   certificate_row[5],      # country code
151
                                                   certificate_row[6],      # locality
152
                                                   certificate_row[7],      # province
153
                                                   certificate_row[8],      # organization
154
                                                   certificate_row[9],      # organizational unit
155
                                                   certificate_row[10],     # email address
156
                                                   certificate_row[11],     # revocation date
157
                                                   certificate_row[12])     # revocation reason
158

    
159
        except IntegrityError:
160
            Logger.error(INTEGRITY_ERROR_MSG)
161
            raise DatabaseException(INTEGRITY_ERROR_MSG)
162
        except ProgrammingError:
163
            Logger.error(PROGRAMMING_ERROR_MSG)
164
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
165
        except OperationalError:
166
            Logger.error(OPERATIONAL_ERROR_MSG)
167
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
168
        except NotSupportedError:
169
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
170
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
171
        except DatabaseError:
172
            Logger.error(DATABASE_ERROR_MSG)
173
            raise DatabaseException(DATABASE_ERROR_MSG)
174
        except Error:
175
            Logger.error(ERROR_MSG)
176
            raise DatabaseException(ERROR_MSG)
177

    
178
        return certificate
179

    
180
    def read_all(self, filter_type: int = None):
181
        """
182
        Reads (selects) all certificates (with type).
183

    
184
        :param filter_type: ID of certificate type from CertificateTypes table
185

    
186
        :return: list of certificates
187
        """
188

    
189
        Logger.debug("Function launched.")
190

    
191
        try:
192
            sql_extension = ""
193
            values = []
194
            if filter_type is not None:
195
                sql_extension = (f" AND {COL_TYPE_ID} = ("
196
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
197
                values = [filter_type]
198

    
199
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
200
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
201
            self.cursor.execute(sql, values)
202
            certificate_rows = self.cursor.fetchall()
203

    
204
            certificates: List[Certificate] = []
205
            for certificate_row in certificate_rows:
206
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
207
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
208
                values = [certificate_row[0]]
209
                self.cursor.execute(sql, values)
210
                usage_rows = self.cursor.fetchall()
211

    
212
                usage_dict: Dict[int, bool] = {}
213
                for usage_row in usage_rows:
214
                    usage_dict[usage_row[2]] = True
215

    
216
                certificates.append(Certificate(certificate_row[0],      # ID
217
                                                certificate_row[1],      # valid from
218
                                                certificate_row[2],      # valid to
219
                                                certificate_row[3],      # pem data
220
                                                certificate_row[14],     # type ID
221
                                                certificate_row[15],     # parent ID
222
                                                certificate_row[16],     # private key ID
223
                                                usage_dict,
224
                                                certificate_row[4],      # common name
225
                                                certificate_row[5],      # country code
226
                                                certificate_row[6],      # locality
227
                                                certificate_row[7],      # province
228
                                                certificate_row[8],      # organization
229
                                                certificate_row[9],      # organizational unit
230
                                                certificate_row[10],     # email address
231
                                                certificate_row[11],     # revocation date
232
                                                certificate_row[12]))    # revocation reason
233
        except IntegrityError:
234
            Logger.error(INTEGRITY_ERROR_MSG)
235
            raise DatabaseException(INTEGRITY_ERROR_MSG)
236
        except ProgrammingError:
237
            Logger.error(PROGRAMMING_ERROR_MSG)
238
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
239
        except OperationalError:
240
            Logger.error(OPERATIONAL_ERROR_MSG)
241
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
242
        except NotSupportedError:
243
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
244
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
245
        except DatabaseError:
246
            Logger.error(DATABASE_ERROR_MSG)
247
            raise DatabaseException(DATABASE_ERROR_MSG)
248
        except Error:
249
            Logger.error(ERROR_MSG)
250
            raise DatabaseException(ERROR_MSG)
251

    
252
        return certificates
253

    
254
    def update(self, certificate_id: int, certificate: Certificate):
255
        """
256
        Updates a certificate.
257
        If the parameter of certificate (Certificate object) is not to be changed,
258
        the same value must be specified.
259

    
260
        :param certificate_id: ID of specific certificate
261
        :param certificate: Instance of the Certificate object
262

    
263
        :return: the result of whether the updation was successful
264
        """
265

    
266
        Logger.debug("Function launched.")
267

    
268
        try:
269
            sql = (f"UPDATE {TAB_CERTIFICATES} "
270
                   f"SET {COL_VALID_FROM} = ?, "
271
                   f"{COL_VALID_TO} = ?, "
272
                   f"{COL_PEM_DATA} = ?, "
273
                   f"{COL_COMMON_NAME} = ?, "
274
                   f"{COL_COUNTRY_CODE} = ?, "
275
                   f"{COL_LOCALITY} = ?, "
276
                   f"{COL_PROVINCE} = ?, "
277
                   f"{COL_ORGANIZATION} = ?, "
278
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
279
                   f"{COL_EMAIL_ADDRESS} = ?, "
280
                   f"{COL_TYPE_ID} = ?, "
281
                   f"{COL_PARENT_ID} = ?, "
282
                   f"{COL_PRIVATE_KEY_ID} = ? "
283
                   f"WHERE {COL_ID} = ?")
284
            values = [certificate.valid_from,
285
                      certificate.valid_to,
286
                      certificate.pem_data,
287
                      certificate.common_name,
288
                      certificate.country_code,
289
                      certificate.locality,
290
                      certificate.province,
291
                      certificate.organization,
292
                      certificate.organizational_unit,
293
                      certificate.email_address,
294
                      certificate.type_id,
295
                      certificate.parent_id,
296
                      certificate.private_key_id,
297
                      certificate_id]
298

    
299
            self.cursor.execute(sql, values)
300
            self.connection.commit()
301

    
302
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
303
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
304
            values = [certificate_id]
305
            self.cursor.execute(sql, values)
306
            self.connection.commit()
307

    
308
            check_updated = self.cursor.rowcount > 0
309

    
310
            # iterate over usage pairs
311
            for usage_id, usage_value in certificate.usages.items():
312
                if usage_value:
313
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
314
                           f"({COL_CERTIFICATE_ID},"
315
                           f"{COL_USAGE_TYPE_ID}) "
316
                           f"VALUES (?,?)")
317
                    values = [certificate_id, usage_id]
318
                    self.cursor.execute(sql, values)
319
                    self.connection.commit()
320
        except IntegrityError:
321
            Logger.error(INTEGRITY_ERROR_MSG)
322
            raise DatabaseException(INTEGRITY_ERROR_MSG)
323
        except ProgrammingError:
324
            Logger.error(PROGRAMMING_ERROR_MSG)
325
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
326
        except OperationalError:
327
            Logger.error(OPERATIONAL_ERROR_MSG)
328
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
329
        except NotSupportedError:
330
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
331
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
332
        except DatabaseError:
333
            Logger.error(DATABASE_ERROR_MSG)
334
            raise DatabaseException(DATABASE_ERROR_MSG)
335
        except Error:
336
            Logger.error(ERROR_MSG)
337
            raise DatabaseException(ERROR_MSG)
338

    
339
        return check_updated
340

    
341
    def delete(self, certificate_id: int):
342
        """
343
        Deletes a certificate
344

    
345
        :param certificate_id: ID of specific certificate
346

    
347
        :return: the result of whether the deletion was successful
348
        """
349

    
350
        Logger.debug("Function launched.")
351

    
352
        try:
353
            sql = (f"UPDATE {TAB_CERTIFICATES} "
354
                   f"SET {COL_DELETION_DATE} = ? "
355
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
356
            values = [int(time.time()),
357
                      certificate_id]
358
            self.cursor.execute(sql, values)
359
            self.connection.commit()
360
        except IntegrityError:
361
            Logger.error(INTEGRITY_ERROR_MSG)
362
            raise DatabaseException(INTEGRITY_ERROR_MSG)
363
        except ProgrammingError:
364
            Logger.error(PROGRAMMING_ERROR_MSG)
365
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
366
        except OperationalError:
367
            Logger.error(OPERATIONAL_ERROR_MSG)
368
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
369
        except NotSupportedError:
370
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
371
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
372
        except DatabaseError:
373
            Logger.error(DATABASE_ERROR_MSG)
374
            raise DatabaseException(DATABASE_ERROR_MSG)
375
        except Error:
376
            Logger.error(ERROR_MSG)
377
            raise DatabaseException(ERROR_MSG)
378

    
379
        return self.cursor.rowcount > 0
380

    
381
    def set_certificate_revoked(
382
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
383
        """
384
        Revoke a certificate
385

    
386
        :param certificate_id: ID of specific certificate
387
        :param revocation_date: Date, when the certificate is revoked
388
        :param revocation_reason: Reason of the revocation
389

    
390
        :return:
391
            the result of whether the revocation was successful OR
392
            sqlite3.Error if an exception is thrown
393
        """
394

    
395
        Logger.debug("Function launched.")
396

    
397
        try:
398
            if revocation_date != "" and revocation_reason == "":
399
                revocation_reason = REV_REASON_UNSPECIFIED
400
            elif revocation_date == "":
401
                return False
402

    
403
            sql = (f"UPDATE {TAB_CERTIFICATES} "
404
                   f"SET {COL_REVOCATION_DATE} = ?, "
405
                   f"{COL_REVOCATION_REASON} = ? "
406
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
407
            values = [revocation_date,
408
                      revocation_reason,
409
                      certificate_id]
410
            self.cursor.execute(sql, values)
411
            self.connection.commit()
412
        except IntegrityError:
413
            Logger.error(INTEGRITY_ERROR_MSG)
414
            raise DatabaseException(INTEGRITY_ERROR_MSG)
415
        except ProgrammingError:
416
            Logger.error(PROGRAMMING_ERROR_MSG)
417
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
418
        except OperationalError:
419
            Logger.error(OPERATIONAL_ERROR_MSG)
420
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
421
        except NotSupportedError:
422
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
423
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
424
        except DatabaseError:
425
            Logger.error(DATABASE_ERROR_MSG)
426
            raise DatabaseException(DATABASE_ERROR_MSG)
427
        except Error:
428
            Logger.error(ERROR_MSG)
429
            raise DatabaseException(ERROR_MSG)
430

    
431
        return self.cursor.rowcount > 0
432

    
433
    def clear_certificate_revocation(self, certificate_id: int):
434
        """
435
        Clear revocation of a certificate
436

    
437
        :param certificate_id: ID of specific certificate
438

    
439
        :return:
440
            the result of whether the clear revocation was successful OR
441
            sqlite3.Error if an exception is thrown
442
        """
443

    
444
        Logger.debug("Function launched.")
445

    
446
        try:
447
            sql = (f"UPDATE {TAB_CERTIFICATES} "
448
                   f"SET {COL_REVOCATION_DATE} = NULL, "
449
                   f"{COL_REVOCATION_REASON} = NULL "
450
                   f"WHERE {COL_ID} = ?")
451
            values = [certificate_id]
452
            self.cursor.execute(sql, values)
453
            self.connection.commit()
454
        except IntegrityError:
455
            Logger.error(INTEGRITY_ERROR_MSG)
456
            raise DatabaseException(INTEGRITY_ERROR_MSG)
457
        except ProgrammingError:
458
            Logger.error(PROGRAMMING_ERROR_MSG)
459
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
460
        except OperationalError:
461
            Logger.error(OPERATIONAL_ERROR_MSG)
462
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
463
        except NotSupportedError:
464
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
465
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
466
        except DatabaseError:
467
            Logger.error(DATABASE_ERROR_MSG)
468
            raise DatabaseException(DATABASE_ERROR_MSG)
469
        except Error:
470
            Logger.error(ERROR_MSG)
471
            raise DatabaseException(ERROR_MSG)
472

    
473
        return self.cursor.rowcount > 0
474

    
475
    def get_all_revoked_by(self, certificate_id: int):
476
        """
477
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
478

    
479
        :param certificate_id: ID of specific certificate
480

    
481
        :return:
482
            list of the certificates OR
483
            None if the list is empty OR
484
            sqlite3.Error if an exception is thrown
485
        """
486

    
487
        Logger.debug("Function launched.")
488

    
489
        try:
490
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
491
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
492
            values = [certificate_id]
493
            self.cursor.execute(sql, values)
494
            certificate_rows = self.cursor.fetchall()
495

    
496
            certificates: List[Certificate] = []
497
            for certificate_row in certificate_rows:
498
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
499
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
500
                values = [certificate_row[0]]
501
                self.cursor.execute(sql, values)
502
                usage_rows = self.cursor.fetchall()
503

    
504
                usage_dict: Dict[int, bool] = {}
505
                for usage_row in usage_rows:
506
                    usage_dict[usage_row[2]] = True
507

    
508
                certificates.append(Certificate(certificate_row[0],      # ID
509
                                                certificate_row[1],      # valid from
510
                                                certificate_row[2],      # valid to
511
                                                certificate_row[3],      # pem data
512
                                                certificate_row[14],     # type ID
513
                                                certificate_row[15],     # parent ID
514
                                                certificate_row[16],     # private key ID
515
                                                usage_dict,
516
                                                certificate_row[4],      # common name
517
                                                certificate_row[5],      # country code
518
                                                certificate_row[6],      # locality
519
                                                certificate_row[7],      # province
520
                                                certificate_row[8],      # organization
521
                                                certificate_row[9],      # organizational unit
522
                                                certificate_row[10],     # email address
523
                                                certificate_row[11],     # revocation date
524
                                                certificate_row[12]))    # revocation reason
525
        except IntegrityError:
526
            Logger.error(INTEGRITY_ERROR_MSG)
527
            raise DatabaseException(INTEGRITY_ERROR_MSG)
528
        except ProgrammingError:
529
            Logger.error(PROGRAMMING_ERROR_MSG)
530
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
531
        except OperationalError:
532
            Logger.error(OPERATIONAL_ERROR_MSG)
533
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
534
        except NotSupportedError:
535
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
536
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
537
        except DatabaseError:
538
            Logger.error(DATABASE_ERROR_MSG)
539
            raise DatabaseException(DATABASE_ERROR_MSG)
540
        except Error:
541
            Logger.error(ERROR_MSG)
542
            raise DatabaseException(ERROR_MSG)
543

    
544
        return certificates
545

    
546
    def get_all_issued_by(self, certificate_id: int):
547
        """
548
        Get list of the certificates that are direct descendants of the certificate with the ID
549

    
550
        :param certificate_id: ID of specific certificate
551

    
552
        :return:
553
            list of the certificates OR
554
            None if the list is empty OR
555
            sqlite3.Error if an exception is thrown
556
        """
557

    
558
        Logger.debug("Function launched.")
559

    
560
        try:
561
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
562
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
563
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
564
            values = [certificate_id, certificate_id]
565
            self.cursor.execute(sql, values)
566
            certificate_rows = self.cursor.fetchall()
567

    
568
            certificates: List[Certificate] = []
569
            for certificate_row in certificate_rows:
570
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
571
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
572
                values = [certificate_row[0]]
573
                self.cursor.execute(sql, values)
574
                usage_rows = self.cursor.fetchall()
575

    
576
                usage_dict: Dict[int, bool] = {}
577
                for usage_row in usage_rows:
578
                    usage_dict[usage_row[2]] = True
579

    
580
                certificates.append(Certificate(certificate_row[0],      # ID
581
                                                certificate_row[1],      # valid from
582
                                                certificate_row[2],      # valid to
583
                                                certificate_row[3],      # pem data
584
                                                certificate_row[14],     # type ID
585
                                                certificate_row[15],     # parent ID
586
                                                certificate_row[16],     # private key ID
587
                                                usage_dict,
588
                                                certificate_row[4],      # common name
589
                                                certificate_row[5],      # country code
590
                                                certificate_row[6],      # locality
591
                                                certificate_row[7],      # province
592
                                                certificate_row[8],      # organization
593
                                                certificate_row[9],      # organizational unit
594
                                                certificate_row[10],     # email address
595
                                                certificate_row[11],     # revocation date
596
                                                certificate_row[12]))    # revocation reason
597
        except IntegrityError:
598
            Logger.error(INTEGRITY_ERROR_MSG)
599
            raise DatabaseException(INTEGRITY_ERROR_MSG)
600
        except ProgrammingError:
601
            Logger.error(PROGRAMMING_ERROR_MSG)
602
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
603
        except OperationalError:
604
            Logger.error(OPERATIONAL_ERROR_MSG)
605
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
606
        except NotSupportedError:
607
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
608
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
609
        except DatabaseError:
610
            Logger.error(DATABASE_ERROR_MSG)
611
            raise DatabaseException(DATABASE_ERROR_MSG)
612
        except Error:
613
            Logger.error(ERROR_MSG)
614
            raise DatabaseException(ERROR_MSG)
615

    
616
        return certificates
617

    
618
    def get_all_descendants_of(self, certificate_id: int):
619
        """
620
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
621
        between C and its root certificate authority (i.e. is an ancestor of C).
622
        :param certificate_id: target certificate ID
623
        :return: list of all descendants
624
        """
625

    
626
        Logger.debug("Function launched.")
627

    
628
        try:
629
            def dfs(children_of, this, collection: list):
630
                for child in children_of(this.certificate_id):
631
                    dfs(children_of, child, collection)
632
                collection.append(this)
633

    
634
            subtree_root = self.read(certificate_id)
635
            if subtree_root is None:
636
                return None
637

    
638
            all_certs = []
639
            dfs(self.get_all_issued_by, subtree_root, all_certs)
640
        except IntegrityError:
641
            Logger.error(INTEGRITY_ERROR_MSG)
642
            raise DatabaseException(INTEGRITY_ERROR_MSG)
643
        except ProgrammingError:
644
            Logger.error(PROGRAMMING_ERROR_MSG)
645
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
646
        except OperationalError:
647
            Logger.error(OPERATIONAL_ERROR_MSG)
648
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
649
        except NotSupportedError:
650
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
651
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
652
        except DatabaseError:
653
            Logger.error(DATABASE_ERROR_MSG)
654
            raise DatabaseException(DATABASE_ERROR_MSG)
655
        except Error:
656
            Logger.error(ERROR_MSG)
657
            raise DatabaseException(ERROR_MSG)
658

    
659
        return all_certs
660

    
661
    def get_next_id(self) -> int:
662
        """
663
        Get identifier of the next certificate that will be inserted into the database
664
        :return: identifier of the next certificate that will be added into the database
665
        """
666

    
667
        Logger.debug("Function launched.")
668

    
669
        try:
670
            # get next IDs of all tables
671
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
672
            results = self.cursor.fetchall()
673

    
674
            # search for next ID in Certificates table and return it
675
            for result in results:
676
                if result[0] == TAB_CERTIFICATES:
677
                    return result[1] + 1  # current last id + 1
678
            # if certificates table is not present in the query results, return 1
679
        except IntegrityError:
680
            Logger.error(INTEGRITY_ERROR_MSG)
681
            raise DatabaseException(INTEGRITY_ERROR_MSG)
682
        except ProgrammingError:
683
            Logger.error(PROGRAMMING_ERROR_MSG)
684
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
685
        except OperationalError:
686
            Logger.error(OPERATIONAL_ERROR_MSG)
687
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
688
        except NotSupportedError:
689
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
690
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
691
        except DatabaseError:
692
            Logger.error(DATABASE_ERROR_MSG)
693
            raise DatabaseException(DATABASE_ERROR_MSG)
694
        except Error:
695
            Logger.error(ERROR_MSG)
696
            raise DatabaseException(ERROR_MSG)
697

    
698
        return 1
(2-2/3)