Projekt

Obecné

Profil

Stáhnout (37.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18

    
19

    
20
class CertificateRepository:
21

    
22
    @inject
23
    def __init__(self, connection: Connection):
24
        """
25
        Constructor of the CertificateRepository object
26

    
27
        :param connection: Instance of the Connection object
28
        """
29
        self.connection = connection
30
        self.cursor = connection.cursor()
31

    
32
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36

    
37
        :param certificate: Instance of the Certificate object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        Logger.debug("Function launched.")
43

    
44
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56
                   f"{COL_TYPE_ID},"
57
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61
                      certificate.valid_to,
62
                      certificate.pem_data,
63
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70
                      certificate.type_id,
71
                      certificate.parent_id,
72
                      certificate.private_key_id]
73
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75

    
76
            last_id: int = self.cursor.lastrowid
77

    
78
            if certificate.type_id == ROOT_CA_ID:
79
                certificate.parent_id = last_id
80
                self.update(last_id, certificate)
81
            else:
82
                for usage_id, usage_value in certificate.usages.items():
83
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97
        except OperationalError:
98
            Logger.error(OPERATIONAL_ERROR_MSG)
99
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
100
        except NotSupportedError:
101
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
102
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
103
        except DatabaseError:
104
            Logger.error(DATABASE_ERROR_MSG)
105
            raise DatabaseException(DATABASE_ERROR_MSG)
106
        except Error:
107
            Logger.error(ERROR_MSG)
108
            raise DatabaseException(ERROR_MSG)
109

    
110
        return last_id
111

    
112
    def read(self, certificate_id: int):
113
        """
114
        Reads (selects) a certificate.
115

    
116
        :param certificate_id: ID of specific certificate
117

    
118
        :return: instance of the Certificate object
119
        """
120

    
121
        Logger.debug("Function launched.")
122

    
123
        try:
124
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
125
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
126
            values = [certificate_id]
127
            self.cursor.execute(sql, values)
128
            certificate_row = self.cursor.fetchone()
129

    
130
            if certificate_row is None:
131
                return None
132

    
133
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
134
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
135
            self.cursor.execute(sql, values)
136
            usage_rows = self.cursor.fetchall()
137

    
138
            usage_dict: Dict[int, bool] = {}
139
            for usage_row in usage_rows:
140
                usage_dict[usage_row[2]] = True
141

    
142
            certificate: Certificate = Certificate(certificate_row[0],      # ID
143
                                                   certificate_row[1],      # valid from
144
                                                   certificate_row[2],      # valid to
145
                                                   certificate_row[3],      # pem data
146
                                                   certificate_row[14],     # type ID
147
                                                   certificate_row[15],     # parent ID
148
                                                   certificate_row[16],     # private key ID
149
                                                   usage_dict,
150
                                                   certificate_row[4],      # common name
151
                                                   certificate_row[5],      # country code
152
                                                   certificate_row[6],      # locality
153
                                                   certificate_row[7],      # province
154
                                                   certificate_row[8],      # organization
155
                                                   certificate_row[9],      # organizational unit
156
                                                   certificate_row[10],     # email address
157
                                                   certificate_row[11],     # revocation date
158
                                                   certificate_row[12])     # revocation reason
159

    
160
        except IntegrityError:
161
            Logger.error(INTEGRITY_ERROR_MSG)
162
            raise DatabaseException(INTEGRITY_ERROR_MSG)
163
        except ProgrammingError:
164
            Logger.error(PROGRAMMING_ERROR_MSG)
165
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
166
        except OperationalError:
167
            Logger.error(OPERATIONAL_ERROR_MSG)
168
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
169
        except NotSupportedError:
170
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
171
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
172
        except DatabaseError:
173
            Logger.error(DATABASE_ERROR_MSG)
174
            raise DatabaseException(DATABASE_ERROR_MSG)
175
        except Error:
176
            Logger.error(ERROR_MSG)
177
            raise DatabaseException(ERROR_MSG)
178

    
179
        return certificate
180

    
181
    def read_all(self, filter_type: int = None):
182
        """
183
        Reads (selects) all certificates (with type).
184

    
185
        :param filter_type: ID of certificate type from CertificateTypes table
186

    
187
        :return: list of certificates
188
        """
189

    
190
        Logger.debug("Function launched.")
191

    
192
        try:
193
            sql_extension = ""
194
            values = []
195
            if filter_type is not None:
196
                sql_extension = (f" AND {COL_TYPE_ID} = ("
197
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
198
                values = [filter_type]
199

    
200
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
201
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
202
            self.cursor.execute(sql, values)
203
            certificate_rows = self.cursor.fetchall()
204

    
205
            certificates: List[Certificate] = []
206
            for certificate_row in certificate_rows:
207
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
208
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
209
                values = [certificate_row[0]]
210
                self.cursor.execute(sql, values)
211
                usage_rows = self.cursor.fetchall()
212

    
213
                usage_dict: Dict[int, bool] = {}
214
                for usage_row in usage_rows:
215
                    usage_dict[usage_row[2]] = True
216

    
217
                certificates.append(Certificate(certificate_row[0],      # ID
218
                                                certificate_row[1],      # valid from
219
                                                certificate_row[2],      # valid to
220
                                                certificate_row[3],      # pem data
221
                                                certificate_row[14],     # type ID
222
                                                certificate_row[15],     # parent ID
223
                                                certificate_row[16],     # private key ID
224
                                                usage_dict,
225
                                                certificate_row[4],      # common name
226
                                                certificate_row[5],      # country code
227
                                                certificate_row[6],      # locality
228
                                                certificate_row[7],      # province
229
                                                certificate_row[8],      # organization
230
                                                certificate_row[9],      # organizational unit
231
                                                certificate_row[10],     # email address
232
                                                certificate_row[11],     # revocation date
233
                                                certificate_row[12]))    # revocation reason
234
        except IntegrityError:
235
            Logger.error(INTEGRITY_ERROR_MSG)
236
            raise DatabaseException(INTEGRITY_ERROR_MSG)
237
        except ProgrammingError:
238
            Logger.error(PROGRAMMING_ERROR_MSG)
239
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
240
        except OperationalError:
241
            Logger.error(OPERATIONAL_ERROR_MSG)
242
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
243
        except NotSupportedError:
244
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
245
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
246
        except DatabaseError:
247
            Logger.error(DATABASE_ERROR_MSG)
248
            raise DatabaseException(DATABASE_ERROR_MSG)
249
        except Error:
250
            Logger.error(ERROR_MSG)
251
            raise DatabaseException(ERROR_MSG)
252

    
253
        return certificates
254

    
255
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
256
                        per_page: int):
257
        """
258
        Reads (selects) all certificates according to a specific filtering and pagination options.
259
        :param target_types: certificate types (filter)
260
        :param target_usages: certificate usages (filter)
261
        :param target_cn_substring: certificate CN substring (filter)
262
        :param page: target page
263
        :param per_page: target page size
264
        :return: list of certificates
265
        """
266

    
267
        Logger.debug("Function launched.")
268

    
269
        try:
270
            values = []
271
            values += list(target_types)
272
            values += list(target_usages)
273

    
274
            sql = (
275
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
276
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
277
                f"FROM {TAB_CERTIFICATES} a "
278
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
279
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
280
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
281
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
282
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
283
                f") > 0 "
284
            )
285

    
286
            if target_cn_substring is not None:
287
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
288

    
289
                values += [target_cn_substring]
290

    
291
            if page is not None and per_page is not None:
292
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
293

    
294
            self.cursor.execute(sql, values)
295
            certificate_rows = self.cursor.fetchall()
296

    
297
            certificates: List[Certificate] = []
298
            for certificate_row in certificate_rows:
299
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
300
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
301
                types = [certificate_row[0]]
302
                self.cursor.execute(sql, types)
303
                usage_rows = self.cursor.fetchall()
304

    
305
                usage_dict: Dict[int, bool] = {}
306
                for usage_row in usage_rows:
307
                    usage_dict[usage_row[0]] = True
308

    
309
                certificates.append(Certificate(certificate_row[0],
310
                                                certificate_row[1],
311
                                                certificate_row[2],
312
                                                certificate_row[3],
313
                                                certificate_row[4],
314
                                                certificate_row[7],
315
                                                certificate_row[8],
316
                                                certificate_row[9],
317
                                                usage_dict,
318
                                                certificate_row[5],
319
                                                certificate_row[6]))
320
        except IntegrityError:
321
            Logger.error(INTEGRITY_ERROR_MSG)
322
            raise DatabaseException(INTEGRITY_ERROR_MSG)
323
        except ProgrammingError:
324
            Logger.error(PROGRAMMING_ERROR_MSG)
325
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
326
        except OperationalError:
327
            Logger.error(OPERATIONAL_ERROR_MSG)
328
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
329
        except NotSupportedError:
330
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
331
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
332
        except DatabaseError:
333
            Logger.error(DATABASE_ERROR_MSG)
334
            raise DatabaseException(DATABASE_ERROR_MSG)
335
        except Error:
336
            Logger.error(ERROR_MSG)
337
            raise DatabaseException(ERROR_MSG)
338

    
339
        return certificates
340

    
341
    def update(self, certificate_id: int, certificate: Certificate):
342
        """
343
        Updates a certificate.
344
        If the parameter of certificate (Certificate object) is not to be changed,
345
        the same value must be specified.
346

    
347
        :param certificate_id: ID of specific certificate
348
        :param certificate: Instance of the Certificate object
349

    
350
        :return: the result of whether the updation was successful
351
        """
352

    
353
        Logger.debug("Function launched.")
354

    
355
        try:
356
            sql = (f"UPDATE {TAB_CERTIFICATES} "
357
                   f"SET {COL_VALID_FROM} = ?, "
358
                   f"{COL_VALID_TO} = ?, "
359
                   f"{COL_PEM_DATA} = ?, "
360
                   f"{COL_COMMON_NAME} = ?, "
361
                   f"{COL_COUNTRY_CODE} = ?, "
362
                   f"{COL_LOCALITY} = ?, "
363
                   f"{COL_PROVINCE} = ?, "
364
                   f"{COL_ORGANIZATION} = ?, "
365
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
366
                   f"{COL_EMAIL_ADDRESS} = ?, "
367
                   f"{COL_TYPE_ID} = ?, "
368
                   f"{COL_PARENT_ID} = ?, "
369
                   f"{COL_PRIVATE_KEY_ID} = ? "
370
                   f"WHERE {COL_ID} = ?")
371
            values = [certificate.valid_from,
372
                      certificate.valid_to,
373
                      certificate.pem_data,
374
                      certificate.common_name,
375
                      certificate.country_code,
376
                      certificate.locality,
377
                      certificate.province,
378
                      certificate.organization,
379
                      certificate.organizational_unit,
380
                      certificate.email_address,
381
                      certificate.type_id,
382
                      certificate.parent_id,
383
                      certificate.private_key_id,
384
                      certificate_id]
385

    
386
            self.cursor.execute(sql, values)
387
            self.connection.commit()
388

    
389
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
390
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
391
            values = [certificate_id]
392
            self.cursor.execute(sql, values)
393
            self.connection.commit()
394

    
395
            check_updated = self.cursor.rowcount > 0
396

    
397
            # iterate over usage pairs
398
            for usage_id, usage_value in certificate.usages.items():
399
                if usage_value:
400
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
401
                           f"({COL_CERTIFICATE_ID},"
402
                           f"{COL_USAGE_TYPE_ID}) "
403
                           f"VALUES (?,?)")
404
                    values = [certificate_id, usage_id]
405
                    self.cursor.execute(sql, values)
406
                    self.connection.commit()
407
        except IntegrityError:
408
            Logger.error(INTEGRITY_ERROR_MSG)
409
            raise DatabaseException(INTEGRITY_ERROR_MSG)
410
        except ProgrammingError:
411
            Logger.error(PROGRAMMING_ERROR_MSG)
412
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
413
        except OperationalError:
414
            Logger.error(OPERATIONAL_ERROR_MSG)
415
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
416
        except NotSupportedError:
417
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
418
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
419
        except DatabaseError:
420
            Logger.error(DATABASE_ERROR_MSG)
421
            raise DatabaseException(DATABASE_ERROR_MSG)
422
        except Error:
423
            Logger.error(ERROR_MSG)
424
            raise DatabaseException(ERROR_MSG)
425

    
426
        return check_updated
427

    
428
    def delete(self, certificate_id: int):
429
        """
430
        Deletes a certificate
431

    
432
        :param certificate_id: ID of specific certificate
433

    
434
        :return: the result of whether the deletion was successful
435
        """
436

    
437
        Logger.debug("Function launched.")
438

    
439
        try:
440
            sql = (f"UPDATE {TAB_CERTIFICATES} "
441
                   f"SET {COL_DELETION_DATE} = ? "
442
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
443
            values = [int(time.time()),
444
                      certificate_id]
445
            self.cursor.execute(sql, values)
446
            self.connection.commit()
447
        except IntegrityError:
448
            Logger.error(INTEGRITY_ERROR_MSG)
449
            raise DatabaseException(INTEGRITY_ERROR_MSG)
450
        except ProgrammingError:
451
            Logger.error(PROGRAMMING_ERROR_MSG)
452
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
453
        except OperationalError:
454
            Logger.error(OPERATIONAL_ERROR_MSG)
455
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
456
        except NotSupportedError:
457
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
458
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
459
        except DatabaseError:
460
            Logger.error(DATABASE_ERROR_MSG)
461
            raise DatabaseException(DATABASE_ERROR_MSG)
462
        except Error:
463
            Logger.error(ERROR_MSG)
464
            raise DatabaseException(ERROR_MSG)
465

    
466
        return self.cursor.rowcount > 0
467

    
468
    def set_certificate_revoked(
469
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
470
        """
471
        Revoke a certificate
472

    
473
        :param certificate_id: ID of specific certificate
474
        :param revocation_date: Date, when the certificate is revoked
475
        :param revocation_reason: Reason of the revocation
476

    
477
        :return:
478
            the result of whether the revocation was successful OR
479
            sqlite3.Error if an exception is thrown
480
        """
481

    
482
        Logger.debug("Function launched.")
483

    
484
        try:
485
            if revocation_date != "" and revocation_reason == "":
486
                revocation_reason = REV_REASON_UNSPECIFIED
487
            elif revocation_date == "":
488
                return False
489

    
490
            sql = (f"UPDATE {TAB_CERTIFICATES} "
491
                   f"SET {COL_REVOCATION_DATE} = ?, "
492
                   f"{COL_REVOCATION_REASON} = ? "
493
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
494
            values = [revocation_date,
495
                      revocation_reason,
496
                      certificate_id]
497
            self.cursor.execute(sql, values)
498
            self.connection.commit()
499
        except IntegrityError:
500
            Logger.error(INTEGRITY_ERROR_MSG)
501
            raise DatabaseException(INTEGRITY_ERROR_MSG)
502
        except ProgrammingError:
503
            Logger.error(PROGRAMMING_ERROR_MSG)
504
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
505
        except OperationalError:
506
            Logger.error(OPERATIONAL_ERROR_MSG)
507
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
508
        except NotSupportedError:
509
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
510
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
511
        except DatabaseError:
512
            Logger.error(DATABASE_ERROR_MSG)
513
            raise DatabaseException(DATABASE_ERROR_MSG)
514
        except Error:
515
            Logger.error(ERROR_MSG)
516
            raise DatabaseException(ERROR_MSG)
517

    
518
        return self.cursor.rowcount > 0
519

    
520
    def clear_certificate_revocation(self, certificate_id: int):
521
        """
522
        Clear revocation of a certificate
523

    
524
        :param certificate_id: ID of specific certificate
525

    
526
        :return:
527
            the result of whether the clear revocation was successful OR
528
            sqlite3.Error if an exception is thrown
529
        """
530

    
531
        Logger.debug("Function launched.")
532

    
533
        try:
534
            sql = (f"UPDATE {TAB_CERTIFICATES} "
535
                   f"SET {COL_REVOCATION_DATE} = NULL, "
536
                   f"{COL_REVOCATION_REASON} = NULL "
537
                   f"WHERE {COL_ID} = ?")
538
            values = [certificate_id]
539
            self.cursor.execute(sql, values)
540
            self.connection.commit()
541
        except IntegrityError:
542
            Logger.error(INTEGRITY_ERROR_MSG)
543
            raise DatabaseException(INTEGRITY_ERROR_MSG)
544
        except ProgrammingError:
545
            Logger.error(PROGRAMMING_ERROR_MSG)
546
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
547
        except OperationalError:
548
            Logger.error(OPERATIONAL_ERROR_MSG)
549
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
550
        except NotSupportedError:
551
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
552
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
553
        except DatabaseError:
554
            Logger.error(DATABASE_ERROR_MSG)
555
            raise DatabaseException(DATABASE_ERROR_MSG)
556
        except Error:
557
            Logger.error(ERROR_MSG)
558
            raise DatabaseException(ERROR_MSG)
559

    
560
        return self.cursor.rowcount > 0
561

    
562
    def get_all_revoked_by(self, certificate_id: int):
563
        """
564
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
565

    
566
        :param certificate_id: ID of specific certificate
567

    
568
        :return:
569
            list of the certificates OR
570
            None if the list is empty OR
571
            sqlite3.Error if an exception is thrown
572
        """
573

    
574
        Logger.debug("Function launched.")
575

    
576
        try:
577
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
578
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
579
            values = [certificate_id]
580
            self.cursor.execute(sql, values)
581
            certificate_rows = self.cursor.fetchall()
582

    
583
            certificates: List[Certificate] = []
584
            for certificate_row in certificate_rows:
585
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
586
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
587
                values = [certificate_row[0]]
588
                self.cursor.execute(sql, values)
589
                usage_rows = self.cursor.fetchall()
590

    
591
                usage_dict: Dict[int, bool] = {}
592
                for usage_row in usage_rows:
593
                    usage_dict[usage_row[2]] = True
594

    
595
                certificates.append(Certificate(certificate_row[0],      # ID
596
                                                certificate_row[1],      # valid from
597
                                                certificate_row[2],      # valid to
598
                                                certificate_row[3],      # pem data
599
                                                certificate_row[14],     # type ID
600
                                                certificate_row[15],     # parent ID
601
                                                certificate_row[16],     # private key ID
602
                                                usage_dict,
603
                                                certificate_row[4],      # common name
604
                                                certificate_row[5],      # country code
605
                                                certificate_row[6],      # locality
606
                                                certificate_row[7],      # province
607
                                                certificate_row[8],      # organization
608
                                                certificate_row[9],      # organizational unit
609
                                                certificate_row[10],     # email address
610
                                                certificate_row[11],     # revocation date
611
                                                certificate_row[12]))    # revocation reason
612
        except IntegrityError:
613
            Logger.error(INTEGRITY_ERROR_MSG)
614
            raise DatabaseException(INTEGRITY_ERROR_MSG)
615
        except ProgrammingError:
616
            Logger.error(PROGRAMMING_ERROR_MSG)
617
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
618
        except OperationalError:
619
            Logger.error(OPERATIONAL_ERROR_MSG)
620
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
621
        except NotSupportedError:
622
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
623
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
624
        except DatabaseError:
625
            Logger.error(DATABASE_ERROR_MSG)
626
            raise DatabaseException(DATABASE_ERROR_MSG)
627
        except Error:
628
            Logger.error(ERROR_MSG)
629
            raise DatabaseException(ERROR_MSG)
630

    
631
        return certificates
632

    
633
    def get_all_issued_by(self, certificate_id: int):
634
        """
635
        Get list of the certificates that are direct descendants of the certificate with the ID
636

    
637
        :param certificate_id: ID of specific certificate
638

    
639
        :return:
640
            list of the certificates OR
641
            None if the list is empty OR
642
            sqlite3.Error if an exception is thrown
643
        """
644

    
645
        Logger.debug("Function launched.")
646

    
647
        try:
648
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
649
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
650
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
651
            values = [certificate_id, certificate_id]
652
            self.cursor.execute(sql, values)
653
            certificate_rows = self.cursor.fetchall()
654

    
655
            certificates: List[Certificate] = []
656
            for certificate_row in certificate_rows:
657
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
658
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
659
                values = [certificate_row[0]]
660
                self.cursor.execute(sql, values)
661
                usage_rows = self.cursor.fetchall()
662

    
663
                usage_dict: Dict[int, bool] = {}
664
                for usage_row in usage_rows:
665
                    usage_dict[usage_row[2]] = True
666

    
667
                certificates.append(Certificate(certificate_row[0],      # ID
668
                                                certificate_row[1],      # valid from
669
                                                certificate_row[2],      # valid to
670
                                                certificate_row[3],      # pem data
671
                                                certificate_row[14],     # type ID
672
                                                certificate_row[15],     # parent ID
673
                                                certificate_row[16],     # private key ID
674
                                                usage_dict,
675
                                                certificate_row[4],      # common name
676
                                                certificate_row[5],      # country code
677
                                                certificate_row[6],      # locality
678
                                                certificate_row[7],      # province
679
                                                certificate_row[8],      # organization
680
                                                certificate_row[9],      # organizational unit
681
                                                certificate_row[10],     # email address
682
                                                certificate_row[11],     # revocation date
683
                                                certificate_row[12]))    # revocation reason
684
        except IntegrityError:
685
            Logger.error(INTEGRITY_ERROR_MSG)
686
            raise DatabaseException(INTEGRITY_ERROR_MSG)
687
        except ProgrammingError:
688
            Logger.error(PROGRAMMING_ERROR_MSG)
689
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
690
        except OperationalError:
691
            Logger.error(OPERATIONAL_ERROR_MSG)
692
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
693
        except NotSupportedError:
694
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
695
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
696
        except DatabaseError:
697
            Logger.error(DATABASE_ERROR_MSG)
698
            raise DatabaseException(DATABASE_ERROR_MSG)
699
        except Error:
700
            Logger.error(ERROR_MSG)
701
            raise DatabaseException(ERROR_MSG)
702

    
703
        return certificates
704

    
705
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
706
        """
707
        Get list of the certificates that are direct descendants of the certificate with the ID
708

    
709
        :param target_types: certificate types (filter)
710
        :param target_usages: certificate usages (filter)
711
        :param target_cn_substring: certificate CN substring (filter)
712
        :param page: target page
713
        :param per_page: target page size
714
        :param issuer_id: ID of specific certificate
715

    
716
        :return:
717
            list of the certificates OR
718
            None if the list is empty
719
        """
720

    
721
        Logger.debug("Function launched.")
722

    
723
        try:
724
            values = [issuer_id, issuer_id]
725
            values += list(target_types)
726
            values += list(target_usages)
727

    
728
            sql = (
729
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
730
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
731
                f"FROM {TAB_CERTIFICATES} a "
732
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
733
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
734
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
735
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
736
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
737
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
738
                f") > 0 "
739
            )
740

    
741
            if target_cn_substring is not None:
742
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
743

    
744
                values += [target_cn_substring]
745

    
746
            if page is not None and per_page is not None:
747
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
748

    
749
            self.cursor.execute(sql, values)
750
            certificate_rows = self.cursor.fetchall()
751

    
752
            certificates: List[Certificate] = []
753
            for certificate_row in certificate_rows:
754
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
755
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
756
                values = [certificate_row[0]]
757
                self.cursor.execute(sql, values)
758
                usage_rows = self.cursor.fetchall()
759

    
760
                usage_dict: Dict[int, bool] = {}
761
                for usage_row in usage_rows:
762
                    usage_dict[usage_row[2]] = True
763

    
764
                certificates.append(Certificate(certificate_row[0],
765
                                                certificate_row[1],
766
                                                certificate_row[2],
767
                                                certificate_row[3],
768
                                                certificate_row[4],
769
                                                certificate_row[7],
770
                                                certificate_row[8],
771
                                                certificate_row[9],
772
                                                usage_dict,
773
                                                certificate_row[5],
774
                                                certificate_row[6]))
775
        except IntegrityError:
776
            Logger.error(INTEGRITY_ERROR_MSG)
777
            raise DatabaseException(INTEGRITY_ERROR_MSG)
778
        except ProgrammingError:
779
            Logger.error(PROGRAMMING_ERROR_MSG)
780
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
781
        except OperationalError:
782
            Logger.error(OPERATIONAL_ERROR_MSG)
783
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
784
        except NotSupportedError:
785
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
786
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
787
        except DatabaseError:
788
            Logger.error(DATABASE_ERROR_MSG)
789
            raise DatabaseException(DATABASE_ERROR_MSG)
790
        except Error:
791
            Logger.error(ERROR_MSG)
792
            raise DatabaseException(ERROR_MSG)
793

    
794
        return certificates
795

    
796
    def get_all_descendants_of(self, certificate_id: int):
797
        """
798
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
799
        between C and its root certificate authority (i.e. is an ancestor of C).
800
        :param certificate_id: target certificate ID
801
        :return: list of all descendants
802
        """
803

    
804
        Logger.debug("Function launched.")
805

    
806
        try:
807
            def dfs(children_of, this, collection: list):
808
                for child in children_of(this.certificate_id):
809
                    dfs(children_of, child, collection)
810
                collection.append(this)
811

    
812
            subtree_root = self.read(certificate_id)
813
            if subtree_root is None:
814
                return None
815

    
816
            all_certs = []
817
            dfs(self.get_all_issued_by, subtree_root, all_certs)
818
        except IntegrityError:
819
            Logger.error(INTEGRITY_ERROR_MSG)
820
            raise DatabaseException(INTEGRITY_ERROR_MSG)
821
        except ProgrammingError:
822
            Logger.error(PROGRAMMING_ERROR_MSG)
823
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
824
        except OperationalError:
825
            Logger.error(OPERATIONAL_ERROR_MSG)
826
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
827
        except NotSupportedError:
828
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
829
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
830
        except DatabaseError:
831
            Logger.error(DATABASE_ERROR_MSG)
832
            raise DatabaseException(DATABASE_ERROR_MSG)
833
        except Error:
834
            Logger.error(ERROR_MSG)
835
            raise DatabaseException(ERROR_MSG)
836

    
837
        return all_certs
838

    
839
    def get_next_id(self) -> int:
840
        """
841
        Get identifier of the next certificate that will be inserted into the database
842
        :return: identifier of the next certificate that will be added into the database
843
        """
844

    
845
        Logger.debug("Function launched.")
846

    
847
        try:
848
            # get next IDs of all tables
849
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
850
            results = self.cursor.fetchall()
851

    
852
            # search for next ID in Certificates table and return it
853
            for result in results:
854
                if result[0] == TAB_CERTIFICATES:
855
                    return result[1] + 1  # current last id + 1
856
            # if certificates table is not present in the query results, return 1
857
        except IntegrityError:
858
            Logger.error(INTEGRITY_ERROR_MSG)
859
            raise DatabaseException(INTEGRITY_ERROR_MSG)
860
        except ProgrammingError:
861
            Logger.error(PROGRAMMING_ERROR_MSG)
862
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
863
        except OperationalError:
864
            Logger.error(OPERATIONAL_ERROR_MSG)
865
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
866
        except NotSupportedError:
867
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
868
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
869
        except DatabaseError:
870
            Logger.error(DATABASE_ERROR_MSG)
871
            raise DatabaseException(DATABASE_ERROR_MSG)
872
        except Error:
873
            Logger.error(ERROR_MSG)
874
            raise DatabaseException(ERROR_MSG)
875

    
876
        return 1
(2-2/4)