Projekt

Obecné

Profil

Stáhnout (37.6 KB) Statistiky
| Větev: | Tag: | Revize:
1 9a6c9613 Jan Pašek
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 6422796d Stanislav Král
6 2cecaf70 Jan Pašek
from src.exceptions.database_exception import DatabaseException
7 9a6c9613 Jan Pašek
from injector import inject
8
from src.constants import *
9
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 5b57121e Captain_Trojan
12 9a6c9613 Jan Pašek
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18 5b57121e Captain_Trojan
19
20 9a6c9613 Jan Pašek
class CertificateRepository:
21 5b57121e Captain_Trojan
22 9a6c9613 Jan Pašek
    @inject
23
    def __init__(self, connection: Connection):
24 5b57121e Captain_Trojan
        """
25 9a6c9613 Jan Pašek
        Constructor of the CertificateRepository object
26 5e31b492 David Friesecký
27 9a6c9613 Jan Pašek
        :param connection: Instance of the Connection object
28 5b57121e Captain_Trojan
        """
29 9a6c9613 Jan Pašek
        self.connection = connection
30
        self.cursor = connection.cursor()
31 5e31b492 David Friesecký
32 9a6c9613 Jan Pašek
    def create(self, certificate: Certificate):
33
        """
34
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36 5b57121e Captain_Trojan
37 9a6c9613 Jan Pašek
        :param certificate: Instance of the Certificate object
38 5b57121e Captain_Trojan
39 9a6c9613 Jan Pašek
        :return: the result of whether the creation was successful
40 5b57121e Captain_Trojan
        """
41 5e31b492 David Friesecký
42 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
43 5e31b492 David Friesecký
44 5b6d9513 Captain_Trojan
        try:
45 9a6c9613 Jan Pašek
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_VALID_FROM},"
47
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56
                   f"{COL_TYPE_ID},"
57
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61
                      certificate.valid_to,
62
                      certificate.pem_data,
63
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70
                      certificate.type_id,
71
                      certificate.parent_id,
72
                      certificate.private_key_id]
73
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75
76
            last_id: int = self.cursor.lastrowid
77
78
            if certificate.type_id == ROOT_CA_ID:
79
                certificate.parent_id = last_id
80
                self.update(last_id, certificate)
81
            else:
82
                for usage_id, usage_value in certificate.usages.items():
83
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97
        except OperationalError:
98
            Logger.error(OPERATIONAL_ERROR_MSG)
99
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
100
        except NotSupportedError:
101
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
102
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
103
        except DatabaseError:
104
            Logger.error(DATABASE_ERROR_MSG)
105
            raise DatabaseException(DATABASE_ERROR_MSG)
106
        except Error:
107
            Logger.error(ERROR_MSG)
108
            raise DatabaseException(ERROR_MSG)
109
110
        return last_id
111
112
    def read(self, certificate_id: int):
113
        """
114
        Reads (selects) a certificate.
115 5b6d9513 Captain_Trojan
116 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
117 5b6d9513 Captain_Trojan
118 9a6c9613 Jan Pašek
        :return: instance of the Certificate object
119
        """
120 a53e5aef Jan Pašek
121 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
122 a53e5aef Jan Pašek
123
        try:
124 9a6c9613 Jan Pašek
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
125
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
126
            values = [certificate_id]
127
            self.cursor.execute(sql, values)
128
            certificate_row = self.cursor.fetchone()
129
130
            if certificate_row is None:
131
                return None
132
133
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
134
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
135
            self.cursor.execute(sql, values)
136
            usage_rows = self.cursor.fetchall()
137
138
            usage_dict: Dict[int, bool] = {}
139
            for usage_row in usage_rows:
140
                usage_dict[usage_row[2]] = True
141
142
            certificate: Certificate = Certificate(certificate_row[0],      # ID
143
                                                   certificate_row[1],      # valid from
144
                                                   certificate_row[2],      # valid to
145
                                                   certificate_row[3],      # pem data
146
                                                   certificate_row[14],     # type ID
147
                                                   certificate_row[15],     # parent ID
148
                                                   certificate_row[16],     # private key ID
149
                                                   usage_dict,
150
                                                   certificate_row[4],      # common name
151
                                                   certificate_row[5],      # country code
152
                                                   certificate_row[6],      # locality
153
                                                   certificate_row[7],      # province
154
                                                   certificate_row[8],      # organization
155
                                                   certificate_row[9],      # organizational unit
156
                                                   certificate_row[10],     # email address
157
                                                   certificate_row[11],     # revocation date
158
                                                   certificate_row[12])     # revocation reason
159
160
        except IntegrityError:
161
            Logger.error(INTEGRITY_ERROR_MSG)
162
            raise DatabaseException(INTEGRITY_ERROR_MSG)
163
        except ProgrammingError:
164
            Logger.error(PROGRAMMING_ERROR_MSG)
165
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
166
        except OperationalError:
167
            Logger.error(OPERATIONAL_ERROR_MSG)
168
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
169
        except NotSupportedError:
170
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
171
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
172
        except DatabaseError:
173
            Logger.error(DATABASE_ERROR_MSG)
174
            raise DatabaseException(DATABASE_ERROR_MSG)
175
        except Error:
176
            Logger.error(ERROR_MSG)
177
            raise DatabaseException(ERROR_MSG)
178
179
        return certificate
180
181
    def read_all(self, filter_type: int = None):
182 5b57121e Captain_Trojan
        """
183 9a6c9613 Jan Pašek
        Reads (selects) all certificates (with type).
184 5e31b492 David Friesecký
185 9a6c9613 Jan Pašek
        :param filter_type: ID of certificate type from CertificateTypes table
186 5b57121e Captain_Trojan
187 9a6c9613 Jan Pašek
        :return: list of certificates
188 5b57121e Captain_Trojan
        """
189 5e31b492 David Friesecký
190 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
191 5e31b492 David Friesecký
192 d53c2fdc Captain_Trojan
        try:
193 9a6c9613 Jan Pašek
            sql_extension = ""
194
            values = []
195
            if filter_type is not None:
196
                sql_extension = (f" AND {COL_TYPE_ID} = ("
197
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
198
                values = [filter_type]
199
200
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
201
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
202
            self.cursor.execute(sql, values)
203
            certificate_rows = self.cursor.fetchall()
204
205
            certificates: List[Certificate] = []
206
            for certificate_row in certificate_rows:
207
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
208
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
209
                values = [certificate_row[0]]
210
                self.cursor.execute(sql, values)
211
                usage_rows = self.cursor.fetchall()
212
213
                usage_dict: Dict[int, bool] = {}
214
                for usage_row in usage_rows:
215
                    usage_dict[usage_row[2]] = True
216
217
                certificates.append(Certificate(certificate_row[0],      # ID
218
                                                certificate_row[1],      # valid from
219
                                                certificate_row[2],      # valid to
220
                                                certificate_row[3],      # pem data
221
                                                certificate_row[14],     # type ID
222
                                                certificate_row[15],     # parent ID
223
                                                certificate_row[16],     # private key ID
224
                                                usage_dict,
225
                                                certificate_row[4],      # common name
226
                                                certificate_row[5],      # country code
227
                                                certificate_row[6],      # locality
228
                                                certificate_row[7],      # province
229
                                                certificate_row[8],      # organization
230
                                                certificate_row[9],      # organizational unit
231
                                                certificate_row[10],     # email address
232
                                                certificate_row[11],     # revocation date
233
                                                certificate_row[12]))    # revocation reason
234
        except IntegrityError:
235
            Logger.error(INTEGRITY_ERROR_MSG)
236
            raise DatabaseException(INTEGRITY_ERROR_MSG)
237
        except ProgrammingError:
238
            Logger.error(PROGRAMMING_ERROR_MSG)
239
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
240
        except OperationalError:
241
            Logger.error(OPERATIONAL_ERROR_MSG)
242
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
243
        except NotSupportedError:
244
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
245
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
246
        except DatabaseError:
247
            Logger.error(DATABASE_ERROR_MSG)
248
            raise DatabaseException(DATABASE_ERROR_MSG)
249
        except Error:
250
            Logger.error(ERROR_MSG)
251
            raise DatabaseException(ERROR_MSG)
252
253
        return certificates
254
255
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
256
                        per_page: int):
257
        """
258
        Reads (selects) all certificates according to a specific filtering and pagination options.
259
        :param target_types: certificate types (filter)
260
        :param target_usages: certificate usages (filter)
261
        :param target_cn_substring: certificate CN substring (filter)
262
        :param page: target page
263
        :param per_page: target page size
264
        :return: list of certificates
265
        """
266 d53c2fdc Captain_Trojan
267 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
268 5b57121e Captain_Trojan
269 9a6c9613 Jan Pašek
        try:
270
            values = []
271
            values += list(target_types)
272
            values += list(target_usages)
273
274
            sql = (
275
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
276
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
277
                f"FROM {TAB_CERTIFICATES} a "
278
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
279
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
280
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
281
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
282
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
283
                f") > 0 "
284
            )
285 5b57121e Captain_Trojan
286 9a6c9613 Jan Pašek
            if target_cn_substring is not None:
287
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
288
289
                values += [target_cn_substring]
290
291
            if page is not None and per_page is not None:
292
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
293
294
            self.cursor.execute(sql, values)
295
            certificate_rows = self.cursor.fetchall()
296
297
            certificates: List[Certificate] = []
298
            for certificate_row in certificate_rows:
299
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
300
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
301
                types = [certificate_row[0]]
302
                self.cursor.execute(sql, types)
303
                usage_rows = self.cursor.fetchall()
304
305
                usage_dict: Dict[int, bool] = {}
306
                for usage_row in usage_rows:
307
                    usage_dict[usage_row[0]] = True
308
309
                certificates.append(Certificate(certificate_row[0],
310
                                                certificate_row[1],
311
                                                certificate_row[2],
312
                                                certificate_row[3],
313
                                                certificate_row[4],
314
                                                certificate_row[7],
315
                                                certificate_row[8],
316
                                                certificate_row[9],
317
                                                usage_dict,
318
                                                certificate_row[5],
319
                                                certificate_row[6]))
320
        except IntegrityError:
321
            Logger.error(INTEGRITY_ERROR_MSG)
322
            raise DatabaseException(INTEGRITY_ERROR_MSG)
323
        except ProgrammingError:
324
            Logger.error(PROGRAMMING_ERROR_MSG)
325
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
326
        except OperationalError:
327
            Logger.error(OPERATIONAL_ERROR_MSG)
328
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
329
        except NotSupportedError:
330
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
331
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
332
        except DatabaseError:
333
            Logger.error(DATABASE_ERROR_MSG)
334
            raise DatabaseException(DATABASE_ERROR_MSG)
335
        except Error:
336
            Logger.error(ERROR_MSG)
337
            raise DatabaseException(ERROR_MSG)
338
339
        return certificates
340
341
    def update(self, certificate_id: int, certificate: Certificate):
342
        """
343
        Updates a certificate.
344
        If the parameter of certificate (Certificate object) is not to be changed,
345
        the same value must be specified.
346 5b57121e Captain_Trojan
347 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
348
        :param certificate: Instance of the Certificate object
349 5b57121e Captain_Trojan
350 9a6c9613 Jan Pašek
        :return: the result of whether the updation was successful
351 5b57121e Captain_Trojan
        """
352
353 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
354 5e31b492 David Friesecký
355 aa740737 Captain_Trojan
        try:
356 9a6c9613 Jan Pašek
            sql = (f"UPDATE {TAB_CERTIFICATES} "
357
                   f"SET {COL_VALID_FROM} = ?, "
358
                   f"{COL_VALID_TO} = ?, "
359
                   f"{COL_PEM_DATA} = ?, "
360
                   f"{COL_COMMON_NAME} = ?, "
361
                   f"{COL_COUNTRY_CODE} = ?, "
362
                   f"{COL_LOCALITY} = ?, "
363
                   f"{COL_PROVINCE} = ?, "
364
                   f"{COL_ORGANIZATION} = ?, "
365
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
366
                   f"{COL_EMAIL_ADDRESS} = ?, "
367
                   f"{COL_TYPE_ID} = ?, "
368
                   f"{COL_PARENT_ID} = ?, "
369
                   f"{COL_PRIVATE_KEY_ID} = ? "
370
                   f"WHERE {COL_ID} = ?")
371
            values = [certificate.valid_from,
372
                      certificate.valid_to,
373
                      certificate.pem_data,
374
                      certificate.common_name,
375
                      certificate.country_code,
376
                      certificate.locality,
377
                      certificate.province,
378
                      certificate.organization,
379
                      certificate.organizational_unit,
380
                      certificate.email_address,
381
                      certificate.type_id,
382
                      certificate.parent_id,
383
                      certificate.private_key_id,
384
                      certificate_id]
385
386
            self.cursor.execute(sql, values)
387
            self.connection.commit()
388
389
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
390
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
391
            values = [certificate_id]
392
            self.cursor.execute(sql, values)
393
            self.connection.commit()
394
395
            check_updated = self.cursor.rowcount > 0
396
397
            # iterate over usage pairs
398
            for usage_id, usage_value in certificate.usages.items():
399
                if usage_value:
400
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
401
                           f"({COL_CERTIFICATE_ID},"
402
                           f"{COL_USAGE_TYPE_ID}) "
403
                           f"VALUES (?,?)")
404
                    values = [certificate_id, usage_id]
405
                    self.cursor.execute(sql, values)
406
                    self.connection.commit()
407
        except IntegrityError:
408
            Logger.error(INTEGRITY_ERROR_MSG)
409
            raise DatabaseException(INTEGRITY_ERROR_MSG)
410
        except ProgrammingError:
411
            Logger.error(PROGRAMMING_ERROR_MSG)
412
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
413
        except OperationalError:
414
            Logger.error(OPERATIONAL_ERROR_MSG)
415
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
416
        except NotSupportedError:
417
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
418
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
419
        except DatabaseError:
420
            Logger.error(DATABASE_ERROR_MSG)
421
            raise DatabaseException(DATABASE_ERROR_MSG)
422
        except Error:
423
            Logger.error(ERROR_MSG)
424
            raise DatabaseException(ERROR_MSG)
425
426
        return check_updated
427
428
    def delete(self, certificate_id: int):
429
        """
430
        Deletes a certificate
431 aa740737 Captain_Trojan
432 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
433 aa740737 Captain_Trojan
434 9a6c9613 Jan Pašek
        :return: the result of whether the deletion was successful
435
        """
436 aa740737 Captain_Trojan
437 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
438 11a90594 Jan Pašek
439 9a6c9613 Jan Pašek
        try:
440
            sql = (f"UPDATE {TAB_CERTIFICATES} "
441
                   f"SET {COL_DELETION_DATE} = ? "
442
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
443
            values = [int(time.time()),
444
                      certificate_id]
445
            self.cursor.execute(sql, values)
446
            self.connection.commit()
447
        except IntegrityError:
448
            Logger.error(INTEGRITY_ERROR_MSG)
449
            raise DatabaseException(INTEGRITY_ERROR_MSG)
450
        except ProgrammingError:
451
            Logger.error(PROGRAMMING_ERROR_MSG)
452
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
453
        except OperationalError:
454
            Logger.error(OPERATIONAL_ERROR_MSG)
455
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
456
        except NotSupportedError:
457
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
458
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
459
        except DatabaseError:
460
            Logger.error(DATABASE_ERROR_MSG)
461
            raise DatabaseException(DATABASE_ERROR_MSG)
462
        except Error:
463
            Logger.error(ERROR_MSG)
464
            raise DatabaseException(ERROR_MSG)
465
466
        return self.cursor.rowcount > 0
467
468
    def set_certificate_revoked(
469
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
470
        """
471
        Revoke a certificate
472 5b6d9513 Captain_Trojan
473 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
474
        :param revocation_date: Date, when the certificate is revoked
475
        :param revocation_reason: Reason of the revocation
476 5b6d9513 Captain_Trojan
477 9a6c9613 Jan Pašek
        :return:
478
            the result of whether the revocation was successful OR
479
            sqlite3.Error if an exception is thrown
480 2cecaf70 Jan Pašek
        """
481
482 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
483 5e31b492 David Friesecký
484 9a6c9613 Jan Pašek
        try:
485
            if revocation_date != "" and revocation_reason == "":
486
                revocation_reason = REV_REASON_UNSPECIFIED
487
            elif revocation_date == "":
488
                return False
489
490
            sql = (f"UPDATE {TAB_CERTIFICATES} "
491
                   f"SET {COL_REVOCATION_DATE} = ?, "
492
                   f"{COL_REVOCATION_REASON} = ? "
493
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
494
            values = [revocation_date,
495
                      revocation_reason,
496
                      certificate_id]
497
            self.cursor.execute(sql, values)
498
            self.connection.commit()
499
        except IntegrityError:
500
            Logger.error(INTEGRITY_ERROR_MSG)
501
            raise DatabaseException(INTEGRITY_ERROR_MSG)
502
        except ProgrammingError:
503
            Logger.error(PROGRAMMING_ERROR_MSG)
504
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
505
        except OperationalError:
506
            Logger.error(OPERATIONAL_ERROR_MSG)
507
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
508
        except NotSupportedError:
509
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
510
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
511
        except DatabaseError:
512
            Logger.error(DATABASE_ERROR_MSG)
513
            raise DatabaseException(DATABASE_ERROR_MSG)
514
        except Error:
515
            Logger.error(ERROR_MSG)
516
            raise DatabaseException(ERROR_MSG)
517
518
        return self.cursor.rowcount > 0
519
520
    def clear_certificate_revocation(self, certificate_id: int):
521 9247d70a Captain_Trojan
        """
522 9a6c9613 Jan Pašek
        Clear revocation of a certificate
523 5e31b492 David Friesecký
524 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
525 5e31b492 David Friesecký
526 9a6c9613 Jan Pašek
        :return:
527
            the result of whether the clear revocation was successful OR
528
            sqlite3.Error if an exception is thrown
529
        """
530 5b6d9513 Captain_Trojan
531 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
532 cfda1725 Stanislav Král
533 9a6c9613 Jan Pašek
        try:
534
            sql = (f"UPDATE {TAB_CERTIFICATES} "
535
                   f"SET {COL_REVOCATION_DATE} = NULL, "
536
                   f"{COL_REVOCATION_REASON} = NULL "
537
                   f"WHERE {COL_ID} = ?")
538
            values = [certificate_id]
539
            self.cursor.execute(sql, values)
540
            self.connection.commit()
541
        except IntegrityError:
542
            Logger.error(INTEGRITY_ERROR_MSG)
543
            raise DatabaseException(INTEGRITY_ERROR_MSG)
544
        except ProgrammingError:
545
            Logger.error(PROGRAMMING_ERROR_MSG)
546
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
547
        except OperationalError:
548
            Logger.error(OPERATIONAL_ERROR_MSG)
549
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
550
        except NotSupportedError:
551
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
552
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
553
        except DatabaseError:
554
            Logger.error(DATABASE_ERROR_MSG)
555
            raise DatabaseException(DATABASE_ERROR_MSG)
556
        except Error:
557
            Logger.error(ERROR_MSG)
558
            raise DatabaseException(ERROR_MSG)
559
560
        return self.cursor.rowcount > 0
561
562
    def get_all_revoked_by(self, certificate_id: int):
563 cfda1725 Stanislav Král
        """
564 9a6c9613 Jan Pašek
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
565 cfda1725 Stanislav Král
566 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
567 ce8b9aaf Stanislav Král
568 9a6c9613 Jan Pašek
        :return:
569
            list of the certificates OR
570
            None if the list is empty OR
571
            sqlite3.Error if an exception is thrown
572 ce8b9aaf Stanislav Král
        """
573
574 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
575 5e31b492 David Friesecký
576 ce8b9aaf Stanislav Král
        try:
577 9a6c9613 Jan Pašek
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
578
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
579
            values = [certificate_id]
580
            self.cursor.execute(sql, values)
581
            certificate_rows = self.cursor.fetchall()
582
583
            certificates: List[Certificate] = []
584
            for certificate_row in certificate_rows:
585
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
586
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
587
                values = [certificate_row[0]]
588
                self.cursor.execute(sql, values)
589
                usage_rows = self.cursor.fetchall()
590
591
                usage_dict: Dict[int, bool] = {}
592
                for usage_row in usage_rows:
593
                    usage_dict[usage_row[2]] = True
594
595
                certificates.append(Certificate(certificate_row[0],      # ID
596
                                                certificate_row[1],      # valid from
597
                                                certificate_row[2],      # valid to
598
                                                certificate_row[3],      # pem data
599
                                                certificate_row[14],     # type ID
600
                                                certificate_row[15],     # parent ID
601
                                                certificate_row[16],     # private key ID
602
                                                usage_dict,
603
                                                certificate_row[4],      # common name
604
                                                certificate_row[5],      # country code
605
                                                certificate_row[6],      # locality
606
                                                certificate_row[7],      # province
607
                                                certificate_row[8],      # organization
608
                                                certificate_row[9],      # organizational unit
609
                                                certificate_row[10],     # email address
610
                                                certificate_row[11],     # revocation date
611
                                                certificate_row[12]))    # revocation reason
612
        except IntegrityError:
613
            Logger.error(INTEGRITY_ERROR_MSG)
614
            raise DatabaseException(INTEGRITY_ERROR_MSG)
615
        except ProgrammingError:
616
            Logger.error(PROGRAMMING_ERROR_MSG)
617
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
618
        except OperationalError:
619
            Logger.error(OPERATIONAL_ERROR_MSG)
620
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
621
        except NotSupportedError:
622
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
623
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
624
        except DatabaseError:
625
            Logger.error(DATABASE_ERROR_MSG)
626
            raise DatabaseException(DATABASE_ERROR_MSG)
627
        except Error:
628
            Logger.error(ERROR_MSG)
629
            raise DatabaseException(ERROR_MSG)
630
631
        return certificates
632
633
    def get_all_issued_by(self, certificate_id: int):
634 ce8b9aaf Stanislav Král
        """
635 9a6c9613 Jan Pašek
        Get list of the certificates that are direct descendants of the certificate with the ID
636 ce8b9aaf Stanislav Král
637 9a6c9613 Jan Pašek
        :param certificate_id: ID of specific certificate
638 cfda1725 Stanislav Král
639 9a6c9613 Jan Pašek
        :return:
640
            list of the certificates OR
641
            None if the list is empty OR
642
            sqlite3.Error if an exception is thrown
643 cfda1725 Stanislav Král
        """
644
645 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
646 5e31b492 David Friesecký
647 cfda1725 Stanislav Král
        try:
648 9a6c9613 Jan Pašek
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
649
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
650
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
651
            values = [certificate_id, certificate_id]
652
            self.cursor.execute(sql, values)
653
            certificate_rows = self.cursor.fetchall()
654
655
            certificates: List[Certificate] = []
656
            for certificate_row in certificate_rows:
657
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
658
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
659
                values = [certificate_row[0]]
660
                self.cursor.execute(sql, values)
661
                usage_rows = self.cursor.fetchall()
662
663
                usage_dict: Dict[int, bool] = {}
664
                for usage_row in usage_rows:
665
                    usage_dict[usage_row[2]] = True
666
667
                certificates.append(Certificate(certificate_row[0],      # ID
668
                                                certificate_row[1],      # valid from
669
                                                certificate_row[2],      # valid to
670
                                                certificate_row[3],      # pem data
671
                                                certificate_row[14],     # type ID
672
                                                certificate_row[15],     # parent ID
673
                                                certificate_row[16],     # private key ID
674
                                                usage_dict,
675
                                                certificate_row[4],      # common name
676
                                                certificate_row[5],      # country code
677
                                                certificate_row[6],      # locality
678
                                                certificate_row[7],      # province
679
                                                certificate_row[8],      # organization
680
                                                certificate_row[9],      # organizational unit
681
                                                certificate_row[10],     # email address
682
                                                certificate_row[11],     # revocation date
683
                                                certificate_row[12]))    # revocation reason
684
        except IntegrityError:
685
            Logger.error(INTEGRITY_ERROR_MSG)
686
            raise DatabaseException(INTEGRITY_ERROR_MSG)
687
        except ProgrammingError:
688
            Logger.error(PROGRAMMING_ERROR_MSG)
689
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
690
        except OperationalError:
691
            Logger.error(OPERATIONAL_ERROR_MSG)
692
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
693
        except NotSupportedError:
694
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
695
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
696
        except DatabaseError:
697
            Logger.error(DATABASE_ERROR_MSG)
698
            raise DatabaseException(DATABASE_ERROR_MSG)
699
        except Error:
700
            Logger.error(ERROR_MSG)
701
            raise DatabaseException(ERROR_MSG)
702
703
        return certificates
704
705
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
706 9cf9a19d Captain_Trojan
        """
707 9a6c9613 Jan Pašek
        Get list of the certificates that are direct descendants of the certificate with the ID
708
709
        :param target_types: certificate types (filter)
710
        :param target_usages: certificate usages (filter)
711
        :param target_cn_substring: certificate CN substring (filter)
712
        :param page: target page
713
        :param per_page: target page size
714
        :param issuer_id: ID of specific certificate
715
716
        :return:
717
            list of the certificates OR
718
            None if the list is empty
719 9cf9a19d Captain_Trojan
        """
720 5e31b492 David Friesecký
721 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
722 5e31b492 David Friesecký
723 9cf9a19d Captain_Trojan
        try:
724 9a6c9613 Jan Pašek
            values = [issuer_id, issuer_id]
725
            values += list(target_types)
726
            values += list(target_usages)
727
728
            sql = (
729
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
730
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
731
                f"FROM {TAB_CERTIFICATES} a "
732
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
733
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
734
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
735
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
736
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
737
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
738
                f") > 0 "
739
            )
740 9cf9a19d Captain_Trojan
741 9a6c9613 Jan Pašek
            if target_cn_substring is not None:
742
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
743
744
                values += [target_cn_substring]
745
746
            if page is not None and per_page is not None:
747
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
748
749
            self.cursor.execute(sql, values)
750
            certificate_rows = self.cursor.fetchall()
751
752
            certificates: List[Certificate] = []
753
            for certificate_row in certificate_rows:
754
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
755
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
756
                values = [certificate_row[0]]
757
                self.cursor.execute(sql, values)
758
                usage_rows = self.cursor.fetchall()
759
760
                usage_dict: Dict[int, bool] = {}
761
                for usage_row in usage_rows:
762
                    usage_dict[usage_row[2]] = True
763
764
                certificates.append(Certificate(certificate_row[0],
765
                                                certificate_row[1],
766
                                                certificate_row[2],
767
                                                certificate_row[3],
768
                                                certificate_row[4],
769
                                                certificate_row[7],
770
                                                certificate_row[8],
771
                                                certificate_row[9],
772
                                                usage_dict,
773
                                                certificate_row[5],
774
                                                certificate_row[6]))
775
        except IntegrityError:
776
            Logger.error(INTEGRITY_ERROR_MSG)
777
            raise DatabaseException(INTEGRITY_ERROR_MSG)
778
        except ProgrammingError:
779
            Logger.error(PROGRAMMING_ERROR_MSG)
780
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
781
        except OperationalError:
782
            Logger.error(OPERATIONAL_ERROR_MSG)
783
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
784
        except NotSupportedError:
785
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
786
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
787
        except DatabaseError:
788
            Logger.error(DATABASE_ERROR_MSG)
789
            raise DatabaseException(DATABASE_ERROR_MSG)
790
        except Error:
791
            Logger.error(ERROR_MSG)
792
            raise DatabaseException(ERROR_MSG)
793
794
        return certificates
795
796
    def get_all_descendants_of(self, certificate_id: int):
797 1d8ff0a3 Stanislav Král
        """
798 9a6c9613 Jan Pašek
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
799
        between C and its root certificate authority (i.e. is an ancestor of C).
800
        :param certificate_id: target certificate ID
801
        :return: list of all descendants
802 1d8ff0a3 Stanislav Král
        """
803
804 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
805 1d8ff0a3 Stanislav Král
806
        try:
807 9a6c9613 Jan Pašek
            def dfs(children_of, this, collection: list):
808
                for child in children_of(this.certificate_id):
809
                    dfs(children_of, child, collection)
810
                collection.append(this)
811
812
            subtree_root = self.read(certificate_id)
813
            if subtree_root is None:
814
                return None
815
816
            all_certs = []
817
            dfs(self.get_all_issued_by, subtree_root, all_certs)
818
        except IntegrityError:
819
            Logger.error(INTEGRITY_ERROR_MSG)
820
            raise DatabaseException(INTEGRITY_ERROR_MSG)
821
        except ProgrammingError:
822
            Logger.error(PROGRAMMING_ERROR_MSG)
823
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
824
        except OperationalError:
825
            Logger.error(OPERATIONAL_ERROR_MSG)
826
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
827
        except NotSupportedError:
828
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
829
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
830
        except DatabaseError:
831
            Logger.error(DATABASE_ERROR_MSG)
832
            raise DatabaseException(DATABASE_ERROR_MSG)
833
        except Error:
834
            Logger.error(ERROR_MSG)
835
            raise DatabaseException(ERROR_MSG)
836
837
        return all_certs
838
839
    def get_next_id(self) -> int:
840
        """
841
        Get identifier of the next certificate that will be inserted into the database
842
        :return: identifier of the next certificate that will be added into the database
843
        """
844 1d8ff0a3 Stanislav Král
845 9a6c9613 Jan Pašek
        Logger.debug("Function launched.")
846 1d8ff0a3 Stanislav Král
847 9a6c9613 Jan Pašek
        try:
848
            # get next IDs of all tables
849
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
850
            results = self.cursor.fetchall()
851
852
            # search for next ID in Certificates table and return it
853
            for result in results:
854
                if result[0] == TAB_CERTIFICATES:
855
                    return result[1] + 1  # current last id + 1
856
            # if certificates table is not present in the query results, return 1
857
        except IntegrityError:
858
            Logger.error(INTEGRITY_ERROR_MSG)
859
            raise DatabaseException(INTEGRITY_ERROR_MSG)
860
        except ProgrammingError:
861
            Logger.error(PROGRAMMING_ERROR_MSG)
862
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
863
        except OperationalError:
864
            Logger.error(OPERATIONAL_ERROR_MSG)
865
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
866
        except NotSupportedError:
867
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
868
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
869
        except DatabaseError:
870
            Logger.error(DATABASE_ERROR_MSG)
871
            raise DatabaseException(DATABASE_ERROR_MSG)
872
        except Error:
873
            Logger.error(ERROR_MSG)
874
            raise DatabaseException(ERROR_MSG)
875
876
        return 1