Projekt

Obecné

Profil

Stáhnout (38.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10
from src.utils.logger import Logger
11

    
12
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18

    
19

    
20
class CertificateRepository:
21

    
22
    @inject
23
    def __init__(self, connection: Connection):
24
        """
25
        Constructor of the CertificateRepository object
26

    
27
        :param connection: Instance of the Connection object
28
        """
29
        self.connection = connection
30
        self.cursor = connection.cursor()
31

    
32
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36

    
37
        :param certificate: Instance of the Certificate object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        Logger.debug("Function launched.")
43

    
44
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56
                   f"{COL_TYPE_ID},"
57
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61
                      certificate.valid_to,
62
                      certificate.pem_data,
63
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70
                      certificate.type_id,
71
                      certificate.parent_id,
72
                      certificate.private_key_id]
73
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75

    
76
            last_id: int = self.cursor.lastrowid
77

    
78
            if certificate.type_id == ROOT_CA_ID:
79
                certificate.parent_id = last_id
80
                self.update(last_id, certificate)
81
            else:
82
                for usage_id, usage_value in certificate.usages.items():
83
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97
        except OperationalError:
98
            Logger.error(OPERATIONAL_ERROR_MSG)
99
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
100
        except NotSupportedError:
101
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
102
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
103
        except DatabaseError:
104
            Logger.error(DATABASE_ERROR_MSG)
105
            raise DatabaseException(DATABASE_ERROR_MSG)
106
        except Error:
107
            Logger.error(ERROR_MSG)
108
            raise DatabaseException(ERROR_MSG)
109

    
110
        return last_id
111

    
112
    def read(self, certificate_id: int):
113
        """
114
        Reads (selects) a certificate.
115

    
116
        :param certificate_id: ID of specific certificate
117

    
118
        :return: instance of the Certificate object
119
        """
120

    
121
        Logger.debug("Function launched.")
122

    
123
        try:
124
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
125
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
126
            values = [certificate_id]
127
            self.cursor.execute(sql, values)
128
            certificate_row = self.cursor.fetchone()
129

    
130
            if certificate_row is None:
131
                return None
132

    
133
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
134
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
135
            self.cursor.execute(sql, values)
136
            usage_rows = self.cursor.fetchall()
137

    
138
            usage_dict: Dict[int, bool] = {}
139
            for usage_row in usage_rows:
140
                usage_dict[usage_row[2]] = True
141

    
142
            certificate: Certificate = Certificate(certificate_row[0],      # ID
143
                                                   certificate_row[1],      # valid from
144
                                                   certificate_row[2],      # valid to
145
                                                   certificate_row[3],      # pem data
146
                                                   certificate_row[14],     # type ID
147
                                                   certificate_row[15],     # parent ID
148
                                                   certificate_row[16],     # private key ID
149
                                                   usage_dict,
150
                                                   certificate_row[4],      # common name
151
                                                   certificate_row[5],      # country code
152
                                                   certificate_row[6],      # locality
153
                                                   certificate_row[7],      # province
154
                                                   certificate_row[8],      # organization
155
                                                   certificate_row[9],      # organizational unit
156
                                                   certificate_row[10],     # email address
157
                                                   certificate_row[11],     # revocation date
158
                                                   certificate_row[12])     # revocation reason
159

    
160
        except IntegrityError:
161
            Logger.error(INTEGRITY_ERROR_MSG)
162
            raise DatabaseException(INTEGRITY_ERROR_MSG)
163
        except ProgrammingError:
164
            Logger.error(PROGRAMMING_ERROR_MSG)
165
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
166
        except OperationalError:
167
            Logger.error(OPERATIONAL_ERROR_MSG)
168
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
169
        except NotSupportedError:
170
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
171
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
172
        except DatabaseError:
173
            Logger.error(DATABASE_ERROR_MSG)
174
            raise DatabaseException(DATABASE_ERROR_MSG)
175
        except Error:
176
            Logger.error(ERROR_MSG)
177
            raise DatabaseException(ERROR_MSG)
178

    
179
        return certificate
180

    
181
    def read_all(self, filter_type: int = None):
182
        """
183
        Reads (selects) all certificates (with type).
184

    
185
        :param filter_type: ID of certificate type from CertificateTypes table
186

    
187
        :return: list of certificates
188
        """
189

    
190
        Logger.debug("Function launched.")
191

    
192
        try:
193
            sql_extension = ""
194
            values = []
195
            if filter_type is not None:
196
                sql_extension = (f" AND {COL_TYPE_ID} = ("
197
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
198
                values = [filter_type]
199

    
200
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
201
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
202
            self.cursor.execute(sql, values)
203
            certificate_rows = self.cursor.fetchall()
204

    
205
            certificates: List[Certificate] = []
206
            for certificate_row in certificate_rows:
207
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
208
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
209
                values = [certificate_row[0]]
210
                self.cursor.execute(sql, values)
211
                usage_rows = self.cursor.fetchall()
212

    
213
                usage_dict: Dict[int, bool] = {}
214
                for usage_row in usage_rows:
215
                    usage_dict[usage_row[2]] = True
216

    
217
                certificates.append(Certificate(certificate_row[0],      # ID
218
                                                certificate_row[1],      # valid from
219
                                                certificate_row[2],      # valid to
220
                                                certificate_row[3],      # pem data
221
                                                certificate_row[14],     # type ID
222
                                                certificate_row[15],     # parent ID
223
                                                certificate_row[16],     # private key ID
224
                                                usage_dict,
225
                                                certificate_row[4],      # common name
226
                                                certificate_row[5],      # country code
227
                                                certificate_row[6],      # locality
228
                                                certificate_row[7],      # province
229
                                                certificate_row[8],      # organization
230
                                                certificate_row[9],      # organizational unit
231
                                                certificate_row[10],     # email address
232
                                                certificate_row[11],     # revocation date
233
                                                certificate_row[12]))    # revocation reason
234
        except IntegrityError:
235
            Logger.error(INTEGRITY_ERROR_MSG)
236
            raise DatabaseException(INTEGRITY_ERROR_MSG)
237
        except ProgrammingError:
238
            Logger.error(PROGRAMMING_ERROR_MSG)
239
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
240
        except OperationalError:
241
            Logger.error(OPERATIONAL_ERROR_MSG)
242
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
243
        except NotSupportedError:
244
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
245
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
246
        except DatabaseError:
247
            Logger.error(DATABASE_ERROR_MSG)
248
            raise DatabaseException(DATABASE_ERROR_MSG)
249
        except Error:
250
            Logger.error(ERROR_MSG)
251
            raise DatabaseException(ERROR_MSG)
252

    
253
        return certificates
254

    
255
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
256
                        per_page: int):
257
        """
258
        Reads (selects) all certificates according to a specific filtering and pagination options.
259
        :param target_types: certificate types (filter)
260
        :param target_usages: certificate usages (filter)
261
        :param target_cn_substring: certificate CN substring (filter)
262
        :param page: target page
263
        :param per_page: target page size
264
        :return: list of certificates
265
        """
266

    
267
        Logger.debug("Function launched.")
268

    
269
        try:
270
            values = []
271
            values += list(target_types)
272

    
273
            sql = (
274
                f"SELECT * "
275
                f"FROM {TAB_CERTIFICATES} a "
276
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
277
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
278
            )
279
            
280
            if target_usages is not None:
281
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
282
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
283
                values += list(target_usages)
284

    
285
            if target_cn_substring is not None:
286
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
287

    
288
                values += [target_cn_substring]
289

    
290
            if page is not None and per_page is not None:
291
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
292

    
293
            self.cursor.execute(sql, values)
294
            certificate_rows = self.cursor.fetchall()
295

    
296
            certificates: List[Certificate] = []
297
            for certificate_row in certificate_rows:
298
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
299
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
300
                types = [certificate_row[0]]
301
                self.cursor.execute(sql, types)
302
                usage_rows = self.cursor.fetchall()
303

    
304
                usage_dict: Dict[int, bool] = {}
305
                for usage_row in usage_rows:
306
                    usage_dict[usage_row[0]] = True
307

    
308
                certificates.append(Certificate(certificate_row[0],  # ID
309
                                                certificate_row[1],  # valid from
310
                                                certificate_row[2],  # valid to
311
                                                certificate_row[3],  # pem data
312
                                                certificate_row[14],  # type ID
313
                                                certificate_row[15],  # parent ID
314
                                                certificate_row[16],  # private key ID
315
                                                usage_dict,
316
                                                certificate_row[4],  # common name
317
                                                certificate_row[5],  # country code
318
                                                certificate_row[6],  # locality
319
                                                certificate_row[7],  # province
320
                                                certificate_row[8],  # organization
321
                                                certificate_row[9],  # organizational unit
322
                                                certificate_row[10],  # email address
323
                                                certificate_row[11],  # revocation date
324
                                                certificate_row[12]))  # revocation reason
325
        except IntegrityError:
326
            Logger.error(INTEGRITY_ERROR_MSG)
327
            raise DatabaseException(INTEGRITY_ERROR_MSG)
328
        except ProgrammingError:
329
            Logger.error(PROGRAMMING_ERROR_MSG)
330
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
331
        except OperationalError:
332
            Logger.error(OPERATIONAL_ERROR_MSG)
333
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
334
        except NotSupportedError:
335
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
336
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
337
        except DatabaseError:
338
            Logger.error(DATABASE_ERROR_MSG)
339
            raise DatabaseException(DATABASE_ERROR_MSG)
340
        except Error:
341
            Logger.error(ERROR_MSG)
342
            raise DatabaseException(ERROR_MSG)
343

    
344
        return certificates
345

    
346
    def update(self, certificate_id: int, certificate: Certificate):
347
        """
348
        Updates a certificate.
349
        If the parameter of certificate (Certificate object) is not to be changed,
350
        the same value must be specified.
351

    
352
        :param certificate_id: ID of specific certificate
353
        :param certificate: Instance of the Certificate object
354

    
355
        :return: the result of whether the updation was successful
356
        """
357

    
358
        Logger.debug("Function launched.")
359

    
360
        try:
361
            sql = (f"UPDATE {TAB_CERTIFICATES} "
362
                   f"SET {COL_VALID_FROM} = ?, "
363
                   f"{COL_VALID_TO} = ?, "
364
                   f"{COL_PEM_DATA} = ?, "
365
                   f"{COL_COMMON_NAME} = ?, "
366
                   f"{COL_COUNTRY_CODE} = ?, "
367
                   f"{COL_LOCALITY} = ?, "
368
                   f"{COL_PROVINCE} = ?, "
369
                   f"{COL_ORGANIZATION} = ?, "
370
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
371
                   f"{COL_EMAIL_ADDRESS} = ?, "
372
                   f"{COL_TYPE_ID} = ?, "
373
                   f"{COL_PARENT_ID} = ?, "
374
                   f"{COL_PRIVATE_KEY_ID} = ? "
375
                   f"WHERE {COL_ID} = ?")
376
            values = [certificate.valid_from,
377
                      certificate.valid_to,
378
                      certificate.pem_data,
379
                      certificate.common_name,
380
                      certificate.country_code,
381
                      certificate.locality,
382
                      certificate.province,
383
                      certificate.organization,
384
                      certificate.organizational_unit,
385
                      certificate.email_address,
386
                      certificate.type_id,
387
                      certificate.parent_id,
388
                      certificate.private_key_id,
389
                      certificate_id]
390

    
391
            self.cursor.execute(sql, values)
392
            self.connection.commit()
393

    
394
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
395
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
396
            values = [certificate_id]
397
            self.cursor.execute(sql, values)
398
            self.connection.commit()
399

    
400
            check_updated = self.cursor.rowcount > 0
401

    
402
            # iterate over usage pairs
403
            for usage_id, usage_value in certificate.usages.items():
404
                if usage_value:
405
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
406
                           f"({COL_CERTIFICATE_ID},"
407
                           f"{COL_USAGE_TYPE_ID}) "
408
                           f"VALUES (?,?)")
409
                    values = [certificate_id, usage_id]
410
                    self.cursor.execute(sql, values)
411
                    self.connection.commit()
412
        except IntegrityError:
413
            Logger.error(INTEGRITY_ERROR_MSG)
414
            raise DatabaseException(INTEGRITY_ERROR_MSG)
415
        except ProgrammingError:
416
            Logger.error(PROGRAMMING_ERROR_MSG)
417
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
418
        except OperationalError:
419
            Logger.error(OPERATIONAL_ERROR_MSG)
420
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
421
        except NotSupportedError:
422
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
423
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
424
        except DatabaseError:
425
            Logger.error(DATABASE_ERROR_MSG)
426
            raise DatabaseException(DATABASE_ERROR_MSG)
427
        except Error:
428
            Logger.error(ERROR_MSG)
429
            raise DatabaseException(ERROR_MSG)
430

    
431
        return check_updated
432

    
433
    def delete(self, certificate_id: int):
434
        """
435
        Deletes a certificate
436

    
437
        :param certificate_id: ID of specific certificate
438

    
439
        :return: the result of whether the deletion was successful
440
        """
441

    
442
        Logger.debug("Function launched.")
443

    
444
        try:
445
            sql = (f"UPDATE {TAB_CERTIFICATES} "
446
                   f"SET {COL_DELETION_DATE} = ? "
447
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
448
            values = [int(time.time()),
449
                      certificate_id]
450
            self.cursor.execute(sql, values)
451
            self.connection.commit()
452
        except IntegrityError:
453
            Logger.error(INTEGRITY_ERROR_MSG)
454
            raise DatabaseException(INTEGRITY_ERROR_MSG)
455
        except ProgrammingError:
456
            Logger.error(PROGRAMMING_ERROR_MSG)
457
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
458
        except OperationalError:
459
            Logger.error(OPERATIONAL_ERROR_MSG)
460
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
461
        except NotSupportedError:
462
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
463
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
464
        except DatabaseError:
465
            Logger.error(DATABASE_ERROR_MSG)
466
            raise DatabaseException(DATABASE_ERROR_MSG)
467
        except Error:
468
            Logger.error(ERROR_MSG)
469
            raise DatabaseException(ERROR_MSG)
470

    
471
        return self.cursor.rowcount > 0
472

    
473
    def set_certificate_revoked(
474
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
475
        """
476
        Revoke a certificate
477

    
478
        :param certificate_id: ID of specific certificate
479
        :param revocation_date: Date, when the certificate is revoked
480
        :param revocation_reason: Reason of the revocation
481

    
482
        :return:
483
            the result of whether the revocation was successful OR
484
            sqlite3.Error if an exception is thrown
485
        """
486

    
487
        Logger.debug("Function launched.")
488

    
489
        try:
490
            if revocation_date != "" and revocation_reason == "":
491
                revocation_reason = REV_REASON_UNSPECIFIED
492
            elif revocation_date == "":
493
                return False
494

    
495
            sql = (f"UPDATE {TAB_CERTIFICATES} "
496
                   f"SET {COL_REVOCATION_DATE} = ?, "
497
                   f"{COL_REVOCATION_REASON} = ? "
498
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
499
            values = [revocation_date,
500
                      revocation_reason,
501
                      certificate_id]
502
            self.cursor.execute(sql, values)
503
            self.connection.commit()
504
        except IntegrityError:
505
            Logger.error(INTEGRITY_ERROR_MSG)
506
            raise DatabaseException(INTEGRITY_ERROR_MSG)
507
        except ProgrammingError:
508
            Logger.error(PROGRAMMING_ERROR_MSG)
509
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
510
        except OperationalError:
511
            Logger.error(OPERATIONAL_ERROR_MSG)
512
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
513
        except NotSupportedError:
514
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
515
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
516
        except DatabaseError:
517
            Logger.error(DATABASE_ERROR_MSG)
518
            raise DatabaseException(DATABASE_ERROR_MSG)
519
        except Error:
520
            Logger.error(ERROR_MSG)
521
            raise DatabaseException(ERROR_MSG)
522

    
523
        return self.cursor.rowcount > 0
524

    
525
    def clear_certificate_revocation(self, certificate_id: int):
526
        """
527
        Clear revocation of a certificate
528

    
529
        :param certificate_id: ID of specific certificate
530

    
531
        :return:
532
            the result of whether the clear revocation was successful OR
533
            sqlite3.Error if an exception is thrown
534
        """
535

    
536
        Logger.debug("Function launched.")
537

    
538
        try:
539
            sql = (f"UPDATE {TAB_CERTIFICATES} "
540
                   f"SET {COL_REVOCATION_DATE} = NULL, "
541
                   f"{COL_REVOCATION_REASON} = NULL "
542
                   f"WHERE {COL_ID} = ?")
543
            values = [certificate_id]
544
            self.cursor.execute(sql, values)
545
            self.connection.commit()
546
        except IntegrityError:
547
            Logger.error(INTEGRITY_ERROR_MSG)
548
            raise DatabaseException(INTEGRITY_ERROR_MSG)
549
        except ProgrammingError:
550
            Logger.error(PROGRAMMING_ERROR_MSG)
551
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
552
        except OperationalError:
553
            Logger.error(OPERATIONAL_ERROR_MSG)
554
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
555
        except NotSupportedError:
556
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
557
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
558
        except DatabaseError:
559
            Logger.error(DATABASE_ERROR_MSG)
560
            raise DatabaseException(DATABASE_ERROR_MSG)
561
        except Error:
562
            Logger.error(ERROR_MSG)
563
            raise DatabaseException(ERROR_MSG)
564

    
565
        return self.cursor.rowcount > 0
566

    
567
    def get_all_revoked_by(self, certificate_id: int):
568
        """
569
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
570

    
571
        :param certificate_id: ID of specific certificate
572

    
573
        :return:
574
            list of the certificates OR
575
            None if the list is empty OR
576
            sqlite3.Error if an exception is thrown
577
        """
578

    
579
        Logger.debug("Function launched.")
580

    
581
        try:
582
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
583
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
584
            values = [certificate_id]
585
            self.cursor.execute(sql, values)
586
            certificate_rows = self.cursor.fetchall()
587

    
588
            certificates: List[Certificate] = []
589
            for certificate_row in certificate_rows:
590
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
591
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
592
                values = [certificate_row[0]]
593
                self.cursor.execute(sql, values)
594
                usage_rows = self.cursor.fetchall()
595

    
596
                usage_dict: Dict[int, bool] = {}
597
                for usage_row in usage_rows:
598
                    usage_dict[usage_row[2]] = True
599

    
600
                certificates.append(Certificate(certificate_row[0],      # ID
601
                                                certificate_row[1],      # valid from
602
                                                certificate_row[2],      # valid to
603
                                                certificate_row[3],      # pem data
604
                                                certificate_row[14],     # type ID
605
                                                certificate_row[15],     # parent ID
606
                                                certificate_row[16],     # private key ID
607
                                                usage_dict,
608
                                                certificate_row[4],      # common name
609
                                                certificate_row[5],      # country code
610
                                                certificate_row[6],      # locality
611
                                                certificate_row[7],      # province
612
                                                certificate_row[8],      # organization
613
                                                certificate_row[9],      # organizational unit
614
                                                certificate_row[10],     # email address
615
                                                certificate_row[11],     # revocation date
616
                                                certificate_row[12]))    # revocation reason
617
        except IntegrityError:
618
            Logger.error(INTEGRITY_ERROR_MSG)
619
            raise DatabaseException(INTEGRITY_ERROR_MSG)
620
        except ProgrammingError:
621
            Logger.error(PROGRAMMING_ERROR_MSG)
622
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
623
        except OperationalError:
624
            Logger.error(OPERATIONAL_ERROR_MSG)
625
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
626
        except NotSupportedError:
627
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
628
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
629
        except DatabaseError:
630
            Logger.error(DATABASE_ERROR_MSG)
631
            raise DatabaseException(DATABASE_ERROR_MSG)
632
        except Error:
633
            Logger.error(ERROR_MSG)
634
            raise DatabaseException(ERROR_MSG)
635

    
636
        return certificates
637

    
638
    def get_all_issued_by(self, certificate_id: int):
639
        """
640
        Get list of the certificates that are direct descendants of the certificate with the ID
641

    
642
        :param certificate_id: ID of specific certificate
643

    
644
        :return:
645
            list of the certificates OR
646
            None if the list is empty OR
647
            sqlite3.Error if an exception is thrown
648
        """
649

    
650
        Logger.debug("Function launched.")
651

    
652
        try:
653
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
654
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
655
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
656
            values = [certificate_id, certificate_id]
657
            self.cursor.execute(sql, values)
658
            certificate_rows = self.cursor.fetchall()
659

    
660
            certificates: List[Certificate] = []
661
            for certificate_row in certificate_rows:
662
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
663
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
664
                values = [certificate_row[0]]
665
                self.cursor.execute(sql, values)
666
                usage_rows = self.cursor.fetchall()
667

    
668
                usage_dict: Dict[int, bool] = {}
669
                for usage_row in usage_rows:
670
                    usage_dict[usage_row[2]] = True
671

    
672
                certificates.append(Certificate(certificate_row[0],      # ID
673
                                                certificate_row[1],      # valid from
674
                                                certificate_row[2],      # valid to
675
                                                certificate_row[3],      # pem data
676
                                                certificate_row[14],     # type ID
677
                                                certificate_row[15],     # parent ID
678
                                                certificate_row[16],     # private key ID
679
                                                usage_dict,
680
                                                certificate_row[4],      # common name
681
                                                certificate_row[5],      # country code
682
                                                certificate_row[6],      # locality
683
                                                certificate_row[7],      # province
684
                                                certificate_row[8],      # organization
685
                                                certificate_row[9],      # organizational unit
686
                                                certificate_row[10],     # email address
687
                                                certificate_row[11],     # revocation date
688
                                                certificate_row[12]))    # revocation reason
689
        except IntegrityError:
690
            Logger.error(INTEGRITY_ERROR_MSG)
691
            raise DatabaseException(INTEGRITY_ERROR_MSG)
692
        except ProgrammingError:
693
            Logger.error(PROGRAMMING_ERROR_MSG)
694
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
695
        except OperationalError:
696
            Logger.error(OPERATIONAL_ERROR_MSG)
697
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
698
        except NotSupportedError:
699
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
700
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
701
        except DatabaseError:
702
            Logger.error(DATABASE_ERROR_MSG)
703
            raise DatabaseException(DATABASE_ERROR_MSG)
704
        except Error:
705
            Logger.error(ERROR_MSG)
706
            raise DatabaseException(ERROR_MSG)
707

    
708
        return certificates
709

    
710
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
711
        """
712
        Get list of the certificates that are direct descendants of the certificate with the ID
713

    
714
        :param target_types: certificate types (filter)
715
        :param target_usages: certificate usages (filter)
716
        :param target_cn_substring: certificate CN substring (filter)
717
        :param page: target page
718
        :param per_page: target page size
719
        :param issuer_id: ID of specific certificate
720

    
721
        :return:
722
            list of the certificates OR
723
            None if the list is empty
724
        """
725

    
726
        Logger.debug("Function launched.")
727

    
728
        try:
729
            values = [issuer_id, issuer_id]
730
            values += list(target_types)
731

    
732
            sql = (
733
                f"SELECT * "
734
                f"FROM {TAB_CERTIFICATES} a "
735
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
736
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
737
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
738
                
739
            )
740
            
741
            if target_usages is not None:
742
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
743
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
744
                values += list(target_usages)
745

    
746
            if target_cn_substring is not None:
747
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
748
                values += [target_cn_substring]
749

    
750
            if page is not None and per_page is not None:
751
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
752

    
753
            self.cursor.execute(sql, values)
754
            certificate_rows = self.cursor.fetchall()
755

    
756
            certificates: List[Certificate] = []
757
            for certificate_row in certificate_rows:
758
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
759
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
760
                values = [certificate_row[0]]
761
                self.cursor.execute(sql, values)
762
                usage_rows = self.cursor.fetchall()
763

    
764
                usage_dict: Dict[int, bool] = {}
765
                for usage_row in usage_rows:
766
                    usage_dict[usage_row[2]] = True
767

    
768
                certificates.append(Certificate(certificate_row[0],  # ID
769
                                                certificate_row[1],  # valid from
770
                                                certificate_row[2],  # valid to
771
                                                certificate_row[3],  # pem data
772
                                                certificate_row[14],  # type ID
773
                                                certificate_row[15],  # parent ID
774
                                                certificate_row[16],  # private key ID
775
                                                usage_dict,
776
                                                certificate_row[4],  # common name
777
                                                certificate_row[5],  # country code
778
                                                certificate_row[6],  # locality
779
                                                certificate_row[7],  # province
780
                                                certificate_row[8],  # organization
781
                                                certificate_row[9],  # organizational unit
782
                                                certificate_row[10],  # email address
783
                                                certificate_row[11],  # revocation date
784
                                                certificate_row[12]))  # revocation reason
785
        except IntegrityError:
786
            Logger.error(INTEGRITY_ERROR_MSG)
787
            raise DatabaseException(INTEGRITY_ERROR_MSG)
788
        except ProgrammingError:
789
            Logger.error(PROGRAMMING_ERROR_MSG)
790
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
791
        except OperationalError:
792
            Logger.error(OPERATIONAL_ERROR_MSG)
793
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
794
        except NotSupportedError:
795
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
796
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
797
        except DatabaseError:
798
            Logger.error(DATABASE_ERROR_MSG)
799
            raise DatabaseException(DATABASE_ERROR_MSG)
800
        except Error:
801
            Logger.error(ERROR_MSG)
802
            raise DatabaseException(ERROR_MSG)
803

    
804
        return certificates
805

    
806
    def get_all_descendants_of(self, certificate_id: int):
807
        """
808
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
809
        between C and its root certificate authority (i.e. is an ancestor of C).
810
        :param certificate_id: target certificate ID
811
        :return: list of all descendants
812
        """
813

    
814
        Logger.debug("Function launched.")
815

    
816
        try:
817
            def dfs(children_of, this, collection: list):
818
                for child in children_of(this.certificate_id):
819
                    dfs(children_of, child, collection)
820
                collection.append(this)
821

    
822
            subtree_root = self.read(certificate_id)
823
            if subtree_root is None:
824
                return None
825

    
826
            all_certs = []
827
            dfs(self.get_all_issued_by, subtree_root, all_certs)
828
        except IntegrityError:
829
            Logger.error(INTEGRITY_ERROR_MSG)
830
            raise DatabaseException(INTEGRITY_ERROR_MSG)
831
        except ProgrammingError:
832
            Logger.error(PROGRAMMING_ERROR_MSG)
833
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
834
        except OperationalError:
835
            Logger.error(OPERATIONAL_ERROR_MSG)
836
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
837
        except NotSupportedError:
838
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
839
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
840
        except DatabaseError:
841
            Logger.error(DATABASE_ERROR_MSG)
842
            raise DatabaseException(DATABASE_ERROR_MSG)
843
        except Error:
844
            Logger.error(ERROR_MSG)
845
            raise DatabaseException(ERROR_MSG)
846

    
847
        return all_certs
848

    
849
    def get_next_id(self) -> int:
850
        """
851
        Get identifier of the next certificate that will be inserted into the database
852
        :return: identifier of the next certificate that will be added into the database
853
        """
854

    
855
        Logger.debug("Function launched.")
856

    
857
        try:
858
            # get next IDs of all tables
859
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
860
            results = self.cursor.fetchall()
861

    
862
            # search for next ID in Certificates table and return it
863
            for result in results:
864
                if result[0] == TAB_CERTIFICATES:
865
                    return result[1] + 1  # current last id + 1
866
            # if certificates table is not present in the query results, return 1
867
        except IntegrityError:
868
            Logger.error(INTEGRITY_ERROR_MSG)
869
            raise DatabaseException(INTEGRITY_ERROR_MSG)
870
        except ProgrammingError:
871
            Logger.error(PROGRAMMING_ERROR_MSG)
872
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
873
        except OperationalError:
874
            Logger.error(OPERATIONAL_ERROR_MSG)
875
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
876
        except NotSupportedError:
877
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
878
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
879
        except DatabaseError:
880
            Logger.error(DATABASE_ERROR_MSG)
881
            raise DatabaseException(DATABASE_ERROR_MSG)
882
        except Error:
883
            Logger.error(ERROR_MSG)
884
            raise DatabaseException(ERROR_MSG)
885

    
886
        return 1
(2-2/3)