Revize cf247eaa
Přidáno uživatelem Michal Seják před téměř 4 roky(ů)
src/dao/certificate_repository.py | ||
---|---|---|
1 | 1 |
import time |
2 |
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError |
|
3 |
from typing import Dict, List |
|
2 |
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \ |
|
3 |
NotSupportedError |
|
4 |
from typing import Dict, List, Set |
|
4 | 5 |
|
5 | 6 |
from src.exceptions.database_exception import DatabaseException |
6 | 7 |
from injector import inject |
... | ... | |
227 | 228 |
|
228 | 229 |
return certificates |
229 | 230 |
|
231 |
def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int, |
|
232 |
per_page: int): |
|
233 |
""" |
|
234 |
Reads (selects) all certificates according to a specific filtering and pagination options. |
|
235 |
:param target_types: certificate types (filter) |
|
236 |
:param target_usages: certificate usages (filter) |
|
237 |
:param target_cn_substring: certificate CN substring (filter) |
|
238 |
:param page: target page |
|
239 |
:param per_page: target page size |
|
240 |
:return: list of certificates |
|
241 |
""" |
|
242 |
|
|
243 |
Logger.debug("Function launched.") |
|
244 |
|
|
245 |
try: |
|
246 |
values = [] |
|
247 |
values += list(target_types) |
|
248 |
values += list(target_usages) |
|
249 |
|
|
250 |
sql = ( |
|
251 |
f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, " |
|
252 |
f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} " |
|
253 |
f"FROM {TAB_CERTIFICATES} a " |
|
254 |
f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') " |
|
255 |
f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) " |
|
256 |
f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} " |
|
257 |
f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " |
|
258 |
f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) " |
|
259 |
f") > 0 " |
|
260 |
) |
|
261 |
|
|
262 |
if target_cn_substring is not None: |
|
263 |
sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'" |
|
264 |
|
|
265 |
values += [target_cn_substring] |
|
266 |
|
|
267 |
if page is not None and per_page is not None: |
|
268 |
sql += f" LIMIT {per_page} OFFSET {page * per_page}" |
|
269 |
|
|
270 |
self.cursor.execute(sql, values) |
|
271 |
certificate_rows = self.cursor.fetchall() |
|
272 |
|
|
273 |
certificates: List[Certificate] = [] |
|
274 |
for certificate_row in certificate_rows: |
|
275 |
sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} " |
|
276 |
f"WHERE {COL_CERTIFICATE_ID} = ?") |
|
277 |
types = [certificate_row[0]] |
|
278 |
self.cursor.execute(sql, types) |
|
279 |
usage_rows = self.cursor.fetchall() |
|
280 |
|
|
281 |
usage_dict: Dict[int, bool] = {} |
|
282 |
for usage_row in usage_rows: |
|
283 |
usage_dict[usage_row[0]] = True |
|
284 |
|
|
285 |
certificates.append(Certificate(certificate_row[0], |
|
286 |
certificate_row[1], |
|
287 |
certificate_row[2], |
|
288 |
certificate_row[3], |
|
289 |
certificate_row[4], |
|
290 |
certificate_row[7], |
|
291 |
certificate_row[8], |
|
292 |
certificate_row[9], |
|
293 |
usage_dict, |
|
294 |
certificate_row[5], |
|
295 |
certificate_row[6])) |
|
296 |
except IntegrityError: |
|
297 |
Logger.error(INTEGRITY_ERROR_MSG) |
|
298 |
raise DatabaseException(INTEGRITY_ERROR_MSG) |
|
299 |
except ProgrammingError: |
|
300 |
Logger.error(PROGRAMMING_ERROR_MSG) |
|
301 |
raise DatabaseException(PROGRAMMING_ERROR_MSG) |
|
302 |
except OperationalError: |
|
303 |
Logger.error(OPERATIONAL_ERROR_MSG) |
|
304 |
raise DatabaseException(OPERATIONAL_ERROR_MSG) |
|
305 |
except NotSupportedError: |
|
306 |
Logger.error(NOT_SUPPORTED_ERROR_MSG) |
|
307 |
raise DatabaseException(NOT_SUPPORTED_ERROR_MSG) |
|
308 |
except DatabaseError: |
|
309 |
Logger.error(DATABASE_ERROR_MSG) |
|
310 |
raise DatabaseException(DATABASE_ERROR_MSG) |
|
311 |
except Error: |
|
312 |
Logger.error(ERROR_MSG) |
|
313 |
raise DatabaseException(ERROR_MSG) |
|
314 |
|
|
315 |
return certificates |
|
316 |
|
|
230 | 317 |
def update(self, certificate_id: int, certificate: Certificate): |
231 | 318 |
""" |
232 | 319 |
Updates a certificate. |
... | ... | |
567 | 654 |
|
568 | 655 |
return certificates |
569 | 656 |
|
657 |
def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page): |
|
658 |
""" |
|
659 |
Get list of the certificates that are direct descendants of the certificate with the ID |
|
660 |
|
|
661 |
:param target_types: certificate types (filter) |
|
662 |
:param target_usages: certificate usages (filter) |
|
663 |
:param target_cn_substring: certificate CN substring (filter) |
|
664 |
:param page: target page |
|
665 |
:param per_page: target page size |
|
666 |
:param issuer_id: ID of specific certificate |
|
667 |
|
|
668 |
:return: |
|
669 |
list of the certificates OR |
|
670 |
None if the list is empty |
|
671 |
""" |
|
672 |
|
|
673 |
Logger.debug("Function launched.") |
|
674 |
|
|
675 |
try: |
|
676 |
values = [issuer_id, issuer_id] |
|
677 |
values += list(target_types) |
|
678 |
values += list(target_usages) |
|
679 |
|
|
680 |
sql = ( |
|
681 |
f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, " |
|
682 |
f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} " |
|
683 |
f"FROM {TAB_CERTIFICATES} a " |
|
684 |
f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') " |
|
685 |
f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? " |
|
686 |
f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) " |
|
687 |
f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} " |
|
688 |
f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} " |
|
689 |
f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) " |
|
690 |
f") > 0 " |
|
691 |
) |
|
692 |
|
|
693 |
if target_cn_substring is not None: |
|
694 |
sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'" |
|
695 |
|
|
696 |
values += [target_cn_substring] |
|
697 |
|
|
698 |
if page is not None and per_page is not None: |
|
699 |
sql += f" LIMIT {per_page} OFFSET {page * per_page}" |
|
700 |
|
|
701 |
self.cursor.execute(sql, values) |
|
702 |
certificate_rows = self.cursor.fetchall() |
|
703 |
|
|
704 |
certificates: List[Certificate] = [] |
|
705 |
for certificate_row in certificate_rows: |
|
706 |
sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} " |
|
707 |
f"WHERE {COL_CERTIFICATE_ID} = ?") |
|
708 |
values = [certificate_row[0]] |
|
709 |
self.cursor.execute(sql, values) |
|
710 |
usage_rows = self.cursor.fetchall() |
|
711 |
|
|
712 |
usage_dict: Dict[int, bool] = {} |
|
713 |
for usage_row in usage_rows: |
|
714 |
usage_dict[usage_row[2]] = True |
|
715 |
|
|
716 |
certificates.append(Certificate(certificate_row[0], |
|
717 |
certificate_row[1], |
|
718 |
certificate_row[2], |
|
719 |
certificate_row[3], |
|
720 |
certificate_row[4], |
|
721 |
certificate_row[7], |
|
722 |
certificate_row[8], |
|
723 |
certificate_row[9], |
|
724 |
usage_dict, |
|
725 |
certificate_row[5], |
|
726 |
certificate_row[6])) |
|
727 |
except IntegrityError: |
|
728 |
Logger.error(INTEGRITY_ERROR_MSG) |
|
729 |
raise DatabaseException(INTEGRITY_ERROR_MSG) |
|
730 |
except ProgrammingError: |
|
731 |
Logger.error(PROGRAMMING_ERROR_MSG) |
|
732 |
raise DatabaseException(PROGRAMMING_ERROR_MSG) |
|
733 |
except OperationalError: |
|
734 |
Logger.error(OPERATIONAL_ERROR_MSG) |
|
735 |
raise DatabaseException(OPERATIONAL_ERROR_MSG) |
|
736 |
except NotSupportedError: |
|
737 |
Logger.error(NOT_SUPPORTED_ERROR_MSG) |
|
738 |
raise DatabaseException(NOT_SUPPORTED_ERROR_MSG) |
|
739 |
except DatabaseError: |
|
740 |
Logger.error(DATABASE_ERROR_MSG) |
|
741 |
raise DatabaseException(DATABASE_ERROR_MSG) |
|
742 |
except Error: |
|
743 |
Logger.error(ERROR_MSG) |
|
744 |
raise DatabaseException(ERROR_MSG) |
|
745 |
|
|
746 |
return certificates |
|
747 |
|
|
570 | 748 |
def get_all_descendants_of(self, certificate_id: int): |
571 | 749 |
""" |
572 | 750 |
Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain |
Také k dispozici: Unified diff
Re #8702 - Added filtering methods to CertRepository.