Projekt

Obecné

Profil

Stáhnout (30.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18

    
19

    
20
class CertificateRepository:
21

    
22
    @inject
23
    def __init__(self, connection: Connection):
24
        """
25
        Constructor of the CertificateRepository object
26

    
27
        :param connection: Instance of the Connection object
28
        """
29
        self.connection = connection
30
        self.cursor = connection.cursor()
31

    
32
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36

    
37
        :param certificate: Instance of the Certificate object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        Logger.debug("Function launched.")
43

    
44
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56
                   f"{COL_TYPE_ID},"
57
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61
                      certificate.valid_to,
62
                      certificate.pem_data,
63
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70
                      certificate.type_id,
71
                      certificate.parent_id,
72
                      certificate.private_key_id]
73
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75

    
76
            last_id: int = self.cursor.lastrowid
77

    
78
            if certificate.type_id == ROOT_CA_ID:
79
                certificate.parent_id = last_id
80
                self.update(last_id, certificate)
81
            else:
82
                for usage_id, usage_value in certificate.usages.items():
83
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91

    
92
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
93
            Logger.error(str(e))
94
            raise DatabaseException(e)
95

    
96
        return last_id
97

    
98
    def read(self, certificate_id: int):
99
        """
100
        Reads (selects) a certificate.
101

    
102
        :param certificate_id: ID of specific certificate
103

    
104
        :return: instance of the Certificate object
105
        """
106

    
107
        Logger.debug("Function launched.")
108

    
109
        try:
110
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
111
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
112
            values = [certificate_id]
113
            self.cursor.execute(sql, values)
114
            certificate_row = self.cursor.fetchone()
115

    
116
            if certificate_row is None:
117
                return None
118

    
119
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
120
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
121
            self.cursor.execute(sql, values)
122
            usage_rows = self.cursor.fetchall()
123

    
124
            usage_dict: Dict[int, bool] = {}
125
            for usage_row in usage_rows:
126
                usage_dict[usage_row[2]] = True
127

    
128
            certificate: Certificate = Certificate(certificate_row[0],      # ID
129
                                                   certificate_row[1],      # valid from
130
                                                   certificate_row[2],      # valid to
131
                                                   certificate_row[3],      # pem data
132
                                                   certificate_row[14],     # type ID
133
                                                   certificate_row[15],     # parent ID
134
                                                   certificate_row[16],     # private key ID
135
                                                   usage_dict,
136
                                                   certificate_row[4],      # common name
137
                                                   certificate_row[5],      # country code
138
                                                   certificate_row[6],      # locality
139
                                                   certificate_row[7],      # province
140
                                                   certificate_row[8],      # organization
141
                                                   certificate_row[9],      # organizational unit
142
                                                   certificate_row[10],     # email address
143
                                                   certificate_row[11],     # revocation date
144
                                                   certificate_row[12])     # revocation reason
145

    
146
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
147
            Logger.error(str(e))
148
            raise DatabaseException(e)
149

    
150
        return certificate
151

    
152
    def read_all(self, filter_type: int = None):
153
        """
154
        Reads (selects) all certificates (with type).
155

    
156
        :param filter_type: ID of certificate type from CertificateTypes table
157

    
158
        :return: list of certificates
159
        """
160

    
161
        Logger.debug("Function launched.")
162

    
163
        try:
164
            sql_extension = ""
165
            values = []
166
            if filter_type is not None:
167
                sql_extension = (f" AND {COL_TYPE_ID} = ("
168
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
169
                values = [filter_type]
170

    
171
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
172
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
173
            self.cursor.execute(sql, values)
174
            certificate_rows = self.cursor.fetchall()
175

    
176
            certificates: List[Certificate] = []
177
            for certificate_row in certificate_rows:
178
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
179
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
180
                values = [certificate_row[0]]
181
                self.cursor.execute(sql, values)
182
                usage_rows = self.cursor.fetchall()
183

    
184
                usage_dict: Dict[int, bool] = {}
185
                for usage_row in usage_rows:
186
                    usage_dict[usage_row[2]] = True
187

    
188
                certificates.append(Certificate(certificate_row[0],      # ID
189
                                                certificate_row[1],      # valid from
190
                                                certificate_row[2],      # valid to
191
                                                certificate_row[3],      # pem data
192
                                                certificate_row[14],     # type ID
193
                                                certificate_row[15],     # parent ID
194
                                                certificate_row[16],     # private key ID
195
                                                usage_dict,
196
                                                certificate_row[4],      # common name
197
                                                certificate_row[5],      # country code
198
                                                certificate_row[6],      # locality
199
                                                certificate_row[7],      # province
200
                                                certificate_row[8],      # organization
201
                                                certificate_row[9],      # organizational unit
202
                                                certificate_row[10],     # email address
203
                                                certificate_row[11],     # revocation date
204
                                                certificate_row[12]))    # revocation reason
205

    
206
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
207
            Logger.error(str(e))
208
            raise DatabaseException(e)
209

    
210
        return certificates
211

    
212
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
213
                        per_page: int):
214
        """
215
        Reads (selects) all certificates according to a specific filtering and pagination options.
216
        :param target_types: certificate types (filter)
217
        :param target_usages: certificate usages (filter)
218
        :param target_cn_substring: certificate CN substring (filter)
219
        :param page: target page
220
        :param per_page: target page size
221
        :return: list of certificates
222
        """
223

    
224
        Logger.debug("Function launched.")
225

    
226
        try:
227
            values = []
228
            values += list(target_types)
229

    
230
            sql = (
231
                f"SELECT * "
232
                f"FROM {TAB_CERTIFICATES} a "
233
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
234
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
235
            )
236
            
237
            if target_usages is not None:
238
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
239
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
240
                values += list(target_usages)
241

    
242
            if target_cn_substring is not None:
243
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
244

    
245
                values += [target_cn_substring]
246

    
247
            if page is not None and per_page is not None:
248
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
249

    
250
            self.cursor.execute(sql, values)
251
            certificate_rows = self.cursor.fetchall()
252

    
253
            certificates: List[Certificate] = []
254
            for certificate_row in certificate_rows:
255
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
256
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
257
                types = [certificate_row[0]]
258
                self.cursor.execute(sql, types)
259
                usage_rows = self.cursor.fetchall()
260

    
261
                usage_dict: Dict[int, bool] = {}
262
                for usage_row in usage_rows:
263
                    usage_dict[usage_row[0]] = True
264

    
265
                certificates.append(Certificate(certificate_row[0],  # ID
266
                                                certificate_row[1],  # valid from
267
                                                certificate_row[2],  # valid to
268
                                                certificate_row[3],  # pem data
269
                                                certificate_row[14],  # type ID
270
                                                certificate_row[15],  # parent ID
271
                                                certificate_row[16],  # private key ID
272
                                                usage_dict,
273
                                                certificate_row[4],  # common name
274
                                                certificate_row[5],  # country code
275
                                                certificate_row[6],  # locality
276
                                                certificate_row[7],  # province
277
                                                certificate_row[8],  # organization
278
                                                certificate_row[9],  # organizational unit
279
                                                certificate_row[10],  # email address
280
                                                certificate_row[11],  # revocation date
281
                                                certificate_row[12]))  # revocation reason
282

    
283
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
284
            Logger.error(str(e))
285
            raise DatabaseException(e)
286

    
287
        return certificates
288

    
289
    def update(self, certificate_id: int, certificate: Certificate):
290
        """
291
        Updates a certificate.
292
        If the parameter of certificate (Certificate object) is not to be changed,
293
        the same value must be specified.
294

    
295
        :param certificate_id: ID of specific certificate
296
        :param certificate: Instance of the Certificate object
297

    
298
        :return: the result of whether the updation was successful
299
        """
300

    
301
        Logger.debug("Function launched.")
302

    
303
        try:
304
            sql = (f"UPDATE {TAB_CERTIFICATES} "
305
                   f"SET {COL_VALID_FROM} = ?, "
306
                   f"{COL_VALID_TO} = ?, "
307
                   f"{COL_PEM_DATA} = ?, "
308
                   f"{COL_COMMON_NAME} = ?, "
309
                   f"{COL_COUNTRY_CODE} = ?, "
310
                   f"{COL_LOCALITY} = ?, "
311
                   f"{COL_PROVINCE} = ?, "
312
                   f"{COL_ORGANIZATION} = ?, "
313
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
314
                   f"{COL_EMAIL_ADDRESS} = ?, "
315
                   f"{COL_TYPE_ID} = ?, "
316
                   f"{COL_PARENT_ID} = ?, "
317
                   f"{COL_PRIVATE_KEY_ID} = ? "
318
                   f"WHERE {COL_ID} = ?")
319
            values = [certificate.valid_from,
320
                      certificate.valid_to,
321
                      certificate.pem_data,
322
                      certificate.common_name,
323
                      certificate.country_code,
324
                      certificate.locality,
325
                      certificate.province,
326
                      certificate.organization,
327
                      certificate.organizational_unit,
328
                      certificate.email_address,
329
                      certificate.type_id,
330
                      certificate.parent_id,
331
                      certificate.private_key_id,
332
                      certificate_id]
333

    
334
            self.cursor.execute(sql, values)
335
            self.connection.commit()
336

    
337
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
338
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
339
            values = [certificate_id]
340
            self.cursor.execute(sql, values)
341
            self.connection.commit()
342

    
343
            check_updated = self.cursor.rowcount > 0
344

    
345
            # iterate over usage pairs
346
            for usage_id, usage_value in certificate.usages.items():
347
                if usage_value:
348
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
349
                           f"({COL_CERTIFICATE_ID},"
350
                           f"{COL_USAGE_TYPE_ID}) "
351
                           f"VALUES (?,?)")
352
                    values = [certificate_id, usage_id]
353
                    self.cursor.execute(sql, values)
354
                    self.connection.commit()
355

    
356
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
357
            Logger.error(str(e))
358
            raise DatabaseException(e)
359

    
360
        return check_updated
361

    
362
    def delete(self, certificate_id: int):
363
        """
364
        Deletes a certificate
365

    
366
        :param certificate_id: ID of specific certificate
367

    
368
        :return: the result of whether the deletion was successful
369
        """
370

    
371
        Logger.debug("Function launched.")
372

    
373
        try:
374
            sql = (f"UPDATE {TAB_CERTIFICATES} "
375
                   f"SET {COL_DELETION_DATE} = ? "
376
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
377
            values = [int(time.time()),
378
                      certificate_id]
379
            self.cursor.execute(sql, values)
380
            self.connection.commit()
381

    
382
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
383
            Logger.error(str(e))
384
            raise DatabaseException(e)
385

    
386
        return self.cursor.rowcount > 0
387

    
388
    def set_certificate_revoked(
389
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
390
        """
391
        Revoke a certificate
392

    
393
        :param certificate_id: ID of specific certificate
394
        :param revocation_date: Date, when the certificate is revoked
395
        :param revocation_reason: Reason of the revocation
396

    
397
        :return:
398
            the result of whether the revocation was successful OR
399
            sqlite3.Error if an exception is thrown
400
        """
401

    
402
        Logger.debug("Function launched.")
403

    
404
        try:
405
            if revocation_date != "" and revocation_reason == "":
406
                revocation_reason = REV_REASON_UNSPECIFIED
407
            elif revocation_date == "":
408
                return False
409

    
410
            sql = (f"UPDATE {TAB_CERTIFICATES} "
411
                   f"SET {COL_REVOCATION_DATE} = ?, "
412
                   f"{COL_REVOCATION_REASON} = ? "
413
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
414
            values = [revocation_date,
415
                      revocation_reason,
416
                      certificate_id]
417
            self.cursor.execute(sql, values)
418
            self.connection.commit()
419

    
420
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
421
            Logger.error(str(e))
422
            raise DatabaseException(e)
423

    
424
        return self.cursor.rowcount > 0
425

    
426
    def clear_certificate_revocation(self, certificate_id: int):
427
        """
428
        Clear revocation of a certificate
429

    
430
        :param certificate_id: ID of specific certificate
431

    
432
        :return:
433
            the result of whether the clear revocation was successful OR
434
            sqlite3.Error if an exception is thrown
435
        """
436

    
437
        Logger.debug("Function launched.")
438

    
439
        try:
440
            sql = (f"UPDATE {TAB_CERTIFICATES} "
441
                   f"SET {COL_REVOCATION_DATE} = NULL, "
442
                   f"{COL_REVOCATION_REASON} = NULL "
443
                   f"WHERE {COL_ID} = ?")
444
            values = [certificate_id]
445
            self.cursor.execute(sql, values)
446
            self.connection.commit()
447

    
448
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
449
            Logger.error(str(e))
450
            raise DatabaseException(e)
451

    
452
        return self.cursor.rowcount > 0
453

    
454
    def get_all_revoked_by(self, certificate_id: int):
455
        """
456
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
457

    
458
        :param certificate_id: ID of specific certificate
459

    
460
        :return:
461
            list of the certificates OR
462
            None if the list is empty OR
463
            sqlite3.Error if an exception is thrown
464
        """
465

    
466
        Logger.debug("Function launched.")
467

    
468
        try:
469
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
470
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
471
            values = [certificate_id]
472
            self.cursor.execute(sql, values)
473
            certificate_rows = self.cursor.fetchall()
474

    
475
            certificates: List[Certificate] = []
476
            for certificate_row in certificate_rows:
477
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
478
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
479
                values = [certificate_row[0]]
480
                self.cursor.execute(sql, values)
481
                usage_rows = self.cursor.fetchall()
482

    
483
                usage_dict: Dict[int, bool] = {}
484
                for usage_row in usage_rows:
485
                    usage_dict[usage_row[2]] = True
486

    
487
                certificates.append(Certificate(certificate_row[0],      # ID
488
                                                certificate_row[1],      # valid from
489
                                                certificate_row[2],      # valid to
490
                                                certificate_row[3],      # pem data
491
                                                certificate_row[14],     # type ID
492
                                                certificate_row[15],     # parent ID
493
                                                certificate_row[16],     # private key ID
494
                                                usage_dict,
495
                                                certificate_row[4],      # common name
496
                                                certificate_row[5],      # country code
497
                                                certificate_row[6],      # locality
498
                                                certificate_row[7],      # province
499
                                                certificate_row[8],      # organization
500
                                                certificate_row[9],      # organizational unit
501
                                                certificate_row[10],     # email address
502
                                                certificate_row[11],     # revocation date
503
                                                certificate_row[12]))    # revocation reason
504

    
505
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
506
            Logger.error(str(e))
507
            raise DatabaseException(e)
508

    
509
        return certificates
510

    
511
    def get_all_issued_by(self, certificate_id: int):
512
        """
513
        Get list of the certificates that are direct descendants of the certificate with the ID
514

    
515
        :param certificate_id: ID of specific certificate
516

    
517
        :return:
518
            list of the certificates OR
519
            None if the list is empty OR
520
            sqlite3.Error if an exception is thrown
521
        """
522

    
523
        Logger.debug("Function launched.")
524

    
525
        try:
526
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
527
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
528
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
529
            values = [certificate_id, certificate_id]
530
            self.cursor.execute(sql, values)
531
            certificate_rows = self.cursor.fetchall()
532

    
533
            certificates: List[Certificate] = []
534
            for certificate_row in certificate_rows:
535
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
536
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
537
                values = [certificate_row[0]]
538
                self.cursor.execute(sql, values)
539
                usage_rows = self.cursor.fetchall()
540

    
541
                usage_dict: Dict[int, bool] = {}
542
                for usage_row in usage_rows:
543
                    usage_dict[usage_row[2]] = True
544

    
545
                certificates.append(Certificate(certificate_row[0],      # ID
546
                                                certificate_row[1],      # valid from
547
                                                certificate_row[2],      # valid to
548
                                                certificate_row[3],      # pem data
549
                                                certificate_row[14],     # type ID
550
                                                certificate_row[15],     # parent ID
551
                                                certificate_row[16],     # private key ID
552
                                                usage_dict,
553
                                                certificate_row[4],      # common name
554
                                                certificate_row[5],      # country code
555
                                                certificate_row[6],      # locality
556
                                                certificate_row[7],      # province
557
                                                certificate_row[8],      # organization
558
                                                certificate_row[9],      # organizational unit
559
                                                certificate_row[10],     # email address
560
                                                certificate_row[11],     # revocation date
561
                                                certificate_row[12]))    # revocation reason
562

    
563
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
564
            Logger.error(str(e))
565
            raise DatabaseException(e)
566

    
567
        return certificates
568

    
569
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
570
        """
571
        Get list of the certificates that are direct descendants of the certificate with the ID
572

    
573
        :param target_types: certificate types (filter)
574
        :param target_usages: certificate usages (filter)
575
        :param target_cn_substring: certificate CN substring (filter)
576
        :param page: target page
577
        :param per_page: target page size
578
        :param issuer_id: ID of specific certificate
579

    
580
        :return:
581
            list of the certificates OR
582
            None if the list is empty
583
        """
584

    
585
        Logger.debug("Function launched.")
586

    
587
        try:
588
            values = [issuer_id, issuer_id]
589
            values += list(target_types)
590

    
591
            sql = (
592
                f"SELECT * "
593
                f"FROM {TAB_CERTIFICATES} a "
594
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
595
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
596
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
597
                
598
            )
599
            
600
            if target_usages is not None:
601
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
602
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
603
                values += list(target_usages)
604

    
605
            if target_cn_substring is not None:
606
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
607
                values += [target_cn_substring]
608

    
609
            if page is not None and per_page is not None:
610
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
611

    
612
            self.cursor.execute(sql, values)
613
            certificate_rows = self.cursor.fetchall()
614

    
615
            certificates: List[Certificate] = []
616
            for certificate_row in certificate_rows:
617
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
618
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
619
                values = [certificate_row[0]]
620
                self.cursor.execute(sql, values)
621
                usage_rows = self.cursor.fetchall()
622

    
623
                usage_dict: Dict[int, bool] = {}
624
                for usage_row in usage_rows:
625
                    usage_dict[usage_row[2]] = True
626

    
627
                certificates.append(Certificate(certificate_row[0],  # ID
628
                                                certificate_row[1],  # valid from
629
                                                certificate_row[2],  # valid to
630
                                                certificate_row[3],  # pem data
631
                                                certificate_row[14],  # type ID
632
                                                certificate_row[15],  # parent ID
633
                                                certificate_row[16],  # private key ID
634
                                                usage_dict,
635
                                                certificate_row[4],  # common name
636
                                                certificate_row[5],  # country code
637
                                                certificate_row[6],  # locality
638
                                                certificate_row[7],  # province
639
                                                certificate_row[8],  # organization
640
                                                certificate_row[9],  # organizational unit
641
                                                certificate_row[10],  # email address
642
                                                certificate_row[11],  # revocation date
643
                                                certificate_row[12]))  # revocation reason
644

    
645
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
646
            Logger.error(str(e))
647
            raise DatabaseException(e)
648

    
649
        return certificates
650

    
651
    def get_all_descendants_of(self, certificate_id: int):
652
        """
653
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
654
        between C and its root certificate authority (i.e. is an ancestor of C).
655
        :param certificate_id: target certificate ID
656
        :return: list of all descendants
657
        """
658

    
659
        Logger.debug("Function launched.")
660

    
661
        try:
662
            def dfs(children_of, this, collection: list):
663
                for child in children_of(this.certificate_id):
664
                    dfs(children_of, child, collection)
665
                collection.append(this)
666

    
667
            subtree_root = self.read(certificate_id)
668
            if subtree_root is None:
669
                return None
670

    
671
            all_certs = []
672
            dfs(self.get_all_issued_by, subtree_root, all_certs)
673

    
674
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
675
            Logger.error(str(e))
676
            raise DatabaseException(e)
677

    
678
        return all_certs
679

    
680
    def get_next_id(self) -> int:
681
        """
682
        Get identifier of the next certificate that will be inserted into the database
683
        :return: identifier of the next certificate that will be added into the database
684
        """
685

    
686
        Logger.debug("Function launched.")
687

    
688
        try:
689
            # get next IDs of all tables
690
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
691
            results = self.cursor.fetchall()
692

    
693
            # search for next ID in Certificates table and return it
694
            for result in results:
695
                if result[0] == TAB_CERTIFICATES:
696
                    return result[1] + 1  # current last id + 1
697
            # if certificates table is not present in the query results, return 1
698

    
699
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
700
            Logger.error(str(e))
701
            raise DatabaseException(e)
702

    
703
        return 1
(2-2/3)