Projekt

Obecné

Profil

Stáhnout (6.42 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2
from typing import List
3

    
4
from injector import inject
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from src.model.private_key import PrivateKey
8
from src.constants import *
9
from src.utils.logger import Logger
10

    
11

    
12
class PrivateKeyRepository:
13

    
14
    @inject
15
    def __init__(self, connection: Connection):
16
        """
17
        Constructor of the PrivateKeyRepository object
18

    
19
        :param connection: Instance of the Connection object
20
        :param cursor: Instance of the Cursor object
21
        """
22

    
23
        self.connection = connection
24
        self.cursor = connection.cursor()
25

    
26
    def create(self, private_key: PrivateKey):
27
        """
28
        Creates a private key.
29

    
30
        :param private_key: Instance of the PrivateKey object
31

    
32
        :return: the result of whether the creation was successful
33
        """
34

    
35
        try:
36
            sql = (f"INSERT INTO {TAB_PRIVATE_KEYS} "
37
                   f"({COL_PRIVATE_KEY},"
38
                   f"{COL_PASSWORD}) "
39
                   f"VALUES(?,?)")
40
            values = [private_key.private_key,
41
                      private_key.password]
42
            self.cursor.execute(sql, values)
43
            last_id = self.cursor.lastrowid
44
            self.connection.commit()
45

    
46
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
47
            Logger.error(str(e))
48
            raise DatabaseException(e)
49

    
50
        return last_id
51

    
52
    def read(self, private_key_id: int):
53
        """
54
        Reads (selects) a private key.
55

    
56
        :param private_key_id: ID of specific private key
57

    
58
        :return: instance of the PrivateKey object
59
        """
60

    
61
        try:
62
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
63
                   f"WHERE {COL_ID} = ?")
64
            values = [private_key_id]
65
            self.cursor.execute(sql, values)
66
            private_key_row = self.cursor.fetchone()
67

    
68
            if private_key_row is None:
69
                return None
70

    
71
            private_key: PrivateKey = PrivateKey(private_key_row[0],
72
                                                 private_key_row[1],
73
                                                 private_key_row[2])
74

    
75
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
76
            Logger.error(str(e))
77
            raise DatabaseException(e)
78

    
79
        return private_key
80

    
81
    def find_pk(self, private_key_pem: str):
82
        """
83
        Tries to find an existing private key by its PEM.
84
        :param private_key_pem: target key PEM
85
        :return: corresponding PrivateKey or None if not found
86
        """
87

    
88
        try:
89
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
90
                   f"WHERE {COL_PK} = ?")
91
            values = (private_key_pem, )
92
            self.cursor.execute(sql, values)
93
            private_key_row = self.cursor.fetchone()
94

    
95
            if private_key_row is None:
96
                return None
97

    
98
            private_key: PrivateKey = PrivateKey(private_key_row[0],
99
                                                 private_key_row[1],
100
                                                 private_key_row[2])
101
        except IntegrityError:
102
            Logger.error(INTEGRITY_ERROR_MSG)
103
            raise DatabaseException(INTEGRITY_ERROR_MSG)
104
        except ProgrammingError:
105
            Logger.error(PROGRAMMING_ERROR_MSG)
106
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
107
        except OperationalError:
108
            Logger.error(OPERATIONAL_ERROR_MSG)
109
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
110
        except NotSupportedError:
111
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
112
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
113
        except DatabaseError:
114
            Logger.error(DATABASE_ERROR_MSG)
115
            raise DatabaseException(DATABASE_ERROR_MSG)
116
        except Error:
117
            Logger.error(ERROR_MSG)
118
            raise DatabaseException(ERROR_MSG)
119

    
120
        return private_key
121

    
122
    def read_all(self):
123
        """
124
        Reads (selects) all private keys.
125

    
126
        :return: list of private keys
127
        """
128

    
129
        try:
130
            sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}"
131
            self.cursor.execute(sql)
132
            private_key_rows = self.cursor.fetchall()
133

    
134
            private_keys: List[PrivateKey] = []
135
            for private_key_row in private_key_rows:
136
                private_keys.append(PrivateKey(private_key_row[0],
137
                                               private_key_row[1],
138
                                               private_key_row[2]))
139

    
140
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
141
            Logger.error(str(e))
142
            raise DatabaseException(e)
143

    
144
        return private_keys
145

    
146
    def update(self, private_key_id: int, private_key: PrivateKey) -> bool:
147
        """
148
        Updates a private key.
149

    
150
        :param private_key_id: ID of specific private key
151
        :param private_key: Instance of the PrivateKey object
152

    
153
        :return: the result of whether the updation was successful
154
        """
155

    
156
        try:
157
            sql = (f"UPDATE {TAB_PRIVATE_KEYS} "
158
                   f"SET {COL_PRIVATE_KEY} = ?, "
159
                   f"{COL_PASSWORD} = ? "
160
                   f"WHERE {COL_ID} = ?")
161
            values = [private_key.private_key,
162
                      private_key.password,
163
                      private_key_id]
164
            self.cursor.execute(sql, values)
165
            self.connection.commit()
166

    
167
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
168
            Logger.error(str(e))
169
            raise DatabaseException(e)
170

    
171
        return self.cursor.rowcount > 0
172

    
173
    def delete(self, private_key_id: int) -> bool:
174
        """
175
        Deletes a private key
176

    
177
        :param private_key_id: ID of specific private key
178

    
179
        :return: the result of whether the deletion was successful
180
        """
181

    
182
        try:
183
            sql = (f"DELETE FROM {TAB_PRIVATE_KEYS} "
184
                   f"WHERE {COL_ID} = ?")
185
            values = [private_key_id]
186
            self.cursor.execute(sql, values)
187
            self.connection.commit()
188

    
189
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
190
            Logger.error(str(e))
191
            raise DatabaseException(e)
192

    
193
        return self.cursor.rowcount > 0
(3-3/3)