Projekt

Obecné

Profil

Stáhnout (38.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18

    
19

    
20
class CertificateRepository:
21

    
22
    @inject
23
    def __init__(self, connection: Connection):
24
        """
25
        Constructor of the CertificateRepository object
26

    
27
        :param connection: Instance of the Connection object
28
        """
29
        self.connection = connection
30
        self.cursor = connection.cursor()
31

    
32
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36

    
37
        :param certificate: Instance of the Certificate object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        Logger.debug("Function launched.")
43

    
44
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56
                   f"{COL_TYPE_ID},"
57
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61
                      certificate.valid_to,
62
                      certificate.pem_data,
63
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70
                      certificate.type_id,
71
                      certificate.parent_id,
72
                      certificate.private_key_id]
73
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75

    
76
            last_id: int = self.cursor.lastrowid
77

    
78
            if certificate.type_id == ROOT_CA_ID:
79
                certificate.parent_id = last_id
80
                self.update(last_id, certificate)
81
            else:
82
                for usage_id, usage_value in certificate.usages.items():
83
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97
        except OperationalError as e:
98
            Logger.error(OPERATIONAL_ERROR_MSG)
99
            # TODO will be improved in #8884
100
            Logger.error(str(e))
101
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
102
        except NotSupportedError:
103
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
104
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
105
        except DatabaseError:
106
            Logger.error(DATABASE_ERROR_MSG)
107
            raise DatabaseException(DATABASE_ERROR_MSG)
108
        except Error:
109
            Logger.error(ERROR_MSG)
110
            raise DatabaseException(ERROR_MSG)
111

    
112
        return last_id
113

    
114
    def read(self, certificate_id: int):
115
        """
116
        Reads (selects) a certificate.
117

    
118
        :param certificate_id: ID of specific certificate
119

    
120
        :return: instance of the Certificate object
121
        """
122

    
123
        Logger.debug("Function launched.")
124

    
125
        try:
126
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
127
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL)")
128
            values = [certificate_id]
129
            self.cursor.execute(sql, values)
130
            certificate_row = self.cursor.fetchone()
131

    
132
            if certificate_row is None:
133
                return None
134

    
135
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
136
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
137
            self.cursor.execute(sql, values)
138
            usage_rows = self.cursor.fetchall()
139

    
140
            usage_dict: Dict[int, bool] = {}
141
            for usage_row in usage_rows:
142
                usage_dict[usage_row[2]] = True
143

    
144
            certificate: Certificate = Certificate(certificate_row[0],      # ID
145
                                                   certificate_row[1],      # valid from
146
                                                   certificate_row[2],      # valid to
147
                                                   certificate_row[3],      # pem data
148
                                                   certificate_row[14],     # type ID
149
                                                   certificate_row[15],     # parent ID
150
                                                   certificate_row[16],     # private key ID
151
                                                   usage_dict,
152
                                                   certificate_row[4],      # common name
153
                                                   certificate_row[5],      # country code
154
                                                   certificate_row[6],      # locality
155
                                                   certificate_row[7],      # province
156
                                                   certificate_row[8],      # organization
157
                                                   certificate_row[9],      # organizational unit
158
                                                   certificate_row[10],     # email address
159
                                                   certificate_row[11],     # revocation date
160
                                                   certificate_row[12])     # revocation reason
161

    
162
        except IntegrityError:
163
            Logger.error(INTEGRITY_ERROR_MSG)
164
            raise DatabaseException(INTEGRITY_ERROR_MSG)
165
        except ProgrammingError:
166
            Logger.error(PROGRAMMING_ERROR_MSG)
167
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
168
        except OperationalError:
169
            Logger.error(OPERATIONAL_ERROR_MSG)
170
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
171
        except NotSupportedError:
172
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
173
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
174
        except DatabaseError:
175
            Logger.error(DATABASE_ERROR_MSG)
176
            raise DatabaseException(DATABASE_ERROR_MSG)
177
        except Error:
178
            Logger.error(ERROR_MSG)
179
            raise DatabaseException(ERROR_MSG)
180

    
181
        return certificate
182

    
183
    def read_all(self, filter_type: int = None):
184
        """
185
        Reads (selects) all certificates (with type).
186

    
187
        :param filter_type: ID of certificate type from CertificateTypes table
188

    
189
        :return: list of certificates
190
        """
191

    
192
        Logger.debug("Function launched.")
193

    
194
        try:
195
            sql_extension = ""
196
            values = []
197
            if filter_type is not None:
198
                sql_extension = (f" AND {COL_TYPE_ID} = ("
199
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
200
                values = [filter_type]
201

    
202
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
203
                   f"WHERE ({COL_DELETION_DATE} IS NULL){sql_extension}")
204
            self.cursor.execute(sql, values)
205
            certificate_rows = self.cursor.fetchall()
206

    
207
            certificates: List[Certificate] = []
208
            for certificate_row in certificate_rows:
209
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
210
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
211
                values = [certificate_row[0]]
212
                self.cursor.execute(sql, values)
213
                usage_rows = self.cursor.fetchall()
214

    
215
                usage_dict: Dict[int, bool] = {}
216
                for usage_row in usage_rows:
217
                    usage_dict[usage_row[2]] = True
218

    
219
                certificates.append(Certificate(certificate_row[0],      # ID
220
                                                certificate_row[1],      # valid from
221
                                                certificate_row[2],      # valid to
222
                                                certificate_row[3],      # pem data
223
                                                certificate_row[14],     # type ID
224
                                                certificate_row[15],     # parent ID
225
                                                certificate_row[16],     # private key ID
226
                                                usage_dict,
227
                                                certificate_row[4],      # common name
228
                                                certificate_row[5],      # country code
229
                                                certificate_row[6],      # locality
230
                                                certificate_row[7],      # province
231
                                                certificate_row[8],      # organization
232
                                                certificate_row[9],      # organizational unit
233
                                                certificate_row[10],     # email address
234
                                                certificate_row[11],     # revocation date
235
                                                certificate_row[12]))    # revocation reason
236
        except IntegrityError:
237
            Logger.error(INTEGRITY_ERROR_MSG)
238
            raise DatabaseException(INTEGRITY_ERROR_MSG)
239
        except ProgrammingError:
240
            Logger.error(PROGRAMMING_ERROR_MSG)
241
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
242
        except OperationalError:
243
            Logger.error(OPERATIONAL_ERROR_MSG)
244
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
245
        except NotSupportedError:
246
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
247
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
248
        except DatabaseError:
249
            Logger.error(DATABASE_ERROR_MSG)
250
            raise DatabaseException(DATABASE_ERROR_MSG)
251
        except Error:
252
            Logger.error(ERROR_MSG)
253
            raise DatabaseException(ERROR_MSG)
254

    
255
        return certificates
256

    
257
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
258
                        per_page: int):
259
        """
260
        Reads (selects) all certificates according to a specific filtering and pagination options.
261
        :param target_types: certificate types (filter)
262
        :param target_usages: certificate usages (filter)
263
        :param target_cn_substring: certificate CN substring (filter)
264
        :param page: target page
265
        :param per_page: target page size
266
        :return: list of certificates
267
        """
268

    
269
        Logger.debug("Function launched.")
270

    
271
        try:
272
            values = []
273
            values += list(target_types)
274

    
275
            sql = (
276
                f"SELECT * "
277
                f"FROM {TAB_CERTIFICATES} a "
278
                f"WHERE (a.{COL_DELETION_DATE} IS NULL) "
279
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
280
            )
281
            
282
            if target_usages is not None:
283
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
284
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
285
                values += list(target_usages)
286

    
287
            if target_cn_substring is not None:
288
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
289

    
290
                values += [target_cn_substring]
291

    
292
            if page is not None and per_page is not None:
293
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
294

    
295
            self.cursor.execute(sql, values)
296
            certificate_rows = self.cursor.fetchall()
297

    
298
            certificates: List[Certificate] = []
299
            for certificate_row in certificate_rows:
300
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
301
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
302
                types = [certificate_row[0]]
303
                self.cursor.execute(sql, types)
304
                usage_rows = self.cursor.fetchall()
305

    
306
                usage_dict: Dict[int, bool] = {}
307
                for usage_row in usage_rows:
308
                    usage_dict[usage_row[0]] = True
309

    
310
                certificates.append(Certificate(certificate_row[0],  # ID
311
                                                certificate_row[1],  # valid from
312
                                                certificate_row[2],  # valid to
313
                                                certificate_row[3],  # pem data
314
                                                certificate_row[14],  # type ID
315
                                                certificate_row[15],  # parent ID
316
                                                certificate_row[16],  # private key ID
317
                                                usage_dict,
318
                                                certificate_row[4],  # common name
319
                                                certificate_row[5],  # country code
320
                                                certificate_row[6],  # locality
321
                                                certificate_row[7],  # province
322
                                                certificate_row[8],  # organization
323
                                                certificate_row[9],  # organizational unit
324
                                                certificate_row[10],  # email address
325
                                                certificate_row[11],  # revocation date
326
                                                certificate_row[12]))  # revocation reason
327
        except IntegrityError:
328
            Logger.error(INTEGRITY_ERROR_MSG)
329
            raise DatabaseException(INTEGRITY_ERROR_MSG)
330
        except ProgrammingError:
331
            Logger.error(PROGRAMMING_ERROR_MSG)
332
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
333
        except OperationalError:
334
            Logger.error(OPERATIONAL_ERROR_MSG)
335
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
336
        except NotSupportedError:
337
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
338
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
339
        except DatabaseError:
340
            Logger.error(DATABASE_ERROR_MSG)
341
            raise DatabaseException(DATABASE_ERROR_MSG)
342
        except Error:
343
            Logger.error(ERROR_MSG)
344
            raise DatabaseException(ERROR_MSG)
345

    
346
        return certificates
347

    
348
    def update(self, certificate_id: int, certificate: Certificate):
349
        """
350
        Updates a certificate.
351
        If the parameter of certificate (Certificate object) is not to be changed,
352
        the same value must be specified.
353

    
354
        :param certificate_id: ID of specific certificate
355
        :param certificate: Instance of the Certificate object
356

    
357
        :return: the result of whether the updation was successful
358
        """
359

    
360
        Logger.debug("Function launched.")
361

    
362
        try:
363
            sql = (f"UPDATE {TAB_CERTIFICATES} "
364
                   f"SET {COL_VALID_FROM} = ?, "
365
                   f"{COL_VALID_TO} = ?, "
366
                   f"{COL_PEM_DATA} = ?, "
367
                   f"{COL_COMMON_NAME} = ?, "
368
                   f"{COL_COUNTRY_CODE} = ?, "
369
                   f"{COL_LOCALITY} = ?, "
370
                   f"{COL_PROVINCE} = ?, "
371
                   f"{COL_ORGANIZATION} = ?, "
372
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
373
                   f"{COL_EMAIL_ADDRESS} = ?, "
374
                   f"{COL_TYPE_ID} = ?, "
375
                   f"{COL_PARENT_ID} = ?, "
376
                   f"{COL_PRIVATE_KEY_ID} = ? "
377
                   f"WHERE {COL_ID} = ?")
378
            values = [certificate.valid_from,
379
                      certificate.valid_to,
380
                      certificate.pem_data,
381
                      certificate.common_name,
382
                      certificate.country_code,
383
                      certificate.locality,
384
                      certificate.province,
385
                      certificate.organization,
386
                      certificate.organizational_unit,
387
                      certificate.email_address,
388
                      certificate.type_id,
389
                      certificate.parent_id,
390
                      certificate.private_key_id,
391
                      certificate_id]
392

    
393
            self.cursor.execute(sql, values)
394
            self.connection.commit()
395

    
396
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
397
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
398
            values = [certificate_id]
399
            self.cursor.execute(sql, values)
400
            self.connection.commit()
401

    
402
            check_updated = self.cursor.rowcount > 0
403

    
404
            # iterate over usage pairs
405
            for usage_id, usage_value in certificate.usages.items():
406
                if usage_value:
407
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
408
                           f"({COL_CERTIFICATE_ID},"
409
                           f"{COL_USAGE_TYPE_ID}) "
410
                           f"VALUES (?,?)")
411
                    values = [certificate_id, usage_id]
412
                    self.cursor.execute(sql, values)
413
                    self.connection.commit()
414
        except IntegrityError:
415
            Logger.error(INTEGRITY_ERROR_MSG)
416
            raise DatabaseException(INTEGRITY_ERROR_MSG)
417
        except ProgrammingError:
418
            Logger.error(PROGRAMMING_ERROR_MSG)
419
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
420
        except OperationalError:
421
            Logger.error(OPERATIONAL_ERROR_MSG)
422
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
423
        except NotSupportedError:
424
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
425
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
426
        except DatabaseError:
427
            Logger.error(DATABASE_ERROR_MSG)
428
            raise DatabaseException(DATABASE_ERROR_MSG)
429
        except Error:
430
            Logger.error(ERROR_MSG)
431
            raise DatabaseException(ERROR_MSG)
432

    
433
        return check_updated
434

    
435
    def delete(self, certificate_id: int):
436
        """
437
        Deletes a certificate
438

    
439
        :param certificate_id: ID of specific certificate
440

    
441
        :return: the result of whether the deletion was successful
442
        """
443

    
444
        Logger.debug("Function launched.")
445

    
446
        try:
447
            sql = (f"UPDATE {TAB_CERTIFICATES} "
448
                   f"SET {COL_DELETION_DATE} = ? "
449
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL)")
450
            values = [int(time.time()),
451
                      certificate_id]
452
            self.cursor.execute(sql, values)
453
            self.connection.commit()
454
        except IntegrityError:
455
            Logger.error(INTEGRITY_ERROR_MSG)
456
            raise DatabaseException(INTEGRITY_ERROR_MSG)
457
        except ProgrammingError:
458
            Logger.error(PROGRAMMING_ERROR_MSG)
459
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
460
        except OperationalError:
461
            Logger.error(OPERATIONAL_ERROR_MSG)
462
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
463
        except NotSupportedError:
464
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
465
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
466
        except DatabaseError:
467
            Logger.error(DATABASE_ERROR_MSG)
468
            raise DatabaseException(DATABASE_ERROR_MSG)
469
        except Error:
470
            Logger.error(ERROR_MSG)
471
            raise DatabaseException(ERROR_MSG)
472

    
473
        return self.cursor.rowcount > 0
474

    
475
    def set_certificate_revoked(
476
            self, certificate_id: int, revocation_date: int, revocation_reason: str = REV_REASON_UNSPECIFIED):
477
        """
478
        Revoke a certificate
479

    
480
        :param certificate_id: ID of specific certificate
481
        :param revocation_date: Date, when the certificate is revoked
482
        :param revocation_reason: Reason of the revocation
483

    
484
        :return:
485
            the result of whether the revocation was successful OR
486
            sqlite3.Error if an exception is thrown
487
        """
488

    
489
        Logger.debug("Function launched.")
490

    
491
        try:
492
            if revocation_date != "" and revocation_reason == "":
493
                revocation_reason = REV_REASON_UNSPECIFIED
494
            elif revocation_date == "":
495
                return False
496

    
497
            sql = (f"UPDATE {TAB_CERTIFICATES} "
498
                   f"SET {COL_REVOCATION_DATE} = ?, "
499
                   f"{COL_REVOCATION_REASON} = ? "
500
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL)")
501
            values = [revocation_date,
502
                      revocation_reason,
503
                      certificate_id]
504
            self.cursor.execute(sql, values)
505
            self.connection.commit()
506
        except IntegrityError:
507
            Logger.error(INTEGRITY_ERROR_MSG)
508
            raise DatabaseException(INTEGRITY_ERROR_MSG)
509
        except ProgrammingError:
510
            Logger.error(PROGRAMMING_ERROR_MSG)
511
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
512
        except OperationalError:
513
            Logger.error(OPERATIONAL_ERROR_MSG)
514
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
515
        except NotSupportedError:
516
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
517
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
518
        except DatabaseError:
519
            Logger.error(DATABASE_ERROR_MSG)
520
            raise DatabaseException(DATABASE_ERROR_MSG)
521
        except Error:
522
            Logger.error(ERROR_MSG)
523
            raise DatabaseException(ERROR_MSG)
524

    
525
        return self.cursor.rowcount > 0
526

    
527
    def clear_certificate_revocation(self, certificate_id: int):
528
        """
529
        Clear revocation of a certificate
530

    
531
        :param certificate_id: ID of specific certificate
532

    
533
        :return:
534
            the result of whether the clear revocation was successful OR
535
            sqlite3.Error if an exception is thrown
536
        """
537

    
538
        Logger.debug("Function launched.")
539

    
540
        try:
541
            sql = (f"UPDATE {TAB_CERTIFICATES} "
542
                   f"SET {COL_REVOCATION_DATE} = NULL, "
543
                   f"{COL_REVOCATION_REASON} = NULL "
544
                   f"WHERE {COL_ID} = ?")
545
            values = [certificate_id]
546
            self.cursor.execute(sql, values)
547
            self.connection.commit()
548
        except IntegrityError:
549
            Logger.error(INTEGRITY_ERROR_MSG)
550
            raise DatabaseException(INTEGRITY_ERROR_MSG)
551
        except ProgrammingError:
552
            Logger.error(PROGRAMMING_ERROR_MSG)
553
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
554
        except OperationalError:
555
            Logger.error(OPERATIONAL_ERROR_MSG)
556
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
557
        except NotSupportedError:
558
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
559
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
560
        except DatabaseError:
561
            Logger.error(DATABASE_ERROR_MSG)
562
            raise DatabaseException(DATABASE_ERROR_MSG)
563
        except Error:
564
            Logger.error(ERROR_MSG)
565
            raise DatabaseException(ERROR_MSG)
566

    
567
        return self.cursor.rowcount > 0
568

    
569
    def get_all_revoked_by(self, certificate_id: int):
570
        """
571
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
572

    
573
        :param certificate_id: ID of specific certificate
574

    
575
        :return:
576
            list of the certificates OR
577
            None if the list is empty OR
578
            sqlite3.Error if an exception is thrown
579
        """
580

    
581
        Logger.debug("Function launched.")
582

    
583
        try:
584
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
585
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL")
586
            values = [certificate_id]
587
            self.cursor.execute(sql, values)
588
            certificate_rows = self.cursor.fetchall()
589

    
590
            certificates: List[Certificate] = []
591
            for certificate_row in certificate_rows:
592
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
593
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
594
                values = [certificate_row[0]]
595
                self.cursor.execute(sql, values)
596
                usage_rows = self.cursor.fetchall()
597

    
598
                usage_dict: Dict[int, bool] = {}
599
                for usage_row in usage_rows:
600
                    usage_dict[usage_row[2]] = True
601

    
602
                certificates.append(Certificate(certificate_row[0],      # ID
603
                                                certificate_row[1],      # valid from
604
                                                certificate_row[2],      # valid to
605
                                                certificate_row[3],      # pem data
606
                                                certificate_row[14],     # type ID
607
                                                certificate_row[15],     # parent ID
608
                                                certificate_row[16],     # private key ID
609
                                                usage_dict,
610
                                                certificate_row[4],      # common name
611
                                                certificate_row[5],      # country code
612
                                                certificate_row[6],      # locality
613
                                                certificate_row[7],      # province
614
                                                certificate_row[8],      # organization
615
                                                certificate_row[9],      # organizational unit
616
                                                certificate_row[10],     # email address
617
                                                certificate_row[11],     # revocation date
618
                                                certificate_row[12]))    # revocation reason
619
        except IntegrityError:
620
            Logger.error(INTEGRITY_ERROR_MSG)
621
            raise DatabaseException(INTEGRITY_ERROR_MSG)
622
        except ProgrammingError:
623
            Logger.error(PROGRAMMING_ERROR_MSG)
624
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
625
        except OperationalError:
626
            Logger.error(OPERATIONAL_ERROR_MSG)
627
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
628
        except NotSupportedError:
629
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
630
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
631
        except DatabaseError:
632
            Logger.error(DATABASE_ERROR_MSG)
633
            raise DatabaseException(DATABASE_ERROR_MSG)
634
        except Error:
635
            Logger.error(ERROR_MSG)
636
            raise DatabaseException(ERROR_MSG)
637

    
638
        return certificates
639

    
640
    def get_all_issued_by(self, certificate_id: int):
641
        """
642
        Get list of the certificates that are direct descendants of the certificate with the ID
643

    
644
        :param certificate_id: ID of specific certificate
645

    
646
        :return:
647
            list of the certificates OR
648
            None if the list is empty OR
649
            sqlite3.Error if an exception is thrown
650
        """
651

    
652
        Logger.debug("Function launched.")
653

    
654
        try:
655
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
656
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
657
                   f"({COL_DELETION_DATE} IS NULL)")
658
            values = [certificate_id, certificate_id]
659
            self.cursor.execute(sql, values)
660
            certificate_rows = self.cursor.fetchall()
661

    
662
            certificates: List[Certificate] = []
663
            for certificate_row in certificate_rows:
664
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
665
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
666
                values = [certificate_row[0]]
667
                self.cursor.execute(sql, values)
668
                usage_rows = self.cursor.fetchall()
669

    
670
                usage_dict: Dict[int, bool] = {}
671
                for usage_row in usage_rows:
672
                    usage_dict[usage_row[2]] = True
673

    
674
                certificates.append(Certificate(certificate_row[0],      # ID
675
                                                certificate_row[1],      # valid from
676
                                                certificate_row[2],      # valid to
677
                                                certificate_row[3],      # pem data
678
                                                certificate_row[14],     # type ID
679
                                                certificate_row[15],     # parent ID
680
                                                certificate_row[16],     # private key ID
681
                                                usage_dict,
682
                                                certificate_row[4],      # common name
683
                                                certificate_row[5],      # country code
684
                                                certificate_row[6],      # locality
685
                                                certificate_row[7],      # province
686
                                                certificate_row[8],      # organization
687
                                                certificate_row[9],      # organizational unit
688
                                                certificate_row[10],     # email address
689
                                                certificate_row[11],     # revocation date
690
                                                certificate_row[12]))    # revocation reason
691
        except IntegrityError:
692
            Logger.error(INTEGRITY_ERROR_MSG)
693
            raise DatabaseException(INTEGRITY_ERROR_MSG)
694
        except ProgrammingError:
695
            Logger.error(PROGRAMMING_ERROR_MSG)
696
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
697
        except OperationalError:
698
            Logger.error(OPERATIONAL_ERROR_MSG)
699
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
700
        except NotSupportedError:
701
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
702
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
703
        except DatabaseError:
704
            Logger.error(DATABASE_ERROR_MSG)
705
            raise DatabaseException(DATABASE_ERROR_MSG)
706
        except Error:
707
            Logger.error(ERROR_MSG)
708
            raise DatabaseException(ERROR_MSG)
709

    
710
        return certificates
711

    
712
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
713
        """
714
        Get list of the certificates that are direct descendants of the certificate with the ID
715

    
716
        :param target_types: certificate types (filter)
717
        :param target_usages: certificate usages (filter)
718
        :param target_cn_substring: certificate CN substring (filter)
719
        :param page: target page
720
        :param per_page: target page size
721
        :param issuer_id: ID of specific certificate
722

    
723
        :return:
724
            list of the certificates OR
725
            None if the list is empty
726
        """
727

    
728
        Logger.debug("Function launched.")
729

    
730
        try:
731
            values = [issuer_id, issuer_id]
732
            values += list(target_types)
733

    
734
            sql = (
735
                f"SELECT * "
736
                f"FROM {TAB_CERTIFICATES} a "
737
                f"WHERE (a.{COL_DELETION_DATE} IS NULL) "
738
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
739
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
740
                
741
            )
742
            
743
            if target_usages is not None:
744
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
745
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
746
                values += list(target_usages)
747

    
748
            if target_cn_substring is not None:
749
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
750
                values += [target_cn_substring]
751

    
752
            if page is not None and per_page is not None:
753
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
754

    
755
            self.cursor.execute(sql, values)
756
            certificate_rows = self.cursor.fetchall()
757

    
758
            certificates: List[Certificate] = []
759
            for certificate_row in certificate_rows:
760
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
761
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
762
                values = [certificate_row[0]]
763
                self.cursor.execute(sql, values)
764
                usage_rows = self.cursor.fetchall()
765

    
766
                usage_dict: Dict[int, bool] = {}
767
                for usage_row in usage_rows:
768
                    usage_dict[usage_row[2]] = True
769

    
770
                certificates.append(Certificate(certificate_row[0],  # ID
771
                                                certificate_row[1],  # valid from
772
                                                certificate_row[2],  # valid to
773
                                                certificate_row[3],  # pem data
774
                                                certificate_row[14],  # type ID
775
                                                certificate_row[15],  # parent ID
776
                                                certificate_row[16],  # private key ID
777
                                                usage_dict,
778
                                                certificate_row[4],  # common name
779
                                                certificate_row[5],  # country code
780
                                                certificate_row[6],  # locality
781
                                                certificate_row[7],  # province
782
                                                certificate_row[8],  # organization
783
                                                certificate_row[9],  # organizational unit
784
                                                certificate_row[10],  # email address
785
                                                certificate_row[11],  # revocation date
786
                                                certificate_row[12]))  # revocation reason
787
        except IntegrityError:
788
            Logger.error(INTEGRITY_ERROR_MSG)
789
            raise DatabaseException(INTEGRITY_ERROR_MSG)
790
        except ProgrammingError:
791
            Logger.error(PROGRAMMING_ERROR_MSG)
792
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
793
        except OperationalError:
794
            Logger.error(OPERATIONAL_ERROR_MSG)
795
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
796
        except NotSupportedError:
797
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
798
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
799
        except DatabaseError:
800
            Logger.error(DATABASE_ERROR_MSG)
801
            raise DatabaseException(DATABASE_ERROR_MSG)
802
        except Error:
803
            Logger.error(ERROR_MSG)
804
            raise DatabaseException(ERROR_MSG)
805

    
806
        return certificates
807

    
808
    def get_all_descendants_of(self, certificate_id: int):
809
        """
810
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
811
        between C and its root certificate authority (i.e. is an ancestor of C).
812
        :param certificate_id: target certificate ID
813
        :return: list of all descendants
814
        """
815

    
816
        Logger.debug("Function launched.")
817

    
818
        try:
819
            def dfs(children_of, this, collection: list):
820
                for child in children_of(this.certificate_id):
821
                    dfs(children_of, child, collection)
822
                collection.append(this)
823

    
824
            subtree_root = self.read(certificate_id)
825
            if subtree_root is None:
826
                return None
827

    
828
            all_certs = []
829
            dfs(self.get_all_issued_by, subtree_root, all_certs)
830
        except IntegrityError:
831
            Logger.error(INTEGRITY_ERROR_MSG)
832
            raise DatabaseException(INTEGRITY_ERROR_MSG)
833
        except ProgrammingError:
834
            Logger.error(PROGRAMMING_ERROR_MSG)
835
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
836
        except OperationalError:
837
            Logger.error(OPERATIONAL_ERROR_MSG)
838
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
839
        except NotSupportedError:
840
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
841
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
842
        except DatabaseError:
843
            Logger.error(DATABASE_ERROR_MSG)
844
            raise DatabaseException(DATABASE_ERROR_MSG)
845
        except Error:
846
            Logger.error(ERROR_MSG)
847
            raise DatabaseException(ERROR_MSG)
848

    
849
        return all_certs
850

    
851
    def get_next_id(self) -> int:
852
        """
853
        Get identifier of the next certificate that will be inserted into the database
854
        :return: identifier of the next certificate that will be added into the database
855
        """
856

    
857
        Logger.debug("Function launched.")
858

    
859
        try:
860
            # get next IDs of all tables
861
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
862
            results = self.cursor.fetchall()
863

    
864
            # search for next ID in Certificates table and return it
865
            for result in results:
866
                if result[0] == TAB_CERTIFICATES:
867
                    return result[1] + 1  # current last id + 1
868
            # if certificates table is not present in the query results, return 1
869
        except IntegrityError:
870
            Logger.error(INTEGRITY_ERROR_MSG)
871
            raise DatabaseException(INTEGRITY_ERROR_MSG)
872
        except ProgrammingError:
873
            Logger.error(PROGRAMMING_ERROR_MSG)
874
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
875
        except OperationalError:
876
            Logger.error(OPERATIONAL_ERROR_MSG)
877
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
878
        except NotSupportedError:
879
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
880
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
881
        except DatabaseError:
882
            Logger.error(DATABASE_ERROR_MSG)
883
            raise DatabaseException(DATABASE_ERROR_MSG)
884
        except Error:
885
            Logger.error(ERROR_MSG)
886
            raise DatabaseException(ERROR_MSG)
887

    
888
        return 1
(2-2/3)