Projekt

Obecné

Profil

Stáhnout (30.9 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 cf247eaa Captain_Trojan
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 1636aefe David Friesecký
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 1d2add74 Jan Pašek
from injector import inject
8 f8b23532 David Friesecký
from src.constants import *
9 1d2add74 Jan Pašek
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 e9e55282 David Friesecký
12 b3c80ccb David Friesecký
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
13
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
14
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
15
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
16
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
17
ERROR_MSG = "Unknown exception."
18 e9e55282 David Friesecký
19
20 f8b23532 David Friesecký
class CertificateRepository:
21 25053504 David Friesecký
22 1d2add74 Jan Pašek
    @inject
23
    def __init__(self, connection: Connection):
24 a0602bad David Friesecký
        """
25 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
26 a0602bad David Friesecký
27 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
28 a0602bad David Friesecký
        """
29 f8b23532 David Friesecký
        self.connection = connection
30 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
31 e9e55282 David Friesecký
32 805077f5 David Friesecký
    def create(self, certificate: Certificate):
33 a0602bad David Friesecký
        """
34 f8b23532 David Friesecký
        Creates a certificate.
35
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
36 a0602bad David Friesecký
37 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
38 a0602bad David Friesecký
39 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
40 a0602bad David Friesecký
        """
41
42 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
43
44 f8b23532 David Friesecký
        try:
45
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
46 0e7c3096 David Friesecký
                   f"({COL_VALID_FROM},"
47 f8b23532 David Friesecký
                   f"{COL_VALID_TO},"
48
                   f"{COL_PEM_DATA},"
49 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME},"
50
                   f"{COL_COUNTRY_CODE},"
51
                   f"{COL_LOCALITY},"
52
                   f"{COL_PROVINCE},"
53
                   f"{COL_ORGANIZATION},"
54
                   f"{COL_ORGANIZATIONAL_UNIT},"
55
                   f"{COL_EMAIL_ADDRESS},"
56 f8b23532 David Friesecký
                   f"{COL_TYPE_ID},"
57 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID},"
58
                   f"{COL_PRIVATE_KEY_ID}) "
59
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
60
            values = [certificate.valid_from,
61 f8b23532 David Friesecký
                      certificate.valid_to,
62
                      certificate.pem_data,
63 0e7c3096 David Friesecký
                      certificate.common_name,
64
                      certificate.country_code,
65
                      certificate.locality,
66
                      certificate.province,
67
                      certificate.organization,
68
                      certificate.organizational_unit,
69
                      certificate.email_address,
70 f8b23532 David Friesecký
                      certificate.type_id,
71 0e7c3096 David Friesecký
                      certificate.parent_id,
72
                      certificate.private_key_id]
73 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
74
            self.connection.commit()
75
76
            last_id: int = self.cursor.lastrowid
77
78 f3125948 Stanislav Král
            if certificate.type_id == ROOT_CA_ID:
79 f8b23532 David Friesecký
                certificate.parent_id = last_id
80 093d06df Stanislav Král
                self.update(last_id, certificate)
81 f8b23532 David Friesecký
            else:
82 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
83 f8b23532 David Friesecký
                    if usage_value:
84
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
85
                               f"({COL_CERTIFICATE_ID},"
86
                               f"{COL_USAGE_TYPE_ID}) "
87
                               f"VALUES (?,?)")
88
                        values = [last_id, usage_id]
89
                        self.cursor.execute(sql, values)
90
                        self.connection.commit()
91 445886fb David Friesecký
92
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
93 9c6005f2 Stanislav Král
            Logger.error(str(e))
94 445886fb David Friesecký
            raise DatabaseException(e)
95 f8b23532 David Friesecký
96 805077f5 David Friesecký
        return last_id
97 f8b23532 David Friesecký
98 805077f5 David Friesecký
    def read(self, certificate_id: int):
99 f8b23532 David Friesecký
        """
100
        Reads (selects) a certificate.
101 e9e55282 David Friesecký
102 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
103 e9e55282 David Friesecký
104 f8b23532 David Friesecký
        :return: instance of the Certificate object
105
        """
106 e9e55282 David Friesecký
107 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
108
109 f8b23532 David Friesecký
        try:
110
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
111 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
112 f8b23532 David Friesecký
            values = [certificate_id]
113
            self.cursor.execute(sql, values)
114 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
115 e9e55282 David Friesecký
116 6f4a5f24 Captain_Trojan
            if certificate_row is None:
117
                return None
118
119 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
120
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
121
            self.cursor.execute(sql, values)
122
            usage_rows = self.cursor.fetchall()
123 1636aefe David Friesecký
124
            usage_dict: Dict[int, bool] = {}
125
            for usage_row in usage_rows:
126 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
127
128 0e7c3096 David Friesecký
            certificate: Certificate = Certificate(certificate_row[0],      # ID
129
                                                   certificate_row[1],      # valid from
130
                                                   certificate_row[2],      # valid to
131
                                                   certificate_row[3],      # pem data
132
                                                   certificate_row[14],     # type ID
133
                                                   certificate_row[15],     # parent ID
134
                                                   certificate_row[16],     # private key ID
135 58051326 David Friesecký
                                                   usage_dict,
136 0e7c3096 David Friesecký
                                                   certificate_row[4],      # common name
137
                                                   certificate_row[5],      # country code
138
                                                   certificate_row[6],      # locality
139
                                                   certificate_row[7],      # province
140
                                                   certificate_row[8],      # organization
141
                                                   certificate_row[9],      # organizational unit
142
                                                   certificate_row[10],     # email address
143
                                                   certificate_row[11],     # revocation date
144
                                                   certificate_row[12])     # revocation reason
145
146 445886fb David Friesecký
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
147
            Logger.error(str(e))
148
            raise DatabaseException(e)
149 f8b23532 David Friesecký
150 d65b022d David Friesecký
        return certificate
151 f8b23532 David Friesecký
152 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
153 9e22e20c David Friesecký
        """
154
        Reads (selects) all certificates (with type).
155
156
        :param filter_type: ID of certificate type from CertificateTypes table
157
158
        :return: list of certificates
159
        """
160
161 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
162
163 9e22e20c David Friesecký
        try:
164
            sql_extension = ""
165
            values = []
166
            if filter_type is not None:
167 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
168 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
169 9e22e20c David Friesecký
                values = [filter_type]
170
171 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
172
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
173 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
174
            certificate_rows = self.cursor.fetchall()
175
176
            certificates: List[Certificate] = []
177
            for certificate_row in certificate_rows:
178
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
179
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
180
                values = [certificate_row[0]]
181
                self.cursor.execute(sql, values)
182
                usage_rows = self.cursor.fetchall()
183
184
                usage_dict: Dict[int, bool] = {}
185
                for usage_row in usage_rows:
186
                    usage_dict[usage_row[2]] = True
187
188 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
189
                                                certificate_row[1],      # valid from
190
                                                certificate_row[2],      # valid to
191
                                                certificate_row[3],      # pem data
192
                                                certificate_row[14],     # type ID
193
                                                certificate_row[15],     # parent ID
194
                                                certificate_row[16],     # private key ID
195 58051326 David Friesecký
                                                usage_dict,
196 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
197
                                                certificate_row[5],      # country code
198
                                                certificate_row[6],      # locality
199
                                                certificate_row[7],      # province
200
                                                certificate_row[8],      # organization
201
                                                certificate_row[9],      # organizational unit
202
                                                certificate_row[10],     # email address
203
                                                certificate_row[11],     # revocation date
204
                                                certificate_row[12]))    # revocation reason
205 445886fb David Friesecký
206
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
207
            Logger.error(str(e))
208
            raise DatabaseException(e)
209 9e22e20c David Friesecký
210 0f3af523 Stanislav Král
        return certificates
211 9e22e20c David Friesecký
212 cf247eaa Captain_Trojan
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
213
                        per_page: int):
214
        """
215
        Reads (selects) all certificates according to a specific filtering and pagination options.
216
        :param target_types: certificate types (filter)
217
        :param target_usages: certificate usages (filter)
218
        :param target_cn_substring: certificate CN substring (filter)
219
        :param page: target page
220
        :param per_page: target page size
221
        :return: list of certificates
222
        """
223
224
        Logger.debug("Function launched.")
225
226
        try:
227
            values = []
228
            values += list(target_types)
229
230
            sql = (
231 bb4a9d1f Captain_Trojan
                f"SELECT * "
232 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
233
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
234
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
235
            )
236 e77c14f9 Michal Sejak
            
237
            if target_usages is not None:
238
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
239
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
240
                values += list(target_usages)
241 cf247eaa Captain_Trojan
242
            if target_cn_substring is not None:
243
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
244
245
                values += [target_cn_substring]
246
247
            if page is not None and per_page is not None:
248
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
249
250
            self.cursor.execute(sql, values)
251
            certificate_rows = self.cursor.fetchall()
252
253
            certificates: List[Certificate] = []
254
            for certificate_row in certificate_rows:
255
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
256
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
257
                types = [certificate_row[0]]
258
                self.cursor.execute(sql, types)
259
                usage_rows = self.cursor.fetchall()
260
261
                usage_dict: Dict[int, bool] = {}
262
                for usage_row in usage_rows:
263
                    usage_dict[usage_row[0]] = True
264
265 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
266
                                                certificate_row[1],  # valid from
267
                                                certificate_row[2],  # valid to
268
                                                certificate_row[3],  # pem data
269
                                                certificate_row[14],  # type ID
270
                                                certificate_row[15],  # parent ID
271
                                                certificate_row[16],  # private key ID
272 cf247eaa Captain_Trojan
                                                usage_dict,
273 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
274
                                                certificate_row[5],  # country code
275
                                                certificate_row[6],  # locality
276
                                                certificate_row[7],  # province
277
                                                certificate_row[8],  # organization
278
                                                certificate_row[9],  # organizational unit
279
                                                certificate_row[10],  # email address
280
                                                certificate_row[11],  # revocation date
281
                                                certificate_row[12]))  # revocation reason
282 445886fb David Friesecký
283
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
284
            Logger.error(str(e))
285
            raise DatabaseException(e)
286 cf247eaa Captain_Trojan
287
        return certificates
288
289 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
290 a0602bad David Friesecký
        """
291 f8b23532 David Friesecký
        Updates a certificate.
292
        If the parameter of certificate (Certificate object) is not to be changed,
293
        the same value must be specified.
294 a0602bad David Friesecký
295
        :param certificate_id: ID of specific certificate
296 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
297 a0602bad David Friesecký
298
        :return: the result of whether the updation was successful
299
        """
300
301 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
302
303 f8b23532 David Friesecký
        try:
304
            sql = (f"UPDATE {TAB_CERTIFICATES} "
305 0e7c3096 David Friesecký
                   f"SET {COL_VALID_FROM} = ?, "
306 f8b23532 David Friesecký
                   f"{COL_VALID_TO} = ?, "
307
                   f"{COL_PEM_DATA} = ?, "
308 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME} = ?, "
309
                   f"{COL_COUNTRY_CODE} = ?, "
310
                   f"{COL_LOCALITY} = ?, "
311
                   f"{COL_PROVINCE} = ?, "
312
                   f"{COL_ORGANIZATION} = ?, "
313
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
314
                   f"{COL_EMAIL_ADDRESS} = ?, "
315 f8b23532 David Friesecký
                   f"{COL_TYPE_ID} = ?, "
316 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID} = ?, "
317
                   f"{COL_PRIVATE_KEY_ID} = ? "
318 f8b23532 David Friesecký
                   f"WHERE {COL_ID} = ?")
319 0e7c3096 David Friesecký
            values = [certificate.valid_from,
320 f8b23532 David Friesecký
                      certificate.valid_to,
321
                      certificate.pem_data,
322 0e7c3096 David Friesecký
                      certificate.common_name,
323
                      certificate.country_code,
324
                      certificate.locality,
325
                      certificate.province,
326
                      certificate.organization,
327
                      certificate.organizational_unit,
328
                      certificate.email_address,
329 f8b23532 David Friesecký
                      certificate.type_id,
330 805077f5 David Friesecký
                      certificate.parent_id,
331 0e7c3096 David Friesecký
                      certificate.private_key_id,
332 805077f5 David Friesecký
                      certificate_id]
333 6425fa36 David Friesecký
334 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
335
            self.connection.commit()
336
337
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
338
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
339
            values = [certificate_id]
340
            self.cursor.execute(sql, values)
341
            self.connection.commit()
342
343 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
344
345 f3125948 Stanislav Král
            # iterate over usage pairs
346
            for usage_id, usage_value in certificate.usages.items():
347 f8b23532 David Friesecký
                if usage_value:
348
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
349
                           f"({COL_CERTIFICATE_ID},"
350
                           f"{COL_USAGE_TYPE_ID}) "
351
                           f"VALUES (?,?)")
352
                    values = [certificate_id, usage_id]
353
                    self.cursor.execute(sql, values)
354
                    self.connection.commit()
355 445886fb David Friesecký
356
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
357
            Logger.error(str(e))
358
            raise DatabaseException(e)
359 f8b23532 David Friesecký
360 c36d3299 David Friesecký
        return check_updated
361 e9e55282 David Friesecký
362 58051326 David Friesecký
    def delete(self, certificate_id: int):
363 a0602bad David Friesecký
        """
364
        Deletes a certificate
365
366
        :param certificate_id: ID of specific certificate
367
368
        :return: the result of whether the deletion was successful
369
        """
370
371 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
372
373 f8b23532 David Friesecký
        try:
374 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
375
                   f"SET {COL_DELETION_DATE} = ? "
376
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
377
            values = [int(time.time()),
378
                      certificate_id]
379 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
380
            self.connection.commit()
381 445886fb David Friesecký
382
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
383
            Logger.error(str(e))
384
            raise DatabaseException(e)
385 f8b23532 David Friesecký
386 45744020 Stanislav Král
        return self.cursor.rowcount > 0
387 58051326 David Friesecký
388
    def set_certificate_revoked(
389 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
390 58051326 David Friesecký
        """
391
        Revoke a certificate
392
393
        :param certificate_id: ID of specific certificate
394
        :param revocation_date: Date, when the certificate is revoked
395
        :param revocation_reason: Reason of the revocation
396
397 8b049f43 David Friesecký
        :return:
398
            the result of whether the revocation was successful OR
399
            sqlite3.Error if an exception is thrown
400 58051326 David Friesecký
        """
401
402 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
403
404 58051326 David Friesecký
        try:
405 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
406
                revocation_reason = REV_REASON_UNSPECIFIED
407
            elif revocation_date == "":
408
                return False
409
410 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
411
                   f"SET {COL_REVOCATION_DATE} = ?, "
412 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
413 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
414
            values = [revocation_date,
415
                      revocation_reason,
416
                      certificate_id]
417
            self.cursor.execute(sql, values)
418
            self.connection.commit()
419 445886fb David Friesecký
420
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
421
            Logger.error(str(e))
422
            raise DatabaseException(e)
423 8b049f43 David Friesecký
424 d65b022d David Friesecký
        return self.cursor.rowcount > 0
425 58051326 David Friesecký
426
    def clear_certificate_revocation(self, certificate_id: int):
427
        """
428
        Clear revocation of a certificate
429
430
        :param certificate_id: ID of specific certificate
431
432 8b049f43 David Friesecký
        :return:
433
            the result of whether the clear revocation was successful OR
434
            sqlite3.Error if an exception is thrown
435 58051326 David Friesecký
        """
436
437 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
438
439 58051326 David Friesecký
        try:
440
            sql = (f"UPDATE {TAB_CERTIFICATES} "
441 0e7c3096 David Friesecký
                   f"SET {COL_REVOCATION_DATE} = NULL, "
442
                   f"{COL_REVOCATION_REASON} = NULL "
443 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
444
            values = [certificate_id]
445
            self.cursor.execute(sql, values)
446
            self.connection.commit()
447 445886fb David Friesecký
448
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
449
            Logger.error(str(e))
450
            raise DatabaseException(e)
451 f8b23532 David Friesecký
452 45744020 Stanislav Král
        return self.cursor.rowcount > 0
453 58051326 David Friesecký
454
    def get_all_revoked_by(self, certificate_id: int):
455
        """
456
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
457
458
        :param certificate_id: ID of specific certificate
459
460
        :return:
461 8b049f43 David Friesecký
            list of the certificates OR
462
            None if the list is empty OR
463
            sqlite3.Error if an exception is thrown
464 58051326 David Friesecký
        """
465
466 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
467
468 58051326 David Friesecký
        try:
469
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
470
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
471
            values = [certificate_id]
472
            self.cursor.execute(sql, values)
473
            certificate_rows = self.cursor.fetchall()
474
475
            certificates: List[Certificate] = []
476
            for certificate_row in certificate_rows:
477
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
478
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
479
                values = [certificate_row[0]]
480
                self.cursor.execute(sql, values)
481
                usage_rows = self.cursor.fetchall()
482
483
                usage_dict: Dict[int, bool] = {}
484
                for usage_row in usage_rows:
485
                    usage_dict[usage_row[2]] = True
486
487 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
488
                                                certificate_row[1],      # valid from
489
                                                certificate_row[2],      # valid to
490
                                                certificate_row[3],      # pem data
491
                                                certificate_row[14],     # type ID
492
                                                certificate_row[15],     # parent ID
493
                                                certificate_row[16],     # private key ID
494 58051326 David Friesecký
                                                usage_dict,
495 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
496
                                                certificate_row[5],      # country code
497
                                                certificate_row[6],      # locality
498
                                                certificate_row[7],      # province
499
                                                certificate_row[8],      # organization
500
                                                certificate_row[9],      # organizational unit
501
                                                certificate_row[10],     # email address
502
                                                certificate_row[11],     # revocation date
503
                                                certificate_row[12]))    # revocation reason
504 445886fb David Friesecký
505
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
506
            Logger.error(str(e))
507
            raise DatabaseException(e)
508 58051326 David Friesecký
509 d65b022d David Friesecký
        return certificates
510 58051326 David Friesecký
511
    def get_all_issued_by(self, certificate_id: int):
512
        """
513
        Get list of the certificates that are direct descendants of the certificate with the ID
514
515
        :param certificate_id: ID of specific certificate
516
517
        :return:
518 8b049f43 David Friesecký
            list of the certificates OR
519
            None if the list is empty OR
520
            sqlite3.Error if an exception is thrown
521 58051326 David Friesecký
        """
522
523 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
524
525 58051326 David Friesecký
        try:
526
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
527 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
528
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
529 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
530 58051326 David Friesecký
            self.cursor.execute(sql, values)
531
            certificate_rows = self.cursor.fetchall()
532
533
            certificates: List[Certificate] = []
534
            for certificate_row in certificate_rows:
535
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
536
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
537
                values = [certificate_row[0]]
538
                self.cursor.execute(sql, values)
539
                usage_rows = self.cursor.fetchall()
540
541
                usage_dict: Dict[int, bool] = {}
542
                for usage_row in usage_rows:
543
                    usage_dict[usage_row[2]] = True
544
545 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
546
                                                certificate_row[1],      # valid from
547
                                                certificate_row[2],      # valid to
548
                                                certificate_row[3],      # pem data
549
                                                certificate_row[14],     # type ID
550
                                                certificate_row[15],     # parent ID
551
                                                certificate_row[16],     # private key ID
552 58051326 David Friesecký
                                                usage_dict,
553 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
554
                                                certificate_row[5],      # country code
555
                                                certificate_row[6],      # locality
556
                                                certificate_row[7],      # province
557
                                                certificate_row[8],      # organization
558
                                                certificate_row[9],      # organizational unit
559
                                                certificate_row[10],     # email address
560
                                                certificate_row[11],     # revocation date
561
                                                certificate_row[12]))    # revocation reason
562 445886fb David Friesecký
563
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
564
            Logger.error(str(e))
565
            raise DatabaseException(e)
566 58051326 David Friesecký
567 d65b022d David Friesecký
        return certificates
568 94f6d8b8 Jan Pašek
569 cf247eaa Captain_Trojan
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
570
        """
571
        Get list of the certificates that are direct descendants of the certificate with the ID
572
573
        :param target_types: certificate types (filter)
574
        :param target_usages: certificate usages (filter)
575
        :param target_cn_substring: certificate CN substring (filter)
576
        :param page: target page
577
        :param per_page: target page size
578
        :param issuer_id: ID of specific certificate
579
580
        :return:
581
            list of the certificates OR
582
            None if the list is empty
583
        """
584
585
        Logger.debug("Function launched.")
586
587
        try:
588
            values = [issuer_id, issuer_id]
589
            values += list(target_types)
590
591
            sql = (
592 bb4a9d1f Captain_Trojan
                f"SELECT * "
593 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
594
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
595
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
596
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
597 e77c14f9 Michal Sejak
                
598 cf247eaa Captain_Trojan
            )
599 e77c14f9 Michal Sejak
            
600
            if target_usages is not None:
601
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
602
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
603
                values += list(target_usages)
604 cf247eaa Captain_Trojan
605
            if target_cn_substring is not None:
606
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
607
                values += [target_cn_substring]
608
609
            if page is not None and per_page is not None:
610
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
611
612
            self.cursor.execute(sql, values)
613
            certificate_rows = self.cursor.fetchall()
614
615
            certificates: List[Certificate] = []
616
            for certificate_row in certificate_rows:
617
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
618
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
619
                values = [certificate_row[0]]
620
                self.cursor.execute(sql, values)
621
                usage_rows = self.cursor.fetchall()
622
623
                usage_dict: Dict[int, bool] = {}
624
                for usage_row in usage_rows:
625
                    usage_dict[usage_row[2]] = True
626
627 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
628
                                                certificate_row[1],  # valid from
629
                                                certificate_row[2],  # valid to
630
                                                certificate_row[3],  # pem data
631
                                                certificate_row[14],  # type ID
632
                                                certificate_row[15],  # parent ID
633
                                                certificate_row[16],  # private key ID
634 cf247eaa Captain_Trojan
                                                usage_dict,
635 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
636
                                                certificate_row[5],  # country code
637
                                                certificate_row[6],  # locality
638
                                                certificate_row[7],  # province
639
                                                certificate_row[8],  # organization
640
                                                certificate_row[9],  # organizational unit
641
                                                certificate_row[10],  # email address
642
                                                certificate_row[11],  # revocation date
643
                                                certificate_row[12]))  # revocation reason
644 445886fb David Friesecký
645
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
646
            Logger.error(str(e))
647
            raise DatabaseException(e)
648 58051326 David Friesecký
649 d65b022d David Friesecký
        return certificates
650 94f6d8b8 Jan Pašek
651 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
652
        """
653
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
654
        between C and its root certificate authority (i.e. is an ancestor of C).
655
        :param certificate_id: target certificate ID
656
        :return: list of all descendants
657
        """
658
659 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
660
661
        try:
662
            def dfs(children_of, this, collection: list):
663
                for child in children_of(this.certificate_id):
664
                    dfs(children_of, child, collection)
665
                collection.append(this)
666
667
            subtree_root = self.read(certificate_id)
668
            if subtree_root is None:
669
                return None
670
671
            all_certs = []
672
            dfs(self.get_all_issued_by, subtree_root, all_certs)
673 445886fb David Friesecký
674
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
675
            Logger.error(str(e))
676
            raise DatabaseException(e)
677 85003184 Captain_Trojan
678
        return all_certs
679
680 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
681
        """
682
        Get identifier of the next certificate that will be inserted into the database
683
        :return: identifier of the next certificate that will be added into the database
684
        """
685 b3c80ccb David Friesecký
686
        Logger.debug("Function launched.")
687
688
        try:
689
            # get next IDs of all tables
690
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
691
            results = self.cursor.fetchall()
692
693
            # search for next ID in Certificates table and return it
694
            for result in results:
695
                if result[0] == TAB_CERTIFICATES:
696
                    return result[1] + 1  # current last id + 1
697
            # if certificates table is not present in the query results, return 1
698 445886fb David Friesecký
699
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
700
            Logger.error(str(e))
701
            raise DatabaseException(e)
702 b3c80ccb David Friesecký
703 94f6d8b8 Jan Pašek
        return 1