Projekt

Obecné

Profil

Stáhnout (6.87 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2
from typing import List
3

    
4
from injector import inject
5

    
6
from src.exceptions.database_exception import DatabaseException
7
from src.model.private_key import PrivateKey
8
from src.constants import *
9
from src.utils.logger import Logger
10

    
11
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17

    
18

    
19
class PrivateKeyRepository:
20

    
21
    @inject
22
    def __init__(self, connection: Connection):
23
        """
24
        Constructor of the PrivateKeyRepository object
25

    
26
        :param connection: Instance of the Connection object
27
        :param cursor: Instance of the Cursor object
28
        """
29

    
30
        self.connection = connection
31
        self.cursor = connection.cursor()
32

    
33
    def create(self, private_key: PrivateKey):
34
        """
35
        Creates a private key.
36

    
37
        :param private_key: Instance of the PrivateKey object
38

    
39
        :return: the result of whether the creation was successful
40
        """
41

    
42
        try:
43
            sql = (f"INSERT INTO {TAB_PRIVATE_KEYS} "
44
                   f"({COL_PRIVATE_KEY},"
45
                   f"{COL_PASSWORD}) "
46
                   f"VALUES(?,?)")
47
            values = [private_key.private_key,
48
                      private_key.password]
49
            self.cursor.execute(sql, values)
50
            last_id = self.cursor.lastrowid
51
            self.connection.commit()
52

    
53
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
54
            Logger.error(str(e))
55
            raise DatabaseException(e)
56

    
57
        return last_id
58

    
59
    def read(self, private_key_id: int):
60
        """
61
        Reads (selects) a private key.
62

    
63
        :param private_key_id: ID of specific private key
64

    
65
        :return: instance of the PrivateKey object
66
        """
67

    
68
        try:
69
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
70
                   f"WHERE {COL_ID} = ?")
71
            values = [private_key_id]
72
            self.cursor.execute(sql, values)
73
            private_key_row = self.cursor.fetchone()
74

    
75
            if private_key_row is None:
76
                return None
77

    
78
            private_key: PrivateKey = PrivateKey(private_key_row[0],
79
                                                 private_key_row[1],
80
                                                 private_key_row[2])
81

    
82
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
83
            Logger.error(str(e))
84
            raise DatabaseException(e)
85

    
86
        return private_key
87

    
88
    def find_pk(self, private_key_pem: str):
89
        """
90
        Tries to find an existing private key by its PEM.
91
        :param private_key_pem: target key PEM
92
        :return: corresponding PrivateKey or None if not found
93
        """
94

    
95
        try:
96
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
97
                   f"WHERE {COL_PK} = ?")
98
            values = (private_key_pem, )
99
            self.cursor.execute(sql, values)
100
            private_key_row = self.cursor.fetchone()
101

    
102
            if private_key_row is None:
103
                return None
104

    
105
            private_key: PrivateKey = PrivateKey(private_key_row[0],
106
                                                 private_key_row[1],
107
                                                 private_key_row[2])
108
        except IntegrityError:
109
            Logger.error(INTEGRITY_ERROR_MSG)
110
            raise DatabaseException(INTEGRITY_ERROR_MSG)
111
        except ProgrammingError:
112
            Logger.error(PROGRAMMING_ERROR_MSG)
113
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
114
        except OperationalError:
115
            Logger.error(OPERATIONAL_ERROR_MSG)
116
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
117
        except NotSupportedError:
118
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
119
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
120
        except DatabaseError:
121
            Logger.error(DATABASE_ERROR_MSG)
122
            raise DatabaseException(DATABASE_ERROR_MSG)
123
        except Error:
124
            Logger.error(ERROR_MSG)
125
            raise DatabaseException(ERROR_MSG)
126

    
127
        return private_key
128

    
129
    def read_all(self):
130
        """
131
        Reads (selects) all private keys.
132

    
133
        :return: list of private keys
134
        """
135

    
136
        try:
137
            sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}"
138
            self.cursor.execute(sql)
139
            private_key_rows = self.cursor.fetchall()
140

    
141
            private_keys: List[PrivateKey] = []
142
            for private_key_row in private_key_rows:
143
                private_keys.append(PrivateKey(private_key_row[0],
144
                                               private_key_row[1],
145
                                               private_key_row[2]))
146

    
147
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
148
            Logger.error(str(e))
149
            raise DatabaseException(e)
150

    
151
        return private_keys
152

    
153
    def update(self, private_key_id: int, private_key: PrivateKey) -> bool:
154
        """
155
        Updates a private key.
156

    
157
        :param private_key_id: ID of specific private key
158
        :param private_key: Instance of the PrivateKey object
159

    
160
        :return: the result of whether the updation was successful
161
        """
162

    
163
        try:
164
            sql = (f"UPDATE {TAB_PRIVATE_KEYS} "
165
                   f"SET {COL_PRIVATE_KEY} = ?, "
166
                   f"{COL_PASSWORD} = ? "
167
                   f"WHERE {COL_ID} = ?")
168
            values = [private_key.private_key,
169
                      private_key.password,
170
                      private_key_id]
171
            self.cursor.execute(sql, values)
172
            self.connection.commit()
173

    
174
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
175
            Logger.error(str(e))
176
            raise DatabaseException(e)
177

    
178
        return self.cursor.rowcount > 0
179

    
180
    def delete(self, private_key_id: int) -> bool:
181
        """
182
        Deletes a private key
183

    
184
        :param private_key_id: ID of specific private key
185

    
186
        :return: the result of whether the deletion was successful
187
        """
188

    
189
        try:
190
            sql = (f"DELETE FROM {TAB_PRIVATE_KEYS} "
191
                   f"WHERE {COL_ID} = ?")
192
            values = [private_key_id]
193
            self.cursor.execute(sql, values)
194
            self.connection.commit()
195

    
196
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
197
            Logger.error(str(e))
198
            raise DatabaseException(e)
199

    
200
        return self.cursor.rowcount > 0
(3-3/3)