Projekt

Obecné

Profil

Stáhnout (8.69 KB) Statistiky
| Větev: | Tag: | Revize:
1 9e22e20c David Friesecký
from typing import Dict, List
2 f8b23532 David Friesecký
from sqlite3 import Connection, Cursor, Error
3 1636aefe David Friesecký
4 181e1196 Jan Pašek
from src.model.certificate import Certificate
5 f8b23532 David Friesecký
from src.constants import *
6 e9e55282 David Friesecký
7
8 f8b23532 David Friesecký
class CertificateRepository:
9 25053504 David Friesecký
10 f8b23532 David Friesecký
    def __init__(self, connection: Connection, cursor: Cursor):
11 a0602bad David Friesecký
        """
12 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
13 a0602bad David Friesecký
14 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16 a0602bad David Friesecký
        """
17
18 f8b23532 David Friesecký
        self.connection = connection
19
        self.cursor = cursor
20 e9e55282 David Friesecký
21 805077f5 David Friesecký
    def create(self, certificate: Certificate):
22 a0602bad David Friesecký
        """
23 f8b23532 David Friesecký
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
29 a0602bad David Friesecký
        """
30
31 f8b23532 David Friesecký
        try:
32
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
33
                   f"({COL_COMMON_NAME},"
34
                   f"{COL_VALID_FROM},"
35
                   f"{COL_VALID_TO},"
36
                   f"{COL_PEM_DATA},"
37
                   f"{COL_PRIVATE_KEY_ID},"
38
                   f"{COL_TYPE_ID},"
39
                   f"{COL_PARENT_ID})"
40
                   f"VALUES(?,?,?,?,?,?,?)")
41
            values = [certificate.common_name,
42
                      certificate.valid_from,
43
                      certificate.valid_to,
44
                      certificate.pem_data,
45
                      certificate.private_key_id,
46
                      certificate.type_id,
47
                      certificate.parent_id]
48
            self.cursor.execute(sql, values)
49
            self.connection.commit()
50
51
            last_id: int = self.cursor.lastrowid
52
53 f3125948 Stanislav Král
            # TODO assure that this is correct
54
            if certificate.type_id == ROOT_CA_ID:
55 f8b23532 David Friesecký
                certificate.parent_id = last_id
56 093d06df Stanislav Král
                self.update(last_id, certificate)
57 f8b23532 David Friesecký
            else:
58 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
59 f8b23532 David Friesecký
                    if usage_value:
60
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
61
                               f"({COL_CERTIFICATE_ID},"
62
                               f"{COL_USAGE_TYPE_ID}) "
63
                               f"VALUES (?,?)")
64
                        values = [last_id, usage_id]
65
                        self.cursor.execute(sql, values)
66
                        self.connection.commit()
67
        except Error as e:
68
            print(e)
69 805077f5 David Friesecký
            return None
70 f8b23532 David Friesecký
71 805077f5 David Friesecký
        return last_id
72 f8b23532 David Friesecký
73 805077f5 David Friesecký
    def read(self, certificate_id: int):
74 f8b23532 David Friesecký
        """
75
        Reads (selects) a certificate.
76 e9e55282 David Friesecký
77 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
78 e9e55282 David Friesecký
79 f8b23532 David Friesecký
        :return: instance of the Certificate object
80
        """
81 e9e55282 David Friesecký
82 f8b23532 David Friesecký
        try:
83
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
84
                   f"WHERE {COL_ID} = ?")
85
            values = [certificate_id]
86
            self.cursor.execute(sql, values)
87 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
88 e9e55282 David Friesecký
89 6f4a5f24 Captain_Trojan
            if certificate_row is None:
90
                return None
91
92 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
93
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
94
            self.cursor.execute(sql, values)
95
            usage_rows = self.cursor.fetchall()
96 1636aefe David Friesecký
97
            usage_dict: Dict[int, bool] = {}
98
            for usage_row in usage_rows:
99 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
100
101
            certificate: Certificate = Certificate(certificate_row[0],
102
                                                   certificate_row[1],
103
                                                   certificate_row[2],
104
                                                   certificate_row[3],
105
                                                   certificate_row[4],
106
                                                   certificate_row[5],
107
                                                   certificate_row[6],
108
                                                   certificate_row[7],
109
                                                   usage_dict)
110
        except Error as e:
111
            print(e)
112
            return None
113
114 805077f5 David Friesecký
        if len(certificate_row) > 0:
115
            return certificate
116
        else:
117
            return None
118 f8b23532 David Friesecký
119 ef65f488 Stanislav Král
    def read_all(self, filter_type: int = None) -> List[Certificate]:
120 9e22e20c David Friesecký
        """
121
        Reads (selects) all certificates (with type).
122
123
        :param filter_type: ID of certificate type from CertificateTypes table
124
125
        :return: list of certificates
126
        """
127
128
        try:
129
            sql_extension = ""
130
            values = []
131
            if filter_type is not None:
132
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
133 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
134 9e22e20c David Friesecký
                values = [filter_type]
135
136
            sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}")
137
            self.cursor.execute(sql, values)
138
            certificate_rows = self.cursor.fetchall()
139
140
            certificates: List[Certificate] = []
141
            for certificate_row in certificate_rows:
142
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
143
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
144
                values = [certificate_row[0]]
145
                self.cursor.execute(sql, values)
146
                usage_rows = self.cursor.fetchall()
147
148
                usage_dict: Dict[int, bool] = {}
149
                for usage_row in usage_rows:
150
                    usage_dict[usage_row[2]] = True
151
152
                certificates.append(Certificate(certificate_row[0],
153
                                                certificate_row[1],
154
                                                certificate_row[2],
155
                                                certificate_row[3],
156
                                                certificate_row[4],
157
                                                certificate_row[5],
158
                                                certificate_row[6],
159
                                                certificate_row[7],
160
                                                usage_dict))
161
        except Error as e:
162
            print(e)
163
            return None
164
165 0f3af523 Stanislav Král
        return certificates
166 9e22e20c David Friesecký
167 f8b23532 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate) -> bool:
168 a0602bad David Friesecký
        """
169 f8b23532 David Friesecký
        Updates a certificate.
170
        If the parameter of certificate (Certificate object) is not to be changed,
171
        the same value must be specified.
172 a0602bad David Friesecký
173
        :param certificate_id: ID of specific certificate
174 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
175 a0602bad David Friesecký
176
        :return: the result of whether the updation was successful
177
        """
178
179 f8b23532 David Friesecký
        try:
180
            sql = (f"UPDATE {TAB_CERTIFICATES} "
181
                   f"SET {COL_COMMON_NAME} = ?, "
182
                   f"{COL_VALID_FROM} = ?, "
183
                   f"{COL_VALID_TO} = ?, "
184
                   f"{COL_PEM_DATA} = ?, "
185
                   f"{COL_PRIVATE_KEY_ID} = ?, "
186
                   f"{COL_TYPE_ID} = ?, "
187
                   f"{COL_PARENT_ID} = ? "
188
                   f"WHERE {COL_ID} = ?")
189
            values = [certificate.common_name,
190
                      certificate.valid_from,
191
                      certificate.valid_to,
192
                      certificate.pem_data,
193
                      certificate.private_key_id,
194
                      certificate.type_id,
195 805077f5 David Friesecký
                      certificate.parent_id,
196
                      certificate_id]
197 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
198
            self.connection.commit()
199
200
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
201
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
202
            values = [certificate_id]
203
            self.cursor.execute(sql, values)
204
            self.connection.commit()
205
206 f3125948 Stanislav Král
            # iterate over usage pairs
207
            for usage_id, usage_value in certificate.usages.items():
208 f8b23532 David Friesecký
                if usage_value:
209
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
210
                           f"({COL_CERTIFICATE_ID},"
211
                           f"{COL_USAGE_TYPE_ID}) "
212
                           f"VALUES (?,?)")
213
                    values = [certificate_id, usage_id]
214
                    self.cursor.execute(sql, values)
215
                    self.connection.commit()
216
        except Error as e:
217
            print(e)
218
            return False
219
220
        return True
221 e9e55282 David Friesecký
222
    def delete(self, certificate_id: int) -> bool:
223 a0602bad David Friesecký
        """
224
        Deletes a certificate
225
226
        :param certificate_id: ID of specific certificate
227
228
        :return: the result of whether the deletion was successful
229
        """
230
231 f8b23532 David Friesecký
        try:
232
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
233
                   f"WHERE {COL_ID} = ?")
234
            values = [certificate_id]
235
            self.cursor.execute(sql, values)
236
            self.connection.commit()
237
        except Error as e:
238
            print(e)
239
            return False
240
241 45744020 Stanislav Král
        return self.cursor.rowcount > 0