Projekt

Obecné

Profil

Stáhnout (9.83 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2
from typing import List
3

    
4
from injector import inject
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from src.model.private_key import PrivateKey
8
from src.constants import *
9
from src.utils.logger import Logger
10

    
11
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17

    
18

    
19
class PrivateKeyRepository:
20

    
21
    @inject
22
    def __init__(self, connection: Connection):
23
        """
24
        Constructor of the PrivateKeyRepository object
25

    
26
        :param connection: Instance of the Connection object
27
        :param cursor: Instance of the Cursor object
28
        """
29

    
30
        self.connection = connection
31
        self.cursor = connection.cursor()
32

    
33
    def create(self, private_key: PrivateKey):
34
        """
35
        Creates a private key.
36

    
37
        :param private_key: Instance of the PrivateKey object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        try:
43
            sql = (f"INSERT INTO {TAB_PRIVATE_KEYS} "
44
                   f"({COL_PRIVATE_KEY},"
45
                   f"{COL_PASSWORD}) "
46
                   f"VALUES(?,?)")
47
            values = [private_key.private_key,
48
                      private_key.password]
49
            self.cursor.execute(sql, values)
50
            last_id = self.cursor.lastrowid
51
            self.connection.commit()
52
        except IntegrityError:
53
            Logger.error(INTEGRITY_ERROR_MSG)
54
            raise DatabaseException(INTEGRITY_ERROR_MSG)
55
        except ProgrammingError:
56
            Logger.error(PROGRAMMING_ERROR_MSG)
57
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
58
        except OperationalError:
59
            Logger.error(OPERATIONAL_ERROR_MSG)
60
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
61
        except NotSupportedError:
62
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
63
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
64
        except DatabaseError:
65
            Logger.error(DATABASE_ERROR_MSG)
66
            raise DatabaseException(DATABASE_ERROR_MSG)
67
        except Error:
68
            Logger.error(ERROR_MSG)
69
            raise DatabaseException(ERROR_MSG)
70

    
71
        return last_id
72

    
73
    def read(self, private_key_id: int):
74
        """
75
        Reads (selects) a private key.
76

    
77
        :param private_key_id: ID of specific private key
78

    
79
        :return: instance of the PrivateKey object
80
        """
81

    
82
        try:
83
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
84
                   f"WHERE {COL_ID} = ?")
85
            values = [private_key_id]
86
            self.cursor.execute(sql, values)
87
            private_key_row = self.cursor.fetchone()
88

    
89
            if private_key_row is None:
90
                return None
91

    
92
            private_key: PrivateKey = PrivateKey(private_key_row[0],
93
                                                 private_key_row[1],
94
                                                 private_key_row[2])
95
        except IntegrityError:
96
            Logger.error(INTEGRITY_ERROR_MSG)
97
            raise DatabaseException(INTEGRITY_ERROR_MSG)
98
        except ProgrammingError:
99
            Logger.error(PROGRAMMING_ERROR_MSG)
100
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
101
        except OperationalError:
102
            Logger.error(OPERATIONAL_ERROR_MSG)
103
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
104
        except NotSupportedError:
105
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
106
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
107
        except DatabaseError:
108
            Logger.error(DATABASE_ERROR_MSG)
109
            raise DatabaseException(DATABASE_ERROR_MSG)
110
        except Error:
111
            Logger.error(ERROR_MSG)
112
            raise DatabaseException(ERROR_MSG)
113

    
114
        return private_key
115

    
116
    def find_pk(self, private_key_pem: str):
117
        """
118
        Tries to find an existing private key by its PEM.
119
        :param private_key_pem: target key PEM
120
        :return: corresponding PrivateKey or None if not found
121
        """
122

    
123
        try:
124
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
125
                   f"WHERE {COL_PK} = ?")
126
            values = (private_key_pem, )
127
            self.cursor.execute(sql, values)
128
            private_key_row = self.cursor.fetchone()
129

    
130
            if private_key_row is None:
131
                return None
132

    
133
            private_key: PrivateKey = PrivateKey(private_key_row[0],
134
                                                 private_key_row[1],
135
                                                 private_key_row[2])
136
        except IntegrityError:
137
            Logger.error(INTEGRITY_ERROR_MSG)
138
            raise DatabaseException(INTEGRITY_ERROR_MSG)
139
        except ProgrammingError:
140
            Logger.error(PROGRAMMING_ERROR_MSG)
141
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
142
        except OperationalError:
143
            Logger.error(OPERATIONAL_ERROR_MSG)
144
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
145
        except NotSupportedError:
146
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
147
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
148
        except DatabaseError:
149
            Logger.error(DATABASE_ERROR_MSG)
150
            raise DatabaseException(DATABASE_ERROR_MSG)
151
        except Error:
152
            Logger.error(ERROR_MSG)
153
            raise DatabaseException(ERROR_MSG)
154

    
155
        return private_key
156

    
157
    def read_all(self):
158
        """
159
        Reads (selects) all private keys.
160

    
161
        :return: list of private keys
162
        """
163

    
164
        try:
165
            sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}"
166
            self.cursor.execute(sql)
167
            private_key_rows = self.cursor.fetchall()
168

    
169
            private_keys: List[PrivateKey] = []
170
            for private_key_row in private_key_rows:
171
                private_keys.append(PrivateKey(private_key_row[0],
172
                                               private_key_row[1],
173
                                               private_key_row[2]))
174
        except IntegrityError:
175
            Logger.error(INTEGRITY_ERROR_MSG)
176
            raise DatabaseException(INTEGRITY_ERROR_MSG)
177
        except ProgrammingError:
178
            Logger.error(PROGRAMMING_ERROR_MSG)
179
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
180
        except OperationalError:
181
            Logger.error(OPERATIONAL_ERROR_MSG)
182
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
183
        except NotSupportedError:
184
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
185
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
186
        except DatabaseError:
187
            Logger.error(DATABASE_ERROR_MSG)
188
            raise DatabaseException(DATABASE_ERROR_MSG)
189
        except Error:
190
            Logger.error(ERROR_MSG)
191
            raise DatabaseException(ERROR_MSG)
192

    
193
        return private_keys
194

    
195
    def update(self, private_key_id: int, private_key: PrivateKey) -> bool:
196
        """
197
        Updates a private key.
198

    
199
        :param private_key_id: ID of specific private key
200
        :param private_key: Instance of the PrivateKey object
201

    
202
        :return: the result of whether the updation was successful
203
        """
204

    
205
        try:
206
            sql = (f"UPDATE {TAB_PRIVATE_KEYS} "
207
                   f"SET {COL_PRIVATE_KEY} = ?, "
208
                   f"{COL_PASSWORD} = ? "
209
                   f"WHERE {COL_ID} = ?")
210
            values = [private_key.private_key,
211
                      private_key.password,
212
                      private_key_id]
213
            self.cursor.execute(sql, values)
214
            self.connection.commit()
215
        except IntegrityError:
216
            Logger.error(INTEGRITY_ERROR_MSG)
217
            raise DatabaseException(INTEGRITY_ERROR_MSG)
218
        except ProgrammingError:
219
            Logger.error(PROGRAMMING_ERROR_MSG)
220
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
221
        except OperationalError:
222
            Logger.error(OPERATIONAL_ERROR_MSG)
223
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
224
        except NotSupportedError:
225
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
226
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
227
        except DatabaseError:
228
            Logger.error(DATABASE_ERROR_MSG)
229
            raise DatabaseException(DATABASE_ERROR_MSG)
230
        except Error:
231
            Logger.error(ERROR_MSG)
232
            raise DatabaseException(ERROR_MSG)
233

    
234
        return self.cursor.rowcount > 0
235

    
236
    def delete(self, private_key_id: int) -> bool:
237
        """
238
        Deletes a private key
239

    
240
        :param private_key_id: ID of specific private key
241

    
242
        :return: the result of whether the deletion was successful
243
        """
244

    
245
        try:
246
            sql = (f"DELETE FROM {TAB_PRIVATE_KEYS} "
247
                   f"WHERE {COL_ID} = ?")
248
            values = [private_key_id]
249
            self.cursor.execute(sql, values)
250
            self.connection.commit()
251
        except IntegrityError:
252
            Logger.error(INTEGRITY_ERROR_MSG)
253
            raise DatabaseException(INTEGRITY_ERROR_MSG)
254
        except ProgrammingError:
255
            Logger.error(PROGRAMMING_ERROR_MSG)
256
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
257
        except OperationalError:
258
            Logger.error(OPERATIONAL_ERROR_MSG)
259
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
260
        except NotSupportedError:
261
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
262
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
263
        except DatabaseError:
264
            Logger.error(DATABASE_ERROR_MSG)
265
            raise DatabaseException(DATABASE_ERROR_MSG)
266
        except Error:
267
            Logger.error(ERROR_MSG)
268
            raise DatabaseException(ERROR_MSG)
269

    
270
        return self.cursor.rowcount > 0
(3-3/3)