Projekt

Obecné

Profil

Stáhnout (30.4 KB) Statistiky
| Větev: | Tag: | Revize:
1 6425fa36 David Friesecký
import time
2 cf247eaa Captain_Trojan
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
5 1636aefe David Friesecký
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 1d2add74 Jan Pašek
from injector import inject
8 f8b23532 David Friesecký
from src.constants import *
9 1d2add74 Jan Pašek
from src.model.certificate import Certificate
10 5e31b492 David Friesecký
from src.utils.logger import Logger
11 e9e55282 David Friesecký
12
13 f8b23532 David Friesecký
class CertificateRepository:
14 25053504 David Friesecký
15 1d2add74 Jan Pašek
    @inject
16
    def __init__(self, connection: Connection):
17 a0602bad David Friesecký
        """
18 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
19 a0602bad David Friesecký
20 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
21 a0602bad David Friesecký
        """
22 f8b23532 David Friesecký
        self.connection = connection
23 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
24 e9e55282 David Friesecký
25 805077f5 David Friesecký
    def create(self, certificate: Certificate):
26 a0602bad David Friesecký
        """
27 f8b23532 David Friesecký
        Creates a certificate.
28
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
29 a0602bad David Friesecký
30 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
31 a0602bad David Friesecký
32 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
33 a0602bad David Friesecký
        """
34
35 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
36
37 f8b23532 David Friesecký
        try:
38
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
39 0e7c3096 David Friesecký
                   f"({COL_VALID_FROM},"
40 f8b23532 David Friesecký
                   f"{COL_VALID_TO},"
41
                   f"{COL_PEM_DATA},"
42 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME},"
43
                   f"{COL_COUNTRY_CODE},"
44
                   f"{COL_LOCALITY},"
45
                   f"{COL_PROVINCE},"
46
                   f"{COL_ORGANIZATION},"
47
                   f"{COL_ORGANIZATIONAL_UNIT},"
48
                   f"{COL_EMAIL_ADDRESS},"
49 f8b23532 David Friesecký
                   f"{COL_TYPE_ID},"
50 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID},"
51
                   f"{COL_PRIVATE_KEY_ID}) "
52
                   f"VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)")
53
            values = [certificate.valid_from,
54 f8b23532 David Friesecký
                      certificate.valid_to,
55
                      certificate.pem_data,
56 0e7c3096 David Friesecký
                      certificate.common_name,
57
                      certificate.country_code,
58
                      certificate.locality,
59
                      certificate.province,
60
                      certificate.organization,
61
                      certificate.organizational_unit,
62
                      certificate.email_address,
63 f8b23532 David Friesecký
                      certificate.type_id,
64 0e7c3096 David Friesecký
                      certificate.parent_id,
65
                      certificate.private_key_id]
66 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
67
            self.connection.commit()
68
69
            last_id: int = self.cursor.lastrowid
70
71 f3125948 Stanislav Král
            if certificate.type_id == ROOT_CA_ID:
72 f8b23532 David Friesecký
                certificate.parent_id = last_id
73 093d06df Stanislav Král
                self.update(last_id, certificate)
74 f8b23532 David Friesecký
            else:
75 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
76 f8b23532 David Friesecký
                    if usage_value:
77
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
78
                               f"({COL_CERTIFICATE_ID},"
79
                               f"{COL_USAGE_TYPE_ID}) "
80
                               f"VALUES (?,?)")
81
                        values = [last_id, usage_id]
82
                        self.cursor.execute(sql, values)
83
                        self.connection.commit()
84 445886fb David Friesecký
85
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
86 9c6005f2 Stanislav Král
            Logger.error(str(e))
87 445886fb David Friesecký
            raise DatabaseException(e)
88 f8b23532 David Friesecký
89 805077f5 David Friesecký
        return last_id
90 f8b23532 David Friesecký
91 805077f5 David Friesecký
    def read(self, certificate_id: int):
92 f8b23532 David Friesecký
        """
93
        Reads (selects) a certificate.
94 e9e55282 David Friesecký
95 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
96 e9e55282 David Friesecký
97 f8b23532 David Friesecký
        :return: instance of the Certificate object
98
        """
99 e9e55282 David Friesecký
100 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
101
102 f8b23532 David Friesecký
        try:
103
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
104 6425fa36 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
105 f8b23532 David Friesecký
            values = [certificate_id]
106
            self.cursor.execute(sql, values)
107 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
108 e9e55282 David Friesecký
109 6f4a5f24 Captain_Trojan
            if certificate_row is None:
110
                return None
111
112 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
113
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
114
            self.cursor.execute(sql, values)
115
            usage_rows = self.cursor.fetchall()
116 1636aefe David Friesecký
117
            usage_dict: Dict[int, bool] = {}
118
            for usage_row in usage_rows:
119 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
120
121 0e7c3096 David Friesecký
            certificate: Certificate = Certificate(certificate_row[0],      # ID
122
                                                   certificate_row[1],      # valid from
123
                                                   certificate_row[2],      # valid to
124
                                                   certificate_row[3],      # pem data
125
                                                   certificate_row[14],     # type ID
126
                                                   certificate_row[15],     # parent ID
127
                                                   certificate_row[16],     # private key ID
128 58051326 David Friesecký
                                                   usage_dict,
129 0e7c3096 David Friesecký
                                                   certificate_row[4],      # common name
130
                                                   certificate_row[5],      # country code
131
                                                   certificate_row[6],      # locality
132
                                                   certificate_row[7],      # province
133
                                                   certificate_row[8],      # organization
134
                                                   certificate_row[9],      # organizational unit
135
                                                   certificate_row[10],     # email address
136
                                                   certificate_row[11],     # revocation date
137
                                                   certificate_row[12])     # revocation reason
138
139 445886fb David Friesecký
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
140
            Logger.error(str(e))
141
            raise DatabaseException(e)
142 f8b23532 David Friesecký
143 d65b022d David Friesecký
        return certificate
144 f8b23532 David Friesecký
145 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
146 9e22e20c David Friesecký
        """
147
        Reads (selects) all certificates (with type).
148
149
        :param filter_type: ID of certificate type from CertificateTypes table
150
151
        :return: list of certificates
152
        """
153
154 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
155
156 9e22e20c David Friesecký
        try:
157
            sql_extension = ""
158
            values = []
159
            if filter_type is not None:
160 6425fa36 David Friesecký
                sql_extension = (f" AND {COL_TYPE_ID} = ("
161 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
162 9e22e20c David Friesecký
                values = [filter_type]
163
164 6425fa36 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
165
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
166 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
167
            certificate_rows = self.cursor.fetchall()
168
169
            certificates: List[Certificate] = []
170
            for certificate_row in certificate_rows:
171
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
172
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
173
                values = [certificate_row[0]]
174
                self.cursor.execute(sql, values)
175
                usage_rows = self.cursor.fetchall()
176
177
                usage_dict: Dict[int, bool] = {}
178
                for usage_row in usage_rows:
179
                    usage_dict[usage_row[2]] = True
180
181 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
182
                                                certificate_row[1],      # valid from
183
                                                certificate_row[2],      # valid to
184
                                                certificate_row[3],      # pem data
185
                                                certificate_row[14],     # type ID
186
                                                certificate_row[15],     # parent ID
187
                                                certificate_row[16],     # private key ID
188 58051326 David Friesecký
                                                usage_dict,
189 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
190
                                                certificate_row[5],      # country code
191
                                                certificate_row[6],      # locality
192
                                                certificate_row[7],      # province
193
                                                certificate_row[8],      # organization
194
                                                certificate_row[9],      # organizational unit
195
                                                certificate_row[10],     # email address
196
                                                certificate_row[11],     # revocation date
197
                                                certificate_row[12]))    # revocation reason
198 445886fb David Friesecký
199
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
200
            Logger.error(str(e))
201
            raise DatabaseException(e)
202 9e22e20c David Friesecký
203 0f3af523 Stanislav Král
        return certificates
204 9e22e20c David Friesecký
205 cf247eaa Captain_Trojan
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
206
                        per_page: int):
207
        """
208
        Reads (selects) all certificates according to a specific filtering and pagination options.
209
        :param target_types: certificate types (filter)
210
        :param target_usages: certificate usages (filter)
211
        :param target_cn_substring: certificate CN substring (filter)
212
        :param page: target page
213
        :param per_page: target page size
214
        :return: list of certificates
215
        """
216
217
        Logger.debug("Function launched.")
218
219
        try:
220
            values = []
221
            values += list(target_types)
222
223
            sql = (
224 bb4a9d1f Captain_Trojan
                f"SELECT * "
225 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
226
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
227
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
228
            )
229 e77c14f9 Michal Sejak
            
230
            if target_usages is not None:
231
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
232
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
233
                values += list(target_usages)
234 cf247eaa Captain_Trojan
235
            if target_cn_substring is not None:
236
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
237
238
                values += [target_cn_substring]
239
240
            if page is not None and per_page is not None:
241
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
242
243
            self.cursor.execute(sql, values)
244
            certificate_rows = self.cursor.fetchall()
245
246
            certificates: List[Certificate] = []
247
            for certificate_row in certificate_rows:
248
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
249
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
250
                types = [certificate_row[0]]
251
                self.cursor.execute(sql, types)
252
                usage_rows = self.cursor.fetchall()
253
254
                usage_dict: Dict[int, bool] = {}
255
                for usage_row in usage_rows:
256
                    usage_dict[usage_row[0]] = True
257
258 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
259
                                                certificate_row[1],  # valid from
260
                                                certificate_row[2],  # valid to
261
                                                certificate_row[3],  # pem data
262
                                                certificate_row[14],  # type ID
263
                                                certificate_row[15],  # parent ID
264
                                                certificate_row[16],  # private key ID
265 cf247eaa Captain_Trojan
                                                usage_dict,
266 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
267
                                                certificate_row[5],  # country code
268
                                                certificate_row[6],  # locality
269
                                                certificate_row[7],  # province
270
                                                certificate_row[8],  # organization
271
                                                certificate_row[9],  # organizational unit
272
                                                certificate_row[10],  # email address
273
                                                certificate_row[11],  # revocation date
274
                                                certificate_row[12]))  # revocation reason
275 445886fb David Friesecký
276
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
277
            Logger.error(str(e))
278
            raise DatabaseException(e)
279 cf247eaa Captain_Trojan
280
        return certificates
281
282 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
283 a0602bad David Friesecký
        """
284 f8b23532 David Friesecký
        Updates a certificate.
285
        If the parameter of certificate (Certificate object) is not to be changed,
286
        the same value must be specified.
287 a0602bad David Friesecký
288
        :param certificate_id: ID of specific certificate
289 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
290 a0602bad David Friesecký
291
        :return: the result of whether the updation was successful
292
        """
293
294 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
295
296 f8b23532 David Friesecký
        try:
297
            sql = (f"UPDATE {TAB_CERTIFICATES} "
298 0e7c3096 David Friesecký
                   f"SET {COL_VALID_FROM} = ?, "
299 f8b23532 David Friesecký
                   f"{COL_VALID_TO} = ?, "
300
                   f"{COL_PEM_DATA} = ?, "
301 0e7c3096 David Friesecký
                   f"{COL_COMMON_NAME} = ?, "
302
                   f"{COL_COUNTRY_CODE} = ?, "
303
                   f"{COL_LOCALITY} = ?, "
304
                   f"{COL_PROVINCE} = ?, "
305
                   f"{COL_ORGANIZATION} = ?, "
306
                   f"{COL_ORGANIZATIONAL_UNIT} = ?, "
307
                   f"{COL_EMAIL_ADDRESS} = ?, "
308 f8b23532 David Friesecký
                   f"{COL_TYPE_ID} = ?, "
309 0e7c3096 David Friesecký
                   f"{COL_PARENT_ID} = ?, "
310
                   f"{COL_PRIVATE_KEY_ID} = ? "
311 f8b23532 David Friesecký
                   f"WHERE {COL_ID} = ?")
312 0e7c3096 David Friesecký
            values = [certificate.valid_from,
313 f8b23532 David Friesecký
                      certificate.valid_to,
314
                      certificate.pem_data,
315 0e7c3096 David Friesecký
                      certificate.common_name,
316
                      certificate.country_code,
317
                      certificate.locality,
318
                      certificate.province,
319
                      certificate.organization,
320
                      certificate.organizational_unit,
321
                      certificate.email_address,
322 f8b23532 David Friesecký
                      certificate.type_id,
323 805077f5 David Friesecký
                      certificate.parent_id,
324 0e7c3096 David Friesecký
                      certificate.private_key_id,
325 805077f5 David Friesecký
                      certificate_id]
326 6425fa36 David Friesecký
327 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
328
            self.connection.commit()
329
330
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
331
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
332
            values = [certificate_id]
333
            self.cursor.execute(sql, values)
334
            self.connection.commit()
335
336 c36d3299 David Friesecký
            check_updated = self.cursor.rowcount > 0
337
338 f3125948 Stanislav Král
            # iterate over usage pairs
339
            for usage_id, usage_value in certificate.usages.items():
340 f8b23532 David Friesecký
                if usage_value:
341
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
342
                           f"({COL_CERTIFICATE_ID},"
343
                           f"{COL_USAGE_TYPE_ID}) "
344
                           f"VALUES (?,?)")
345
                    values = [certificate_id, usage_id]
346
                    self.cursor.execute(sql, values)
347
                    self.connection.commit()
348 445886fb David Friesecký
349
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
350
            Logger.error(str(e))
351
            raise DatabaseException(e)
352 f8b23532 David Friesecký
353 c36d3299 David Friesecký
        return check_updated
354 e9e55282 David Friesecký
355 58051326 David Friesecký
    def delete(self, certificate_id: int):
356 a0602bad David Friesecký
        """
357
        Deletes a certificate
358
359
        :param certificate_id: ID of specific certificate
360
361
        :return: the result of whether the deletion was successful
362
        """
363
364 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
365
366 f8b23532 David Friesecký
        try:
367 6425fa36 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
368
                   f"SET {COL_DELETION_DATE} = ? "
369
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
370
            values = [int(time.time()),
371
                      certificate_id]
372 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
373
            self.connection.commit()
374 445886fb David Friesecký
375
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
376
            Logger.error(str(e))
377
            raise DatabaseException(e)
378 f8b23532 David Friesecký
379 45744020 Stanislav Král
        return self.cursor.rowcount > 0
380 58051326 David Friesecký
381
    def set_certificate_revoked(
382 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
383 58051326 David Friesecký
        """
384
        Revoke a certificate
385
386
        :param certificate_id: ID of specific certificate
387
        :param revocation_date: Date, when the certificate is revoked
388
        :param revocation_reason: Reason of the revocation
389
390 8b049f43 David Friesecký
        :return:
391
            the result of whether the revocation was successful OR
392
            sqlite3.Error if an exception is thrown
393 58051326 David Friesecký
        """
394
395 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
396
397 58051326 David Friesecký
        try:
398 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
399
                revocation_reason = REV_REASON_UNSPECIFIED
400
            elif revocation_date == "":
401
                return False
402
403 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
404
                   f"SET {COL_REVOCATION_DATE} = ?, "
405 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
406 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
407
            values = [revocation_date,
408
                      revocation_reason,
409
                      certificate_id]
410
            self.cursor.execute(sql, values)
411
            self.connection.commit()
412 445886fb David Friesecký
413
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
414
            Logger.error(str(e))
415
            raise DatabaseException(e)
416 8b049f43 David Friesecký
417 d65b022d David Friesecký
        return self.cursor.rowcount > 0
418 58051326 David Friesecký
419
    def clear_certificate_revocation(self, certificate_id: int):
420
        """
421
        Clear revocation of a certificate
422
423
        :param certificate_id: ID of specific certificate
424
425 8b049f43 David Friesecký
        :return:
426
            the result of whether the clear revocation was successful OR
427
            sqlite3.Error if an exception is thrown
428 58051326 David Friesecký
        """
429
430 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
431
432 58051326 David Friesecký
        try:
433
            sql = (f"UPDATE {TAB_CERTIFICATES} "
434 0e7c3096 David Friesecký
                   f"SET {COL_REVOCATION_DATE} = NULL, "
435
                   f"{COL_REVOCATION_REASON} = NULL "
436 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
437
            values = [certificate_id]
438
            self.cursor.execute(sql, values)
439
            self.connection.commit()
440 445886fb David Friesecký
441
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
442
            Logger.error(str(e))
443
            raise DatabaseException(e)
444 f8b23532 David Friesecký
445 45744020 Stanislav Král
        return self.cursor.rowcount > 0
446 58051326 David Friesecký
447
    def get_all_revoked_by(self, certificate_id: int):
448
        """
449
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
450
451
        :param certificate_id: ID of specific certificate
452
453
        :return:
454 8b049f43 David Friesecký
            list of the certificates OR
455
            None if the list is empty OR
456
            sqlite3.Error if an exception is thrown
457 58051326 David Friesecký
        """
458
459 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
460
461 58051326 David Friesecký
        try:
462
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
463
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
464
            values = [certificate_id]
465
            self.cursor.execute(sql, values)
466
            certificate_rows = self.cursor.fetchall()
467
468
            certificates: List[Certificate] = []
469
            for certificate_row in certificate_rows:
470
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
471
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
472
                values = [certificate_row[0]]
473
                self.cursor.execute(sql, values)
474
                usage_rows = self.cursor.fetchall()
475
476
                usage_dict: Dict[int, bool] = {}
477
                for usage_row in usage_rows:
478
                    usage_dict[usage_row[2]] = True
479
480 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
481
                                                certificate_row[1],      # valid from
482
                                                certificate_row[2],      # valid to
483
                                                certificate_row[3],      # pem data
484
                                                certificate_row[14],     # type ID
485
                                                certificate_row[15],     # parent ID
486
                                                certificate_row[16],     # private key ID
487 58051326 David Friesecký
                                                usage_dict,
488 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
489
                                                certificate_row[5],      # country code
490
                                                certificate_row[6],      # locality
491
                                                certificate_row[7],      # province
492
                                                certificate_row[8],      # organization
493
                                                certificate_row[9],      # organizational unit
494
                                                certificate_row[10],     # email address
495
                                                certificate_row[11],     # revocation date
496
                                                certificate_row[12]))    # revocation reason
497 445886fb David Friesecký
498
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
499
            Logger.error(str(e))
500
            raise DatabaseException(e)
501 58051326 David Friesecký
502 d65b022d David Friesecký
        return certificates
503 58051326 David Friesecký
504
    def get_all_issued_by(self, certificate_id: int):
505
        """
506
        Get list of the certificates that are direct descendants of the certificate with the ID
507
508
        :param certificate_id: ID of specific certificate
509
510
        :return:
511 8b049f43 David Friesecký
            list of the certificates OR
512
            None if the list is empty OR
513
            sqlite3.Error if an exception is thrown
514 58051326 David Friesecký
        """
515
516 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
517
518 58051326 David Friesecký
        try:
519
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
520 6425fa36 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
521
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
522 8b049f43 David Friesecký
            values = [certificate_id, certificate_id]
523 58051326 David Friesecký
            self.cursor.execute(sql, values)
524
            certificate_rows = self.cursor.fetchall()
525
526
            certificates: List[Certificate] = []
527
            for certificate_row in certificate_rows:
528
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
529
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
530
                values = [certificate_row[0]]
531
                self.cursor.execute(sql, values)
532
                usage_rows = self.cursor.fetchall()
533
534
                usage_dict: Dict[int, bool] = {}
535
                for usage_row in usage_rows:
536
                    usage_dict[usage_row[2]] = True
537
538 0e7c3096 David Friesecký
                certificates.append(Certificate(certificate_row[0],      # ID
539
                                                certificate_row[1],      # valid from
540
                                                certificate_row[2],      # valid to
541
                                                certificate_row[3],      # pem data
542
                                                certificate_row[14],     # type ID
543
                                                certificate_row[15],     # parent ID
544
                                                certificate_row[16],     # private key ID
545 58051326 David Friesecký
                                                usage_dict,
546 0e7c3096 David Friesecký
                                                certificate_row[4],      # common name
547
                                                certificate_row[5],      # country code
548
                                                certificate_row[6],      # locality
549
                                                certificate_row[7],      # province
550
                                                certificate_row[8],      # organization
551
                                                certificate_row[9],      # organizational unit
552
                                                certificate_row[10],     # email address
553
                                                certificate_row[11],     # revocation date
554
                                                certificate_row[12]))    # revocation reason
555 445886fb David Friesecký
556
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
557
            Logger.error(str(e))
558
            raise DatabaseException(e)
559 58051326 David Friesecký
560 d65b022d David Friesecký
        return certificates
561 94f6d8b8 Jan Pašek
562 cf247eaa Captain_Trojan
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
563
        """
564
        Get list of the certificates that are direct descendants of the certificate with the ID
565
566
        :param target_types: certificate types (filter)
567
        :param target_usages: certificate usages (filter)
568
        :param target_cn_substring: certificate CN substring (filter)
569
        :param page: target page
570
        :param per_page: target page size
571
        :param issuer_id: ID of specific certificate
572
573
        :return:
574
            list of the certificates OR
575
            None if the list is empty
576
        """
577
578
        Logger.debug("Function launched.")
579
580
        try:
581
            values = [issuer_id, issuer_id]
582
            values += list(target_types)
583
584
            sql = (
585 bb4a9d1f Captain_Trojan
                f"SELECT * "
586 cf247eaa Captain_Trojan
                f"FROM {TAB_CERTIFICATES} a "
587
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
588
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
589
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
590 e77c14f9 Michal Sejak
                
591 cf247eaa Captain_Trojan
            )
592 e77c14f9 Michal Sejak
            
593
            if target_usages is not None:
594
                sql += f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " \
595
                    f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))})) > 0 "
596
                values += list(target_usages)
597 cf247eaa Captain_Trojan
598
            if target_cn_substring is not None:
599
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
600
                values += [target_cn_substring]
601
602
            if page is not None and per_page is not None:
603
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
604
605
            self.cursor.execute(sql, values)
606
            certificate_rows = self.cursor.fetchall()
607
608
            certificates: List[Certificate] = []
609
            for certificate_row in certificate_rows:
610
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
611
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
612
                values = [certificate_row[0]]
613
                self.cursor.execute(sql, values)
614
                usage_rows = self.cursor.fetchall()
615
616
                usage_dict: Dict[int, bool] = {}
617
                for usage_row in usage_rows:
618
                    usage_dict[usage_row[2]] = True
619
620 bb4a9d1f Captain_Trojan
                certificates.append(Certificate(certificate_row[0],  # ID
621
                                                certificate_row[1],  # valid from
622
                                                certificate_row[2],  # valid to
623
                                                certificate_row[3],  # pem data
624
                                                certificate_row[14],  # type ID
625
                                                certificate_row[15],  # parent ID
626
                                                certificate_row[16],  # private key ID
627 cf247eaa Captain_Trojan
                                                usage_dict,
628 bb4a9d1f Captain_Trojan
                                                certificate_row[4],  # common name
629
                                                certificate_row[5],  # country code
630
                                                certificate_row[6],  # locality
631
                                                certificate_row[7],  # province
632
                                                certificate_row[8],  # organization
633
                                                certificate_row[9],  # organizational unit
634
                                                certificate_row[10],  # email address
635
                                                certificate_row[11],  # revocation date
636
                                                certificate_row[12]))  # revocation reason
637 445886fb David Friesecký
638
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
639
            Logger.error(str(e))
640
            raise DatabaseException(e)
641 58051326 David Friesecký
642 d65b022d David Friesecký
        return certificates
643 94f6d8b8 Jan Pašek
644 85003184 Captain_Trojan
    def get_all_descendants_of(self, certificate_id: int):
645
        """
646
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
647
        between C and its root certificate authority (i.e. is an ancestor of C).
648
        :param certificate_id: target certificate ID
649
        :return: list of all descendants
650
        """
651
652 b3c80ccb David Friesecký
        Logger.debug("Function launched.")
653
654
        try:
655
            def dfs(children_of, this, collection: list):
656
                for child in children_of(this.certificate_id):
657
                    dfs(children_of, child, collection)
658
                collection.append(this)
659
660
            subtree_root = self.read(certificate_id)
661
            if subtree_root is None:
662
                return None
663
664
            all_certs = []
665
            dfs(self.get_all_issued_by, subtree_root, all_certs)
666 445886fb David Friesecký
667
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
668
            Logger.error(str(e))
669
            raise DatabaseException(e)
670 85003184 Captain_Trojan
671
        return all_certs
672
673 94f6d8b8 Jan Pašek
    def get_next_id(self) -> int:
674
        """
675
        Get identifier of the next certificate that will be inserted into the database
676
        :return: identifier of the next certificate that will be added into the database
677
        """
678 b3c80ccb David Friesecký
679
        Logger.debug("Function launched.")
680
681
        try:
682
            # get next IDs of all tables
683
            self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
684
            results = self.cursor.fetchall()
685
686
            # search for next ID in Certificates table and return it
687
            for result in results:
688
                if result[0] == TAB_CERTIFICATES:
689
                    return result[1] + 1  # current last id + 1
690
            # if certificates table is not present in the query results, return 1
691 445886fb David Friesecký
692
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
693
            Logger.error(str(e))
694
            raise DatabaseException(e)
695 b3c80ccb David Friesecký
696 94f6d8b8 Jan Pašek
        return 1