Projekt

Obecné

Profil

Stáhnout (38.6 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 cf247eaa Captain_Trojan
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 1636aefe David Friesecký
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 1d2add74 Jan Pašek
from injector import inject
8 f8b23532 David Friesecký
from src.constants import *
9 1d2add74 Jan Pašek
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 e9e55282 David Friesecký
12 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18 e9e55282 David Friesecký
19
20 f8b23532 David Friesecký
class CertificateRepository:
21 25053504 David Friesecký
22 1d2add74 Jan Pašek
    @inject
23
    def __init__(self, connection: Connection):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
26 a0602bad David Friesecký
27 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
28 a0602bad David Friesecký
        """
29 f8b23532 David Friesecký
        self.connection = connection
30 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
31 e9e55282 David Friesecký
32 805077f5 David Friesecký
    def create(self, certificate: Certificate):
33 a0602bad David Friesecký
        """
34 f8b23532 David Friesecký
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36 a0602bad David Friesecký
37 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
38 a0602bad David Friesecký
39 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
40 a0602bad David Friesecký
        """
41
42 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
43
44 f8b23532 David Friesecký
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46 0e7c3096 David Friesecký
                   f"({COL_VALID_FROM},"
47 f8b23532 David Friesecký
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56 f8b23532 David Friesecký
                   f"{COL_TYPE_ID},"
57 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61 f8b23532 David Friesecký
                      certificate.valid_to,
62
                      certificate.pem_data,
63 0e7c3096 David Friesecký
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70 f8b23532 David Friesecký
                      certificate.type_id,
71 0e7c3096 David Friesecký
                      certificate.parent_id,
72
                      certificate.private_key_id]
73 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75
76
            last_id: int = self.cursor.lastrowid
77
78 f3125948 Stanislav Král
            if certificate.type_id == ROOT_CA_ID:
79 f8b23532 David Friesecký
                certificate.parent_id = last_id
80 093d06df Stanislav Král
                self.update(last_id, certificate)
81 f8b23532 David Friesecký
            else:
82 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
83 f8b23532 David Friesecký
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91 b3c80ccb David Friesecký
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97 9c6005f2 Stanislav Král
        except OperationalError as e:
98 b3c80ccb David Friesecký
            Logger.error(OPERATIONAL_ERROR_MSG)
99 9c6005f2 Stanislav Král
            # TODO will be improved in #8884
100
            Logger.error(str(e))
101 b3c80ccb David Friesecký
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
102
        except NotSupportedError:
103
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
104
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
105
        except DatabaseError:
106
            Logger.error(DATABASE_ERROR_MSG)
107
            raise DatabaseException(DATABASE_ERROR_MSG)
108
        except Error:
109
            Logger.error(ERROR_MSG)
110
            raise DatabaseException(ERROR_MSG)
111 f8b23532 David Friesecký
112 805077f5 David Friesecký
        return last_id
113 f8b23532 David Friesecký
114 805077f5 David Friesecký
    def read(self, certificate_id: int):
115 f8b23532 David Friesecký
        """
116
        Reads (selects) a certificate.
117 e9e55282 David Friesecký
118 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
119 e9e55282 David Friesecký
120 f8b23532 David Friesecký
        :return: instance of the Certificate object
121
        """
122 e9e55282 David Friesecký
123 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
124
125 f8b23532 David Friesecký
        try:
126
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
127 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
128 f8b23532 David Friesecký
            values = [certificate_id]
129
            self.cursor.execute(sql, values)
130 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
131 e9e55282 David Friesecký
132 6f4a5f24 Captain_Trojan
            if certificate_row is None:
133
                return None
134
135 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
136
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
137
            self.cursor.execute(sql, values)
138
            usage_rows = self.cursor.fetchall()
139 1636aefe David Friesecký
140
            usage_dict: Dict[int, bool] = {}
141
            for usage_row in usage_rows:
142 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
143
144 0e7c3096 David Friesecký
            certificate: Certificate = Certificate(certificate_row[0],      # ID
145
                                                   certificate_row[1],      # valid from
146
                                                   certificate_row[2],      # valid to
147
                                                   certificate_row[3],      # pem data
148
                                                   certificate_row[14],     # type ID
149
                                                   certificate_row[15],     # parent ID
150
                                                   certificate_row[16],     # private key ID
151 58051326 David Friesecký
                                                   usage_dict,
152 0e7c3096 David Friesecký
                                                   certificate_row[4],      # common name
153
                                                   certificate_row[5],      # country code
154
                                                   certificate_row[6],      # locality
155
                                                   certificate_row[7],      # province
156
                                                   certificate_row[8],      # organization
157
                                                   certificate_row[9],      # organizational unit
158
                                                   certificate_row[10],     # email address
159
                                                   certificate_row[11],     # revocation date
160
                                                   certificate_row[12])     # revocation reason
161
162 b3c80ccb David Friesecký
        except IntegrityError:
163
            Logger.error(INTEGRITY_ERROR_MSG)
164
            raise DatabaseException(INTEGRITY_ERROR_MSG)
165
        except ProgrammingError:
166
            Logger.error(PROGRAMMING_ERROR_MSG)
167
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
168
        except OperationalError:
169
            Logger.error(OPERATIONAL_ERROR_MSG)
170
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
171
        except NotSupportedError:
172
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
173
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
174
        except DatabaseError:
175
            Logger.error(DATABASE_ERROR_MSG)
176
            raise DatabaseException(DATABASE_ERROR_MSG)
177
        except Error:
178
            Logger.error(ERROR_MSG)
179
            raise DatabaseException(ERROR_MSG)
180 f8b23532 David Friesecký
181 d65b022d David Friesecký
        return certificate
182 f8b23532 David Friesecký
183 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
184 9e22e20c David Friesecký
        """
185
        Reads (selects) all certificates (with type).
186
187
        :param filter_type: ID of certificate type from CertificateTypes table
188
189
        :return: list of certificates
190
        """
191
192 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
193
194 9e22e20c David Friesecký
        try:
195
            sql_extension = ""
196
            values = []
197
            if filter_type is not None:
198 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
199 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
200 9e22e20c David Friesecký
                values = [filter_type]
201
202 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
203
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
204 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
205
            certificate_rows = self.cursor.fetchall()
206
207
            certificates: List[Certificate] = []
208
            for certificate_row in certificate_rows:
209
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
210
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
211
                values = [certificate_row[0]]
212
                self.cursor.execute(sql, values)
213
                usage_rows = self.cursor.fetchall()
214
215
                usage_dict: Dict[int, bool] = {}
216
                for usage_row in usage_rows:
217
                    usage_dict[usage_row[2]] = True
218
219 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
220
                                                certificate_row[1],      # valid from
221
                                                certificate_row[2],      # valid to
222
                                                certificate_row[3],      # pem data
223
                                                certificate_row[14],     # type ID
224
                                                certificate_row[15],     # parent ID
225
                                                certificate_row[16],     # private key ID
226 58051326 David Friesecký
                                                usage_dict,
227 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
228
                                                certificate_row[5],      # country code
229
                                                certificate_row[6],      # locality
230
                                                certificate_row[7],      # province
231
                                                certificate_row[8],      # organization
232
                                                certificate_row[9],      # organizational unit
233
                                                certificate_row[10],     # email address
234
                                                certificate_row[11],     # revocation date
235
                                                certificate_row[12]))    # revocation reason
236 b3c80ccb David Friesecký
        except IntegrityError:
237
            Logger.error(INTEGRITY_ERROR_MSG)
238
            raise DatabaseException(INTEGRITY_ERROR_MSG)
239
        except ProgrammingError:
240
            Logger.error(PROGRAMMING_ERROR_MSG)
241
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
242
        except OperationalError:
243
            Logger.error(OPERATIONAL_ERROR_MSG)
244
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
245
        except NotSupportedError:
246
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
247
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
248
        except DatabaseError:
249
            Logger.error(DATABASE_ERROR_MSG)
250
            raise DatabaseException(DATABASE_ERROR_MSG)
251
        except Error:
252
            Logger.error(ERROR_MSG)
253
            raise DatabaseException(ERROR_MSG)
254 9e22e20c David Friesecký
255 0f3af523 Stanislav Král
        return certificates
256 9e22e20c David Friesecký
257 cf247eaa Captain_Trojan
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
258
                        per_page: int):
259
        """
260
        Reads (selects) all certificates according to a specific filtering and pagination options.
261
        :param target_types: certificate types (filter)
262
        :param target_usages: certificate usages (filter)
263
        :param target_cn_substring: certificate CN substring (filter)
264
        :param page: target page
265
        :param per_page: target page size
266
        :return: list of certificates
267
        """
268
269
        Logger.debug("Function launched.")
270
271
        try:
272
            values = []
273
            values += list(target_types)
274
275
            sql = (
276 bb4a9d1f Captain_Trojan
                f"SELECT * "
277 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
278
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
279
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
280
            )
281 e77c14f9 Michal Sejak
            
282
            if target_usages is not None:
283
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
284
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
285
                values += list(target_usages)
286 cf247eaa Captain_Trojan
287
            if target_cn_substring is not None:
288
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
289
290
                values += [target_cn_substring]
291
292
            if page is not None and per_page is not None:
293
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
294
295
            self.cursor.execute(sql, values)
296
            certificate_rows = self.cursor.fetchall()
297
298
            certificates: List[Certificate] = []
299
            for certificate_row in certificate_rows:
300
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
301
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
302
                types = [certificate_row[0]]
303
                self.cursor.execute(sql, types)
304
                usage_rows = self.cursor.fetchall()
305
306
                usage_dict: Dict[int, bool] = {}
307
                for usage_row in usage_rows:
308
                    usage_dict[usage_row[0]] = True
309
310 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
311
                                                certificate_row[1],  # valid from
312
                                                certificate_row[2],  # valid to
313
                                                certificate_row[3],  # pem data
314
                                                certificate_row[14],  # type ID
315
                                                certificate_row[15],  # parent ID
316
                                                certificate_row[16],  # private key ID
317 cf247eaa Captain_Trojan
                                                usage_dict,
318 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
319
                                                certificate_row[5],  # country code
320
                                                certificate_row[6],  # locality
321
                                                certificate_row[7],  # province
322
                                                certificate_row[8],  # organization
323
                                                certificate_row[9],  # organizational unit
324
                                                certificate_row[10],  # email address
325
                                                certificate_row[11],  # revocation date
326
                                                certificate_row[12]))  # revocation reason
327 cf247eaa Captain_Trojan
        except IntegrityError:
328
            Logger.error(INTEGRITY_ERROR_MSG)
329
            raise DatabaseException(INTEGRITY_ERROR_MSG)
330
        except ProgrammingError:
331
            Logger.error(PROGRAMMING_ERROR_MSG)
332
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
333
        except OperationalError:
334
            Logger.error(OPERATIONAL_ERROR_MSG)
335
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
336
        except NotSupportedError:
337
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
338
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
339
        except DatabaseError:
340
            Logger.error(DATABASE_ERROR_MSG)
341
            raise DatabaseException(DATABASE_ERROR_MSG)
342
        except Error:
343
            Logger.error(ERROR_MSG)
344
            raise DatabaseException(ERROR_MSG)
345
346
        return certificates
347
348 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
349 a0602bad David Friesecký
        """
350 f8b23532 David Friesecký
        Updates a certificate.
351
        If the parameter of certificate (Certificate object) is not to be changed,
352
        the same value must be specified.
353 a0602bad David Friesecký
354
        :param certificate_id: ID of specific certificate
355 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
356 a0602bad David Friesecký
357
        :return: the result of whether the updation was successful
358
        """
359
360 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
361
362 f8b23532 David Friesecký
        try:
363
            sql = (f"UPDATE {TAB_CERTIFICATES} "
364 0e7c3096 David Friesecký
                   f"SET {COL_VALID_FROM} = ?, "
365 f8b23532 David Friesecký
                   f"{COL_VALID_TO} = ?, "
366
                   f"{COL_PEM_DATA} = ?, "
367 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME} = ?, "
368
                   f"{COL_COUNTRY_CODE} = ?, "
369
                   f"{COL_LOCALITY} = ?, "
370
                   f"{COL_PROVINCE} = ?, "
371
                   f"{COL_ORGANIZATION} = ?, "
372
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
373
                   f"{COL_EMAIL_ADDRESS} = ?, "
374 f8b23532 David Friesecký
                   f"{COL_TYPE_ID} = ?, "
375 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID} = ?, "
376
                   f"{COL_PRIVATE_KEY_ID} = ? "
377 f8b23532 David Friesecký
                   f"WHERE {COL_ID} = ?")
378 0e7c3096 David Friesecký
            values = [certificate.valid_from,
379 f8b23532 David Friesecký
                      certificate.valid_to,
380
                      certificate.pem_data,
381 0e7c3096 David Friesecký
                      certificate.common_name,
382
                      certificate.country_code,
383
                      certificate.locality,
384
                      certificate.province,
385
                      certificate.organization,
386
                      certificate.organizational_unit,
387
                      certificate.email_address,
388 f8b23532 David Friesecký
                      certificate.type_id,
389 805077f5 David Friesecký
                      certificate.parent_id,
390 0e7c3096 David Friesecký
                      certificate.private_key_id,
391 805077f5 David Friesecký
                      certificate_id]
392 6425fa36 David Friesecký
393 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
394
            self.connection.commit()
395
396
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
397
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
398
            values = [certificate_id]
399
            self.cursor.execute(sql, values)
400
            self.connection.commit()
401
402 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
403
404 f3125948 Stanislav Král
            # iterate over usage pairs
405
            for usage_id, usage_value in certificate.usages.items():
406 f8b23532 David Friesecký
                if usage_value:
407
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
408
                           f"({COL_CERTIFICATE_ID},"
409
                           f"{COL_USAGE_TYPE_ID}) "
410
                           f"VALUES (?,?)")
411
                    values = [certificate_id, usage_id]
412
                    self.cursor.execute(sql, values)
413
                    self.connection.commit()
414 b3c80ccb David Friesecký
        except IntegrityError:
415
            Logger.error(INTEGRITY_ERROR_MSG)
416
            raise DatabaseException(INTEGRITY_ERROR_MSG)
417
        except ProgrammingError:
418
            Logger.error(PROGRAMMING_ERROR_MSG)
419
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
420
        except OperationalError:
421
            Logger.error(OPERATIONAL_ERROR_MSG)
422
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
423
        except NotSupportedError:
424
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
425
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
426
        except DatabaseError:
427
            Logger.error(DATABASE_ERROR_MSG)
428
            raise DatabaseException(DATABASE_ERROR_MSG)
429
        except Error:
430
            Logger.error(ERROR_MSG)
431
            raise DatabaseException(ERROR_MSG)
432 f8b23532 David Friesecký
433 c36d3299 David Friesecký
        return check_updated
434 e9e55282 David Friesecký
435 58051326 David Friesecký
    def delete(self, certificate_id: int):
436 a0602bad David Friesecký
        """
437
        Deletes a certificate
438
439
        :param certificate_id: ID of specific certificate
440
441
        :return: the result of whether the deletion was successful
442
        """
443
444 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
445
446 f8b23532 David Friesecký
        try:
447 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
448
                   f"SET {COL_DELETION_DATE} = ? "
449
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
450
            values = [int(time.time()),
451
                      certificate_id]
452 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
453
            self.connection.commit()
454 b3c80ccb David Friesecký
        except IntegrityError:
455
            Logger.error(INTEGRITY_ERROR_MSG)
456
            raise DatabaseException(INTEGRITY_ERROR_MSG)
457
        except ProgrammingError:
458
            Logger.error(PROGRAMMING_ERROR_MSG)
459
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
460
        except OperationalError:
461
            Logger.error(OPERATIONAL_ERROR_MSG)
462
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
463
        except NotSupportedError:
464
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
465
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
466
        except DatabaseError:
467
            Logger.error(DATABASE_ERROR_MSG)
468
            raise DatabaseException(DATABASE_ERROR_MSG)
469
        except Error:
470
            Logger.error(ERROR_MSG)
471
            raise DatabaseException(ERROR_MSG)
472 f8b23532 David Friesecký
473 45744020 Stanislav Král
        return self.cursor.rowcount > 0
474 58051326 David Friesecký
475
    def set_certificate_revoked(
476 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
477 58051326 David Friesecký
        """
478
        Revoke a certificate
479
480
        :param certificate_id: ID of specific certificate
481
        :param revocation_date: Date, when the certificate is revoked
482
        :param revocation_reason: Reason of the revocation
483
484 8b049f43 David Friesecký
        :return:
485
            the result of whether the revocation was successful OR
486
            sqlite3.Error if an exception is thrown
487 58051326 David Friesecký
        """
488
489 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
490
491 58051326 David Friesecký
        try:
492 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
493
                revocation_reason = REV_REASON_UNSPECIFIED
494
            elif revocation_date == "":
495
                return False
496
497 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
498
                   f"SET {COL_REVOCATION_DATE} = ?, "
499 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
500 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
501
            values = [revocation_date,
502
                      revocation_reason,
503
                      certificate_id]
504
            self.cursor.execute(sql, values)
505
            self.connection.commit()
506 b3c80ccb David Friesecký
        except IntegrityError:
507
            Logger.error(INTEGRITY_ERROR_MSG)
508
            raise DatabaseException(INTEGRITY_ERROR_MSG)
509
        except ProgrammingError:
510
            Logger.error(PROGRAMMING_ERROR_MSG)
511
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
512
        except OperationalError:
513
            Logger.error(OPERATIONAL_ERROR_MSG)
514
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
515
        except NotSupportedError:
516
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
517
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
518
        except DatabaseError:
519
            Logger.error(DATABASE_ERROR_MSG)
520
            raise DatabaseException(DATABASE_ERROR_MSG)
521
        except Error:
522
            Logger.error(ERROR_MSG)
523
            raise DatabaseException(ERROR_MSG)
524 8b049f43 David Friesecký
525 d65b022d David Friesecký
        return self.cursor.rowcount > 0
526 58051326 David Friesecký
527
    def clear_certificate_revocation(self, certificate_id: int):
528
        """
529
        Clear revocation of a certificate
530
531
        :param certificate_id: ID of specific certificate
532
533 8b049f43 David Friesecký
        :return:
534
            the result of whether the clear revocation was successful OR
535
            sqlite3.Error if an exception is thrown
536 58051326 David Friesecký
        """
537
538 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
539
540 58051326 David Friesecký
        try:
541
            sql = (f"UPDATE {TAB_CERTIFICATES} "
542 0e7c3096 David Friesecký
                   f"SET {COL_REVOCATION_DATE} = NULL, "
543
                   f"{COL_REVOCATION_REASON} = NULL "
544 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
545
            values = [certificate_id]
546
            self.cursor.execute(sql, values)
547
            self.connection.commit()
548 b3c80ccb David Friesecký
        except IntegrityError:
549
            Logger.error(INTEGRITY_ERROR_MSG)
550
            raise DatabaseException(INTEGRITY_ERROR_MSG)
551
        except ProgrammingError:
552
            Logger.error(PROGRAMMING_ERROR_MSG)
553
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
554
        except OperationalError:
555
            Logger.error(OPERATIONAL_ERROR_MSG)
556
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
557
        except NotSupportedError:
558
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
559
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
560
        except DatabaseError:
561
            Logger.error(DATABASE_ERROR_MSG)
562
            raise DatabaseException(DATABASE_ERROR_MSG)
563
        except Error:
564
            Logger.error(ERROR_MSG)
565
            raise DatabaseException(ERROR_MSG)
566 f8b23532 David Friesecký
567 45744020 Stanislav Král
        return self.cursor.rowcount > 0
568 58051326 David Friesecký
569
    def get_all_revoked_by(self, certificate_id: int):
570
        """
571
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
572
573
        :param certificate_id: ID of specific certificate
574
575
        :return:
576 8b049f43 David Friesecký
            list of the certificates OR
577
            None if the list is empty OR
578
            sqlite3.Error if an exception is thrown
579 58051326 David Friesecký
        """
580
581 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
582
583 58051326 David Friesecký
        try:
584
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
585
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
586
            values = [certificate_id]
587
            self.cursor.execute(sql, values)
588
            certificate_rows = self.cursor.fetchall()
589
590
            certificates: List[Certificate] = []
591
            for certificate_row in certificate_rows:
592
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
593
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
594
                values = [certificate_row[0]]
595
                self.cursor.execute(sql, values)
596
                usage_rows = self.cursor.fetchall()
597
598
                usage_dict: Dict[int, bool] = {}
599
                for usage_row in usage_rows:
600
                    usage_dict[usage_row[2]] = True
601
602 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
603
                                                certificate_row[1],      # valid from
604
                                                certificate_row[2],      # valid to
605
                                                certificate_row[3],      # pem data
606
                                                certificate_row[14],     # type ID
607
                                                certificate_row[15],     # parent ID
608
                                                certificate_row[16],     # private key ID
609 58051326 David Friesecký
                                                usage_dict,
610 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
611
                                                certificate_row[5],      # country code
612
                                                certificate_row[6],      # locality
613
                                                certificate_row[7],      # province
614
                                                certificate_row[8],      # organization
615
                                                certificate_row[9],      # organizational unit
616
                                                certificate_row[10],     # email address
617
                                                certificate_row[11],     # revocation date
618
                                                certificate_row[12]))    # revocation reason
619 b3c80ccb David Friesecký
        except IntegrityError:
620
            Logger.error(INTEGRITY_ERROR_MSG)
621
            raise DatabaseException(INTEGRITY_ERROR_MSG)
622
        except ProgrammingError:
623
            Logger.error(PROGRAMMING_ERROR_MSG)
624
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
625
        except OperationalError:
626
            Logger.error(OPERATIONAL_ERROR_MSG)
627
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
628
        except NotSupportedError:
629
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
630
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
631
        except DatabaseError:
632
            Logger.error(DATABASE_ERROR_MSG)
633
            raise DatabaseException(DATABASE_ERROR_MSG)
634
        except Error:
635
            Logger.error(ERROR_MSG)
636
            raise DatabaseException(ERROR_MSG)
637 58051326 David Friesecký
638 d65b022d David Friesecký
        return certificates
639 58051326 David Friesecký
640
    def get_all_issued_by(self, certificate_id: int):
641
        """
642
        Get list of the certificates that are direct descendants of the certificate with the ID
643
644
        :param certificate_id: ID of specific certificate
645
646
        :return:
647 8b049f43 David Friesecký
            list of the certificates OR
648
            None if the list is empty OR
649
            sqlite3.Error if an exception is thrown
650 58051326 David Friesecký
        """
651
652 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
653
654 58051326 David Friesecký
        try:
655
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
656 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
657
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
658 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
659 58051326 David Friesecký
            self.cursor.execute(sql, values)
660
            certificate_rows = self.cursor.fetchall()
661
662
            certificates: List[Certificate] = []
663
            for certificate_row in certificate_rows:
664
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
665
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
666
                values = [certificate_row[0]]
667
                self.cursor.execute(sql, values)
668
                usage_rows = self.cursor.fetchall()
669
670
                usage_dict: Dict[int, bool] = {}
671
                for usage_row in usage_rows:
672
                    usage_dict[usage_row[2]] = True
673
674 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
675
                                                certificate_row[1],      # valid from
676
                                                certificate_row[2],      # valid to
677
                                                certificate_row[3],      # pem data
678
                                                certificate_row[14],     # type ID
679
                                                certificate_row[15],     # parent ID
680
                                                certificate_row[16],     # private key ID
681 58051326 David Friesecký
                                                usage_dict,
682 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
683
                                                certificate_row[5],      # country code
684
                                                certificate_row[6],      # locality
685
                                                certificate_row[7],      # province
686
                                                certificate_row[8],      # organization
687
                                                certificate_row[9],      # organizational unit
688
                                                certificate_row[10],     # email address
689
                                                certificate_row[11],     # revocation date
690
                                                certificate_row[12]))    # revocation reason
691 b3c80ccb David Friesecký
        except IntegrityError:
692
            Logger.error(INTEGRITY_ERROR_MSG)
693
            raise DatabaseException(INTEGRITY_ERROR_MSG)
694
        except ProgrammingError:
695
            Logger.error(PROGRAMMING_ERROR_MSG)
696
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
697
        except OperationalError:
698
            Logger.error(OPERATIONAL_ERROR_MSG)
699
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
700
        except NotSupportedError:
701
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
702
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
703
        except DatabaseError:
704
            Logger.error(DATABASE_ERROR_MSG)
705
            raise DatabaseException(DATABASE_ERROR_MSG)
706
        except Error:
707
            Logger.error(ERROR_MSG)
708
            raise DatabaseException(ERROR_MSG)
709 58051326 David Friesecký
710 d65b022d David Friesecký
        return certificates
711 94f6d8b8 Jan Pašek
712 cf247eaa Captain_Trojan
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
713
        """
714
        Get list of the certificates that are direct descendants of the certificate with the ID
715
716
        :param target_types: certificate types (filter)
717
        :param target_usages: certificate usages (filter)
718
        :param target_cn_substring: certificate CN substring (filter)
719
        :param page: target page
720
        :param per_page: target page size
721
        :param issuer_id: ID of specific certificate
722
723
        :return:
724
            list of the certificates OR
725
            None if the list is empty
726
        """
727
728
        Logger.debug("Function launched.")
729
730
        try:
731
            values = [issuer_id, issuer_id]
732
            values += list(target_types)
733
734
            sql = (
735 bb4a9d1f Captain_Trojan
                f"SELECT * "
736 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
737
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
738
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
739
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
740 e77c14f9 Michal Sejak
                
741 cf247eaa Captain_Trojan
            )
742 e77c14f9 Michal Sejak
            
743
            if target_usages is not None:
744
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
745
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
746
                values += list(target_usages)
747 cf247eaa Captain_Trojan
748
            if target_cn_substring is not None:
749
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
750
                values += [target_cn_substring]
751
752
            if page is not None and per_page is not None:
753
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
754
755
            self.cursor.execute(sql, values)
756
            certificate_rows = self.cursor.fetchall()
757
758
            certificates: List[Certificate] = []
759
            for certificate_row in certificate_rows:
760
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
761
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
762
                values = [certificate_row[0]]
763
                self.cursor.execute(sql, values)
764
                usage_rows = self.cursor.fetchall()
765
766
                usage_dict: Dict[int, bool] = {}
767
                for usage_row in usage_rows:
768
                    usage_dict[usage_row[2]] = True
769
770 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
771
                                                certificate_row[1],  # valid from
772
                                                certificate_row[2],  # valid to
773
                                                certificate_row[3],  # pem data
774
                                                certificate_row[14],  # type ID
775
                                                certificate_row[15],  # parent ID
776
                                                certificate_row[16],  # private key ID
777 cf247eaa Captain_Trojan
                                                usage_dict,
778 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
779
                                                certificate_row[5],  # country code
780
                                                certificate_row[6],  # locality
781
                                                certificate_row[7],  # province
782
                                                certificate_row[8],  # organization
783
                                                certificate_row[9],  # organizational unit
784
                                                certificate_row[10],  # email address
785
                                                certificate_row[11],  # revocation date
786
                                                certificate_row[12]))  # revocation reason
787 b3c80ccb David Friesecký
        except IntegrityError:
788
            Logger.error(INTEGRITY_ERROR_MSG)
789
            raise DatabaseException(INTEGRITY_ERROR_MSG)
790
        except ProgrammingError:
791
            Logger.error(PROGRAMMING_ERROR_MSG)
792
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
793
        except OperationalError:
794
            Logger.error(OPERATIONAL_ERROR_MSG)
795
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
796
        except NotSupportedError:
797
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
798
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
799
        except DatabaseError:
800
            Logger.error(DATABASE_ERROR_MSG)
801
            raise DatabaseException(DATABASE_ERROR_MSG)
802
        except Error:
803
            Logger.error(ERROR_MSG)
804
            raise DatabaseException(ERROR_MSG)
805 58051326 David Friesecký
806 d65b022d David Friesecký
        return certificates
807 94f6d8b8 Jan Pašek
808 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
809
        """
810
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
811
        between C and its root certificate authority (i.e. is an ancestor of C).
812
        :param certificate_id: target certificate ID
813
        :return: list of all descendants
814
        """
815
816 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
817
818
        try:
819
            def dfs(children_of, this, collection: list):
820
                for child in children_of(this.certificate_id):
821
                    dfs(children_of, child, collection)
822
                collection.append(this)
823
824
            subtree_root = self.read(certificate_id)
825
            if subtree_root is None:
826
                return None
827
828
            all_certs = []
829
            dfs(self.get_all_issued_by, subtree_root, all_certs)
830
        except IntegrityError:
831
            Logger.error(INTEGRITY_ERROR_MSG)
832
            raise DatabaseException(INTEGRITY_ERROR_MSG)
833
        except ProgrammingError:
834
            Logger.error(PROGRAMMING_ERROR_MSG)
835
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
836
        except OperationalError:
837
            Logger.error(OPERATIONAL_ERROR_MSG)
838
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
839
        except NotSupportedError:
840
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
841
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
842
        except DatabaseError:
843
            Logger.error(DATABASE_ERROR_MSG)
844
            raise DatabaseException(DATABASE_ERROR_MSG)
845
        except Error:
846
            Logger.error(ERROR_MSG)
847
            raise DatabaseException(ERROR_MSG)
848 85003184 Captain_Trojan
849
        return all_certs
850
851 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
852
        """
853
        Get identifier of the next certificate that will be inserted into the database
854
        :return: identifier of the next certificate that will be added into the database
855
        """
856 b3c80ccb David Friesecký
857
        Logger.debug("Function launched.")
858
859
        try:
860
            # get next IDs of all tables
861
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
862
            results = self.cursor.fetchall()
863
864
            # search for next ID in Certificates table and return it
865
            for result in results:
866
                if result[0] == TAB_CERTIFICATES:
867
                    return result[1] + 1  # current last id + 1
868
            # if certificates table is not present in the query results, return 1
869
        except IntegrityError:
870
            Logger.error(INTEGRITY_ERROR_MSG)
871
            raise DatabaseException(INTEGRITY_ERROR_MSG)
872
        except ProgrammingError:
873
            Logger.error(PROGRAMMING_ERROR_MSG)
874
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
875
        except OperationalError:
876
            Logger.error(OPERATIONAL_ERROR_MSG)
877
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
878
        except NotSupportedError:
879
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
880
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
881
        except DatabaseError:
882
            Logger.error(DATABASE_ERROR_MSG)
883
            raise DatabaseException(DATABASE_ERROR_MSG)
884
        except Error:
885
            Logger.error(ERROR_MSG)
886
            raise DatabaseException(ERROR_MSG)
887
888 94f6d8b8 Jan Pašek
        return 1