Projekt

Obecné

Profil

Stáhnout (25.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2
from typing import Dict, List
3

    
4
from src.exceptions.database_exception import DatabaseException
5
from injector import inject
6
from src.constants import *
7
from src.model.certificate import Certificate
8
from src.utils.logger import Logger
9

    
10
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
11
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
12
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
13
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
14
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
15
ERROR_MSG = "Unknown exception."
16

    
17

    
18
class CertificateRepository:
19

    
20
    @inject
21
    def __init__(self, connection: Connection):
22
        """
23
        Constructor of the CertificateRepository object
24

    
25
        :param connection: Instance of the Connection object
26
        """
27
        self.connection = connection
28
        self.cursor = connection.cursor()
29

    
30
    def create(self, certificate: Certificate):
31
        """
32
        Creates a certificate.
33
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
34

    
35
        :param certificate: Instance of the Certificate object
36

    
37
        :return: the result of whether the creation was successful
38
        """
39

    
40
        Logger.debug("Function launched.")
41

    
42
        try:
43
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
44
                   f"({COL_COMMON_NAME},"
45
                   f"{COL_VALID_FROM},"
46
                   f"{COL_VALID_TO},"
47
                   f"{COL_PEM_DATA},"
48
                   f"{COL_PRIVATE_KEY_ID},"
49
                   f"{COL_TYPE_ID},"
50
                   f"{COL_PARENT_ID})"
51
                   f"VALUES(?,?,?,?,?,?,?)")
52
            values = [certificate.common_name,
53
                      certificate.valid_from,
54
                      certificate.valid_to,
55
                      certificate.pem_data,
56
                      certificate.private_key_id,
57
                      certificate.type_id,
58
                      certificate.parent_id]
59
            self.cursor.execute(sql, values)
60
            self.connection.commit()
61

    
62
            last_id: int = self.cursor.lastrowid
63

    
64
            # TODO assure that this is correct
65
            if certificate.type_id == ROOT_CA_ID:
66
                certificate.parent_id = last_id
67
                self.update(last_id, certificate)
68
            else:
69
                for usage_id, usage_value in certificate.usages.items():
70
                    if usage_value:
71
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
72
                               f"({COL_CERTIFICATE_ID},"
73
                               f"{COL_USAGE_TYPE_ID}) "
74
                               f"VALUES (?,?)")
75
                        values = [last_id, usage_id]
76
                        self.cursor.execute(sql, values)
77
                        self.connection.commit()
78
        except IntegrityError:
79
            Logger.error(INTEGRITY_ERROR_MSG)
80
            raise DatabaseException(INTEGRITY_ERROR_MSG)
81
        except ProgrammingError:
82
            Logger.error(PROGRAMMING_ERROR_MSG)
83
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
84
        except OperationalError:
85
            Logger.error(OPERATIONAL_ERROR_MSG)
86
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
87
        except NotSupportedError:
88
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
89
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
90
        except DatabaseError:
91
            Logger.error(DATABASE_ERROR_MSG)
92
            raise DatabaseException(DATABASE_ERROR_MSG)
93
        except Error:
94
            Logger.error(ERROR_MSG)
95
            raise DatabaseException(ERROR_MSG)
96

    
97
        return last_id
98

    
99
    def read(self, certificate_id: int):
100
        """
101
        Reads (selects) a certificate.
102

    
103
        :param certificate_id: ID of specific certificate
104

    
105
        :return: instance of the Certificate object
106
        """
107

    
108
        Logger.debug("Function launched.")
109

    
110
        try:
111
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
112
                   f"WHERE {COL_ID} = ?")
113
            values = [certificate_id]
114
            self.cursor.execute(sql, values)
115
            certificate_row = self.cursor.fetchone()
116

    
117
            if certificate_row is None:
118
                return None
119

    
120
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
121
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
122
            self.cursor.execute(sql, values)
123
            usage_rows = self.cursor.fetchall()
124

    
125
            usage_dict: Dict[int, bool] = {}
126
            for usage_row in usage_rows:
127
                usage_dict[usage_row[2]] = True
128

    
129
            certificate: Certificate = Certificate(certificate_row[0],
130
                                                   certificate_row[1],
131
                                                   certificate_row[2],
132
                                                   certificate_row[3],
133
                                                   certificate_row[4],
134
                                                   certificate_row[7],
135
                                                   certificate_row[8],
136
                                                   certificate_row[9],
137
                                                   usage_dict,
138
                                                   certificate_row[5],
139
                                                   certificate_row[6])
140
        except IntegrityError:
141
            Logger.error(INTEGRITY_ERROR_MSG)
142
            raise DatabaseException(INTEGRITY_ERROR_MSG)
143
        except ProgrammingError:
144
            Logger.error(PROGRAMMING_ERROR_MSG)
145
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
146
        except OperationalError:
147
            Logger.error(OPERATIONAL_ERROR_MSG)
148
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
149
        except NotSupportedError:
150
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
151
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
152
        except DatabaseError:
153
            Logger.error(DATABASE_ERROR_MSG)
154
            raise DatabaseException(DATABASE_ERROR_MSG)
155
        except Error:
156
            Logger.error(ERROR_MSG)
157
            raise DatabaseException(ERROR_MSG)
158

    
159
        return certificate
160

    
161
    def read_all(self, filter_type: int = None):
162
        """
163
        Reads (selects) all certificates (with type).
164

    
165
        :param filter_type: ID of certificate type from CertificateTypes table
166

    
167
        :return: list of certificates
168
        """
169

    
170
        Logger.debug("Function launched.")
171

    
172
        try:
173
            sql_extension = ""
174
            values = []
175
            if filter_type is not None:
176
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
177
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
178
                values = [filter_type]
179

    
180
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
181
            self.cursor.execute(sql, values)
182
            certificate_rows = self.cursor.fetchall()
183

    
184
            certificates: List[Certificate] = []
185
            for certificate_row in certificate_rows:
186
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
187
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
188
                values = [certificate_row[0]]
189
                self.cursor.execute(sql, values)
190
                usage_rows = self.cursor.fetchall()
191

    
192
                usage_dict: Dict[int, bool] = {}
193
                for usage_row in usage_rows:
194
                    usage_dict[usage_row[2]] = True
195

    
196
                certificates.append(Certificate(certificate_row[0],
197
                                                certificate_row[1],
198
                                                certificate_row[2],
199
                                                certificate_row[3],
200
                                                certificate_row[4],
201
                                                certificate_row[7],
202
                                                certificate_row[8],
203
                                                certificate_row[9],
204
                                                usage_dict,
205
                                                certificate_row[5],
206
                                                certificate_row[6]))
207
        except IntegrityError:
208
            Logger.error(INTEGRITY_ERROR_MSG)
209
            raise DatabaseException(INTEGRITY_ERROR_MSG)
210
        except ProgrammingError:
211
            Logger.error(PROGRAMMING_ERROR_MSG)
212
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
213
        except OperationalError:
214
            Logger.error(OPERATIONAL_ERROR_MSG)
215
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
216
        except NotSupportedError:
217
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
218
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
219
        except DatabaseError:
220
            Logger.error(DATABASE_ERROR_MSG)
221
            raise DatabaseException(DATABASE_ERROR_MSG)
222
        except Error:
223
            Logger.error(ERROR_MSG)
224
            raise DatabaseException(ERROR_MSG)
225

    
226
        return certificates
227

    
228
    def update(self, certificate_id: int, certificate: Certificate):
229
        """
230
        Updates a certificate.
231
        If the parameter of certificate (Certificate object) is not to be changed,
232
        the same value must be specified.
233

    
234
        :param certificate_id: ID of specific certificate
235
        :param certificate: Instance of the Certificate object
236

    
237
        :return: the result of whether the updation was successful
238
        """
239

    
240
        Logger.debug("Function launched.")
241

    
242
        try:
243
            sql = (f"UPDATE {TAB_CERTIFICATES} "
244
                   f"SET {COL_COMMON_NAME} = ?, "
245
                   f"{COL_VALID_FROM} = ?, "
246
                   f"{COL_VALID_TO} = ?, "
247
                   f"{COL_PEM_DATA} = ?, "
248
                   f"{COL_PRIVATE_KEY_ID} = ?, "
249
                   f"{COL_TYPE_ID} = ?, "
250
                   f"{COL_PARENT_ID} = ? "
251
                   f"WHERE {COL_ID} = ?")
252
            values = [certificate.common_name,
253
                      certificate.valid_from,
254
                      certificate.valid_to,
255
                      certificate.pem_data,
256
                      certificate.private_key_id,
257
                      certificate.type_id,
258
                      certificate.parent_id,
259
                      certificate_id]
260
            self.cursor.execute(sql, values)
261
            self.connection.commit()
262

    
263
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
264
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
265
            values = [certificate_id]
266
            self.cursor.execute(sql, values)
267
            self.connection.commit()
268

    
269
            # iterate over usage pairs
270
            for usage_id, usage_value in certificate.usages.items():
271
                if usage_value:
272
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
273
                           f"({COL_CERTIFICATE_ID},"
274
                           f"{COL_USAGE_TYPE_ID}) "
275
                           f"VALUES (?,?)")
276
                    values = [certificate_id, usage_id]
277
                    self.cursor.execute(sql, values)
278
                    self.connection.commit()
279
        except IntegrityError:
280
            Logger.error(INTEGRITY_ERROR_MSG)
281
            raise DatabaseException(INTEGRITY_ERROR_MSG)
282
        except ProgrammingError:
283
            Logger.error(PROGRAMMING_ERROR_MSG)
284
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
285
        except OperationalError:
286
            Logger.error(OPERATIONAL_ERROR_MSG)
287
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
288
        except NotSupportedError:
289
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
290
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
291
        except DatabaseError:
292
            Logger.error(DATABASE_ERROR_MSG)
293
            raise DatabaseException(DATABASE_ERROR_MSG)
294
        except Error:
295
            Logger.error(ERROR_MSG)
296
            raise DatabaseException(ERROR_MSG)
297

    
298
        return self.cursor.rowcount > 0
299

    
300
    def delete(self, certificate_id: int):
301
        """
302
        Deletes a certificate
303

    
304
        :param certificate_id: ID of specific certificate
305

    
306
        :return: the result of whether the deletion was successful
307
        """
308

    
309
        Logger.debug("Function launched.")
310

    
311
        try:
312
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
313
                   f"WHERE {COL_ID} = ?")
314
            values = [certificate_id]
315
            self.cursor.execute(sql, values)
316
            self.connection.commit()
317
        except IntegrityError:
318
            Logger.error(INTEGRITY_ERROR_MSG)
319
            raise DatabaseException(INTEGRITY_ERROR_MSG)
320
        except ProgrammingError:
321
            Logger.error(PROGRAMMING_ERROR_MSG)
322
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
323
        except OperationalError:
324
            Logger.error(OPERATIONAL_ERROR_MSG)
325
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
326
        except NotSupportedError:
327
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
328
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
329
        except DatabaseError:
330
            Logger.error(DATABASE_ERROR_MSG)
331
            raise DatabaseException(DATABASE_ERROR_MSG)
332
        except Error:
333
            Logger.error(ERROR_MSG)
334
            raise DatabaseException(ERROR_MSG)
335

    
336
        return self.cursor.rowcount > 0
337

    
338
    def set_certificate_revoked(
339
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
340
        """
341
        Revoke a certificate
342

    
343
        :param certificate_id: ID of specific certificate
344
        :param revocation_date: Date, when the certificate is revoked
345
        :param revocation_reason: Reason of the revocation
346

    
347
        :return:
348
            the result of whether the revocation was successful OR
349
            sqlite3.Error if an exception is thrown
350
        """
351

    
352
        Logger.debug("Function launched.")
353

    
354
        try:
355
            if revocation_date != "" and revocation_reason == "":
356
                revocation_reason = REV_REASON_UNSPECIFIED
357
            elif revocation_date == "":
358
                return False
359

    
360
            sql = (f"UPDATE {TAB_CERTIFICATES} "
361
                   f"SET {COL_REVOCATION_DATE} = ?, "
362
                   f"{COL_REVOCATION_REASON} = ? "
363
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
364
            values = [revocation_date,
365
                      revocation_reason,
366
                      certificate_id]
367
            self.cursor.execute(sql, values)
368
            self.connection.commit()
369
        except IntegrityError:
370
            Logger.error(INTEGRITY_ERROR_MSG)
371
            raise DatabaseException(INTEGRITY_ERROR_MSG)
372
        except ProgrammingError:
373
            Logger.error(PROGRAMMING_ERROR_MSG)
374
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
375
        except OperationalError:
376
            Logger.error(OPERATIONAL_ERROR_MSG)
377
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
378
        except NotSupportedError:
379
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
380
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
381
        except DatabaseError:
382
            Logger.error(DATABASE_ERROR_MSG)
383
            raise DatabaseException(DATABASE_ERROR_MSG)
384
        except Error:
385
            Logger.error(ERROR_MSG)
386
            raise DatabaseException(ERROR_MSG)
387

    
388
        return self.cursor.rowcount > 0
389

    
390
    def clear_certificate_revocation(self, certificate_id: int):
391
        """
392
        Clear revocation of a certificate
393

    
394
        :param certificate_id: ID of specific certificate
395

    
396
        :return:
397
            the result of whether the clear revocation was successful OR
398
            sqlite3.Error if an exception is thrown
399
        """
400

    
401
        Logger.debug("Function launched.")
402

    
403
        try:
404
            sql = (f"UPDATE {TAB_CERTIFICATES} "
405
                   f"SET {COL_REVOCATION_DATE} = '', "
406
                   f"{COL_REVOCATION_REASON} = '' "
407
                   f"WHERE {COL_ID} = ?")
408
            values = [certificate_id]
409
            self.cursor.execute(sql, values)
410
            self.connection.commit()
411
        except IntegrityError:
412
            Logger.error(INTEGRITY_ERROR_MSG)
413
            raise DatabaseException(INTEGRITY_ERROR_MSG)
414
        except ProgrammingError:
415
            Logger.error(PROGRAMMING_ERROR_MSG)
416
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
417
        except OperationalError:
418
            Logger.error(OPERATIONAL_ERROR_MSG)
419
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
420
        except NotSupportedError:
421
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
422
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
423
        except DatabaseError:
424
            Logger.error(DATABASE_ERROR_MSG)
425
            raise DatabaseException(DATABASE_ERROR_MSG)
426
        except Error:
427
            Logger.error(ERROR_MSG)
428
            raise DatabaseException(ERROR_MSG)
429

    
430
        return self.cursor.rowcount > 0
431

    
432
    def get_all_revoked_by(self, certificate_id: int):
433
        """
434
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
435

    
436
        :param certificate_id: ID of specific certificate
437

    
438
        :return:
439
            list of the certificates OR
440
            None if the list is empty OR
441
            sqlite3.Error if an exception is thrown
442
        """
443

    
444
        Logger.debug("Function launched.")
445

    
446
        try:
447
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
448
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
449
            values = [certificate_id]
450
            self.cursor.execute(sql, values)
451
            certificate_rows = self.cursor.fetchall()
452

    
453
            certificates: List[Certificate] = []
454
            for certificate_row in certificate_rows:
455
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
456
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
457
                values = [certificate_row[0]]
458
                self.cursor.execute(sql, values)
459
                usage_rows = self.cursor.fetchall()
460

    
461
                usage_dict: Dict[int, bool] = {}
462
                for usage_row in usage_rows:
463
                    usage_dict[usage_row[2]] = True
464

    
465
                certificates.append(Certificate(certificate_row[0],
466
                                                certificate_row[1],
467
                                                certificate_row[2],
468
                                                certificate_row[3],
469
                                                certificate_row[4],
470
                                                certificate_row[7],
471
                                                certificate_row[8],
472
                                                certificate_row[9],
473
                                                usage_dict,
474
                                                certificate_row[5],
475
                                                certificate_row[6]))
476
        except IntegrityError:
477
            Logger.error(INTEGRITY_ERROR_MSG)
478
            raise DatabaseException(INTEGRITY_ERROR_MSG)
479
        except ProgrammingError:
480
            Logger.error(PROGRAMMING_ERROR_MSG)
481
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
482
        except OperationalError:
483
            Logger.error(OPERATIONAL_ERROR_MSG)
484
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
485
        except NotSupportedError:
486
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
487
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
488
        except DatabaseError:
489
            Logger.error(DATABASE_ERROR_MSG)
490
            raise DatabaseException(DATABASE_ERROR_MSG)
491
        except Error:
492
            Logger.error(ERROR_MSG)
493
            raise DatabaseException(ERROR_MSG)
494

    
495
        return certificates
496

    
497
    def get_all_issued_by(self, certificate_id: int):
498
        """
499
        Get list of the certificates that are direct descendants of the certificate with the ID
500

    
501
        :param certificate_id: ID of specific certificate
502

    
503
        :return:
504
            list of the certificates OR
505
            None if the list is empty OR
506
            sqlite3.Error if an exception is thrown
507
        """
508

    
509
        Logger.debug("Function launched.")
510

    
511
        try:
512
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
513
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
514
            values = [certificate_id, certificate_id]
515
            self.cursor.execute(sql, values)
516
            certificate_rows = self.cursor.fetchall()
517

    
518
            certificates: List[Certificate] = []
519
            for certificate_row in certificate_rows:
520
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
521
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
522
                values = [certificate_row[0]]
523
                self.cursor.execute(sql, values)
524
                usage_rows = self.cursor.fetchall()
525

    
526
                usage_dict: Dict[int, bool] = {}
527
                for usage_row in usage_rows:
528
                    usage_dict[usage_row[2]] = True
529

    
530
                certificates.append(Certificate(certificate_row[0],
531
                                                certificate_row[1],
532
                                                certificate_row[2],
533
                                                certificate_row[3],
534
                                                certificate_row[4],
535
                                                certificate_row[7],
536
                                                certificate_row[8],
537
                                                certificate_row[9],
538
                                                usage_dict,
539
                                                certificate_row[5],
540
                                                certificate_row[6]))
541
        except IntegrityError:
542
            Logger.error(INTEGRITY_ERROR_MSG)
543
            raise DatabaseException(INTEGRITY_ERROR_MSG)
544
        except ProgrammingError:
545
            Logger.error(PROGRAMMING_ERROR_MSG)
546
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
547
        except OperationalError:
548
            Logger.error(OPERATIONAL_ERROR_MSG)
549
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
550
        except NotSupportedError:
551
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
552
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
553
        except DatabaseError:
554
            Logger.error(DATABASE_ERROR_MSG)
555
            raise DatabaseException(DATABASE_ERROR_MSG)
556
        except Error:
557
            Logger.error(ERROR_MSG)
558
            raise DatabaseException(ERROR_MSG)
559

    
560
        return certificates
561

    
562
    def get_all_descendants_of(self, certificate_id: int):
563
        """
564
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
565
        between C and its root certificate authority (i.e. is an ancestor of C).
566
        :param certificate_id: target certificate ID
567
        :return: list of all descendants
568
        """
569

    
570
        Logger.debug("Function launched.")
571

    
572
        try:
573
            def dfs(children_of, this, collection: list):
574
                for child in children_of(this.certificate_id):
575
                    dfs(children_of, child, collection)
576
                collection.append(this)
577

    
578
            subtree_root = self.read(certificate_id)
579
            if subtree_root is None:
580
                return None
581

    
582
            all_certs = []
583
            dfs(self.get_all_issued_by, subtree_root, all_certs)
584
        except IntegrityError:
585
            Logger.error(INTEGRITY_ERROR_MSG)
586
            raise DatabaseException(INTEGRITY_ERROR_MSG)
587
        except ProgrammingError:
588
            Logger.error(PROGRAMMING_ERROR_MSG)
589
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
590
        except OperationalError:
591
            Logger.error(OPERATIONAL_ERROR_MSG)
592
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
593
        except NotSupportedError:
594
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
595
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
596
        except DatabaseError:
597
            Logger.error(DATABASE_ERROR_MSG)
598
            raise DatabaseException(DATABASE_ERROR_MSG)
599
        except Error:
600
            Logger.error(ERROR_MSG)
601
            raise DatabaseException(ERROR_MSG)
602

    
603
        return all_certs
604

    
605
    def get_next_id(self) -> int:
606
        """
607
        Get identifier of the next certificate that will be inserted into the database
608
        :return: identifier of the next certificate that will be added into the database
609
        """
610

    
611
        Logger.debug("Function launched.")
612

    
613
        try:
614
            # get next IDs of all tables
615
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
616
            results = self.cursor.fetchall()
617

    
618
            # search for next ID in Certificates table and return it
619
            for result in results:
620
                if result[0] == TAB_CERTIFICATES:
621
                    return result[1] + 1  # current last id + 1
622
            # if certificates table is not present in the query results, return 1
623
        except IntegrityError:
624
            Logger.error(INTEGRITY_ERROR_MSG)
625
            raise DatabaseException(INTEGRITY_ERROR_MSG)
626
        except ProgrammingError:
627
            Logger.error(PROGRAMMING_ERROR_MSG)
628
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
629
        except OperationalError:
630
            Logger.error(OPERATIONAL_ERROR_MSG)
631
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
632
        except NotSupportedError:
633
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
634
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
635
        except DatabaseError:
636
            Logger.error(DATABASE_ERROR_MSG)
637
            raise DatabaseException(DATABASE_ERROR_MSG)
638
        except Error:
639
            Logger.error(ERROR_MSG)
640
            raise DatabaseException(ERROR_MSG)
641

    
642
        return 1
(2-2/3)