Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 1636aefe

Přidáno uživatelem David Friesecký před asi 4 roky(ů)

Re #8471 - Edited files for DB communication

Zobrazit rozdíly:

src/impl/certificate_repository_impl.py
1
from typing import List, Dict
2

  
1 3
from ..db_objects.certificate import Certificate
2 4
from ..dao.repository import IRepository
3 5
from db_manager import DBManager
......
7 9
class CertificateRepositoryImpl(IRepository):
8 10

  
9 11
    def create(self, common_name: str, valid_from: str, valid_to: str, pem_data: str,
10
               type_id: int, private_key_id: int, usage_id: int) -> bool:
11
        sql = (f"INSERT INTO {TAB_CERTIFICATE} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA},"
12
               f"{COL_TYPE_ID},{COL_PRIVATE_KEY_ID},{COL_USAGE_ID})"
12
               private_key_id: int, type_id: int, usages: Dict[int, bool]) -> bool:
13
        sql = (f"INSERT INTO {TAB_CERTIFICATES} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA},"
14
               f"{COL_PRIVATE_KEY_ID},{COL_TYPE_ID})"
13 15
               f"VALUES(?,?,?,?,?,?,?)")
14
        result = DBManager.create(sql, common_name, valid_from, valid_to, pem_data, type_id, private_key_id, usage_id)
16
        result = DBManager.create(sql, [common_name, valid_from, valid_to, pem_data, private_key_id, type_id], usages)
15 17
        return result
16 18

  
17 19
    def read(self, certificate_id: int = None):
......
20 22
        if certificate_id is not None:
21 23
            sql_extend = f" WHERE {COL_ID} = ?"
22 24

  
23
        sql = f"SELECT * FROM {TAB_CERTIFICATE}{sql_extend}"
24
        rows = DBManager.read(sql, certificate_id)
25
        sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extend}"
26
        certificate_rows = DBManager.read(sql, certificate_id)
25 27

  
26 28
        certificates: list = []
27 29

  
28
        for row in rows:
29
            certificates.append(Certificate(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
30
        for certificate_row in certificate_rows:
31
            sql = f"SELECT * FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = ?"
32
            usage_rows = DBManager.read(sql, certificate_row[0])
33

  
34
            usage_dict: Dict[int, bool] = {}
35
            for usage_row in usage_rows:
36
                usage_dict[usage_row[1]] = True
37

  
38
            certificates.append(Certificate(certificate_row[0],
39
                                            certificate_row[1],
40
                                            certificate_row[2],
41
                                            certificate_row[3],
42
                                            certificate_row[4],
43
                                            certificate_row[5],
44
                                            certificate_row[6],
45
                                            usage_dict))
30 46

  
31 47
        if certificate_id is not None:
32 48
            return certificates[0]
......
34 50
        return certificates
35 51

  
36 52
    def update(self, certificate_id: int, common_name: str = None, valid_from: str = None, valid_to: str = None,
37
               pem_data: str = None, type_id: int = None, private_key_id: int = None, usage_id: int = None) -> bool:
53
               pem_data: str = None, private_key_id: int = None, type_id: int = None,
54
               usages: Dict[int, bool] = None) -> bool:
38 55
        updated_list = []
39 56
        values = []
40 57
        if common_name is not None:
......
49 66
        if pem_data is not None:
50 67
            updated_list.append(f"{COL_PEM_DATA} = ?")
51 68
            values.append(pem_data)
52
        if type_id is not None:
53
            updated_list.append(f"{COL_TYPE_ID} = ?")
54
            values.append(type_id)
55 69
        if private_key_id is not None:
56 70
            updated_list.append(f"{COL_PRIVATE_KEY_ID} = ?")
57 71
            values.append(private_key_id)
58
        if usage_id is not None:
59
            updated_list.append(f"{COL_USAGE_ID} = ?")
60
            values.append(usage_id)
61

  
62
        values.append(certificate_id)
72
        if type_id is not None:
73
            updated_list.append(f"{COL_TYPE_ID} = ?")
74
            values.append(type_id)
63 75

  
64 76
        updated_str = ", ".join(updated_list)
65
        sql = f"UPDATE {TAB_CERTIFICATE} SET {updated_str} WHERE {COL_ID} = ?"
66
        result = DBManager.update(sql, values)
77
        sql = f"UPDATE {TAB_CERTIFICATES} SET {updated_str} WHERE {COL_ID} = ?"
78
        result = DBManager.update(sql, certificate_id, values, usages)
67 79
        return result
68 80

  
69 81
    def delete(self, certificate_id: int) -> bool:
70
        sql = f"DELETE FROM {TAB_CERTIFICATE} WHERE {COL_ID} = ?"
71
        result = DBManager.delete(sql, certificate_id)
82
        sql = f"DELETE FROM {TAB_CERTIFICATES} WHERE {COL_ID} = ?"
83
        result = DBManager.delete(sql, certificate_id, True)
72 84
        return result
src/impl/db_manager.py
1 1
import sqlite3
2 2
from sqlite3 import Error
3
from typing import List, Dict
3 4

  
4
from ..constants import DATABASE_FILE
5
from ..constants import *
5 6
from ..dao.repository import IRepository
6 7

  
7 8

  
8 9
class DBManager(IRepository):
9 10

  
10 11
    @staticmethod
11
    def create(sql: str, *args) -> bool:
12
    def create(sql: str, values: List, usages: Dict[int, bool] = None) -> bool:
12 13
        conn = None
13 14
        try:
14
            sql: str = args[0]
15
            values = args[1:]
16

  
17 15
            conn = sqlite3.connect(DATABASE_FILE)
18 16
            c = conn.cursor()
19 17
            c.execute(sql, values)
18

  
19
            if usages is not None:
20
                for usage_id, usage_value in usages:
21
                    if usage_value:
22
                        c.execute(f"INSERT INTO {TAB_CERTIFICATE_USAGES} ({COL_CERTIFICATE_ID},{COL_USAGE_TYPE_ID}) "
23
                                  f"VALUES (?,?)",
24
                                  [c.lastrowid, usage_id])
25

  
20 26
            conn.commit()
21 27
            c.close()
22 28
        except Error as e:
......
35 41
            c = conn.cursor()
36 42

  
37 43
            if item_id is not None:
38
                list_item_id = [item_id]
39
                c.execute(sql, list_item_id)
44
                c.execute(sql, [item_id])
40 45
            else:
41 46
                c.execute(sql)
42 47

  
......
51 56
        return records
52 57

  
53 58
    @staticmethod
54
    def update(sql: str, *args) -> bool:
59
    def update(sql: str, item_id: int, values: List, usages: Dict[int, bool] = None) -> bool:
55 60
        conn = None
56 61
        try:
62
            values.append(item_id)
63

  
57 64
            conn = sqlite3.connect(DATABASE_FILE)
58 65
            c = conn.cursor()
59
            c.execute(sql, args)
66
            c.execute(sql, values)
67

  
68
            if usages is not None:
69
                for usage_id, usage_value in usages:
70
                    c.execute(f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
71
                              f"WHERE {COL_CERTIFICATE_ID} = ? AND {COL_USAGE_TYPE_ID} = ?",
72
                              [item_id, usage_id])
73

  
74
                    if usage_value:
75
                        c.execute(f"INSERT INTO {TAB_CERTIFICATE_USAGES} ({COL_CERTIFICATE_ID},{COL_USAGE_TYPE_ID}) "
76
                                  f"VALUES (?,?)",
77
                                  [item_id, usage_id])
78

  
60 79
            conn.commit()
61 80
            c.close()
62 81
        except Error as e:
......
68 87
        return True
69 88

  
70 89
    @staticmethod
71
    def delete(sql: str, item_id: int) -> bool:
90
    def delete(sql: str, item_id: int, delete_usages: bool) -> bool:
72 91
        conn = None
73 92
        try:
74 93
            conn = sqlite3.connect(DATABASE_FILE)
75 94
            c = conn.cursor()
76
            c.execute(sql, (item_id,))
95

  
96
            if delete_usages:
97
                c.execute(f"DELETE FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = ?", [item_id])
98

  
99
            c.execute(sql, [item_id])
77 100
            conn.commit()
78 101
            c.close()
79 102
        except Error as e:
src/impl/private_key_repository_impl.py
7 7
class PrivateKeyRepositoryImpl(IRepository):
8 8

  
9 9
    def create(self, private_key: str, password: str) -> bool:
10
        sql = f"INSERT INTO {TAB_PRIVATE_KEY} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES(?,?)"
11
        result = DBManager.create(sql, private_key, password)
10
        sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES(?,?)"
11
        result = DBManager.create(sql, [private_key, password])
12 12
        return result
13 13

  
14 14
    def read(self, private_key_id: int = None):
......
17 17
        if private_key_id is not None:
18 18
            sql_extend = f" WHERE {COL_ID} = ?"
19 19

  
20
        sql = f"SELECT * FROM {TAB_PRIVATE_KEY}{sql_extend}"
20
        sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}{sql_extend}"
21 21
        rows = DBManager.read(sql, private_key_id)
22 22

  
23 23
        private_keys: list = []
......
40 40
            updated_list.append(f"{COL_PASSWORD} = ?")
41 41
            values.append(password)
42 42

  
43
        values.append(private_key_id)
44

  
45 43
        updated_str = ", ".join(updated_list)
46
        sql = f"UPDATE {TAB_PRIVATE_KEY} SET {updated_str} WHERE {COL_ID} = ?"
47
        result = DBManager.update(sql, values)
44
        sql = f"UPDATE {TAB_PRIVATE_KEYS} SET {updated_str} WHERE {COL_ID} = ?"
45
        result = DBManager.update(sql, private_key_id, values)
48 46
        return result
49 47

  
50 48
    def delete(self, private_key_id: int) -> bool:
51
        sql = f"DELETE FROM {TAB_PRIVATE_KEY} WHERE {COL_ID} = ?"
52
        result = DBManager.delete(sql, private_key_id)
49
        sql = f"DELETE FROM {TAB_PRIVATE_KEYS} WHERE {COL_ID} = ?"
50
        result = DBManager.delete(sql, private_key_id, False)
53 51
        return result

Také k dispozici: Unified diff