Projekt

Obecné

Profil

Stáhnout (29.7 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 b3c80ccb David Friesecký
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
3 9e22e20c David Friesecký
from typing import Dict, List
4 1636aefe David Friesecký
5 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
6 1d2add74 Jan Pašek
from injector import inject
7 f8b23532 David Friesecký
from src.constants import *
8 1d2add74 Jan Pašek
from src.model.certificate import Certificate
9 5e31b492 David Friesecký
from src.utils.logger import Logger
10 e9e55282 David Friesecký
11 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17 e9e55282 David Friesecký
18
19 f8b23532 David Friesecký
class CertificateRepository:
20 25053504 David Friesecký
21 1d2add74 Jan Pašek
    @inject
22
    def __init__(self, connection: Connection):
23 a0602bad David Friesecký
        """
24 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
27 a0602bad David Friesecký
        """
28 f8b23532 David Friesecký
        self.connection = connection
29 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
30 e9e55282 David Friesecký
31 805077f5 David Friesecký
    def create(self, certificate: Certificate):
32 a0602bad David Friesecký
        """
33 f8b23532 David Friesecký
        Creates a certificate.
34
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
35 a0602bad David Friesecký
36 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
37 a0602bad David Friesecký
38 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
39 a0602bad David Friesecký
        """
40
41 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
42
43 f8b23532 David Friesecký
        try:
44
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
45 0e7c3096 David Friesecký
                   f"({COL_VALID_FROM},"
46 f8b23532 David Friesecký
                   f"{COL_VALID_TO},"
47
                   f"{COL_PEM_DATA},"
48 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME},"
49
                   f"{COL_COUNTRY_CODE},"
50
                   f"{COL_LOCALITY},"
51
                   f"{COL_PROVINCE},"
52
                   f"{COL_ORGANIZATION},"
53
                   f"{COL_ORGANIZATIONAL_UNIT},"
54
                   f"{COL_EMAIL_ADDRESS},"
55 f8b23532 David Friesecký
                   f"{COL_TYPE_ID},"
56 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID},"
57
                   f"{COL_PRIVATE_KEY_ID}) "
58
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
59
            values = [certificate.valid_from,
60 f8b23532 David Friesecký
                      certificate.valid_to,
61
                      certificate.pem_data,
62 0e7c3096 David Friesecký
                      certificate.common_name,
63
                      certificate.country_code,
64
                      certificate.locality,
65
                      certificate.province,
66
                      certificate.organization,
67
                      certificate.organizational_unit,
68
                      certificate.email_address,
69 f8b23532 David Friesecký
                      certificate.type_id,
70 0e7c3096 David Friesecký
                      certificate.parent_id,
71
                      certificate.private_key_id]
72 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
73
            self.connection.commit()
74
75
            last_id: int = self.cursor.lastrowid
76
77 f3125948 Stanislav Král
            if certificate.type_id == ROOT_CA_ID:
78 f8b23532 David Friesecký
                certificate.parent_id = last_id
79 093d06df Stanislav Král
                self.update(last_id, certificate)
80 f8b23532 David Friesecký
            else:
81 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
82 f8b23532 David Friesecký
                    if usage_value:
83
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
84
                               f"({COL_CERTIFICATE_ID},"
85
                               f"{COL_USAGE_TYPE_ID}) "
86
                               f"VALUES (?,?)")
87
                        values = [last_id, usage_id]
88
                        self.cursor.execute(sql, values)
89
                        self.connection.commit()
90 b3c80ccb David Friesecký
        except IntegrityError:
91
            Logger.error(INTEGRITY_ERROR_MSG)
92
            raise DatabaseException(INTEGRITY_ERROR_MSG)
93
        except ProgrammingError:
94
            Logger.error(PROGRAMMING_ERROR_MSG)
95
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
96
        except OperationalError:
97
            Logger.error(OPERATIONAL_ERROR_MSG)
98
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
99
        except NotSupportedError:
100
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
101
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
102
        except DatabaseError:
103
            Logger.error(DATABASE_ERROR_MSG)
104
            raise DatabaseException(DATABASE_ERROR_MSG)
105
        except Error:
106
            Logger.error(ERROR_MSG)
107
            raise DatabaseException(ERROR_MSG)
108 f8b23532 David Friesecký
109 805077f5 David Friesecký
        return last_id
110 f8b23532 David Friesecký
111 805077f5 David Friesecký
    def read(self, certificate_id: int):
112 f8b23532 David Friesecký
        """
113
        Reads (selects) a certificate.
114 e9e55282 David Friesecký
115 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
116 e9e55282 David Friesecký
117 f8b23532 David Friesecký
        :return: instance of the Certificate object
118
        """
119 e9e55282 David Friesecký
120 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
121
122 f8b23532 David Friesecký
        try:
123
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
124 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
125 f8b23532 David Friesecký
            values = [certificate_id]
126
            self.cursor.execute(sql, values)
127 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
128 e9e55282 David Friesecký
129 6f4a5f24 Captain_Trojan
            if certificate_row is None:
130
                return None
131
132 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
133
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
134
            self.cursor.execute(sql, values)
135
            usage_rows = self.cursor.fetchall()
136 1636aefe David Friesecký
137
            usage_dict: Dict[int, bool] = {}
138
            for usage_row in usage_rows:
139 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
140
141 0e7c3096 David Friesecký
            certificate: Certificate = Certificate(certificate_row[0],      # ID
142
                                                   certificate_row[1],      # valid from
143
                                                   certificate_row[2],      # valid to
144
                                                   certificate_row[3],      # pem data
145
                                                   certificate_row[14],     # type ID
146
                                                   certificate_row[15],     # parent ID
147
                                                   certificate_row[16],     # private key ID
148 58051326 David Friesecký
                                                   usage_dict,
149 0e7c3096 David Friesecký
                                                   certificate_row[4],      # common name
150
                                                   certificate_row[5],      # country code
151
                                                   certificate_row[6],      # locality
152
                                                   certificate_row[7],      # province
153
                                                   certificate_row[8],      # organization
154
                                                   certificate_row[9],      # organizational unit
155
                                                   certificate_row[10],     # email address
156
                                                   certificate_row[11],     # revocation date
157
                                                   certificate_row[12])     # revocation reason
158
159 b3c80ccb David Friesecký
        except IntegrityError:
160
            Logger.error(INTEGRITY_ERROR_MSG)
161
            raise DatabaseException(INTEGRITY_ERROR_MSG)
162
        except ProgrammingError:
163
            Logger.error(PROGRAMMING_ERROR_MSG)
164
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
165
        except OperationalError:
166
            Logger.error(OPERATIONAL_ERROR_MSG)
167
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
168
        except NotSupportedError:
169
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
170
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
171
        except DatabaseError:
172
            Logger.error(DATABASE_ERROR_MSG)
173
            raise DatabaseException(DATABASE_ERROR_MSG)
174
        except Error:
175
            Logger.error(ERROR_MSG)
176
            raise DatabaseException(ERROR_MSG)
177 f8b23532 David Friesecký
178 d65b022d David Friesecký
        return certificate
179 f8b23532 David Friesecký
180 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
181 9e22e20c David Friesecký
        """
182
        Reads (selects) all certificates (with type).
183
184
        :param filter_type: ID of certificate type from CertificateTypes table
185
186
        :return: list of certificates
187
        """
188
189 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
190
191 9e22e20c David Friesecký
        try:
192
            sql_extension = ""
193
            values = []
194
            if filter_type is not None:
195 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
196 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
197 9e22e20c David Friesecký
                values = [filter_type]
198
199 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
200
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
201 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
202
            certificate_rows = self.cursor.fetchall()
203
204
            certificates: List[Certificate] = []
205
            for certificate_row in certificate_rows:
206
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
207
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
208
                values = [certificate_row[0]]
209
                self.cursor.execute(sql, values)
210
                usage_rows = self.cursor.fetchall()
211
212
                usage_dict: Dict[int, bool] = {}
213
                for usage_row in usage_rows:
214
                    usage_dict[usage_row[2]] = True
215
216 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
217
                                                certificate_row[1],      # valid from
218
                                                certificate_row[2],      # valid to
219
                                                certificate_row[3],      # pem data
220
                                                certificate_row[14],     # type ID
221
                                                certificate_row[15],     # parent ID
222
                                                certificate_row[16],     # private key ID
223 58051326 David Friesecký
                                                usage_dict,
224 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
225
                                                certificate_row[5],      # country code
226
                                                certificate_row[6],      # locality
227
                                                certificate_row[7],      # province
228
                                                certificate_row[8],      # organization
229
                                                certificate_row[9],      # organizational unit
230
                                                certificate_row[10],     # email address
231
                                                certificate_row[11],     # revocation date
232
                                                certificate_row[12]))    # revocation reason
233 b3c80ccb David Friesecký
        except IntegrityError:
234
            Logger.error(INTEGRITY_ERROR_MSG)
235
            raise DatabaseException(INTEGRITY_ERROR_MSG)
236
        except ProgrammingError:
237
            Logger.error(PROGRAMMING_ERROR_MSG)
238
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
239
        except OperationalError:
240
            Logger.error(OPERATIONAL_ERROR_MSG)
241
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
242
        except NotSupportedError:
243
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
244
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
245
        except DatabaseError:
246
            Logger.error(DATABASE_ERROR_MSG)
247
            raise DatabaseException(DATABASE_ERROR_MSG)
248
        except Error:
249
            Logger.error(ERROR_MSG)
250
            raise DatabaseException(ERROR_MSG)
251 9e22e20c David Friesecký
252 0f3af523 Stanislav Král
        return certificates
253 9e22e20c David Friesecký
254 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
255 a0602bad David Friesecký
        """
256 f8b23532 David Friesecký
        Updates a certificate.
257
        If the parameter of certificate (Certificate object) is not to be changed,
258
        the same value must be specified.
259 a0602bad David Friesecký
260
        :param certificate_id: ID of specific certificate
261 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
262 a0602bad David Friesecký
263
        :return: the result of whether the updation was successful
264
        """
265
266 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
267
268 f8b23532 David Friesecký
        try:
269
            sql = (f"UPDATE {TAB_CERTIFICATES} "
270 0e7c3096 David Friesecký
                   f"SET {COL_VALID_FROM} = ?, "
271 f8b23532 David Friesecký
                   f"{COL_VALID_TO} = ?, "
272
                   f"{COL_PEM_DATA} = ?, "
273 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME} = ?, "
274
                   f"{COL_COUNTRY_CODE} = ?, "
275
                   f"{COL_LOCALITY} = ?, "
276
                   f"{COL_PROVINCE} = ?, "
277
                   f"{COL_ORGANIZATION} = ?, "
278
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
279
                   f"{COL_EMAIL_ADDRESS} = ?, "
280 f8b23532 David Friesecký
                   f"{COL_TYPE_ID} = ?, "
281 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID} = ?, "
282
                   f"{COL_PRIVATE_KEY_ID} = ? "
283 f8b23532 David Friesecký
                   f"WHERE {COL_ID} = ?")
284 0e7c3096 David Friesecký
            values = [certificate.valid_from,
285 f8b23532 David Friesecký
                      certificate.valid_to,
286
                      certificate.pem_data,
287 0e7c3096 David Friesecký
                      certificate.common_name,
288
                      certificate.country_code,
289
                      certificate.locality,
290
                      certificate.province,
291
                      certificate.organization,
292
                      certificate.organizational_unit,
293
                      certificate.email_address,
294 f8b23532 David Friesecký
                      certificate.type_id,
295 805077f5 David Friesecký
                      certificate.parent_id,
296 0e7c3096 David Friesecký
                      certificate.private_key_id,
297 805077f5 David Friesecký
                      certificate_id]
298 6425fa36 David Friesecký
299 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
300
            self.connection.commit()
301
302
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
303
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
304
            values = [certificate_id]
305
            self.cursor.execute(sql, values)
306
            self.connection.commit()
307
308 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
309
310 f3125948 Stanislav Král
            # iterate over usage pairs
311
            for usage_id, usage_value in certificate.usages.items():
312 f8b23532 David Friesecký
                if usage_value:
313
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
314
                           f"({COL_CERTIFICATE_ID},"
315
                           f"{COL_USAGE_TYPE_ID}) "
316
                           f"VALUES (?,?)")
317
                    values = [certificate_id, usage_id]
318
                    self.cursor.execute(sql, values)
319
                    self.connection.commit()
320 b3c80ccb David Friesecký
        except IntegrityError:
321
            Logger.error(INTEGRITY_ERROR_MSG)
322
            raise DatabaseException(INTEGRITY_ERROR_MSG)
323
        except ProgrammingError:
324
            Logger.error(PROGRAMMING_ERROR_MSG)
325
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
326
        except OperationalError:
327
            Logger.error(OPERATIONAL_ERROR_MSG)
328
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
329
        except NotSupportedError:
330
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
331
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
332
        except DatabaseError:
333
            Logger.error(DATABASE_ERROR_MSG)
334
            raise DatabaseException(DATABASE_ERROR_MSG)
335
        except Error:
336
            Logger.error(ERROR_MSG)
337
            raise DatabaseException(ERROR_MSG)
338 f8b23532 David Friesecký
339 c36d3299 David Friesecký
        return check_updated
340 e9e55282 David Friesecký
341 58051326 David Friesecký
    def delete(self, certificate_id: int):
342 a0602bad David Friesecký
        """
343
        Deletes a certificate
344
345
        :param certificate_id: ID of specific certificate
346
347
        :return: the result of whether the deletion was successful
348
        """
349
350 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
351
352 f8b23532 David Friesecký
        try:
353 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
354
                   f"SET {COL_DELETION_DATE} = ? "
355
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
356
            values = [int(time.time()),
357
                      certificate_id]
358 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
359
            self.connection.commit()
360 b3c80ccb David Friesecký
        except IntegrityError:
361
            Logger.error(INTEGRITY_ERROR_MSG)
362
            raise DatabaseException(INTEGRITY_ERROR_MSG)
363
        except ProgrammingError:
364
            Logger.error(PROGRAMMING_ERROR_MSG)
365
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
366
        except OperationalError:
367
            Logger.error(OPERATIONAL_ERROR_MSG)
368
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
369
        except NotSupportedError:
370
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
371
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
372
        except DatabaseError:
373
            Logger.error(DATABASE_ERROR_MSG)
374
            raise DatabaseException(DATABASE_ERROR_MSG)
375
        except Error:
376
            Logger.error(ERROR_MSG)
377
            raise DatabaseException(ERROR_MSG)
378 f8b23532 David Friesecký
379 45744020 Stanislav Král
        return self.cursor.rowcount > 0
380 58051326 David Friesecký
381
    def set_certificate_revoked(
382 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
383 58051326 David Friesecký
        """
384
        Revoke a certificate
385
386
        :param certificate_id: ID of specific certificate
387
        :param revocation_date: Date, when the certificate is revoked
388
        :param revocation_reason: Reason of the revocation
389
390 8b049f43 David Friesecký
        :return:
391
            the result of whether the revocation was successful OR
392
            sqlite3.Error if an exception is thrown
393 58051326 David Friesecký
        """
394
395 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
396
397 58051326 David Friesecký
        try:
398 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
399
                revocation_reason = REV_REASON_UNSPECIFIED
400
            elif revocation_date == "":
401
                return False
402
403 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
404
                   f"SET {COL_REVOCATION_DATE} = ?, "
405 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
406 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
407
            values = [revocation_date,
408
                      revocation_reason,
409
                      certificate_id]
410
            self.cursor.execute(sql, values)
411
            self.connection.commit()
412 b3c80ccb David Friesecký
        except IntegrityError:
413
            Logger.error(INTEGRITY_ERROR_MSG)
414
            raise DatabaseException(INTEGRITY_ERROR_MSG)
415
        except ProgrammingError:
416
            Logger.error(PROGRAMMING_ERROR_MSG)
417
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
418
        except OperationalError:
419
            Logger.error(OPERATIONAL_ERROR_MSG)
420
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
421
        except NotSupportedError:
422
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
423
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
424
        except DatabaseError:
425
            Logger.error(DATABASE_ERROR_MSG)
426
            raise DatabaseException(DATABASE_ERROR_MSG)
427
        except Error:
428
            Logger.error(ERROR_MSG)
429
            raise DatabaseException(ERROR_MSG)
430 8b049f43 David Friesecký
431 d65b022d David Friesecký
        return self.cursor.rowcount > 0
432 58051326 David Friesecký
433
    def clear_certificate_revocation(self, certificate_id: int):
434
        """
435
        Clear revocation of a certificate
436
437
        :param certificate_id: ID of specific certificate
438
439 8b049f43 David Friesecký
        :return:
440
            the result of whether the clear revocation was successful OR
441
            sqlite3.Error if an exception is thrown
442 58051326 David Friesecký
        """
443
444 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
445
446 58051326 David Friesecký
        try:
447
            sql = (f"UPDATE {TAB_CERTIFICATES} "
448 0e7c3096 David Friesecký
                   f"SET {COL_REVOCATION_DATE} = NULL, "
449
                   f"{COL_REVOCATION_REASON} = NULL "
450 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
451
            values = [certificate_id]
452
            self.cursor.execute(sql, values)
453
            self.connection.commit()
454 b3c80ccb David Friesecký
        except IntegrityError:
455
            Logger.error(INTEGRITY_ERROR_MSG)
456
            raise DatabaseException(INTEGRITY_ERROR_MSG)
457
        except ProgrammingError:
458
            Logger.error(PROGRAMMING_ERROR_MSG)
459
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
460
        except OperationalError:
461
            Logger.error(OPERATIONAL_ERROR_MSG)
462
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
463
        except NotSupportedError:
464
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
465
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
466
        except DatabaseError:
467
            Logger.error(DATABASE_ERROR_MSG)
468
            raise DatabaseException(DATABASE_ERROR_MSG)
469
        except Error:
470
            Logger.error(ERROR_MSG)
471
            raise DatabaseException(ERROR_MSG)
472 f8b23532 David Friesecký
473 45744020 Stanislav Král
        return self.cursor.rowcount > 0
474 58051326 David Friesecký
475
    def get_all_revoked_by(self, certificate_id: int):
476
        """
477
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
478
479
        :param certificate_id: ID of specific certificate
480
481
        :return:
482 8b049f43 David Friesecký
            list of the certificates OR
483
            None if the list is empty OR
484
            sqlite3.Error if an exception is thrown
485 58051326 David Friesecký
        """
486
487 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
488
489 58051326 David Friesecký
        try:
490
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
491
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
492
            values = [certificate_id]
493
            self.cursor.execute(sql, values)
494
            certificate_rows = self.cursor.fetchall()
495
496
            certificates: List[Certificate] = []
497
            for certificate_row in certificate_rows:
498
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
499
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
500
                values = [certificate_row[0]]
501
                self.cursor.execute(sql, values)
502
                usage_rows = self.cursor.fetchall()
503
504
                usage_dict: Dict[int, bool] = {}
505
                for usage_row in usage_rows:
506
                    usage_dict[usage_row[2]] = True
507
508 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
509
                                                certificate_row[1],      # valid from
510
                                                certificate_row[2],      # valid to
511
                                                certificate_row[3],      # pem data
512
                                                certificate_row[14],     # type ID
513
                                                certificate_row[15],     # parent ID
514
                                                certificate_row[16],     # private key ID
515 58051326 David Friesecký
                                                usage_dict,
516 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
517
                                                certificate_row[5],      # country code
518
                                                certificate_row[6],      # locality
519
                                                certificate_row[7],      # province
520
                                                certificate_row[8],      # organization
521
                                                certificate_row[9],      # organizational unit
522
                                                certificate_row[10],     # email address
523
                                                certificate_row[11],     # revocation date
524
                                                certificate_row[12]))    # revocation reason
525 b3c80ccb David Friesecký
        except IntegrityError:
526
            Logger.error(INTEGRITY_ERROR_MSG)
527
            raise DatabaseException(INTEGRITY_ERROR_MSG)
528
        except ProgrammingError:
529
            Logger.error(PROGRAMMING_ERROR_MSG)
530
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
531
        except OperationalError:
532
            Logger.error(OPERATIONAL_ERROR_MSG)
533
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
534
        except NotSupportedError:
535
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
536
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
537
        except DatabaseError:
538
            Logger.error(DATABASE_ERROR_MSG)
539
            raise DatabaseException(DATABASE_ERROR_MSG)
540
        except Error:
541
            Logger.error(ERROR_MSG)
542
            raise DatabaseException(ERROR_MSG)
543 58051326 David Friesecký
544 d65b022d David Friesecký
        return certificates
545 58051326 David Friesecký
546
    def get_all_issued_by(self, certificate_id: int):
547
        """
548
        Get list of the certificates that are direct descendants of the certificate with the ID
549
550
        :param certificate_id: ID of specific certificate
551
552
        :return:
553 8b049f43 David Friesecký
            list of the certificates OR
554
            None if the list is empty OR
555
            sqlite3.Error if an exception is thrown
556 58051326 David Friesecký
        """
557
558 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
559
560 58051326 David Friesecký
        try:
561
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
562 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
563
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
564 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
565 58051326 David Friesecký
            self.cursor.execute(sql, values)
566
            certificate_rows = self.cursor.fetchall()
567
568
            certificates: List[Certificate] = []
569
            for certificate_row in certificate_rows:
570
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
571
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
572
                values = [certificate_row[0]]
573
                self.cursor.execute(sql, values)
574
                usage_rows = self.cursor.fetchall()
575
576
                usage_dict: Dict[int, bool] = {}
577
                for usage_row in usage_rows:
578
                    usage_dict[usage_row[2]] = True
579
580 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
581
                                                certificate_row[1],      # valid from
582
                                                certificate_row[2],      # valid to
583
                                                certificate_row[3],      # pem data
584
                                                certificate_row[14],     # type ID
585
                                                certificate_row[15],     # parent ID
586
                                                certificate_row[16],     # private key ID
587 58051326 David Friesecký
                                                usage_dict,
588 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
589
                                                certificate_row[5],      # country code
590
                                                certificate_row[6],      # locality
591
                                                certificate_row[7],      # province
592
                                                certificate_row[8],      # organization
593
                                                certificate_row[9],      # organizational unit
594
                                                certificate_row[10],     # email address
595
                                                certificate_row[11],     # revocation date
596
                                                certificate_row[12]))    # revocation reason
597 b3c80ccb David Friesecký
        except IntegrityError:
598
            Logger.error(INTEGRITY_ERROR_MSG)
599
            raise DatabaseException(INTEGRITY_ERROR_MSG)
600
        except ProgrammingError:
601
            Logger.error(PROGRAMMING_ERROR_MSG)
602
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
603
        except OperationalError:
604
            Logger.error(OPERATIONAL_ERROR_MSG)
605
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
606
        except NotSupportedError:
607
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
608
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
609
        except DatabaseError:
610
            Logger.error(DATABASE_ERROR_MSG)
611
            raise DatabaseException(DATABASE_ERROR_MSG)
612
        except Error:
613
            Logger.error(ERROR_MSG)
614
            raise DatabaseException(ERROR_MSG)
615 58051326 David Friesecký
616 d65b022d David Friesecký
        return certificates
617 94f6d8b8 Jan Pašek
618 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
619
        """
620
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
621
        between C and its root certificate authority (i.e. is an ancestor of C).
622
        :param certificate_id: target certificate ID
623
        :return: list of all descendants
624
        """
625
626 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
627
628
        try:
629
            def dfs(children_of, this, collection: list):
630
                for child in children_of(this.certificate_id):
631
                    dfs(children_of, child, collection)
632
                collection.append(this)
633
634
            subtree_root = self.read(certificate_id)
635
            if subtree_root is None:
636
                return None
637
638
            all_certs = []
639
            dfs(self.get_all_issued_by, subtree_root, all_certs)
640
        except IntegrityError:
641
            Logger.error(INTEGRITY_ERROR_MSG)
642
            raise DatabaseException(INTEGRITY_ERROR_MSG)
643
        except ProgrammingError:
644
            Logger.error(PROGRAMMING_ERROR_MSG)
645
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
646
        except OperationalError:
647
            Logger.error(OPERATIONAL_ERROR_MSG)
648
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
649
        except NotSupportedError:
650
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
651
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
652
        except DatabaseError:
653
            Logger.error(DATABASE_ERROR_MSG)
654
            raise DatabaseException(DATABASE_ERROR_MSG)
655
        except Error:
656
            Logger.error(ERROR_MSG)
657
            raise DatabaseException(ERROR_MSG)
658 85003184 Captain_Trojan
659
        return all_certs
660
661 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
662
        """
663
        Get identifier of the next certificate that will be inserted into the database
664
        :return: identifier of the next certificate that will be added into the database
665
        """
666 b3c80ccb David Friesecký
667
        Logger.debug("Function launched.")
668
669
        try:
670
            # get next IDs of all tables
671
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
672
            results = self.cursor.fetchall()
673
674
            # search for next ID in Certificates table and return it
675
            for result in results:
676
                if result[0] == TAB_CERTIFICATES:
677
                    return result[1] + 1  # current last id + 1
678
            # if certificates table is not present in the query results, return 1
679
        except IntegrityError:
680
            Logger.error(INTEGRITY_ERROR_MSG)
681
            raise DatabaseException(INTEGRITY_ERROR_MSG)
682
        except ProgrammingError:
683
            Logger.error(PROGRAMMING_ERROR_MSG)
684
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
685
        except OperationalError:
686
            Logger.error(OPERATIONAL_ERROR_MSG)
687
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
688
        except NotSupportedError:
689
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
690
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
691
        except DatabaseError:
692
            Logger.error(DATABASE_ERROR_MSG)
693
            raise DatabaseException(DATABASE_ERROR_MSG)
694
        except Error:
695
            Logger.error(ERROR_MSG)
696
            raise DatabaseException(ERROR_MSG)
697
698 94f6d8b8 Jan Pašek
        return 1