Projekt

Obecné

Profil

Stáhnout (33.8 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 cf247eaa Captain_Trojan
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 1636aefe David Friesecký
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 1d2add74 Jan Pašek
from injector import inject
8 f8b23532 David Friesecký
from src.constants import *
9 1d2add74 Jan Pašek
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 e9e55282 David Friesecký
12 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18 e9e55282 David Friesecký
19
20 f8b23532 David Friesecký
class CertificateRepository:
21 25053504 David Friesecký
22 1d2add74 Jan Pašek
    @inject
23
    def __init__(self, connection: Connection):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
26 a0602bad David Friesecký
27 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
28 a0602bad David Friesecký
        """
29 f8b23532 David Friesecký
        self.connection = connection
30 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
31 e9e55282 David Friesecký
32 805077f5 David Friesecký
    def create(self, certificate: Certificate):
33 a0602bad David Friesecký
        """
34 f8b23532 David Friesecký
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36 a0602bad David Friesecký
37 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
38 a0602bad David Friesecký
39 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
40 a0602bad David Friesecký
        """
41
42 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
43
44 f8b23532 David Friesecký
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46
                   f"({COL_COMMON_NAME},"
47
                   f"{COL_VALID_FROM},"
48
                   f"{COL_VALID_TO},"
49
                   f"{COL_PEM_DATA},"
50
                   f"{COL_PRIVATE_KEY_ID},"
51
                   f"{COL_TYPE_ID},"
52
                   f"{COL_PARENT_ID})"
53
                   f"VALUES(?,?,?,?,?,?,?)")
54
            values = [certificate.common_name,
55
                      certificate.valid_from,
56
                      certificate.valid_to,
57
                      certificate.pem_data,
58
                      certificate.private_key_id,
59
                      certificate.type_id,
60
                      certificate.parent_id]
61
            self.cursor.execute(sql, values)
62
            self.connection.commit()
63
64
            last_id: int = self.cursor.lastrowid
65
66 f3125948 Stanislav Král
            # TODO assure that this is correct
67
            if certificate.type_id == ROOT_CA_ID:
68 f8b23532 David Friesecký
                certificate.parent_id = last_id
69 093d06df Stanislav Král
                self.update(last_id, certificate)
70 f8b23532 David Friesecký
            else:
71 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
72 f8b23532 David Friesecký
                    if usage_value:
73
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
74
                               f"({COL_CERTIFICATE_ID},"
75
                               f"{COL_USAGE_TYPE_ID}) "
76
                               f"VALUES (?,?)")
77
                        values = [last_id, usage_id]
78
                        self.cursor.execute(sql, values)
79
                        self.connection.commit()
80 b3c80ccb David Friesecký
        except IntegrityError:
81
            Logger.error(INTEGRITY_ERROR_MSG)
82
            raise DatabaseException(INTEGRITY_ERROR_MSG)
83
        except ProgrammingError:
84
            Logger.error(PROGRAMMING_ERROR_MSG)
85
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
86
        except OperationalError:
87
            Logger.error(OPERATIONAL_ERROR_MSG)
88
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
89
        except NotSupportedError:
90
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
91
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
92
        except DatabaseError:
93
            Logger.error(DATABASE_ERROR_MSG)
94
            raise DatabaseException(DATABASE_ERROR_MSG)
95
        except Error:
96
            Logger.error(ERROR_MSG)
97
            raise DatabaseException(ERROR_MSG)
98 f8b23532 David Friesecký
99 805077f5 David Friesecký
        return last_id
100 f8b23532 David Friesecký
101 805077f5 David Friesecký
    def read(self, certificate_id: int):
102 f8b23532 David Friesecký
        """
103
        Reads (selects) a certificate.
104 e9e55282 David Friesecký
105 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
106 e9e55282 David Friesecký
107 f8b23532 David Friesecký
        :return: instance of the Certificate object
108
        """
109 e9e55282 David Friesecký
110 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
111
112 f8b23532 David Friesecký
        try:
113
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
114 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
115 f8b23532 David Friesecký
            values = [certificate_id]
116
            self.cursor.execute(sql, values)
117 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
118 e9e55282 David Friesecký
119 6f4a5f24 Captain_Trojan
            if certificate_row is None:
120
                return None
121
122 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
123
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
124
            self.cursor.execute(sql, values)
125
            usage_rows = self.cursor.fetchall()
126 1636aefe David Friesecký
127
            usage_dict: Dict[int, bool] = {}
128
            for usage_row in usage_rows:
129 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
130
131
            certificate: Certificate = Certificate(certificate_row[0],
132
                                                   certificate_row[1],
133
                                                   certificate_row[2],
134
                                                   certificate_row[3],
135
                                                   certificate_row[4],
136 58051326 David Friesecký
                                                   certificate_row[8],
137
                                                   certificate_row[9],
138 6425fa36 David Friesecký
                                                   certificate_row[10],
139 58051326 David Friesecký
                                                   usage_dict,
140
                                                   certificate_row[5],
141
                                                   certificate_row[6])
142 b3c80ccb David Friesecký
        except IntegrityError:
143
            Logger.error(INTEGRITY_ERROR_MSG)
144
            raise DatabaseException(INTEGRITY_ERROR_MSG)
145
        except ProgrammingError:
146
            Logger.error(PROGRAMMING_ERROR_MSG)
147
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
148
        except OperationalError:
149
            Logger.error(OPERATIONAL_ERROR_MSG)
150
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
151
        except NotSupportedError:
152
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
153
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
154
        except DatabaseError:
155
            Logger.error(DATABASE_ERROR_MSG)
156
            raise DatabaseException(DATABASE_ERROR_MSG)
157
        except Error:
158
            Logger.error(ERROR_MSG)
159
            raise DatabaseException(ERROR_MSG)
160 f8b23532 David Friesecký
161 d65b022d David Friesecký
        return certificate
162 f8b23532 David Friesecký
163 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
164 9e22e20c David Friesecký
        """
165
        Reads (selects) all certificates (with type).
166
167
        :param filter_type: ID of certificate type from CertificateTypes table
168
169
        :return: list of certificates
170
        """
171
172 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
173
174 9e22e20c David Friesecký
        try:
175
            sql_extension = ""
176
            values = []
177
            if filter_type is not None:
178 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
179 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
180 9e22e20c David Friesecký
                values = [filter_type]
181
182 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
183
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
184 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
185
            certificate_rows = self.cursor.fetchall()
186
187
            certificates: List[Certificate] = []
188
            for certificate_row in certificate_rows:
189
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
190
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
191
                values = [certificate_row[0]]
192
                self.cursor.execute(sql, values)
193
                usage_rows = self.cursor.fetchall()
194
195
                usage_dict: Dict[int, bool] = {}
196
                for usage_row in usage_rows:
197
                    usage_dict[usage_row[2]] = True
198
199
                certificates.append(Certificate(certificate_row[0],
200
                                                certificate_row[1],
201
                                                certificate_row[2],
202
                                                certificate_row[3],
203
                                                certificate_row[4],
204 58051326 David Friesecký
                                                certificate_row[8],
205
                                                certificate_row[9],
206 6425fa36 David Friesecký
                                                certificate_row[10],
207 58051326 David Friesecký
                                                usage_dict,
208
                                                certificate_row[5],
209
                                                certificate_row[6]))
210 b3c80ccb David Friesecký
        except IntegrityError:
211
            Logger.error(INTEGRITY_ERROR_MSG)
212
            raise DatabaseException(INTEGRITY_ERROR_MSG)
213
        except ProgrammingError:
214
            Logger.error(PROGRAMMING_ERROR_MSG)
215
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
216
        except OperationalError:
217
            Logger.error(OPERATIONAL_ERROR_MSG)
218
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
219
        except NotSupportedError:
220
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
221
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
222
        except DatabaseError:
223
            Logger.error(DATABASE_ERROR_MSG)
224
            raise DatabaseException(DATABASE_ERROR_MSG)
225
        except Error:
226
            Logger.error(ERROR_MSG)
227
            raise DatabaseException(ERROR_MSG)
228 9e22e20c David Friesecký
229 0f3af523 Stanislav Král
        return certificates
230 9e22e20c David Friesecký
231 cf247eaa Captain_Trojan
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
232
                        per_page: int):
233
        """
234
        Reads (selects) all certificates according to a specific filtering and pagination options.
235
        :param target_types: certificate types (filter)
236
        :param target_usages: certificate usages (filter)
237
        :param target_cn_substring: certificate CN substring (filter)
238
        :param page: target page
239
        :param per_page: target page size
240
        :return: list of certificates
241
        """
242
243
        Logger.debug("Function launched.")
244
245
        try:
246
            values = []
247
            values += list(target_types)
248
            values += list(target_usages)
249
250
            sql = (
251
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
252
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
253
                f"FROM {TAB_CERTIFICATES} a "
254
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
255
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
256
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
257
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
258
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
259
                f") > 0 "
260
            )
261
262
            if target_cn_substring is not None:
263
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
264
265
                values += [target_cn_substring]
266
267
            if page is not None and per_page is not None:
268
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
269
270
            self.cursor.execute(sql, values)
271
            certificate_rows = self.cursor.fetchall()
272
273
            certificates: List[Certificate] = []
274
            for certificate_row in certificate_rows:
275
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
276
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
277
                types = [certificate_row[0]]
278
                self.cursor.execute(sql, types)
279
                usage_rows = self.cursor.fetchall()
280
281
                usage_dict: Dict[int, bool] = {}
282
                for usage_row in usage_rows:
283
                    usage_dict[usage_row[0]] = True
284
285
                certificates.append(Certificate(certificate_row[0],
286
                                                certificate_row[1],
287
                                                certificate_row[2],
288
                                                certificate_row[3],
289
                                                certificate_row[4],
290
                                                certificate_row[7],
291
                                                certificate_row[8],
292
                                                certificate_row[9],
293
                                                usage_dict,
294
                                                certificate_row[5],
295
                                                certificate_row[6]))
296
        except IntegrityError:
297
            Logger.error(INTEGRITY_ERROR_MSG)
298
            raise DatabaseException(INTEGRITY_ERROR_MSG)
299
        except ProgrammingError:
300
            Logger.error(PROGRAMMING_ERROR_MSG)
301
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
302
        except OperationalError:
303
            Logger.error(OPERATIONAL_ERROR_MSG)
304
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
305
        except NotSupportedError:
306
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
307
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
308
        except DatabaseError:
309
            Logger.error(DATABASE_ERROR_MSG)
310
            raise DatabaseException(DATABASE_ERROR_MSG)
311
        except Error:
312
            Logger.error(ERROR_MSG)
313
            raise DatabaseException(ERROR_MSG)
314
315
        return certificates
316
317 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
318 a0602bad David Friesecký
        """
319 f8b23532 David Friesecký
        Updates a certificate.
320
        If the parameter of certificate (Certificate object) is not to be changed,
321
        the same value must be specified.
322 a0602bad David Friesecký
323
        :param certificate_id: ID of specific certificate
324 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
325 a0602bad David Friesecký
326
        :return: the result of whether the updation was successful
327
        """
328
329 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
330
331 f8b23532 David Friesecký
        try:
332
            sql = (f"UPDATE {TAB_CERTIFICATES} "
333
                   f"SET {COL_COMMON_NAME} = ?, "
334
                   f"{COL_VALID_FROM} = ?, "
335
                   f"{COL_VALID_TO} = ?, "
336
                   f"{COL_PEM_DATA} = ?, "
337
                   f"{COL_PRIVATE_KEY_ID} = ?, "
338
                   f"{COL_TYPE_ID} = ?, "
339
                   f"{COL_PARENT_ID} = ? "
340
                   f"WHERE {COL_ID} = ?")
341
            values = [certificate.common_name,
342
                      certificate.valid_from,
343
                      certificate.valid_to,
344
                      certificate.pem_data,
345
                      certificate.private_key_id,
346
                      certificate.type_id,
347 805077f5 David Friesecký
                      certificate.parent_id,
348
                      certificate_id]
349 6425fa36 David Friesecký
350 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
351
            self.connection.commit()
352
353
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
354
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
355
            values = [certificate_id]
356
            self.cursor.execute(sql, values)
357
            self.connection.commit()
358
359 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
360
361 f3125948 Stanislav Král
            # iterate over usage pairs
362
            for usage_id, usage_value in certificate.usages.items():
363 f8b23532 David Friesecký
                if usage_value:
364
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
365
                           f"({COL_CERTIFICATE_ID},"
366
                           f"{COL_USAGE_TYPE_ID}) "
367
                           f"VALUES (?,?)")
368
                    values = [certificate_id, usage_id]
369
                    self.cursor.execute(sql, values)
370
                    self.connection.commit()
371 b3c80ccb David Friesecký
        except IntegrityError:
372
            Logger.error(INTEGRITY_ERROR_MSG)
373
            raise DatabaseException(INTEGRITY_ERROR_MSG)
374
        except ProgrammingError:
375
            Logger.error(PROGRAMMING_ERROR_MSG)
376
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
377
        except OperationalError:
378
            Logger.error(OPERATIONAL_ERROR_MSG)
379
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
380
        except NotSupportedError:
381
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
382
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
383
        except DatabaseError:
384
            Logger.error(DATABASE_ERROR_MSG)
385
            raise DatabaseException(DATABASE_ERROR_MSG)
386
        except Error:
387
            Logger.error(ERROR_MSG)
388
            raise DatabaseException(ERROR_MSG)
389 f8b23532 David Friesecký
390 c36d3299 David Friesecký
        return check_updated
391 e9e55282 David Friesecký
392 58051326 David Friesecký
    def delete(self, certificate_id: int):
393 a0602bad David Friesecký
        """
394
        Deletes a certificate
395
396
        :param certificate_id: ID of specific certificate
397
398
        :return: the result of whether the deletion was successful
399
        """
400
401 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
402
403 f8b23532 David Friesecký
        try:
404 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
405
                   f"SET {COL_DELETION_DATE} = ? "
406
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
407
            values = [int(time.time()),
408
                      certificate_id]
409 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
410
            self.connection.commit()
411 b3c80ccb David Friesecký
        except IntegrityError:
412
            Logger.error(INTEGRITY_ERROR_MSG)
413
            raise DatabaseException(INTEGRITY_ERROR_MSG)
414
        except ProgrammingError:
415
            Logger.error(PROGRAMMING_ERROR_MSG)
416
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
417
        except OperationalError:
418
            Logger.error(OPERATIONAL_ERROR_MSG)
419
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
420
        except NotSupportedError:
421
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
422
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
423
        except DatabaseError:
424
            Logger.error(DATABASE_ERROR_MSG)
425
            raise DatabaseException(DATABASE_ERROR_MSG)
426
        except Error:
427
            Logger.error(ERROR_MSG)
428
            raise DatabaseException(ERROR_MSG)
429 f8b23532 David Friesecký
430 45744020 Stanislav Král
        return self.cursor.rowcount > 0
431 58051326 David Friesecký
432
    def set_certificate_revoked(
433 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
434 58051326 David Friesecký
        """
435
        Revoke a certificate
436
437
        :param certificate_id: ID of specific certificate
438
        :param revocation_date: Date, when the certificate is revoked
439
        :param revocation_reason: Reason of the revocation
440
441 8b049f43 David Friesecký
        :return:
442
            the result of whether the revocation was successful OR
443
            sqlite3.Error if an exception is thrown
444 58051326 David Friesecký
        """
445
446 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
447
448 58051326 David Friesecký
        try:
449 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
450
                revocation_reason = REV_REASON_UNSPECIFIED
451
            elif revocation_date == "":
452
                return False
453
454 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
455
                   f"SET {COL_REVOCATION_DATE} = ?, "
456 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
457 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
458
            values = [revocation_date,
459
                      revocation_reason,
460
                      certificate_id]
461
            self.cursor.execute(sql, values)
462
            self.connection.commit()
463 b3c80ccb David Friesecký
        except IntegrityError:
464
            Logger.error(INTEGRITY_ERROR_MSG)
465
            raise DatabaseException(INTEGRITY_ERROR_MSG)
466
        except ProgrammingError:
467
            Logger.error(PROGRAMMING_ERROR_MSG)
468
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
469
        except OperationalError:
470
            Logger.error(OPERATIONAL_ERROR_MSG)
471
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
472
        except NotSupportedError:
473
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
474
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
475
        except DatabaseError:
476
            Logger.error(DATABASE_ERROR_MSG)
477
            raise DatabaseException(DATABASE_ERROR_MSG)
478
        except Error:
479
            Logger.error(ERROR_MSG)
480
            raise DatabaseException(ERROR_MSG)
481 8b049f43 David Friesecký
482 d65b022d David Friesecký
        return self.cursor.rowcount > 0
483 58051326 David Friesecký
484
    def clear_certificate_revocation(self, certificate_id: int):
485
        """
486
        Clear revocation of a certificate
487
488
        :param certificate_id: ID of specific certificate
489
490 8b049f43 David Friesecký
        :return:
491
            the result of whether the clear revocation was successful OR
492
            sqlite3.Error if an exception is thrown
493 58051326 David Friesecký
        """
494
495 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
496
497 58051326 David Friesecký
        try:
498
            sql = (f"UPDATE {TAB_CERTIFICATES} "
499
                   f"SET {COL_REVOCATION_DATE} = '', "
500 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
501 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
502
            values = [certificate_id]
503
            self.cursor.execute(sql, values)
504
            self.connection.commit()
505 b3c80ccb David Friesecký
        except IntegrityError:
506
            Logger.error(INTEGRITY_ERROR_MSG)
507
            raise DatabaseException(INTEGRITY_ERROR_MSG)
508
        except ProgrammingError:
509
            Logger.error(PROGRAMMING_ERROR_MSG)
510
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
511
        except OperationalError:
512
            Logger.error(OPERATIONAL_ERROR_MSG)
513
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
514
        except NotSupportedError:
515
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
516
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
517
        except DatabaseError:
518
            Logger.error(DATABASE_ERROR_MSG)
519
            raise DatabaseException(DATABASE_ERROR_MSG)
520
        except Error:
521
            Logger.error(ERROR_MSG)
522
            raise DatabaseException(ERROR_MSG)
523 f8b23532 David Friesecký
524 45744020 Stanislav Král
        return self.cursor.rowcount > 0
525 58051326 David Friesecký
526
    def get_all_revoked_by(self, certificate_id: int):
527
        """
528
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
529
530
        :param certificate_id: ID of specific certificate
531
532
        :return:
533 8b049f43 David Friesecký
            list of the certificates OR
534
            None if the list is empty OR
535
            sqlite3.Error if an exception is thrown
536 58051326 David Friesecký
        """
537
538 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
539
540 58051326 David Friesecký
        try:
541
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
542
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
543
            values = [certificate_id]
544
            self.cursor.execute(sql, values)
545
            certificate_rows = self.cursor.fetchall()
546
547
            certificates: List[Certificate] = []
548
            for certificate_row in certificate_rows:
549
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
550
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
551
                values = [certificate_row[0]]
552
                self.cursor.execute(sql, values)
553
                usage_rows = self.cursor.fetchall()
554
555
                usage_dict: Dict[int, bool] = {}
556
                for usage_row in usage_rows:
557
                    usage_dict[usage_row[2]] = True
558
559
                certificates.append(Certificate(certificate_row[0],
560
                                                certificate_row[1],
561
                                                certificate_row[2],
562
                                                certificate_row[3],
563
                                                certificate_row[4],
564
                                                certificate_row[8],
565
                                                certificate_row[9],
566 6425fa36 David Friesecký
                                                certificate_row[10],
567 58051326 David Friesecký
                                                usage_dict,
568
                                                certificate_row[5],
569
                                                certificate_row[6]))
570 b3c80ccb David Friesecký
        except IntegrityError:
571
            Logger.error(INTEGRITY_ERROR_MSG)
572
            raise DatabaseException(INTEGRITY_ERROR_MSG)
573
        except ProgrammingError:
574
            Logger.error(PROGRAMMING_ERROR_MSG)
575
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
576
        except OperationalError:
577
            Logger.error(OPERATIONAL_ERROR_MSG)
578
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
579
        except NotSupportedError:
580
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
581
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
582
        except DatabaseError:
583
            Logger.error(DATABASE_ERROR_MSG)
584
            raise DatabaseException(DATABASE_ERROR_MSG)
585
        except Error:
586
            Logger.error(ERROR_MSG)
587
            raise DatabaseException(ERROR_MSG)
588 58051326 David Friesecký
589 d65b022d David Friesecký
        return certificates
590 58051326 David Friesecký
591
    def get_all_issued_by(self, certificate_id: int):
592
        """
593
        Get list of the certificates that are direct descendants of the certificate with the ID
594
595
        :param certificate_id: ID of specific certificate
596
597
        :return:
598 8b049f43 David Friesecký
            list of the certificates OR
599
            None if the list is empty OR
600
            sqlite3.Error if an exception is thrown
601 58051326 David Friesecký
        """
602
603 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
604
605 58051326 David Friesecký
        try:
606
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
607 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
608
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
609 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
610 58051326 David Friesecký
            self.cursor.execute(sql, values)
611
            certificate_rows = self.cursor.fetchall()
612
613
            certificates: List[Certificate] = []
614
            for certificate_row in certificate_rows:
615
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
616
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
617
                values = [certificate_row[0]]
618
                self.cursor.execute(sql, values)
619
                usage_rows = self.cursor.fetchall()
620
621
                usage_dict: Dict[int, bool] = {}
622
                for usage_row in usage_rows:
623
                    usage_dict[usage_row[2]] = True
624
625
                certificates.append(Certificate(certificate_row[0],
626
                                                certificate_row[1],
627
                                                certificate_row[2],
628
                                                certificate_row[3],
629
                                                certificate_row[4],
630
                                                certificate_row[8],
631
                                                certificate_row[9],
632 6425fa36 David Friesecký
                                                certificate_row[10],
633 58051326 David Friesecký
                                                usage_dict,
634
                                                certificate_row[5],
635
                                                certificate_row[6]))
636 cf247eaa Captain_Trojan
        except IntegrityError:
637
            Logger.error(INTEGRITY_ERROR_MSG)
638
            raise DatabaseException(INTEGRITY_ERROR_MSG)
639
        except ProgrammingError:
640
            Logger.error(PROGRAMMING_ERROR_MSG)
641
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
642
        except OperationalError:
643
            Logger.error(OPERATIONAL_ERROR_MSG)
644
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
645
        except NotSupportedError:
646
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
647
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
648
        except DatabaseError:
649
            Logger.error(DATABASE_ERROR_MSG)
650
            raise DatabaseException(DATABASE_ERROR_MSG)
651
        except Error:
652
            Logger.error(ERROR_MSG)
653
            raise DatabaseException(ERROR_MSG)
654
655
        return certificates
656
657
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
658
        """
659
        Get list of the certificates that are direct descendants of the certificate with the ID
660
661
        :param target_types: certificate types (filter)
662
        :param target_usages: certificate usages (filter)
663
        :param target_cn_substring: certificate CN substring (filter)
664
        :param page: target page
665
        :param per_page: target page size
666
        :param issuer_id: ID of specific certificate
667
668
        :return:
669
            list of the certificates OR
670
            None if the list is empty
671
        """
672
673
        Logger.debug("Function launched.")
674
675
        try:
676
            values = [issuer_id, issuer_id]
677
            values += list(target_types)
678
            values += list(target_usages)
679
680
            sql = (
681
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
682
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
683
                f"FROM {TAB_CERTIFICATES} a "
684
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
685
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
686
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
687
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
688
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
689
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
690
                f") > 0 "
691
            )
692
693
            if target_cn_substring is not None:
694
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
695
696
                values += [target_cn_substring]
697
698
            if page is not None and per_page is not None:
699
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
700
701
            self.cursor.execute(sql, values)
702
            certificate_rows = self.cursor.fetchall()
703
704
            certificates: List[Certificate] = []
705
            for certificate_row in certificate_rows:
706
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
707
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
708
                values = [certificate_row[0]]
709
                self.cursor.execute(sql, values)
710
                usage_rows = self.cursor.fetchall()
711
712
                usage_dict: Dict[int, bool] = {}
713
                for usage_row in usage_rows:
714
                    usage_dict[usage_row[2]] = True
715
716
                certificates.append(Certificate(certificate_row[0],
717
                                                certificate_row[1],
718
                                                certificate_row[2],
719
                                                certificate_row[3],
720
                                                certificate_row[4],
721
                                                certificate_row[7],
722
                                                certificate_row[8],
723
                                                certificate_row[9],
724
                                                usage_dict,
725
                                                certificate_row[5],
726
                                                certificate_row[6]))
727 b3c80ccb David Friesecký
        except IntegrityError:
728
            Logger.error(INTEGRITY_ERROR_MSG)
729
            raise DatabaseException(INTEGRITY_ERROR_MSG)
730
        except ProgrammingError:
731
            Logger.error(PROGRAMMING_ERROR_MSG)
732
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
733
        except OperationalError:
734
            Logger.error(OPERATIONAL_ERROR_MSG)
735
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
736
        except NotSupportedError:
737
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
738
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
739
        except DatabaseError:
740
            Logger.error(DATABASE_ERROR_MSG)
741
            raise DatabaseException(DATABASE_ERROR_MSG)
742
        except Error:
743
            Logger.error(ERROR_MSG)
744
            raise DatabaseException(ERROR_MSG)
745 58051326 David Friesecký
746 d65b022d David Friesecký
        return certificates
747 94f6d8b8 Jan Pašek
748 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
749
        """
750
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
751
        between C and its root certificate authority (i.e. is an ancestor of C).
752
        :param certificate_id: target certificate ID
753
        :return: list of all descendants
754
        """
755
756 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
757
758
        try:
759
            def dfs(children_of, this, collection: list):
760
                for child in children_of(this.certificate_id):
761
                    dfs(children_of, child, collection)
762
                collection.append(this)
763
764
            subtree_root = self.read(certificate_id)
765
            if subtree_root is None:
766
                return None
767
768
            all_certs = []
769
            dfs(self.get_all_issued_by, subtree_root, all_certs)
770
        except IntegrityError:
771
            Logger.error(INTEGRITY_ERROR_MSG)
772
            raise DatabaseException(INTEGRITY_ERROR_MSG)
773
        except ProgrammingError:
774
            Logger.error(PROGRAMMING_ERROR_MSG)
775
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
776
        except OperationalError:
777
            Logger.error(OPERATIONAL_ERROR_MSG)
778
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
779
        except NotSupportedError:
780
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
781
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
782
        except DatabaseError:
783
            Logger.error(DATABASE_ERROR_MSG)
784
            raise DatabaseException(DATABASE_ERROR_MSG)
785
        except Error:
786
            Logger.error(ERROR_MSG)
787
            raise DatabaseException(ERROR_MSG)
788 85003184 Captain_Trojan
789
        return all_certs
790
791 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
792
        """
793
        Get identifier of the next certificate that will be inserted into the database
794
        :return: identifier of the next certificate that will be added into the database
795
        """
796 b3c80ccb David Friesecký
797
        Logger.debug("Function launched.")
798
799
        try:
800
            # get next IDs of all tables
801
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
802
            results = self.cursor.fetchall()
803
804
            # search for next ID in Certificates table and return it
805
            for result in results:
806
                if result[0] == TAB_CERTIFICATES:
807
                    return result[1] + 1  # current last id + 1
808
            # if certificates table is not present in the query results, return 1
809
        except IntegrityError:
810
            Logger.error(INTEGRITY_ERROR_MSG)
811
            raise DatabaseException(INTEGRITY_ERROR_MSG)
812
        except ProgrammingError:
813
            Logger.error(PROGRAMMING_ERROR_MSG)
814
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
815
        except OperationalError:
816
            Logger.error(OPERATIONAL_ERROR_MSG)
817
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
818
        except NotSupportedError:
819
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
820
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
821
        except DatabaseError:
822
            Logger.error(DATABASE_ERROR_MSG)
823
            raise DatabaseException(DATABASE_ERROR_MSG)
824
        except Error:
825
            Logger.error(ERROR_MSG)
826
            raise DatabaseException(ERROR_MSG)
827
828 94f6d8b8 Jan Pašek
        return 1