Projekt

Obecné

Profil

Stáhnout (33.8 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18

    
19

    
20
class CertificateRepository:
21

    
22
    @inject
23
    def __init__(self, connection: Connection):
24
        """
25
        Constructor of the CertificateRepository object
26

    
27
        :param connection: Instance of the Connection object
28
        """
29
        self.connection = connection
30
        self.cursor = connection.cursor()
31

    
32
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36

    
37
        :param certificate: Instance of the Certificate object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        Logger.debug("Function launched.")
43

    
44
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_COMMON_NAME},"
47
                   f"{COL_VALID_FROM},"
48
                   f"{COL_VALID_TO},"
49
                   f"{COL_PEM_DATA},"
50
                   f"{COL_PRIVATE_KEY_ID},"
51
                   f"{COL_TYPE_ID},"
52
                   f"{COL_PARENT_ID})"
53
                   f"VALUES(?,?,?,?,?,?,?)")
54
            values = [certificate.common_name,
55
                      certificate.valid_from,
56
                      certificate.valid_to,
57
                      certificate.pem_data,
58
                      certificate.private_key_id,
59
                      certificate.type_id,
60
                      certificate.parent_id]
61
            self.cursor.execute(sql, values)
62
            self.connection.commit()
63

    
64
            last_id: int = self.cursor.lastrowid
65

    
66
            # TODO assure that this is correct
67
            if certificate.type_id == ROOT_CA_ID:
68
                certificate.parent_id = last_id
69
                self.update(last_id, certificate)
70
            else:
71
                for usage_id, usage_value in certificate.usages.items():
72
                    if usage_value:
73
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
74
                               f"({COL_CERTIFICATE_ID},"
75
                               f"{COL_USAGE_TYPE_ID}) "
76
                               f"VALUES (?,?)")
77
                        values = [last_id, usage_id]
78
                        self.cursor.execute(sql, values)
79
                        self.connection.commit()
80
        except IntegrityError:
81
            Logger.error(INTEGRITY_ERROR_MSG)
82
            raise DatabaseException(INTEGRITY_ERROR_MSG)
83
        except ProgrammingError:
84
            Logger.error(PROGRAMMING_ERROR_MSG)
85
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
86
        except OperationalError:
87
            Logger.error(OPERATIONAL_ERROR_MSG)
88
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
89
        except NotSupportedError:
90
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
91
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
92
        except DatabaseError:
93
            Logger.error(DATABASE_ERROR_MSG)
94
            raise DatabaseException(DATABASE_ERROR_MSG)
95
        except Error:
96
            Logger.error(ERROR_MSG)
97
            raise DatabaseException(ERROR_MSG)
98

    
99
        return last_id
100

    
101
    def read(self, certificate_id: int):
102
        """
103
        Reads (selects) a certificate.
104

    
105
        :param certificate_id: ID of specific certificate
106

    
107
        :return: instance of the Certificate object
108
        """
109

    
110
        Logger.debug("Function launched.")
111

    
112
        try:
113
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
114
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
115
            values = [certificate_id]
116
            self.cursor.execute(sql, values)
117
            certificate_row = self.cursor.fetchone()
118

    
119
            if certificate_row is None:
120
                return None
121

    
122
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
123
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
124
            self.cursor.execute(sql, values)
125
            usage_rows = self.cursor.fetchall()
126

    
127
            usage_dict: Dict[int, bool] = {}
128
            for usage_row in usage_rows:
129
                usage_dict[usage_row[2]] = True
130

    
131
            certificate: Certificate = Certificate(certificate_row[0],
132
                                                   certificate_row[1],
133
                                                   certificate_row[2],
134
                                                   certificate_row[3],
135
                                                   certificate_row[4],
136
                                                   certificate_row[8],
137
                                                   certificate_row[9],
138
                                                   certificate_row[10],
139
                                                   usage_dict,
140
                                                   certificate_row[5],
141
                                                   certificate_row[6])
142
        except IntegrityError:
143
            Logger.error(INTEGRITY_ERROR_MSG)
144
            raise DatabaseException(INTEGRITY_ERROR_MSG)
145
        except ProgrammingError:
146
            Logger.error(PROGRAMMING_ERROR_MSG)
147
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
148
        except OperationalError:
149
            Logger.error(OPERATIONAL_ERROR_MSG)
150
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
151
        except NotSupportedError:
152
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
153
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
154
        except DatabaseError:
155
            Logger.error(DATABASE_ERROR_MSG)
156
            raise DatabaseException(DATABASE_ERROR_MSG)
157
        except Error:
158
            Logger.error(ERROR_MSG)
159
            raise DatabaseException(ERROR_MSG)
160

    
161
        return certificate
162

    
163
    def read_all(self, filter_type: int = None):
164
        """
165
        Reads (selects) all certificates (with type).
166

    
167
        :param filter_type: ID of certificate type from CertificateTypes table
168

    
169
        :return: list of certificates
170
        """
171

    
172
        Logger.debug("Function launched.")
173

    
174
        try:
175
            sql_extension = ""
176
            values = []
177
            if filter_type is not None:
178
                sql_extension = (f" AND {COL_TYPE_ID} = ("
179
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
180
                values = [filter_type]
181

    
182
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
183
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
184
            self.cursor.execute(sql, values)
185
            certificate_rows = self.cursor.fetchall()
186

    
187
            certificates: List[Certificate] = []
188
            for certificate_row in certificate_rows:
189
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
190
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
191
                values = [certificate_row[0]]
192
                self.cursor.execute(sql, values)
193
                usage_rows = self.cursor.fetchall()
194

    
195
                usage_dict: Dict[int, bool] = {}
196
                for usage_row in usage_rows:
197
                    usage_dict[usage_row[2]] = True
198

    
199
                certificates.append(Certificate(certificate_row[0],
200
                                                certificate_row[1],
201
                                                certificate_row[2],
202
                                                certificate_row[3],
203
                                                certificate_row[4],
204
                                                certificate_row[8],
205
                                                certificate_row[9],
206
                                                certificate_row[10],
207
                                                usage_dict,
208
                                                certificate_row[5],
209
                                                certificate_row[6]))
210
        except IntegrityError:
211
            Logger.error(INTEGRITY_ERROR_MSG)
212
            raise DatabaseException(INTEGRITY_ERROR_MSG)
213
        except ProgrammingError:
214
            Logger.error(PROGRAMMING_ERROR_MSG)
215
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
216
        except OperationalError:
217
            Logger.error(OPERATIONAL_ERROR_MSG)
218
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
219
        except NotSupportedError:
220
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
221
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
222
        except DatabaseError:
223
            Logger.error(DATABASE_ERROR_MSG)
224
            raise DatabaseException(DATABASE_ERROR_MSG)
225
        except Error:
226
            Logger.error(ERROR_MSG)
227
            raise DatabaseException(ERROR_MSG)
228

    
229
        return certificates
230

    
231
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
232
                        per_page: int):
233
        """
234
        Reads (selects) all certificates according to a specific filtering and pagination options.
235
        :param target_types: certificate types (filter)
236
        :param target_usages: certificate usages (filter)
237
        :param target_cn_substring: certificate CN substring (filter)
238
        :param page: target page
239
        :param per_page: target page size
240
        :return: list of certificates
241
        """
242

    
243
        Logger.debug("Function launched.")
244

    
245
        try:
246
            values = []
247
            values += list(target_types)
248
            values += list(target_usages)
249

    
250
            sql = (
251
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
252
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
253
                f"FROM {TAB_CERTIFICATES} a "
254
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
255
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
256
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
257
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
258
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
259
                f") > 0 "
260
            )
261

    
262
            if target_cn_substring is not None:
263
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
264

    
265
                values += [target_cn_substring]
266

    
267
            if page is not None and per_page is not None:
268
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
269

    
270
            self.cursor.execute(sql, values)
271
            certificate_rows = self.cursor.fetchall()
272

    
273
            certificates: List[Certificate] = []
274
            for certificate_row in certificate_rows:
275
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
276
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
277
                types = [certificate_row[0]]
278
                self.cursor.execute(sql, types)
279
                usage_rows = self.cursor.fetchall()
280

    
281
                usage_dict: Dict[int, bool] = {}
282
                for usage_row in usage_rows:
283
                    usage_dict[usage_row[0]] = True
284

    
285
                certificates.append(Certificate(certificate_row[0],
286
                                                certificate_row[1],
287
                                                certificate_row[2],
288
                                                certificate_row[3],
289
                                                certificate_row[4],
290
                                                certificate_row[7],
291
                                                certificate_row[8],
292
                                                certificate_row[9],
293
                                                usage_dict,
294
                                                certificate_row[5],
295
                                                certificate_row[6]))
296
        except IntegrityError:
297
            Logger.error(INTEGRITY_ERROR_MSG)
298
            raise DatabaseException(INTEGRITY_ERROR_MSG)
299
        except ProgrammingError:
300
            Logger.error(PROGRAMMING_ERROR_MSG)
301
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
302
        except OperationalError:
303
            Logger.error(OPERATIONAL_ERROR_MSG)
304
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
305
        except NotSupportedError:
306
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
307
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
308
        except DatabaseError:
309
            Logger.error(DATABASE_ERROR_MSG)
310
            raise DatabaseException(DATABASE_ERROR_MSG)
311
        except Error:
312
            Logger.error(ERROR_MSG)
313
            raise DatabaseException(ERROR_MSG)
314

    
315
        return certificates
316

    
317
    def update(self, certificate_id: int, certificate: Certificate):
318
        """
319
        Updates a certificate.
320
        If the parameter of certificate (Certificate object) is not to be changed,
321
        the same value must be specified.
322

    
323
        :param certificate_id: ID of specific certificate
324
        :param certificate: Instance of the Certificate object
325

    
326
        :return: the result of whether the updation was successful
327
        """
328

    
329
        Logger.debug("Function launched.")
330

    
331
        try:
332
            sql = (f"UPDATE {TAB_CERTIFICATES} "
333
                   f"SET {COL_COMMON_NAME} = ?, "
334
                   f"{COL_VALID_FROM} = ?, "
335
                   f"{COL_VALID_TO} = ?, "
336
                   f"{COL_PEM_DATA} = ?, "
337
                   f"{COL_PRIVATE_KEY_ID} = ?, "
338
                   f"{COL_TYPE_ID} = ?, "
339
                   f"{COL_PARENT_ID} = ? "
340
                   f"WHERE {COL_ID} = ?")
341
            values = [certificate.common_name,
342
                      certificate.valid_from,
343
                      certificate.valid_to,
344
                      certificate.pem_data,
345
                      certificate.private_key_id,
346
                      certificate.type_id,
347
                      certificate.parent_id,
348
                      certificate_id]
349

    
350
            self.cursor.execute(sql, values)
351
            self.connection.commit()
352

    
353
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
354
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
355
            values = [certificate_id]
356
            self.cursor.execute(sql, values)
357
            self.connection.commit()
358

    
359
            check_updated = self.cursor.rowcount > 0
360

    
361
            # iterate over usage pairs
362
            for usage_id, usage_value in certificate.usages.items():
363
                if usage_value:
364
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
365
                           f"({COL_CERTIFICATE_ID},"
366
                           f"{COL_USAGE_TYPE_ID}) "
367
                           f"VALUES (?,?)")
368
                    values = [certificate_id, usage_id]
369
                    self.cursor.execute(sql, values)
370
                    self.connection.commit()
371
        except IntegrityError:
372
            Logger.error(INTEGRITY_ERROR_MSG)
373
            raise DatabaseException(INTEGRITY_ERROR_MSG)
374
        except ProgrammingError:
375
            Logger.error(PROGRAMMING_ERROR_MSG)
376
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
377
        except OperationalError:
378
            Logger.error(OPERATIONAL_ERROR_MSG)
379
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
380
        except NotSupportedError:
381
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
382
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
383
        except DatabaseError:
384
            Logger.error(DATABASE_ERROR_MSG)
385
            raise DatabaseException(DATABASE_ERROR_MSG)
386
        except Error:
387
            Logger.error(ERROR_MSG)
388
            raise DatabaseException(ERROR_MSG)
389

    
390
        return check_updated
391

    
392
    def delete(self, certificate_id: int):
393
        """
394
        Deletes a certificate
395

    
396
        :param certificate_id: ID of specific certificate
397

    
398
        :return: the result of whether the deletion was successful
399
        """
400

    
401
        Logger.debug("Function launched.")
402

    
403
        try:
404
            sql = (f"UPDATE {TAB_CERTIFICATES} "
405
                   f"SET {COL_DELETION_DATE} = ? "
406
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
407
            values = [int(time.time()),
408
                      certificate_id]
409
            self.cursor.execute(sql, values)
410
            self.connection.commit()
411
        except IntegrityError:
412
            Logger.error(INTEGRITY_ERROR_MSG)
413
            raise DatabaseException(INTEGRITY_ERROR_MSG)
414
        except ProgrammingError:
415
            Logger.error(PROGRAMMING_ERROR_MSG)
416
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
417
        except OperationalError:
418
            Logger.error(OPERATIONAL_ERROR_MSG)
419
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
420
        except NotSupportedError:
421
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
422
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
423
        except DatabaseError:
424
            Logger.error(DATABASE_ERROR_MSG)
425
            raise DatabaseException(DATABASE_ERROR_MSG)
426
        except Error:
427
            Logger.error(ERROR_MSG)
428
            raise DatabaseException(ERROR_MSG)
429

    
430
        return self.cursor.rowcount > 0
431

    
432
    def set_certificate_revoked(
433
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
434
        """
435
        Revoke a certificate
436

    
437
        :param certificate_id: ID of specific certificate
438
        :param revocation_date: Date, when the certificate is revoked
439
        :param revocation_reason: Reason of the revocation
440

    
441
        :return:
442
            the result of whether the revocation was successful OR
443
            sqlite3.Error if an exception is thrown
444
        """
445

    
446
        Logger.debug("Function launched.")
447

    
448
        try:
449
            if revocation_date != "" and revocation_reason == "":
450
                revocation_reason = REV_REASON_UNSPECIFIED
451
            elif revocation_date == "":
452
                return False
453

    
454
            sql = (f"UPDATE {TAB_CERTIFICATES} "
455
                   f"SET {COL_REVOCATION_DATE} = ?, "
456
                   f"{COL_REVOCATION_REASON} = ? "
457
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
458
            values = [revocation_date,
459
                      revocation_reason,
460
                      certificate_id]
461
            self.cursor.execute(sql, values)
462
            self.connection.commit()
463
        except IntegrityError:
464
            Logger.error(INTEGRITY_ERROR_MSG)
465
            raise DatabaseException(INTEGRITY_ERROR_MSG)
466
        except ProgrammingError:
467
            Logger.error(PROGRAMMING_ERROR_MSG)
468
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
469
        except OperationalError:
470
            Logger.error(OPERATIONAL_ERROR_MSG)
471
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
472
        except NotSupportedError:
473
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
474
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
475
        except DatabaseError:
476
            Logger.error(DATABASE_ERROR_MSG)
477
            raise DatabaseException(DATABASE_ERROR_MSG)
478
        except Error:
479
            Logger.error(ERROR_MSG)
480
            raise DatabaseException(ERROR_MSG)
481

    
482
        return self.cursor.rowcount > 0
483

    
484
    def clear_certificate_revocation(self, certificate_id: int):
485
        """
486
        Clear revocation of a certificate
487

    
488
        :param certificate_id: ID of specific certificate
489

    
490
        :return:
491
            the result of whether the clear revocation was successful OR
492
            sqlite3.Error if an exception is thrown
493
        """
494

    
495
        Logger.debug("Function launched.")
496

    
497
        try:
498
            sql = (f"UPDATE {TAB_CERTIFICATES} "
499
                   f"SET {COL_REVOCATION_DATE} = '', "
500
                   f"{COL_REVOCATION_REASON} = '' "
501
                   f"WHERE {COL_ID} = ?")
502
            values = [certificate_id]
503
            self.cursor.execute(sql, values)
504
            self.connection.commit()
505
        except IntegrityError:
506
            Logger.error(INTEGRITY_ERROR_MSG)
507
            raise DatabaseException(INTEGRITY_ERROR_MSG)
508
        except ProgrammingError:
509
            Logger.error(PROGRAMMING_ERROR_MSG)
510
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
511
        except OperationalError:
512
            Logger.error(OPERATIONAL_ERROR_MSG)
513
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
514
        except NotSupportedError:
515
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
516
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
517
        except DatabaseError:
518
            Logger.error(DATABASE_ERROR_MSG)
519
            raise DatabaseException(DATABASE_ERROR_MSG)
520
        except Error:
521
            Logger.error(ERROR_MSG)
522
            raise DatabaseException(ERROR_MSG)
523

    
524
        return self.cursor.rowcount > 0
525

    
526
    def get_all_revoked_by(self, certificate_id: int):
527
        """
528
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
529

    
530
        :param certificate_id: ID of specific certificate
531

    
532
        :return:
533
            list of the certificates OR
534
            None if the list is empty OR
535
            sqlite3.Error if an exception is thrown
536
        """
537

    
538
        Logger.debug("Function launched.")
539

    
540
        try:
541
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
542
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
543
            values = [certificate_id]
544
            self.cursor.execute(sql, values)
545
            certificate_rows = self.cursor.fetchall()
546

    
547
            certificates: List[Certificate] = []
548
            for certificate_row in certificate_rows:
549
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
550
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
551
                values = [certificate_row[0]]
552
                self.cursor.execute(sql, values)
553
                usage_rows = self.cursor.fetchall()
554

    
555
                usage_dict: Dict[int, bool] = {}
556
                for usage_row in usage_rows:
557
                    usage_dict[usage_row[2]] = True
558

    
559
                certificates.append(Certificate(certificate_row[0],
560
                                                certificate_row[1],
561
                                                certificate_row[2],
562
                                                certificate_row[3],
563
                                                certificate_row[4],
564
                                                certificate_row[8],
565
                                                certificate_row[9],
566
                                                certificate_row[10],
567
                                                usage_dict,
568
                                                certificate_row[5],
569
                                                certificate_row[6]))
570
        except IntegrityError:
571
            Logger.error(INTEGRITY_ERROR_MSG)
572
            raise DatabaseException(INTEGRITY_ERROR_MSG)
573
        except ProgrammingError:
574
            Logger.error(PROGRAMMING_ERROR_MSG)
575
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
576
        except OperationalError:
577
            Logger.error(OPERATIONAL_ERROR_MSG)
578
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
579
        except NotSupportedError:
580
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
581
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
582
        except DatabaseError:
583
            Logger.error(DATABASE_ERROR_MSG)
584
            raise DatabaseException(DATABASE_ERROR_MSG)
585
        except Error:
586
            Logger.error(ERROR_MSG)
587
            raise DatabaseException(ERROR_MSG)
588

    
589
        return certificates
590

    
591
    def get_all_issued_by(self, certificate_id: int):
592
        """
593
        Get list of the certificates that are direct descendants of the certificate with the ID
594

    
595
        :param certificate_id: ID of specific certificate
596

    
597
        :return:
598
            list of the certificates OR
599
            None if the list is empty OR
600
            sqlite3.Error if an exception is thrown
601
        """
602

    
603
        Logger.debug("Function launched.")
604

    
605
        try:
606
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
607
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
608
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
609
            values = [certificate_id, certificate_id]
610
            self.cursor.execute(sql, values)
611
            certificate_rows = self.cursor.fetchall()
612

    
613
            certificates: List[Certificate] = []
614
            for certificate_row in certificate_rows:
615
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
616
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
617
                values = [certificate_row[0]]
618
                self.cursor.execute(sql, values)
619
                usage_rows = self.cursor.fetchall()
620

    
621
                usage_dict: Dict[int, bool] = {}
622
                for usage_row in usage_rows:
623
                    usage_dict[usage_row[2]] = True
624

    
625
                certificates.append(Certificate(certificate_row[0],
626
                                                certificate_row[1],
627
                                                certificate_row[2],
628
                                                certificate_row[3],
629
                                                certificate_row[4],
630
                                                certificate_row[8],
631
                                                certificate_row[9],
632
                                                certificate_row[10],
633
                                                usage_dict,
634
                                                certificate_row[5],
635
                                                certificate_row[6]))
636
        except IntegrityError:
637
            Logger.error(INTEGRITY_ERROR_MSG)
638
            raise DatabaseException(INTEGRITY_ERROR_MSG)
639
        except ProgrammingError:
640
            Logger.error(PROGRAMMING_ERROR_MSG)
641
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
642
        except OperationalError:
643
            Logger.error(OPERATIONAL_ERROR_MSG)
644
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
645
        except NotSupportedError:
646
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
647
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
648
        except DatabaseError:
649
            Logger.error(DATABASE_ERROR_MSG)
650
            raise DatabaseException(DATABASE_ERROR_MSG)
651
        except Error:
652
            Logger.error(ERROR_MSG)
653
            raise DatabaseException(ERROR_MSG)
654

    
655
        return certificates
656

    
657
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
658
        """
659
        Get list of the certificates that are direct descendants of the certificate with the ID
660

    
661
        :param target_types: certificate types (filter)
662
        :param target_usages: certificate usages (filter)
663
        :param target_cn_substring: certificate CN substring (filter)
664
        :param page: target page
665
        :param per_page: target page size
666
        :param issuer_id: ID of specific certificate
667

    
668
        :return:
669
            list of the certificates OR
670
            None if the list is empty
671
        """
672

    
673
        Logger.debug("Function launched.")
674

    
675
        try:
676
            values = [issuer_id, issuer_id]
677
            values += list(target_types)
678
            values += list(target_usages)
679

    
680
            sql = (
681
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
682
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
683
                f"FROM {TAB_CERTIFICATES} a "
684
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
685
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
686
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
687
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
688
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
689
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
690
                f") > 0 "
691
            )
692

    
693
            if target_cn_substring is not None:
694
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
695

    
696
                values += [target_cn_substring]
697

    
698
            if page is not None and per_page is not None:
699
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
700

    
701
            self.cursor.execute(sql, values)
702
            certificate_rows = self.cursor.fetchall()
703

    
704
            certificates: List[Certificate] = []
705
            for certificate_row in certificate_rows:
706
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
707
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
708
                values = [certificate_row[0]]
709
                self.cursor.execute(sql, values)
710
                usage_rows = self.cursor.fetchall()
711

    
712
                usage_dict: Dict[int, bool] = {}
713
                for usage_row in usage_rows:
714
                    usage_dict[usage_row[2]] = True
715

    
716
                certificates.append(Certificate(certificate_row[0],
717
                                                certificate_row[1],
718
                                                certificate_row[2],
719
                                                certificate_row[3],
720
                                                certificate_row[4],
721
                                                certificate_row[7],
722
                                                certificate_row[8],
723
                                                certificate_row[9],
724
                                                usage_dict,
725
                                                certificate_row[5],
726
                                                certificate_row[6]))
727
        except IntegrityError:
728
            Logger.error(INTEGRITY_ERROR_MSG)
729
            raise DatabaseException(INTEGRITY_ERROR_MSG)
730
        except ProgrammingError:
731
            Logger.error(PROGRAMMING_ERROR_MSG)
732
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
733
        except OperationalError:
734
            Logger.error(OPERATIONAL_ERROR_MSG)
735
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
736
        except NotSupportedError:
737
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
738
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
739
        except DatabaseError:
740
            Logger.error(DATABASE_ERROR_MSG)
741
            raise DatabaseException(DATABASE_ERROR_MSG)
742
        except Error:
743
            Logger.error(ERROR_MSG)
744
            raise DatabaseException(ERROR_MSG)
745

    
746
        return certificates
747

    
748
    def get_all_descendants_of(self, certificate_id: int):
749
        """
750
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
751
        between C and its root certificate authority (i.e. is an ancestor of C).
752
        :param certificate_id: target certificate ID
753
        :return: list of all descendants
754
        """
755

    
756
        Logger.debug("Function launched.")
757

    
758
        try:
759
            def dfs(children_of, this, collection: list):
760
                for child in children_of(this.certificate_id):
761
                    dfs(children_of, child, collection)
762
                collection.append(this)
763

    
764
            subtree_root = self.read(certificate_id)
765
            if subtree_root is None:
766
                return None
767

    
768
            all_certs = []
769
            dfs(self.get_all_issued_by, subtree_root, all_certs)
770
        except IntegrityError:
771
            Logger.error(INTEGRITY_ERROR_MSG)
772
            raise DatabaseException(INTEGRITY_ERROR_MSG)
773
        except ProgrammingError:
774
            Logger.error(PROGRAMMING_ERROR_MSG)
775
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
776
        except OperationalError:
777
            Logger.error(OPERATIONAL_ERROR_MSG)
778
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
779
        except NotSupportedError:
780
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
781
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
782
        except DatabaseError:
783
            Logger.error(DATABASE_ERROR_MSG)
784
            raise DatabaseException(DATABASE_ERROR_MSG)
785
        except Error:
786
            Logger.error(ERROR_MSG)
787
            raise DatabaseException(ERROR_MSG)
788

    
789
        return all_certs
790

    
791
    def get_next_id(self) -> int:
792
        """
793
        Get identifier of the next certificate that will be inserted into the database
794
        :return: identifier of the next certificate that will be added into the database
795
        """
796

    
797
        Logger.debug("Function launched.")
798

    
799
        try:
800
            # get next IDs of all tables
801
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
802
            results = self.cursor.fetchall()
803

    
804
            # search for next ID in Certificates table and return it
805
            for result in results:
806
                if result[0] == TAB_CERTIFICATES:
807
                    return result[1] + 1  # current last id + 1
808
            # if certificates table is not present in the query results, return 1
809
        except IntegrityError:
810
            Logger.error(INTEGRITY_ERROR_MSG)
811
            raise DatabaseException(INTEGRITY_ERROR_MSG)
812
        except ProgrammingError:
813
            Logger.error(PROGRAMMING_ERROR_MSG)
814
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
815
        except OperationalError:
816
            Logger.error(OPERATIONAL_ERROR_MSG)
817
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
818
        except NotSupportedError:
819
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
820
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
821
        except DatabaseError:
822
            Logger.error(DATABASE_ERROR_MSG)
823
            raise DatabaseException(DATABASE_ERROR_MSG)
824
        except Error:
825
            Logger.error(ERROR_MSG)
826
            raise DatabaseException(ERROR_MSG)
827

    
828
        return 1
(2-2/3)