Projekt

Obecné

Profil

Stáhnout (38.5 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 cf247eaa Captain_Trojan
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 1636aefe David Friesecký
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 1d2add74 Jan Pašek
from injector import inject
8 f8b23532 David Friesecký
from src.constants import *
9 1d2add74 Jan Pašek
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 e9e55282 David Friesecký
12 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18 e9e55282 David Friesecký
19
20 f8b23532 David Friesecký
class CertificateRepository:
21 25053504 David Friesecký
22 1d2add74 Jan Pašek
    @inject
23
    def __init__(self, connection: Connection):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
26 a0602bad David Friesecký
27 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
28 a0602bad David Friesecký
        """
29 f8b23532 David Friesecký
        self.connection = connection
30 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
31 e9e55282 David Friesecký
32 805077f5 David Friesecký
    def create(self, certificate: Certificate):
33 a0602bad David Friesecký
        """
34 f8b23532 David Friesecký
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36 a0602bad David Friesecký
37 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
38 a0602bad David Friesecký
39 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
40 a0602bad David Friesecký
        """
41
42 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
43
44 f8b23532 David Friesecký
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46 0e7c3096 David Friesecký
                   f"({COL_VALID_FROM},"
47 f8b23532 David Friesecký
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56 f8b23532 David Friesecký
                   f"{COL_TYPE_ID},"
57 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61 f8b23532 David Friesecký
                      certificate.valid_to,
62
                      certificate.pem_data,
63 0e7c3096 David Friesecký
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70 f8b23532 David Friesecký
                      certificate.type_id,
71 0e7c3096 David Friesecký
                      certificate.parent_id,
72
                      certificate.private_key_id]
73 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75
76
            last_id: int = self.cursor.lastrowid
77
78 f3125948 Stanislav Král
            if certificate.type_id == ROOT_CA_ID:
79 f8b23532 David Friesecký
                certificate.parent_id = last_id
80 093d06df Stanislav Král
                self.update(last_id, certificate)
81 f8b23532 David Friesecký
            else:
82 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
83 f8b23532 David Friesecký
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91 b3c80ccb David Friesecký
        except IntegrityError:
92
            Logger.error(INTEGRITY_ERROR_MSG)
93
            raise DatabaseException(INTEGRITY_ERROR_MSG)
94
        except ProgrammingError:
95
            Logger.error(PROGRAMMING_ERROR_MSG)
96
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
97
        except OperationalError:
98
            Logger.error(OPERATIONAL_ERROR_MSG)
99
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
100
        except NotSupportedError:
101
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
102
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
103
        except DatabaseError:
104
            Logger.error(DATABASE_ERROR_MSG)
105
            raise DatabaseException(DATABASE_ERROR_MSG)
106
        except Error:
107
            Logger.error(ERROR_MSG)
108
            raise DatabaseException(ERROR_MSG)
109 f8b23532 David Friesecký
110 805077f5 David Friesecký
        return last_id
111 f8b23532 David Friesecký
112 805077f5 David Friesecký
    def read(self, certificate_id: int):
113 f8b23532 David Friesecký
        """
114
        Reads (selects) a certificate.
115 e9e55282 David Friesecký
116 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
117 e9e55282 David Friesecký
118 f8b23532 David Friesecký
        :return: instance of the Certificate object
119
        """
120 e9e55282 David Friesecký
121 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
122
123 f8b23532 David Friesecký
        try:
124
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
125 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
126 f8b23532 David Friesecký
            values = [certificate_id]
127
            self.cursor.execute(sql, values)
128 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
129 e9e55282 David Friesecký
130 6f4a5f24 Captain_Trojan
            if certificate_row is None:
131
                return None
132
133 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
134
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
135
            self.cursor.execute(sql, values)
136
            usage_rows = self.cursor.fetchall()
137 1636aefe David Friesecký
138
            usage_dict: Dict[int, bool] = {}
139
            for usage_row in usage_rows:
140 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
141
142 0e7c3096 David Friesecký
            certificate: Certificate = Certificate(certificate_row[0],      # ID
143
                                                   certificate_row[1],      # valid from
144
                                                   certificate_row[2],      # valid to
145
                                                   certificate_row[3],      # pem data
146
                                                   certificate_row[14],     # type ID
147
                                                   certificate_row[15],     # parent ID
148
                                                   certificate_row[16],     # private key ID
149 58051326 David Friesecký
                                                   usage_dict,
150 0e7c3096 David Friesecký
                                                   certificate_row[4],      # common name
151
                                                   certificate_row[5],      # country code
152
                                                   certificate_row[6],      # locality
153
                                                   certificate_row[7],      # province
154
                                                   certificate_row[8],      # organization
155
                                                   certificate_row[9],      # organizational unit
156
                                                   certificate_row[10],     # email address
157
                                                   certificate_row[11],     # revocation date
158
                                                   certificate_row[12])     # revocation reason
159
160 b3c80ccb David Friesecký
        except IntegrityError:
161
            Logger.error(INTEGRITY_ERROR_MSG)
162
            raise DatabaseException(INTEGRITY_ERROR_MSG)
163
        except ProgrammingError:
164
            Logger.error(PROGRAMMING_ERROR_MSG)
165
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
166
        except OperationalError:
167
            Logger.error(OPERATIONAL_ERROR_MSG)
168
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
169
        except NotSupportedError:
170
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
171
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
172
        except DatabaseError:
173
            Logger.error(DATABASE_ERROR_MSG)
174
            raise DatabaseException(DATABASE_ERROR_MSG)
175
        except Error:
176
            Logger.error(ERROR_MSG)
177
            raise DatabaseException(ERROR_MSG)
178 f8b23532 David Friesecký
179 d65b022d David Friesecký
        return certificate
180 f8b23532 David Friesecký
181 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
182 9e22e20c David Friesecký
        """
183
        Reads (selects) all certificates (with type).
184
185
        :param filter_type: ID of certificate type from CertificateTypes table
186
187
        :return: list of certificates
188
        """
189
190 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
191
192 9e22e20c David Friesecký
        try:
193
            sql_extension = ""
194
            values = []
195
            if filter_type is not None:
196 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
197 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
198 9e22e20c David Friesecký
                values = [filter_type]
199
200 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
201
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
202 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
203
            certificate_rows = self.cursor.fetchall()
204
205
            certificates: List[Certificate] = []
206
            for certificate_row in certificate_rows:
207
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
208
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
209
                values = [certificate_row[0]]
210
                self.cursor.execute(sql, values)
211
                usage_rows = self.cursor.fetchall()
212
213
                usage_dict: Dict[int, bool] = {}
214
                for usage_row in usage_rows:
215
                    usage_dict[usage_row[2]] = True
216
217 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
218
                                                certificate_row[1],      # valid from
219
                                                certificate_row[2],      # valid to
220
                                                certificate_row[3],      # pem data
221
                                                certificate_row[14],     # type ID
222
                                                certificate_row[15],     # parent ID
223
                                                certificate_row[16],     # private key ID
224 58051326 David Friesecký
                                                usage_dict,
225 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
226
                                                certificate_row[5],      # country code
227
                                                certificate_row[6],      # locality
228
                                                certificate_row[7],      # province
229
                                                certificate_row[8],      # organization
230
                                                certificate_row[9],      # organizational unit
231
                                                certificate_row[10],     # email address
232
                                                certificate_row[11],     # revocation date
233
                                                certificate_row[12]))    # revocation reason
234 b3c80ccb David Friesecký
        except IntegrityError:
235
            Logger.error(INTEGRITY_ERROR_MSG)
236
            raise DatabaseException(INTEGRITY_ERROR_MSG)
237
        except ProgrammingError:
238
            Logger.error(PROGRAMMING_ERROR_MSG)
239
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
240
        except OperationalError:
241
            Logger.error(OPERATIONAL_ERROR_MSG)
242
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
243
        except NotSupportedError:
244
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
245
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
246
        except DatabaseError:
247
            Logger.error(DATABASE_ERROR_MSG)
248
            raise DatabaseException(DATABASE_ERROR_MSG)
249
        except Error:
250
            Logger.error(ERROR_MSG)
251
            raise DatabaseException(ERROR_MSG)
252 9e22e20c David Friesecký
253 0f3af523 Stanislav Král
        return certificates
254 9e22e20c David Friesecký
255 cf247eaa Captain_Trojan
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
256
                        per_page: int):
257
        """
258
        Reads (selects) all certificates according to a specific filtering and pagination options.
259
        :param target_types: certificate types (filter)
260
        :param target_usages: certificate usages (filter)
261
        :param target_cn_substring: certificate CN substring (filter)
262
        :param page: target page
263
        :param per_page: target page size
264
        :return: list of certificates
265
        """
266
267
        Logger.debug("Function launched.")
268
269
        try:
270
            values = []
271
            values += list(target_types)
272
            values += list(target_usages)
273
274
            sql = (
275 bb4a9d1f Captain_Trojan
                f"SELECT * "
276 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
277
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
278
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
279
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
280
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
281
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
282
                f") > 0 "
283
            )
284
285
            if target_cn_substring is not None:
286
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
287
288
                values += [target_cn_substring]
289
290
            if page is not None and per_page is not None:
291
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
292
293
            self.cursor.execute(sql, values)
294
            certificate_rows = self.cursor.fetchall()
295
296
            certificates: List[Certificate] = []
297
            for certificate_row in certificate_rows:
298
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
299
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
300
                types = [certificate_row[0]]
301
                self.cursor.execute(sql, types)
302
                usage_rows = self.cursor.fetchall()
303
304
                usage_dict: Dict[int, bool] = {}
305
                for usage_row in usage_rows:
306
                    usage_dict[usage_row[0]] = True
307
308 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
309
                                                certificate_row[1],  # valid from
310
                                                certificate_row[2],  # valid to
311
                                                certificate_row[3],  # pem data
312
                                                certificate_row[14],  # type ID
313
                                                certificate_row[15],  # parent ID
314
                                                certificate_row[16],  # private key ID
315 cf247eaa Captain_Trojan
                                                usage_dict,
316 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
317
                                                certificate_row[5],  # country code
318
                                                certificate_row[6],  # locality
319
                                                certificate_row[7],  # province
320
                                                certificate_row[8],  # organization
321
                                                certificate_row[9],  # organizational unit
322
                                                certificate_row[10],  # email address
323
                                                certificate_row[11],  # revocation date
324
                                                certificate_row[12]))  # revocation reason
325 cf247eaa Captain_Trojan
        except IntegrityError:
326
            Logger.error(INTEGRITY_ERROR_MSG)
327
            raise DatabaseException(INTEGRITY_ERROR_MSG)
328
        except ProgrammingError:
329
            Logger.error(PROGRAMMING_ERROR_MSG)
330
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
331
        except OperationalError:
332
            Logger.error(OPERATIONAL_ERROR_MSG)
333
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
334
        except NotSupportedError:
335
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
336
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
337
        except DatabaseError:
338
            Logger.error(DATABASE_ERROR_MSG)
339
            raise DatabaseException(DATABASE_ERROR_MSG)
340
        except Error:
341
            Logger.error(ERROR_MSG)
342
            raise DatabaseException(ERROR_MSG)
343
344
        return certificates
345
346 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
347 a0602bad David Friesecký
        """
348 f8b23532 David Friesecký
        Updates a certificate.
349
        If the parameter of certificate (Certificate object) is not to be changed,
350
        the same value must be specified.
351 a0602bad David Friesecký
352
        :param certificate_id: ID of specific certificate
353 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
354 a0602bad David Friesecký
355
        :return: the result of whether the updation was successful
356
        """
357
358 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
359
360 f8b23532 David Friesecký
        try:
361
            sql = (f"UPDATE {TAB_CERTIFICATES} "
362 0e7c3096 David Friesecký
                   f"SET {COL_VALID_FROM} = ?, "
363 f8b23532 David Friesecký
                   f"{COL_VALID_TO} = ?, "
364
                   f"{COL_PEM_DATA} = ?, "
365 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME} = ?, "
366
                   f"{COL_COUNTRY_CODE} = ?, "
367
                   f"{COL_LOCALITY} = ?, "
368
                   f"{COL_PROVINCE} = ?, "
369
                   f"{COL_ORGANIZATION} = ?, "
370
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
371
                   f"{COL_EMAIL_ADDRESS} = ?, "
372 f8b23532 David Friesecký
                   f"{COL_TYPE_ID} = ?, "
373 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID} = ?, "
374
                   f"{COL_PRIVATE_KEY_ID} = ? "
375 f8b23532 David Friesecký
                   f"WHERE {COL_ID} = ?")
376 0e7c3096 David Friesecký
            values = [certificate.valid_from,
377 f8b23532 David Friesecký
                      certificate.valid_to,
378
                      certificate.pem_data,
379 0e7c3096 David Friesecký
                      certificate.common_name,
380
                      certificate.country_code,
381
                      certificate.locality,
382
                      certificate.province,
383
                      certificate.organization,
384
                      certificate.organizational_unit,
385
                      certificate.email_address,
386 f8b23532 David Friesecký
                      certificate.type_id,
387 805077f5 David Friesecký
                      certificate.parent_id,
388 0e7c3096 David Friesecký
                      certificate.private_key_id,
389 805077f5 David Friesecký
                      certificate_id]
390 6425fa36 David Friesecký
391 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
392
            self.connection.commit()
393
394
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
395
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
396
            values = [certificate_id]
397
            self.cursor.execute(sql, values)
398
            self.connection.commit()
399
400 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
401
402 f3125948 Stanislav Král
            # iterate over usage pairs
403
            for usage_id, usage_value in certificate.usages.items():
404 f8b23532 David Friesecký
                if usage_value:
405
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
406
                           f"({COL_CERTIFICATE_ID},"
407
                           f"{COL_USAGE_TYPE_ID}) "
408
                           f"VALUES (?,?)")
409
                    values = [certificate_id, usage_id]
410
                    self.cursor.execute(sql, values)
411
                    self.connection.commit()
412 b3c80ccb David Friesecký
        except IntegrityError:
413
            Logger.error(INTEGRITY_ERROR_MSG)
414
            raise DatabaseException(INTEGRITY_ERROR_MSG)
415
        except ProgrammingError:
416
            Logger.error(PROGRAMMING_ERROR_MSG)
417
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
418
        except OperationalError:
419
            Logger.error(OPERATIONAL_ERROR_MSG)
420
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
421
        except NotSupportedError:
422
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
423
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
424
        except DatabaseError:
425
            Logger.error(DATABASE_ERROR_MSG)
426
            raise DatabaseException(DATABASE_ERROR_MSG)
427
        except Error:
428
            Logger.error(ERROR_MSG)
429
            raise DatabaseException(ERROR_MSG)
430 f8b23532 David Friesecký
431 c36d3299 David Friesecký
        return check_updated
432 e9e55282 David Friesecký
433 58051326 David Friesecký
    def delete(self, certificate_id: int):
434 a0602bad David Friesecký
        """
435
        Deletes a certificate
436
437
        :param certificate_id: ID of specific certificate
438
439
        :return: the result of whether the deletion was successful
440
        """
441
442 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
443
444 f8b23532 David Friesecký
        try:
445 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
446
                   f"SET {COL_DELETION_DATE} = ? "
447
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
448
            values = [int(time.time()),
449
                      certificate_id]
450 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
451
            self.connection.commit()
452 b3c80ccb David Friesecký
        except IntegrityError:
453
            Logger.error(INTEGRITY_ERROR_MSG)
454
            raise DatabaseException(INTEGRITY_ERROR_MSG)
455
        except ProgrammingError:
456
            Logger.error(PROGRAMMING_ERROR_MSG)
457
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
458
        except OperationalError:
459
            Logger.error(OPERATIONAL_ERROR_MSG)
460
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
461
        except NotSupportedError:
462
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
463
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
464
        except DatabaseError:
465
            Logger.error(DATABASE_ERROR_MSG)
466
            raise DatabaseException(DATABASE_ERROR_MSG)
467
        except Error:
468
            Logger.error(ERROR_MSG)
469
            raise DatabaseException(ERROR_MSG)
470 f8b23532 David Friesecký
471 45744020 Stanislav Král
        return self.cursor.rowcount > 0
472 58051326 David Friesecký
473
    def set_certificate_revoked(
474 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
475 58051326 David Friesecký
        """
476
        Revoke a certificate
477
478
        :param certificate_id: ID of specific certificate
479
        :param revocation_date: Date, when the certificate is revoked
480
        :param revocation_reason: Reason of the revocation
481
482 8b049f43 David Friesecký
        :return:
483
            the result of whether the revocation was successful OR
484
            sqlite3.Error if an exception is thrown
485 58051326 David Friesecký
        """
486
487 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
488
489 58051326 David Friesecký
        try:
490 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
491
                revocation_reason = REV_REASON_UNSPECIFIED
492
            elif revocation_date == "":
493
                return False
494
495 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
496
                   f"SET {COL_REVOCATION_DATE} = ?, "
497 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
498 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
499
            values = [revocation_date,
500
                      revocation_reason,
501
                      certificate_id]
502
            self.cursor.execute(sql, values)
503
            self.connection.commit()
504 b3c80ccb David Friesecký
        except IntegrityError:
505
            Logger.error(INTEGRITY_ERROR_MSG)
506
            raise DatabaseException(INTEGRITY_ERROR_MSG)
507
        except ProgrammingError:
508
            Logger.error(PROGRAMMING_ERROR_MSG)
509
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
510
        except OperationalError:
511
            Logger.error(OPERATIONAL_ERROR_MSG)
512
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
513
        except NotSupportedError:
514
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
515
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
516
        except DatabaseError:
517
            Logger.error(DATABASE_ERROR_MSG)
518
            raise DatabaseException(DATABASE_ERROR_MSG)
519
        except Error:
520
            Logger.error(ERROR_MSG)
521
            raise DatabaseException(ERROR_MSG)
522 8b049f43 David Friesecký
523 d65b022d David Friesecký
        return self.cursor.rowcount > 0
524 58051326 David Friesecký
525
    def clear_certificate_revocation(self, certificate_id: int):
526
        """
527
        Clear revocation of a certificate
528
529
        :param certificate_id: ID of specific certificate
530
531 8b049f43 David Friesecký
        :return:
532
            the result of whether the clear revocation was successful OR
533
            sqlite3.Error if an exception is thrown
534 58051326 David Friesecký
        """
535
536 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
537
538 58051326 David Friesecký
        try:
539
            sql = (f"UPDATE {TAB_CERTIFICATES} "
540 0e7c3096 David Friesecký
                   f"SET {COL_REVOCATION_DATE} = NULL, "
541
                   f"{COL_REVOCATION_REASON} = NULL "
542 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
543
            values = [certificate_id]
544
            self.cursor.execute(sql, values)
545
            self.connection.commit()
546 b3c80ccb David Friesecký
        except IntegrityError:
547
            Logger.error(INTEGRITY_ERROR_MSG)
548
            raise DatabaseException(INTEGRITY_ERROR_MSG)
549
        except ProgrammingError:
550
            Logger.error(PROGRAMMING_ERROR_MSG)
551
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
552
        except OperationalError:
553
            Logger.error(OPERATIONAL_ERROR_MSG)
554
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
555
        except NotSupportedError:
556
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
557
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
558
        except DatabaseError:
559
            Logger.error(DATABASE_ERROR_MSG)
560
            raise DatabaseException(DATABASE_ERROR_MSG)
561
        except Error:
562
            Logger.error(ERROR_MSG)
563
            raise DatabaseException(ERROR_MSG)
564 f8b23532 David Friesecký
565 45744020 Stanislav Král
        return self.cursor.rowcount > 0
566 58051326 David Friesecký
567
    def get_all_revoked_by(self, certificate_id: int):
568
        """
569
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
570
571
        :param certificate_id: ID of specific certificate
572
573
        :return:
574 8b049f43 David Friesecký
            list of the certificates OR
575
            None if the list is empty OR
576
            sqlite3.Error if an exception is thrown
577 58051326 David Friesecký
        """
578
579 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
580
581 58051326 David Friesecký
        try:
582
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
583
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
584
            values = [certificate_id]
585
            self.cursor.execute(sql, values)
586
            certificate_rows = self.cursor.fetchall()
587
588
            certificates: List[Certificate] = []
589
            for certificate_row in certificate_rows:
590
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
591
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
592
                values = [certificate_row[0]]
593
                self.cursor.execute(sql, values)
594
                usage_rows = self.cursor.fetchall()
595
596
                usage_dict: Dict[int, bool] = {}
597
                for usage_row in usage_rows:
598
                    usage_dict[usage_row[2]] = True
599
600 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
601
                                                certificate_row[1],      # valid from
602
                                                certificate_row[2],      # valid to
603
                                                certificate_row[3],      # pem data
604
                                                certificate_row[14],     # type ID
605
                                                certificate_row[15],     # parent ID
606
                                                certificate_row[16],     # private key ID
607 58051326 David Friesecký
                                                usage_dict,
608 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
609
                                                certificate_row[5],      # country code
610
                                                certificate_row[6],      # locality
611
                                                certificate_row[7],      # province
612
                                                certificate_row[8],      # organization
613
                                                certificate_row[9],      # organizational unit
614
                                                certificate_row[10],     # email address
615
                                                certificate_row[11],     # revocation date
616
                                                certificate_row[12]))    # revocation reason
617 b3c80ccb David Friesecký
        except IntegrityError:
618
            Logger.error(INTEGRITY_ERROR_MSG)
619
            raise DatabaseException(INTEGRITY_ERROR_MSG)
620
        except ProgrammingError:
621
            Logger.error(PROGRAMMING_ERROR_MSG)
622
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
623
        except OperationalError:
624
            Logger.error(OPERATIONAL_ERROR_MSG)
625
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
626
        except NotSupportedError:
627
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
628
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
629
        except DatabaseError:
630
            Logger.error(DATABASE_ERROR_MSG)
631
            raise DatabaseException(DATABASE_ERROR_MSG)
632
        except Error:
633
            Logger.error(ERROR_MSG)
634
            raise DatabaseException(ERROR_MSG)
635 58051326 David Friesecký
636 d65b022d David Friesecký
        return certificates
637 58051326 David Friesecký
638
    def get_all_issued_by(self, certificate_id: int):
639
        """
640
        Get list of the certificates that are direct descendants of the certificate with the ID
641
642
        :param certificate_id: ID of specific certificate
643
644
        :return:
645 8b049f43 David Friesecký
            list of the certificates OR
646
            None if the list is empty OR
647
            sqlite3.Error if an exception is thrown
648 58051326 David Friesecký
        """
649
650 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
651
652 58051326 David Friesecký
        try:
653
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
654 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
655
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
656 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
657 58051326 David Friesecký
            self.cursor.execute(sql, values)
658
            certificate_rows = self.cursor.fetchall()
659
660
            certificates: List[Certificate] = []
661
            for certificate_row in certificate_rows:
662
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
663
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
664
                values = [certificate_row[0]]
665
                self.cursor.execute(sql, values)
666
                usage_rows = self.cursor.fetchall()
667
668
                usage_dict: Dict[int, bool] = {}
669
                for usage_row in usage_rows:
670
                    usage_dict[usage_row[2]] = True
671
672 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
673
                                                certificate_row[1],      # valid from
674
                                                certificate_row[2],      # valid to
675
                                                certificate_row[3],      # pem data
676
                                                certificate_row[14],     # type ID
677
                                                certificate_row[15],     # parent ID
678
                                                certificate_row[16],     # private key ID
679 58051326 David Friesecký
                                                usage_dict,
680 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
681
                                                certificate_row[5],      # country code
682
                                                certificate_row[6],      # locality
683
                                                certificate_row[7],      # province
684
                                                certificate_row[8],      # organization
685
                                                certificate_row[9],      # organizational unit
686
                                                certificate_row[10],     # email address
687
                                                certificate_row[11],     # revocation date
688
                                                certificate_row[12]))    # revocation reason
689 b3c80ccb David Friesecký
        except IntegrityError:
690
            Logger.error(INTEGRITY_ERROR_MSG)
691
            raise DatabaseException(INTEGRITY_ERROR_MSG)
692
        except ProgrammingError:
693
            Logger.error(PROGRAMMING_ERROR_MSG)
694
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
695
        except OperationalError:
696
            Logger.error(OPERATIONAL_ERROR_MSG)
697
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
698
        except NotSupportedError:
699
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
700
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
701
        except DatabaseError:
702
            Logger.error(DATABASE_ERROR_MSG)
703
            raise DatabaseException(DATABASE_ERROR_MSG)
704
        except Error:
705
            Logger.error(ERROR_MSG)
706
            raise DatabaseException(ERROR_MSG)
707 58051326 David Friesecký
708 d65b022d David Friesecký
        return certificates
709 94f6d8b8 Jan Pašek
710 cf247eaa Captain_Trojan
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
711
        """
712
        Get list of the certificates that are direct descendants of the certificate with the ID
713
714
        :param target_types: certificate types (filter)
715
        :param target_usages: certificate usages (filter)
716
        :param target_cn_substring: certificate CN substring (filter)
717
        :param page: target page
718
        :param per_page: target page size
719
        :param issuer_id: ID of specific certificate
720
721
        :return:
722
            list of the certificates OR
723
            None if the list is empty
724
        """
725
726
        Logger.debug("Function launched.")
727
728
        try:
729
            values = [issuer_id, issuer_id]
730
            values += list(target_types)
731
            values += list(target_usages)
732
733
            sql = (
734 bb4a9d1f Captain_Trojan
                f"SELECT * "
735 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
736
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
737
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
738
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
739
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
740
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
741
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
742
                f") > 0 "
743
            )
744
745
            if target_cn_substring is not None:
746
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
747
748
                values += [target_cn_substring]
749
750
            if page is not None and per_page is not None:
751
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
752
753
            self.cursor.execute(sql, values)
754
            certificate_rows = self.cursor.fetchall()
755
756
            certificates: List[Certificate] = []
757
            for certificate_row in certificate_rows:
758
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
759
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
760
                values = [certificate_row[0]]
761
                self.cursor.execute(sql, values)
762
                usage_rows = self.cursor.fetchall()
763
764
                usage_dict: Dict[int, bool] = {}
765
                for usage_row in usage_rows:
766
                    usage_dict[usage_row[2]] = True
767
768 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
769
                                                certificate_row[1],  # valid from
770
                                                certificate_row[2],  # valid to
771
                                                certificate_row[3],  # pem data
772
                                                certificate_row[14],  # type ID
773
                                                certificate_row[15],  # parent ID
774
                                                certificate_row[16],  # private key ID
775 cf247eaa Captain_Trojan
                                                usage_dict,
776 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
777
                                                certificate_row[5],  # country code
778
                                                certificate_row[6],  # locality
779
                                                certificate_row[7],  # province
780
                                                certificate_row[8],  # organization
781
                                                certificate_row[9],  # organizational unit
782
                                                certificate_row[10],  # email address
783
                                                certificate_row[11],  # revocation date
784
                                                certificate_row[12]))  # revocation reason
785 b3c80ccb David Friesecký
        except IntegrityError:
786
            Logger.error(INTEGRITY_ERROR_MSG)
787
            raise DatabaseException(INTEGRITY_ERROR_MSG)
788
        except ProgrammingError:
789
            Logger.error(PROGRAMMING_ERROR_MSG)
790
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
791
        except OperationalError:
792
            Logger.error(OPERATIONAL_ERROR_MSG)
793
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
794
        except NotSupportedError:
795
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
796
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
797
        except DatabaseError:
798
            Logger.error(DATABASE_ERROR_MSG)
799
            raise DatabaseException(DATABASE_ERROR_MSG)
800
        except Error:
801
            Logger.error(ERROR_MSG)
802
            raise DatabaseException(ERROR_MSG)
803 58051326 David Friesecký
804 d65b022d David Friesecký
        return certificates
805 94f6d8b8 Jan Pašek
806 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
807
        """
808
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
809
        between C and its root certificate authority (i.e. is an ancestor of C).
810
        :param certificate_id: target certificate ID
811
        :return: list of all descendants
812
        """
813
814 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
815
816
        try:
817
            def dfs(children_of, this, collection: list):
818
                for child in children_of(this.certificate_id):
819
                    dfs(children_of, child, collection)
820
                collection.append(this)
821
822
            subtree_root = self.read(certificate_id)
823
            if subtree_root is None:
824
                return None
825
826
            all_certs = []
827
            dfs(self.get_all_issued_by, subtree_root, all_certs)
828
        except IntegrityError:
829
            Logger.error(INTEGRITY_ERROR_MSG)
830
            raise DatabaseException(INTEGRITY_ERROR_MSG)
831
        except ProgrammingError:
832
            Logger.error(PROGRAMMING_ERROR_MSG)
833
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
834
        except OperationalError:
835
            Logger.error(OPERATIONAL_ERROR_MSG)
836
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
837
        except NotSupportedError:
838
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
839
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
840
        except DatabaseError:
841
            Logger.error(DATABASE_ERROR_MSG)
842
            raise DatabaseException(DATABASE_ERROR_MSG)
843
        except Error:
844
            Logger.error(ERROR_MSG)
845
            raise DatabaseException(ERROR_MSG)
846 85003184 Captain_Trojan
847
        return all_certs
848
849 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
850
        """
851
        Get identifier of the next certificate that will be inserted into the database
852
        :return: identifier of the next certificate that will be added into the database
853
        """
854 b3c80ccb David Friesecký
855
        Logger.debug("Function launched.")
856
857
        try:
858
            # get next IDs of all tables
859
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
860
            results = self.cursor.fetchall()
861
862
            # search for next ID in Certificates table and return it
863
            for result in results:
864
                if result[0] == TAB_CERTIFICATES:
865
                    return result[1] + 1  # current last id + 1
866
            # if certificates table is not present in the query results, return 1
867
        except IntegrityError:
868
            Logger.error(INTEGRITY_ERROR_MSG)
869
            raise DatabaseException(INTEGRITY_ERROR_MSG)
870
        except ProgrammingError:
871
            Logger.error(PROGRAMMING_ERROR_MSG)
872
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
873
        except OperationalError:
874
            Logger.error(OPERATIONAL_ERROR_MSG)
875
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
876
        except NotSupportedError:
877
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
878
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
879
        except DatabaseError:
880
            Logger.error(DATABASE_ERROR_MSG)
881
            raise DatabaseException(DATABASE_ERROR_MSG)
882
        except Error:
883
            Logger.error(ERROR_MSG)
884
            raise DatabaseException(ERROR_MSG)
885
886 94f6d8b8 Jan Pašek
        return 1