Projekt

Obecné

Profil

Stáhnout (30.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12

    
13
class CertificateRepository:
14

    
15
    @inject
16
    def __init__(self, connection: Connection):
17
        """
18
        Constructor of the CertificateRepository object
19

    
20
        :param connection: Instance of the Connection object
21
        """
22
        self.connection = connection
23
        self.cursor = connection.cursor()
24

    
25
    def create(self, certificate: Certificate):
26
        """
27
        Creates a certificate.
28
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
29

    
30
        :param certificate: Instance of the Certificate object
31

    
32
        :return: the result of whether the creation was successful
33
        """
34

    
35
        Logger.debug("Function launched.")
36

    
37
        try:
38
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
39
                   f"({COL_VALID_FROM},"
40
                   f"{COL_VALID_TO},"
41
                   f"{COL_PEM_DATA},"
42
                   f"{COL_COMMON_NAME},"
43
                   f"{COL_COUNTRY_CODE},"
44
                   f"{COL_LOCALITY},"
45
                   f"{COL_PROVINCE},"
46
                   f"{COL_ORGANIZATION},"
47
                   f"{COL_ORGANIZATIONAL_UNIT},"
48
                   f"{COL_EMAIL_ADDRESS},"
49
                   f"{COL_TYPE_ID},"
50
                   f"{COL_PARENT_ID},"
51
                   f"{COL_PRIVATE_KEY_ID}) "
52
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
53
            values = [certificate.valid_from,
54
                      certificate.valid_to,
55
                      certificate.pem_data,
56
                      certificate.common_name,
57
                      certificate.country_code,
58
                      certificate.locality,
59
                      certificate.province,
60
                      certificate.organization,
61
                      certificate.organizational_unit,
62
                      certificate.email_address,
63
                      certificate.type_id,
64
                      certificate.parent_id,
65
                      certificate.private_key_id]
66
            self.cursor.execute(sql, values)
67
            self.connection.commit()
68

    
69
            last_id: int = self.cursor.lastrowid
70

    
71
            if certificate.type_id == ROOT_CA_ID:
72
                certificate.parent_id = last_id
73
                self.update(last_id, certificate)
74
            else:
75
                for usage_id, usage_value in certificate.usages.items():
76
                    if usage_value:
77
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
78
                               f"({COL_CERTIFICATE_ID},"
79
                               f"{COL_USAGE_TYPE_ID}) "
80
                               f"VALUES (?,?)")
81
                        values = [last_id, usage_id]
82
                        self.cursor.execute(sql, values)
83
                        self.connection.commit()
84

    
85
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
86
            Logger.error(str(e))
87
            raise DatabaseException(e)
88

    
89
        return last_id
90

    
91
    def read(self, certificate_id: int):
92
        """
93
        Reads (selects) a certificate.
94

    
95
        :param certificate_id: ID of specific certificate
96

    
97
        :return: instance of the Certificate object
98
        """
99

    
100
        Logger.debug("Function launched.")
101

    
102
        try:
103
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
104
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
105
            values = [certificate_id]
106
            self.cursor.execute(sql, values)
107
            certificate_row = self.cursor.fetchone()
108

    
109
            if certificate_row is None:
110
                return None
111

    
112
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
113
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
114
            self.cursor.execute(sql, values)
115
            usage_rows = self.cursor.fetchall()
116

    
117
            usage_dict: Dict[int, bool] = {}
118
            for usage_row in usage_rows:
119
                usage_dict[usage_row[2]] = True
120

    
121
            certificate: Certificate = Certificate(certificate_row[0],      # ID
122
                                                   certificate_row[1],      # valid from
123
                                                   certificate_row[2],      # valid to
124
                                                   certificate_row[3],      # pem data
125
                                                   certificate_row[14],     # type ID
126
                                                   certificate_row[15],     # parent ID
127
                                                   certificate_row[16],     # private key ID
128
                                                   usage_dict,
129
                                                   certificate_row[4],      # common name
130
                                                   certificate_row[5],      # country code
131
                                                   certificate_row[6],      # locality
132
                                                   certificate_row[7],      # province
133
                                                   certificate_row[8],      # organization
134
                                                   certificate_row[9],      # organizational unit
135
                                                   certificate_row[10],     # email address
136
                                                   certificate_row[11],     # revocation date
137
                                                   certificate_row[12])     # revocation reason
138

    
139
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
140
            Logger.error(str(e))
141
            raise DatabaseException(e)
142

    
143
        return certificate
144

    
145
    def read_all(self, filter_type: int = None):
146
        """
147
        Reads (selects) all certificates (with type).
148

    
149
        :param filter_type: ID of certificate type from CertificateTypes table
150

    
151
        :return: list of certificates
152
        """
153

    
154
        Logger.debug("Function launched.")
155

    
156
        try:
157
            sql_extension = ""
158
            values = []
159
            if filter_type is not None:
160
                sql_extension = (f" AND {COL_TYPE_ID} = ("
161
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
162
                values = [filter_type]
163

    
164
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
165
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
166
            self.cursor.execute(sql, values)
167
            certificate_rows = self.cursor.fetchall()
168

    
169
            certificates: List[Certificate] = []
170
            for certificate_row in certificate_rows:
171
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
172
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
173
                values = [certificate_row[0]]
174
                self.cursor.execute(sql, values)
175
                usage_rows = self.cursor.fetchall()
176

    
177
                usage_dict: Dict[int, bool] = {}
178
                for usage_row in usage_rows:
179
                    usage_dict[usage_row[2]] = True
180

    
181
                certificates.append(Certificate(certificate_row[0],      # ID
182
                                                certificate_row[1],      # valid from
183
                                                certificate_row[2],      # valid to
184
                                                certificate_row[3],      # pem data
185
                                                certificate_row[14],     # type ID
186
                                                certificate_row[15],     # parent ID
187
                                                certificate_row[16],     # private key ID
188
                                                usage_dict,
189
                                                certificate_row[4],      # common name
190
                                                certificate_row[5],      # country code
191
                                                certificate_row[6],      # locality
192
                                                certificate_row[7],      # province
193
                                                certificate_row[8],      # organization
194
                                                certificate_row[9],      # organizational unit
195
                                                certificate_row[10],     # email address
196
                                                certificate_row[11],     # revocation date
197
                                                certificate_row[12]))    # revocation reason
198

    
199
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
200
            Logger.error(str(e))
201
            raise DatabaseException(e)
202

    
203
        return certificates
204

    
205
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
206
                        per_page: int):
207
        """
208
        Reads (selects) all certificates according to a specific filtering and pagination options.
209
        :param target_types: certificate types (filter)
210
        :param target_usages: certificate usages (filter)
211
        :param target_cn_substring: certificate CN substring (filter)
212
        :param page: target page
213
        :param per_page: target page size
214
        :return: list of certificates
215
        """
216

    
217
        Logger.debug("Function launched.")
218

    
219
        try:
220
            values = []
221
            values += list(target_types)
222

    
223
            sql = (
224
                f"SELECT * "
225
                f"FROM {TAB_CERTIFICATES} a "
226
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
227
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
228
            )
229
            
230
            if target_usages is not None:
231
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
232
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
233
                values += list(target_usages)
234

    
235
            if target_cn_substring is not None:
236
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
237

    
238
                values += [target_cn_substring]
239

    
240
            if page is not None and per_page is not None:
241
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
242

    
243
            self.cursor.execute(sql, values)
244
            certificate_rows = self.cursor.fetchall()
245

    
246
            certificates: List[Certificate] = []
247
            for certificate_row in certificate_rows:
248
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
249
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
250
                types = [certificate_row[0]]
251
                self.cursor.execute(sql, types)
252
                usage_rows = self.cursor.fetchall()
253

    
254
                usage_dict: Dict[int, bool] = {}
255
                for usage_row in usage_rows:
256
                    usage_dict[usage_row[0]] = True
257

    
258
                certificates.append(Certificate(certificate_row[0],  # ID
259
                                                certificate_row[1],  # valid from
260
                                                certificate_row[2],  # valid to
261
                                                certificate_row[3],  # pem data
262
                                                certificate_row[14],  # type ID
263
                                                certificate_row[15],  # parent ID
264
                                                certificate_row[16],  # private key ID
265
                                                usage_dict,
266
                                                certificate_row[4],  # common name
267
                                                certificate_row[5],  # country code
268
                                                certificate_row[6],  # locality
269
                                                certificate_row[7],  # province
270
                                                certificate_row[8],  # organization
271
                                                certificate_row[9],  # organizational unit
272
                                                certificate_row[10],  # email address
273
                                                certificate_row[11],  # revocation date
274
                                                certificate_row[12]))  # revocation reason
275

    
276
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
277
            Logger.error(str(e))
278
            raise DatabaseException(e)
279

    
280
        return certificates
281

    
282
    def update(self, certificate_id: int, certificate: Certificate):
283
        """
284
        Updates a certificate.
285
        If the parameter of certificate (Certificate object) is not to be changed,
286
        the same value must be specified.
287

    
288
        :param certificate_id: ID of specific certificate
289
        :param certificate: Instance of the Certificate object
290

    
291
        :return: the result of whether the updation was successful
292
        """
293

    
294
        Logger.debug("Function launched.")
295

    
296
        try:
297
            sql = (f"UPDATE {TAB_CERTIFICATES} "
298
                   f"SET {COL_VALID_FROM} = ?, "
299
                   f"{COL_VALID_TO} = ?, "
300
                   f"{COL_PEM_DATA} = ?, "
301
                   f"{COL_COMMON_NAME} = ?, "
302
                   f"{COL_COUNTRY_CODE} = ?, "
303
                   f"{COL_LOCALITY} = ?, "
304
                   f"{COL_PROVINCE} = ?, "
305
                   f"{COL_ORGANIZATION} = ?, "
306
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
307
                   f"{COL_EMAIL_ADDRESS} = ?, "
308
                   f"{COL_TYPE_ID} = ?, "
309
                   f"{COL_PARENT_ID} = ?, "
310
                   f"{COL_PRIVATE_KEY_ID} = ? "
311
                   f"WHERE {COL_ID} = ?")
312
            values = [certificate.valid_from,
313
                      certificate.valid_to,
314
                      certificate.pem_data,
315
                      certificate.common_name,
316
                      certificate.country_code,
317
                      certificate.locality,
318
                      certificate.province,
319
                      certificate.organization,
320
                      certificate.organizational_unit,
321
                      certificate.email_address,
322
                      certificate.type_id,
323
                      certificate.parent_id,
324
                      certificate.private_key_id,
325
                      certificate_id]
326

    
327
            self.cursor.execute(sql, values)
328
            self.connection.commit()
329

    
330
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
331
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
332
            values = [certificate_id]
333
            self.cursor.execute(sql, values)
334
            self.connection.commit()
335

    
336
            check_updated = self.cursor.rowcount > 0
337

    
338
            # iterate over usage pairs
339
            for usage_id, usage_value in certificate.usages.items():
340
                if usage_value:
341
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
342
                           f"({COL_CERTIFICATE_ID},"
343
                           f"{COL_USAGE_TYPE_ID}) "
344
                           f"VALUES (?,?)")
345
                    values = [certificate_id, usage_id]
346
                    self.cursor.execute(sql, values)
347
                    self.connection.commit()
348

    
349
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
350
            Logger.error(str(e))
351
            raise DatabaseException(e)
352

    
353
        return check_updated
354

    
355
    def delete(self, certificate_id: int):
356
        """
357
        Deletes a certificate
358

    
359
        :param certificate_id: ID of specific certificate
360

    
361
        :return: the result of whether the deletion was successful
362
        """
363

    
364
        Logger.debug("Function launched.")
365

    
366
        try:
367
            sql = (f"UPDATE {TAB_CERTIFICATES} "
368
                   f"SET {COL_DELETION_DATE} = ? "
369
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
370
            values = [int(time.time()),
371
                      certificate_id]
372
            self.cursor.execute(sql, values)
373
            self.connection.commit()
374

    
375
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
376
            Logger.error(str(e))
377
            raise DatabaseException(e)
378

    
379
        return self.cursor.rowcount > 0
380

    
381
    def set_certificate_revoked(
382
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
383
        """
384
        Revoke a certificate
385

    
386
        :param certificate_id: ID of specific certificate
387
        :param revocation_date: Date, when the certificate is revoked
388
        :param revocation_reason: Reason of the revocation
389

    
390
        :return:
391
            the result of whether the revocation was successful OR
392
            sqlite3.Error if an exception is thrown
393
        """
394

    
395
        Logger.debug("Function launched.")
396

    
397
        try:
398
            if revocation_date != "" and revocation_reason == "":
399
                revocation_reason = REV_REASON_UNSPECIFIED
400
            elif revocation_date == "":
401
                return False
402

    
403
            sql = (f"UPDATE {TAB_CERTIFICATES} "
404
                   f"SET {COL_REVOCATION_DATE} = ?, "
405
                   f"{COL_REVOCATION_REASON} = ? "
406
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
407
            values = [revocation_date,
408
                      revocation_reason,
409
                      certificate_id]
410
            self.cursor.execute(sql, values)
411
            self.connection.commit()
412

    
413
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
414
            Logger.error(str(e))
415
            raise DatabaseException(e)
416

    
417
        return self.cursor.rowcount > 0
418

    
419
    def clear_certificate_revocation(self, certificate_id: int):
420
        """
421
        Clear revocation of a certificate
422

    
423
        :param certificate_id: ID of specific certificate
424

    
425
        :return:
426
            the result of whether the clear revocation was successful OR
427
            sqlite3.Error if an exception is thrown
428
        """
429

    
430
        Logger.debug("Function launched.")
431

    
432
        try:
433
            sql = (f"UPDATE {TAB_CERTIFICATES} "
434
                   f"SET {COL_REVOCATION_DATE} = NULL, "
435
                   f"{COL_REVOCATION_REASON} = NULL "
436
                   f"WHERE {COL_ID} = ?")
437
            values = [certificate_id]
438
            self.cursor.execute(sql, values)
439
            self.connection.commit()
440

    
441
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
442
            Logger.error(str(e))
443
            raise DatabaseException(e)
444

    
445
        return self.cursor.rowcount > 0
446

    
447
    def get_all_revoked_by(self, certificate_id: int):
448
        """
449
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
450

    
451
        :param certificate_id: ID of specific certificate
452

    
453
        :return:
454
            list of the certificates OR
455
            None if the list is empty OR
456
            sqlite3.Error if an exception is thrown
457
        """
458

    
459
        Logger.debug("Function launched.")
460

    
461
        try:
462
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
463
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
464
            values = [certificate_id]
465
            self.cursor.execute(sql, values)
466
            certificate_rows = self.cursor.fetchall()
467

    
468
            certificates: List[Certificate] = []
469
            for certificate_row in certificate_rows:
470
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
471
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
472
                values = [certificate_row[0]]
473
                self.cursor.execute(sql, values)
474
                usage_rows = self.cursor.fetchall()
475

    
476
                usage_dict: Dict[int, bool] = {}
477
                for usage_row in usage_rows:
478
                    usage_dict[usage_row[2]] = True
479

    
480
                certificates.append(Certificate(certificate_row[0],      # ID
481
                                                certificate_row[1],      # valid from
482
                                                certificate_row[2],      # valid to
483
                                                certificate_row[3],      # pem data
484
                                                certificate_row[14],     # type ID
485
                                                certificate_row[15],     # parent ID
486
                                                certificate_row[16],     # private key ID
487
                                                usage_dict,
488
                                                certificate_row[4],      # common name
489
                                                certificate_row[5],      # country code
490
                                                certificate_row[6],      # locality
491
                                                certificate_row[7],      # province
492
                                                certificate_row[8],      # organization
493
                                                certificate_row[9],      # organizational unit
494
                                                certificate_row[10],     # email address
495
                                                certificate_row[11],     # revocation date
496
                                                certificate_row[12]))    # revocation reason
497

    
498
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
499
            Logger.error(str(e))
500
            raise DatabaseException(e)
501

    
502
        return certificates
503

    
504
    def get_all_issued_by(self, certificate_id: int):
505
        """
506
        Get list of the certificates that are direct descendants of the certificate with the ID
507

    
508
        :param certificate_id: ID of specific certificate
509

    
510
        :return:
511
            list of the certificates OR
512
            None if the list is empty OR
513
            sqlite3.Error if an exception is thrown
514
        """
515

    
516
        Logger.debug("Function launched.")
517

    
518
        try:
519
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
520
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
521
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
522
            values = [certificate_id, certificate_id]
523
            self.cursor.execute(sql, values)
524
            certificate_rows = self.cursor.fetchall()
525

    
526
            certificates: List[Certificate] = []
527
            for certificate_row in certificate_rows:
528
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
529
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
530
                values = [certificate_row[0]]
531
                self.cursor.execute(sql, values)
532
                usage_rows = self.cursor.fetchall()
533

    
534
                usage_dict: Dict[int, bool] = {}
535
                for usage_row in usage_rows:
536
                    usage_dict[usage_row[2]] = True
537

    
538
                certificates.append(Certificate(certificate_row[0],      # ID
539
                                                certificate_row[1],      # valid from
540
                                                certificate_row[2],      # valid to
541
                                                certificate_row[3],      # pem data
542
                                                certificate_row[14],     # type ID
543
                                                certificate_row[15],     # parent ID
544
                                                certificate_row[16],     # private key ID
545
                                                usage_dict,
546
                                                certificate_row[4],      # common name
547
                                                certificate_row[5],      # country code
548
                                                certificate_row[6],      # locality
549
                                                certificate_row[7],      # province
550
                                                certificate_row[8],      # organization
551
                                                certificate_row[9],      # organizational unit
552
                                                certificate_row[10],     # email address
553
                                                certificate_row[11],     # revocation date
554
                                                certificate_row[12]))    # revocation reason
555

    
556
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
557
            Logger.error(str(e))
558
            raise DatabaseException(e)
559

    
560
        return certificates
561

    
562
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
563
        """
564
        Get list of the certificates that are direct descendants of the certificate with the ID
565

    
566
        :param target_types: certificate types (filter)
567
        :param target_usages: certificate usages (filter)
568
        :param target_cn_substring: certificate CN substring (filter)
569
        :param page: target page
570
        :param per_page: target page size
571
        :param issuer_id: ID of specific certificate
572

    
573
        :return:
574
            list of the certificates OR
575
            None if the list is empty
576
        """
577

    
578
        Logger.debug("Function launched.")
579

    
580
        try:
581
            values = [issuer_id, issuer_id]
582
            values += list(target_types)
583

    
584
            sql = (
585
                f"SELECT * "
586
                f"FROM {TAB_CERTIFICATES} a "
587
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
588
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
589
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
590
                
591
            )
592
            
593
            if target_usages is not None:
594
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
595
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
596
                values += list(target_usages)
597

    
598
            if target_cn_substring is not None:
599
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
600
                values += [target_cn_substring]
601

    
602
            if page is not None and per_page is not None:
603
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
604

    
605
            self.cursor.execute(sql, values)
606
            certificate_rows = self.cursor.fetchall()
607

    
608
            certificates: List[Certificate] = []
609
            for certificate_row in certificate_rows:
610
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
611
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
612
                values = [certificate_row[0]]
613
                self.cursor.execute(sql, values)
614
                usage_rows = self.cursor.fetchall()
615

    
616
                usage_dict: Dict[int, bool] = {}
617
                for usage_row in usage_rows:
618
                    usage_dict[usage_row[2]] = True
619

    
620
                certificates.append(Certificate(certificate_row[0],  # ID
621
                                                certificate_row[1],  # valid from
622
                                                certificate_row[2],  # valid to
623
                                                certificate_row[3],  # pem data
624
                                                certificate_row[14],  # type ID
625
                                                certificate_row[15],  # parent ID
626
                                                certificate_row[16],  # private key ID
627
                                                usage_dict,
628
                                                certificate_row[4],  # common name
629
                                                certificate_row[5],  # country code
630
                                                certificate_row[6],  # locality
631
                                                certificate_row[7],  # province
632
                                                certificate_row[8],  # organization
633
                                                certificate_row[9],  # organizational unit
634
                                                certificate_row[10],  # email address
635
                                                certificate_row[11],  # revocation date
636
                                                certificate_row[12]))  # revocation reason
637

    
638
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
639
            Logger.error(str(e))
640
            raise DatabaseException(e)
641

    
642
        return certificates
643

    
644
    def get_all_descendants_of(self, certificate_id: int):
645
        """
646
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
647
        between C and its root certificate authority (i.e. is an ancestor of C).
648
        :param certificate_id: target certificate ID
649
        :return: list of all descendants
650
        """
651

    
652
        Logger.debug("Function launched.")
653

    
654
        try:
655
            def dfs(children_of, this, collection: list):
656
                for child in children_of(this.certificate_id):
657
                    dfs(children_of, child, collection)
658
                collection.append(this)
659

    
660
            subtree_root = self.read(certificate_id)
661
            if subtree_root is None:
662
                return None
663

    
664
            all_certs = []
665
            dfs(self.get_all_issued_by, subtree_root, all_certs)
666

    
667
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
668
            Logger.error(str(e))
669
            raise DatabaseException(e)
670

    
671
        return all_certs
672

    
673
    def get_next_id(self) -> int:
674
        """
675
        Get identifier of the next certificate that will be inserted into the database
676
        :return: identifier of the next certificate that will be added into the database
677
        """
678

    
679
        Logger.debug("Function launched.")
680

    
681
        try:
682
            # get next IDs of all tables
683
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
684
            results = self.cursor.fetchall()
685

    
686
            # search for next ID in Certificates table and return it
687
            for result in results:
688
                if result[0] == TAB_CERTIFICATES:
689
                    return result[1] + 1  # current last id + 1
690
            # if certificates table is not present in the query results, return 1
691

    
692
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
693
            Logger.error(str(e))
694
            raise DatabaseException(e)
695

    
696
        return 1
(2-2/3)