Projekt

Obecné

Profil

Stáhnout (6.87 KB) Statistiky
| Větev: | Tag: | Revize:
1 b3c80ccb David Friesecký
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2 d6ccc8ca David Friesecký
from typing import List
3 e9e55282 David Friesecký
4 1d2add74 Jan Pašek
from injector import inject
5
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 181e1196 Jan Pašek
from src.model.private_key import PrivateKey
8 9d3eded8 David Friesecký
from src.constants import *
9 b3c80ccb David Friesecký
from src.utils.logger import Logger
10
11
INTEGRITY_ERROR_MSG = "Database relational integrity corrupted."
12
PROGRAMMING_ERROR_MSG = "Exception raised for programming errors (etc. SQL statement)."
13
OPERATIONAL_ERROR_MSG = "Exception raised for errors that are related to the database’s operation."
14
NOT_SUPPORTED_ERROR_MSG = "Method or database API was used which is not supported by the database"
15
DATABASE_ERROR_MSG = "Unknown exception that are related to the database."
16
ERROR_MSG = "Unknown exception."
17 e9e55282 David Friesecký
18 25053504 David Friesecký
19 9d3eded8 David Friesecký
class PrivateKeyRepository:
20 a0602bad David Friesecký
21 1d2add74 Jan Pašek
    @inject
22
    def __init__(self, connection: Connection):
23 9d3eded8 David Friesecký
        """
24
        Constructor of the PrivateKeyRepository object
25 a0602bad David Friesecký
26 9d3eded8 David Friesecký
        :param connection: Instance of the Connection object
27
        :param cursor: Instance of the Cursor object
28 a0602bad David Friesecký
        """
29
30 9d3eded8 David Friesecký
        self.connection = connection
31 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
32 e9e55282 David Friesecký
33 805077f5 David Friesecký
    def create(self, private_key: PrivateKey):
34 a0602bad David Friesecký
        """
35 9d3eded8 David Friesecký
        Creates a private key.
36 a0602bad David Friesecký
37 9d3eded8 David Friesecký
        :param private_key: Instance of the PrivateKey object
38 a0602bad David Friesecký
39 9d3eded8 David Friesecký
        :return: the result of whether the creation was successful
40 a0602bad David Friesecký
        """
41
42 9d3eded8 David Friesecký
        try:
43
            sql = (f"INSERT INTO {TAB_PRIVATE_KEYS} "
44
                   f"({COL_PRIVATE_KEY},"
45
                   f"{COL_PASSWORD}) "
46
                   f"VALUES(?,?)")
47
            values = [private_key.private_key,
48
                      private_key.password]
49
            self.cursor.execute(sql, values)
50 805077f5 David Friesecký
            last_id = self.cursor.lastrowid
51 9d3eded8 David Friesecký
            self.connection.commit()
52 445886fb David Friesecký
53
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
54
            Logger.error(str(e))
55
            raise DatabaseException(e)
56 e9e55282 David Friesecký
57 805077f5 David Friesecký
        return last_id
58 e9e55282 David Friesecký
59 9d3eded8 David Friesecký
    def read(self, private_key_id: int):
60
        """
61
        Reads (selects) a private key.
62 e9e55282 David Friesecký
63 9d3eded8 David Friesecký
        :param private_key_id: ID of specific private key
64 e9e55282 David Friesecký
65 9d3eded8 David Friesecký
        :return: instance of the PrivateKey object
66
        """
67 e9e55282 David Friesecký
68 9d3eded8 David Friesecký
        try:
69
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
70
                   f"WHERE {COL_ID} = ?")
71
            values = [private_key_id]
72
            self.cursor.execute(sql, values)
73 f9711600 Stanislav Král
            private_key_row = self.cursor.fetchone()
74 e9e55282 David Friesecký
75 2525db58 Captain_Trojan
            if private_key_row is None:
76
                return None
77
78
            private_key: PrivateKey = PrivateKey(private_key_row[0],
79
                                                 private_key_row[1],
80
                                                 private_key_row[2])
81 445886fb David Friesecký
82
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
83
            Logger.error(str(e))
84
            raise DatabaseException(e)
85 2525db58 Captain_Trojan
86
        return private_key
87
88
    def find_pk(self, private_key_pem: str):
89
        """
90
        Tries to find an existing private key by its PEM.
91
        :param private_key_pem: target key PEM
92
        :return: corresponding PrivateKey or None if not found
93
        """
94
95
        try:
96
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
97
                   f"WHERE {COL_PK} = ?")
98
            values = (private_key_pem, )
99
            self.cursor.execute(sql, values)
100
            private_key_row = self.cursor.fetchone()
101
102 d65b022d David Friesecký
            if private_key_row is None:
103
                return None
104
105 9d3eded8 David Friesecký
            private_key: PrivateKey = PrivateKey(private_key_row[0],
106
                                                 private_key_row[1],
107
                                                 private_key_row[2])
108 b3c80ccb David Friesecký
        except IntegrityError:
109
            Logger.error(INTEGRITY_ERROR_MSG)
110
            raise DatabaseException(INTEGRITY_ERROR_MSG)
111
        except ProgrammingError:
112
            Logger.error(PROGRAMMING_ERROR_MSG)
113
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
114
        except OperationalError:
115
            Logger.error(OPERATIONAL_ERROR_MSG)
116
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
117
        except NotSupportedError:
118
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
119
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
120
        except DatabaseError:
121
            Logger.error(DATABASE_ERROR_MSG)
122
            raise DatabaseException(DATABASE_ERROR_MSG)
123
        except Error:
124
            Logger.error(ERROR_MSG)
125
            raise DatabaseException(ERROR_MSG)
126 e9e55282 David Friesecký
127 d65b022d David Friesecký
        return private_key
128 9d3eded8 David Friesecký
129 d6ccc8ca David Friesecký
    def read_all(self):
130
        """
131
        Reads (selects) all private keys.
132
133
        :return: list of private keys
134
        """
135
136
        try:
137
            sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}"
138
            self.cursor.execute(sql)
139
            private_key_rows = self.cursor.fetchall()
140
141
            private_keys: List[PrivateKey] = []
142
            for private_key_row in private_key_rows:
143
                private_keys.append(PrivateKey(private_key_row[0],
144
                                               private_key_row[1],
145
                                               private_key_row[2]))
146 445886fb David Friesecký
147
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
148
            Logger.error(str(e))
149
            raise DatabaseException(e)
150 d6ccc8ca David Friesecký
151 d65b022d David Friesecký
        return private_keys
152 d6ccc8ca David Friesecký
153 9d3eded8 David Friesecký
    def update(self, private_key_id: int, private_key: PrivateKey) -> bool:
154 a0602bad David Friesecký
        """
155 9d3eded8 David Friesecký
        Updates a private key.
156 a0602bad David Friesecký
157
        :param private_key_id: ID of specific private key
158 9d3eded8 David Friesecký
        :param private_key: Instance of the PrivateKey object
159 a0602bad David Friesecký
160
        :return: the result of whether the updation was successful
161
        """
162
163 9d3eded8 David Friesecký
        try:
164
            sql = (f"UPDATE {TAB_PRIVATE_KEYS} "
165
                   f"SET {COL_PRIVATE_KEY} = ?, "
166
                   f"{COL_PASSWORD} = ? "
167
                   f"WHERE {COL_ID} = ?")
168
            values = [private_key.private_key,
169 805077f5 David Friesecký
                      private_key.password,
170
                      private_key_id]
171 9d3eded8 David Friesecký
            self.cursor.execute(sql, values)
172
            self.connection.commit()
173 445886fb David Friesecký
174
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
175
            Logger.error(str(e))
176
            raise DatabaseException(e)
177 9d3eded8 David Friesecký
178 d65b022d David Friesecký
        return self.cursor.rowcount > 0
179 e9e55282 David Friesecký
180
    def delete(self, private_key_id: int) -> bool:
181 a0602bad David Friesecký
        """
182
        Deletes a private key
183
184
        :param private_key_id: ID of specific private key
185
186
        :return: the result of whether the deletion was successful
187
        """
188
189 9d3eded8 David Friesecký
        try:
190
            sql = (f"DELETE FROM {TAB_PRIVATE_KEYS} "
191
                   f"WHERE {COL_ID} = ?")
192
            values = [private_key_id]
193
            self.cursor.execute(sql, values)
194
            self.connection.commit()
195 445886fb David Friesecký
196
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
197
            Logger.error(str(e))
198
            raise DatabaseException(e)
199 9d3eded8 David Friesecký
200 45744020 Stanislav Král
        return self.cursor.rowcount > 0