Projekt

Obecné

Profil

Stáhnout (6.42 KB) Statistiky
| Větev: | Tag: | Revize:
1 b3c80ccb David Friesecký
from sqlite3 import Connection, Error, DatabaseError, IntegrityError, ProgrammingError, OperationalError, NotSupportedError
2 d6ccc8ca David Friesecký
from typing import List
3 e9e55282 David Friesecký
4 1d2add74 Jan Pašek
from injector import inject
5
6 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
7 181e1196 Jan Pašek
from src.model.private_key import PrivateKey
8 9d3eded8 David Friesecký
from src.constants import *
9 b3c80ccb David Friesecký
from src.utils.logger import Logger
10
11 25053504 David Friesecký
12 9d3eded8 David Friesecký
class PrivateKeyRepository:
13 a0602bad David Friesecký
14 1d2add74 Jan Pašek
    @inject
15
    def __init__(self, connection: Connection):
16 9d3eded8 David Friesecký
        """
17
        Constructor of the PrivateKeyRepository object
18 a0602bad David Friesecký
19 9d3eded8 David Friesecký
        :param connection: Instance of the Connection object
20
        :param cursor: Instance of the Cursor object
21 a0602bad David Friesecký
        """
22
23 9d3eded8 David Friesecký
        self.connection = connection
24 1d2add74 Jan Pašek
        self.cursor = connection.cursor()
25 e9e55282 David Friesecký
26 805077f5 David Friesecký
    def create(self, private_key: PrivateKey):
27 a0602bad David Friesecký
        """
28 9d3eded8 David Friesecký
        Creates a private key.
29 a0602bad David Friesecký
30 9d3eded8 David Friesecký
        :param private_key: Instance of the PrivateKey object
31 a0602bad David Friesecký
32 9d3eded8 David Friesecký
        :return: the result of whether the creation was successful
33 a0602bad David Friesecký
        """
34
35 9d3eded8 David Friesecký
        try:
36
            sql = (f"INSERT INTO {TAB_PRIVATE_KEYS} "
37
                   f"({COL_PRIVATE_KEY},"
38
                   f"{COL_PASSWORD}) "
39
                   f"VALUES(?,?)")
40
            values = [private_key.private_key,
41
                      private_key.password]
42
            self.cursor.execute(sql, values)
43 805077f5 David Friesecký
            last_id = self.cursor.lastrowid
44 9d3eded8 David Friesecký
            self.connection.commit()
45 445886fb David Friesecký
46
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
47
            Logger.error(str(e))
48
            raise DatabaseException(e)
49 e9e55282 David Friesecký
50 805077f5 David Friesecký
        return last_id
51 e9e55282 David Friesecký
52 9d3eded8 David Friesecký
    def read(self, private_key_id: int):
53
        """
54
        Reads (selects) a private key.
55 e9e55282 David Friesecký
56 9d3eded8 David Friesecký
        :param private_key_id: ID of specific private key
57 e9e55282 David Friesecký
58 9d3eded8 David Friesecký
        :return: instance of the PrivateKey object
59
        """
60 e9e55282 David Friesecký
61 9d3eded8 David Friesecký
        try:
62
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
63
                   f"WHERE {COL_ID} = ?")
64
            values = [private_key_id]
65
            self.cursor.execute(sql, values)
66 f9711600 Stanislav Král
            private_key_row = self.cursor.fetchone()
67 e9e55282 David Friesecký
68 2525db58 Captain_Trojan
            if private_key_row is None:
69
                return None
70
71
            private_key: PrivateKey = PrivateKey(private_key_row[0],
72
                                                 private_key_row[1],
73
                                                 private_key_row[2])
74 445886fb David Friesecký
75
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
76
            Logger.error(str(e))
77
            raise DatabaseException(e)
78 2525db58 Captain_Trojan
79
        return private_key
80
81
    def find_pk(self, private_key_pem: str):
82
        """
83
        Tries to find an existing private key by its PEM.
84
        :param private_key_pem: target key PEM
85
        :return: corresponding PrivateKey or None if not found
86
        """
87
88
        try:
89
            sql = (f"SELECT * FROM {TAB_PRIVATE_KEYS} "
90
                   f"WHERE {COL_PK} = ?")
91
            values = (private_key_pem, )
92
            self.cursor.execute(sql, values)
93
            private_key_row = self.cursor.fetchone()
94
95 d65b022d David Friesecký
            if private_key_row is None:
96
                return None
97
98 9d3eded8 David Friesecký
            private_key: PrivateKey = PrivateKey(private_key_row[0],
99
                                                 private_key_row[1],
100
                                                 private_key_row[2])
101 b3c80ccb David Friesecký
        except IntegrityError:
102
            Logger.error(INTEGRITY_ERROR_MSG)
103
            raise DatabaseException(INTEGRITY_ERROR_MSG)
104
        except ProgrammingError:
105
            Logger.error(PROGRAMMING_ERROR_MSG)
106
            raise DatabaseException(PROGRAMMING_ERROR_MSG)
107
        except OperationalError:
108
            Logger.error(OPERATIONAL_ERROR_MSG)
109
            raise DatabaseException(OPERATIONAL_ERROR_MSG)
110
        except NotSupportedError:
111
            Logger.error(NOT_SUPPORTED_ERROR_MSG)
112
            raise DatabaseException(NOT_SUPPORTED_ERROR_MSG)
113
        except DatabaseError:
114
            Logger.error(DATABASE_ERROR_MSG)
115
            raise DatabaseException(DATABASE_ERROR_MSG)
116
        except Error:
117
            Logger.error(ERROR_MSG)
118
            raise DatabaseException(ERROR_MSG)
119 e9e55282 David Friesecký
120 d65b022d David Friesecký
        return private_key
121 9d3eded8 David Friesecký
122 d6ccc8ca David Friesecký
    def read_all(self):
123
        """
124
        Reads (selects) all private keys.
125
126
        :return: list of private keys
127
        """
128
129
        try:
130
            sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}"
131
            self.cursor.execute(sql)
132
            private_key_rows = self.cursor.fetchall()
133
134
            private_keys: List[PrivateKey] = []
135
            for private_key_row in private_key_rows:
136
                private_keys.append(PrivateKey(private_key_row[0],
137
                                               private_key_row[1],
138
                                               private_key_row[2]))
139 445886fb David Friesecký
140
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
141
            Logger.error(str(e))
142
            raise DatabaseException(e)
143 d6ccc8ca David Friesecký
144 d65b022d David Friesecký
        return private_keys
145 d6ccc8ca David Friesecký
146 9d3eded8 David Friesecký
    def update(self, private_key_id: int, private_key: PrivateKey) -> bool:
147 a0602bad David Friesecký
        """
148 9d3eded8 David Friesecký
        Updates a private key.
149 a0602bad David Friesecký
150
        :param private_key_id: ID of specific private key
151 9d3eded8 David Friesecký
        :param private_key: Instance of the PrivateKey object
152 a0602bad David Friesecký
153
        :return: the result of whether the updation was successful
154
        """
155
156 9d3eded8 David Friesecký
        try:
157
            sql = (f"UPDATE {TAB_PRIVATE_KEYS} "
158
                   f"SET {COL_PRIVATE_KEY} = ?, "
159
                   f"{COL_PASSWORD} = ? "
160
                   f"WHERE {COL_ID} = ?")
161
            values = [private_key.private_key,
162 805077f5 David Friesecký
                      private_key.password,
163
                      private_key_id]
164 9d3eded8 David Friesecký
            self.cursor.execute(sql, values)
165
            self.connection.commit()
166 445886fb David Friesecký
167
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
168
            Logger.error(str(e))
169
            raise DatabaseException(e)
170 9d3eded8 David Friesecký
171 d65b022d David Friesecký
        return self.cursor.rowcount > 0
172 e9e55282 David Friesecký
173
    def delete(self, private_key_id: int) -> bool:
174 a0602bad David Friesecký
        """
175
        Deletes a private key
176
177
        :param private_key_id: ID of specific private key
178
179
        :return: the result of whether the deletion was successful
180
        """
181
182 9d3eded8 David Friesecký
        try:
183
            sql = (f"DELETE FROM {TAB_PRIVATE_KEYS} "
184
                   f"WHERE {COL_ID} = ?")
185
            values = [private_key_id]
186
            self.cursor.execute(sql, values)
187
            self.connection.commit()
188 445886fb David Friesecký
189
        except (IntegrityError, OperationalError, ProgrammingError, NotSupportedError, DatabaseError, Error) as e:
190
            Logger.error(str(e))
191
            raise DatabaseException(e)
192 9d3eded8 David Friesecký
193 45744020 Stanislav Král
        return self.cursor.rowcount > 0