Projekt

Obecné

Profil

« Předchozí | Další » 

Revize cf247eaa

Přidáno uživatelem Michal Seják před téměř 4 roky(ů)

Re #8702 - Added filtering methods to CertRepository.

Zobrazit rozdíly:

src/dao/certificate_repository.py
1 1
import time
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
3
from typing import Dict, List
2
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, \
3
    NotSupportedError
4
from typing import Dict, List, Set
4 5

  
5 6
from src.exceptions.database_exception import DatabaseException
6 7
from injector import inject
......
227 228

  
228 229
        return certificates
229 230

  
231
    def read_all_filter(self, target_types: Set[int], target_usages: Set[int], target_cn_substring: str, page: int,
232
                        per_page: int):
233
        """
234
        Reads (selects) all certificates according to a specific filtering and pagination options.
235
        :param target_types: certificate types (filter)
236
        :param target_usages: certificate usages (filter)
237
        :param target_cn_substring: certificate CN substring (filter)
238
        :param page: target page
239
        :param per_page: target page size
240
        :return: list of certificates
241
        """
242

  
243
        Logger.debug("Function launched.")
244

  
245
        try:
246
            values = []
247
            values += list(target_types)
248
            values += list(target_usages)
249

  
250
            sql = (
251
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
252
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
253
                f"FROM {TAB_CERTIFICATES} a "
254
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
255
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
256
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
257
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
258
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
259
                f") > 0 "
260
            )
261

  
262
            if target_cn_substring is not None:
263
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
264

  
265
                values += [target_cn_substring]
266

  
267
            if page is not None and per_page is not None:
268
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
269

  
270
            self.cursor.execute(sql, values)
271
            certificate_rows = self.cursor.fetchall()
272

  
273
            certificates: List[Certificate] = []
274
            for certificate_row in certificate_rows:
275
                sql = (f"SELECT {COL_USAGE_TYPE_ID} FROM {TAB_CERTIFICATE_USAGES} "
276
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
277
                types = [certificate_row[0]]
278
                self.cursor.execute(sql, types)
279
                usage_rows = self.cursor.fetchall()
280

  
281
                usage_dict: Dict[int, bool] = {}
282
                for usage_row in usage_rows:
283
                    usage_dict[usage_row[0]] = True
284

  
285
                certificates.append(Certificate(certificate_row[0],
286
                                                certificate_row[1],
287
                                                certificate_row[2],
288
                                                certificate_row[3],
289
                                                certificate_row[4],
290
                                                certificate_row[7],
291
                                                certificate_row[8],
292
                                                certificate_row[9],
293
                                                usage_dict,
294
                                                certificate_row[5],
295
                                                certificate_row[6]))
296
        except IntegrityError:
297
            Logger.error(INTEGRITY_ERROR_MSG)
298
            raise DatabaseException(INTEGRITY_ERROR_MSG)
299
        except ProgrammingError:
300
            Logger.error(PROGRAMMING_ERROR_MSG)
301
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
302
        except OperationalError:
303
            Logger.error(OPERATIONAL_ERROR_MSG)
304
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
305
        except NotSupportedError:
306
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
307
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
308
        except DatabaseError:
309
            Logger.error(DATABASE_ERROR_MSG)
310
            raise DatabaseException(DATABASE_ERROR_MSG)
311
        except Error:
312
            Logger.error(ERROR_MSG)
313
            raise DatabaseException(ERROR_MSG)
314

  
315
        return certificates
316

  
230 317
    def update(self, certificate_id: int, certificate: Certificate):
231 318
        """
232 319
        Updates a certificate.
......
567 654

  
568 655
        return certificates
569 656

  
657
    def get_all_issued_by_filter(self, issuer_id, target_types, target_usages, target_cn_substring, page, per_page):
658
        """
659
        Get list of the certificates that are direct descendants of the certificate with the ID
660

  
661
        :param target_types: certificate types (filter)
662
        :param target_usages: certificate usages (filter)
663
        :param target_cn_substring: certificate CN substring (filter)
664
        :param page: target page
665
        :param per_page: target page size
666
        :param issuer_id: ID of specific certificate
667

  
668
        :return:
669
            list of the certificates OR
670
            None if the list is empty
671
        """
672

  
673
        Logger.debug("Function launched.")
674

  
675
        try:
676
            values = [issuer_id, issuer_id]
677
            values += list(target_types)
678
            values += list(target_usages)
679

  
680
            sql = (
681
                f"SELECT a.{COL_ID}, {COL_COMMON_NAME}, {COL_VALID_FROM}, {COL_VALID_TO}, {COL_PEM_DATA}, "
682
                f"{COL_REVOCATION_DATE}, {COL_REVOCATION_REASON}, {COL_PRIVATE_KEY_ID}, {COL_TYPE_ID}, {COL_PARENT_ID} "
683
                f"FROM {TAB_CERTIFICATES} a "
684
                f"WHERE (a.{COL_DELETION_DATE} IS NULL OR a.{COL_DELETION_DATE} = '') "
685
                f"AND a.{COL_PARENT_ID} = ? AND a.{COL_ID} != ? "
686
                f"AND a.{COL_TYPE_ID} IN ({','.join('?' * len(target_types))}) "
687
                f"AND (SELECT COUNT(*) FROM {TAB_CERTIFICATE_USAGES} "
688
                f"WHERE {COL_CERTIFICATE_ID} = a.{COL_ID} "
689
                f"AND {COL_USAGE_TYPE_ID} IN ({','.join('?' * len(target_usages))}) "
690
                f") > 0 "
691
            )
692

  
693
            if target_cn_substring is not None:
694
                sql += f" AND a.{COL_COMMON_NAME} LIKE '%' || ? || '%'"
695

  
696
                values += [target_cn_substring]
697

  
698
            if page is not None and per_page is not None:
699
                sql += f" LIMIT {per_page} OFFSET {page * per_page}"
700

  
701
            self.cursor.execute(sql, values)
702
            certificate_rows = self.cursor.fetchall()
703

  
704
            certificates: List[Certificate] = []
705
            for certificate_row in certificate_rows:
706
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
707
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
708
                values = [certificate_row[0]]
709
                self.cursor.execute(sql, values)
710
                usage_rows = self.cursor.fetchall()
711

  
712
                usage_dict: Dict[int, bool] = {}
713
                for usage_row in usage_rows:
714
                    usage_dict[usage_row[2]] = True
715

  
716
                certificates.append(Certificate(certificate_row[0],
717
                                                certificate_row[1],
718
                                                certificate_row[2],
719
                                                certificate_row[3],
720
                                                certificate_row[4],
721
                                                certificate_row[7],
722
                                                certificate_row[8],
723
                                                certificate_row[9],
724
                                                usage_dict,
725
                                                certificate_row[5],
726
                                                certificate_row[6]))
727
        except IntegrityError:
728
            Logger.error(INTEGRITY_ERROR_MSG)
729
            raise DatabaseException(INTEGRITY_ERROR_MSG)
730
        except ProgrammingError:
731
            Logger.error(PROGRAMMING_ERROR_MSG)
732
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
733
        except OperationalError:
734
            Logger.error(OPERATIONAL_ERROR_MSG)
735
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
736
        except NotSupportedError:
737
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
738
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
739
        except DatabaseError:
740
            Logger.error(DATABASE_ERROR_MSG)
741
            raise DatabaseException(DATABASE_ERROR_MSG)
742
        except Error:
743
            Logger.error(ERROR_MSG)
744
            raise DatabaseException(ERROR_MSG)
745

  
746
        return certificates
747

  
570 748
    def get_all_descendants_of(self, certificate_id: int):
571 749
        """
572 750
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain

Také k dispozici: Unified diff