Projekt

Obecné

Profil

Stáhnout (15.5 KB) Statistiky
| Větev: | Tag: | Revize:
1 9e22e20c David Friesecký
from typing import Dict, List
2 f8b23532 David Friesecký
from sqlite3 import Connection, Cursor, Error
3 1636aefe David Friesecký
4 d65b022d David Friesecký
from src.exceptions.database_exception import DatabaseException
5 181e1196 Jan Pašek
from src.model.certificate import Certificate
6 f8b23532 David Friesecký
from src.constants import *
7 e9e55282 David Friesecký
8
9 f8b23532 David Friesecký
class CertificateRepository:
10 25053504 David Friesecký
11 f8b23532 David Friesecký
    def __init__(self, connection: Connection, cursor: Cursor):
12 a0602bad David Friesecký
        """
13 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
14 a0602bad David Friesecký
15 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
16
        :param cursor: Instance of the Cursor object
17 a0602bad David Friesecký
        """
18
19 f8b23532 David Friesecký
        self.connection = connection
20
        self.cursor = cursor
21 e9e55282 David Friesecký
22 805077f5 David Friesecký
    def create(self, certificate: Certificate):
23 a0602bad David Friesecký
        """
24 f8b23532 David Friesecký
        Creates a certificate.
25
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
26 a0602bad David Friesecký
27 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
28 a0602bad David Friesecký
29 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
30 a0602bad David Friesecký
        """
31
32 f8b23532 David Friesecký
        try:
33 8b049f43 David Friesecký
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
34
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
35
            elif certificate.revocation_date == "":
36
                certificate.revocation_reason = ""
37
38 f8b23532 David Friesecký
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
39
                   f"({COL_COMMON_NAME},"
40
                   f"{COL_VALID_FROM},"
41
                   f"{COL_VALID_TO},"
42
                   f"{COL_PEM_DATA},"
43
                   f"{COL_PRIVATE_KEY_ID},"
44
                   f"{COL_TYPE_ID},"
45
                   f"{COL_PARENT_ID})"
46
                   f"VALUES(?,?,?,?,?,?,?)")
47
            values = [certificate.common_name,
48
                      certificate.valid_from,
49
                      certificate.valid_to,
50
                      certificate.pem_data,
51
                      certificate.private_key_id,
52
                      certificate.type_id,
53
                      certificate.parent_id]
54
            self.cursor.execute(sql, values)
55
            self.connection.commit()
56
57
            last_id: int = self.cursor.lastrowid
58
59 f3125948 Stanislav Král
            # TODO assure that this is correct
60
            if certificate.type_id == ROOT_CA_ID:
61 f8b23532 David Friesecký
                certificate.parent_id = last_id
62 093d06df Stanislav Král
                self.update(last_id, certificate)
63 f8b23532 David Friesecký
            else:
64 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
65 f8b23532 David Friesecký
                    if usage_value:
66
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
67
                               f"({COL_CERTIFICATE_ID},"
68
                               f"{COL_USAGE_TYPE_ID}) "
69
                               f"VALUES (?,?)")
70
                        values = [last_id, usage_id]
71
                        self.cursor.execute(sql, values)
72
                        self.connection.commit()
73
        except Error as e:
74 d65b022d David Friesecký
            raise DatabaseException(e)
75 f8b23532 David Friesecký
76 805077f5 David Friesecký
        return last_id
77 f8b23532 David Friesecký
78 805077f5 David Friesecký
    def read(self, certificate_id: int):
79 f8b23532 David Friesecký
        """
80
        Reads (selects) a certificate.
81 e9e55282 David Friesecký
82 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
83 e9e55282 David Friesecký
84 f8b23532 David Friesecký
        :return: instance of the Certificate object
85
        """
86 e9e55282 David Friesecký
87 f8b23532 David Friesecký
        try:
88
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
89
                   f"WHERE {COL_ID} = ?")
90
            values = [certificate_id]
91
            self.cursor.execute(sql, values)
92 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
93 e9e55282 David Friesecký
94 6f4a5f24 Captain_Trojan
            if certificate_row is None:
95
                return None
96
97 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
98
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
99
            self.cursor.execute(sql, values)
100
            usage_rows = self.cursor.fetchall()
101 1636aefe David Friesecký
102
            usage_dict: Dict[int, bool] = {}
103
            for usage_row in usage_rows:
104 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
105
106
            certificate: Certificate = Certificate(certificate_row[0],
107
                                                   certificate_row[1],
108
                                                   certificate_row[2],
109
                                                   certificate_row[3],
110
                                                   certificate_row[4],
111
                                                   certificate_row[7],
112 58051326 David Friesecký
                                                   certificate_row[8],
113
                                                   certificate_row[9],
114
                                                   usage_dict,
115
                                                   certificate_row[5],
116
                                                   certificate_row[6])
117 f8b23532 David Friesecký
        except Error as e:
118 d65b022d David Friesecký
            raise DatabaseException(e)
119 f8b23532 David Friesecký
120 d65b022d David Friesecký
        return certificate
121 f8b23532 David Friesecký
122 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
123 9e22e20c David Friesecký
        """
124
        Reads (selects) all certificates (with type).
125
126
        :param filter_type: ID of certificate type from CertificateTypes table
127
128
        :return: list of certificates
129
        """
130
131
        try:
132
            sql_extension = ""
133
            values = []
134
            if filter_type is not None:
135
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
136 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
137 9e22e20c David Friesecký
                values = [filter_type]
138
139 8b049f43 David Friesecký
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
140 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
141
            certificate_rows = self.cursor.fetchall()
142
143
            certificates: List[Certificate] = []
144
            for certificate_row in certificate_rows:
145
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
146
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
147
                values = [certificate_row[0]]
148
                self.cursor.execute(sql, values)
149
                usage_rows = self.cursor.fetchall()
150
151
                usage_dict: Dict[int, bool] = {}
152
                for usage_row in usage_rows:
153
                    usage_dict[usage_row[2]] = True
154
155
                certificates.append(Certificate(certificate_row[0],
156
                                                certificate_row[1],
157
                                                certificate_row[2],
158
                                                certificate_row[3],
159
                                                certificate_row[4],
160
                                                certificate_row[7],
161 58051326 David Friesecký
                                                certificate_row[8],
162
                                                certificate_row[9],
163
                                                usage_dict,
164
                                                certificate_row[5],
165
                                                certificate_row[6]))
166 9e22e20c David Friesecký
        except Error as e:
167 d65b022d David Friesecký
            raise DatabaseException(e)
168 9e22e20c David Friesecký
169 d65b022d David Friesecký
        return certificates
170 9e22e20c David Friesecký
171 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
172 a0602bad David Friesecký
        """
173 f8b23532 David Friesecký
        Updates a certificate.
174
        If the parameter of certificate (Certificate object) is not to be changed,
175
        the same value must be specified.
176 a0602bad David Friesecký
177
        :param certificate_id: ID of specific certificate
178 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
179 a0602bad David Friesecký
180
        :return: the result of whether the updation was successful
181
        """
182
183 f8b23532 David Friesecký
        try:
184 8b049f43 David Friesecký
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
185
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
186
            elif certificate.revocation_date == "":
187
                certificate.revocation_reason = ""
188
189 f8b23532 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
190
                   f"SET {COL_COMMON_NAME} = ?, "
191
                   f"{COL_VALID_FROM} = ?, "
192
                   f"{COL_VALID_TO} = ?, "
193
                   f"{COL_PEM_DATA} = ?, "
194
                   f"{COL_PRIVATE_KEY_ID} = ?, "
195
                   f"{COL_TYPE_ID} = ?, "
196
                   f"{COL_PARENT_ID} = ? "
197
                   f"WHERE {COL_ID} = ?")
198
            values = [certificate.common_name,
199
                      certificate.valid_from,
200
                      certificate.valid_to,
201
                      certificate.pem_data,
202
                      certificate.private_key_id,
203
                      certificate.type_id,
204 805077f5 David Friesecký
                      certificate.parent_id,
205
                      certificate_id]
206 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
207
            self.connection.commit()
208
209
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
210
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
211
            values = [certificate_id]
212
            self.cursor.execute(sql, values)
213
            self.connection.commit()
214
215 f3125948 Stanislav Král
            # iterate over usage pairs
216
            for usage_id, usage_value in certificate.usages.items():
217 f8b23532 David Friesecký
                if usage_value:
218
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
219
                           f"({COL_CERTIFICATE_ID},"
220
                           f"{COL_USAGE_TYPE_ID}) "
221
                           f"VALUES (?,?)")
222
                    values = [certificate_id, usage_id]
223
                    self.cursor.execute(sql, values)
224
                    self.connection.commit()
225
        except Error as e:
226 d65b022d David Friesecký
            raise DatabaseException(e)
227 f8b23532 David Friesecký
228 d65b022d David Friesecký
        return self.cursor.rowcount > 0
229 e9e55282 David Friesecký
230 58051326 David Friesecký
    def delete(self, certificate_id: int):
231 a0602bad David Friesecký
        """
232
        Deletes a certificate
233
234
        :param certificate_id: ID of specific certificate
235
236
        :return: the result of whether the deletion was successful
237
        """
238
239 f8b23532 David Friesecký
        try:
240
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
241
                   f"WHERE {COL_ID} = ?")
242
            values = [certificate_id]
243
            self.cursor.execute(sql, values)
244
            self.connection.commit()
245
        except Error as e:
246 d65b022d David Friesecký
            raise DatabaseException(e)
247 f8b23532 David Friesecký
248 45744020 Stanislav Král
        return self.cursor.rowcount > 0
249 58051326 David Friesecký
250
    def set_certificate_revoked(
251 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
252 58051326 David Friesecký
        """
253
        Revoke a certificate
254
255
        :param certificate_id: ID of specific certificate
256
        :param revocation_date: Date, when the certificate is revoked
257
        :param revocation_reason: Reason of the revocation
258
259 8b049f43 David Friesecký
        :return:
260
            the result of whether the revocation was successful OR
261
            sqlite3.Error if an exception is thrown
262 58051326 David Friesecký
        """
263
264
        try:
265 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
266
                revocation_reason = REV_REASON_UNSPECIFIED
267
            elif revocation_date == "":
268
                return False
269
270 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
271
                   f"SET {COL_REVOCATION_DATE} = ?, "
272 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
273 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
274
            values = [revocation_date,
275
                      revocation_reason,
276
                      certificate_id]
277
            self.cursor.execute(sql, values)
278
            self.connection.commit()
279
        except Error as e:
280 d65b022d David Friesecký
            raise DatabaseException(e)
281 8b049f43 David Friesecký
282 d65b022d David Friesecký
        return self.cursor.rowcount > 0
283 58051326 David Friesecký
284
    def clear_certificate_revocation(self, certificate_id: int):
285
        """
286
        Clear revocation of a certificate
287
288
        :param certificate_id: ID of specific certificate
289
290 8b049f43 David Friesecký
        :return:
291
            the result of whether the clear revocation was successful OR
292
            sqlite3.Error if an exception is thrown
293 58051326 David Friesecký
        """
294
295
        try:
296
            sql = (f"UPDATE {TAB_CERTIFICATES} "
297
                   f"SET {COL_REVOCATION_DATE} = '', "
298 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
299 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
300
            values = [certificate_id]
301
            self.cursor.execute(sql, values)
302
            self.connection.commit()
303
        except Error as e:
304 d65b022d David Friesecký
            raise DatabaseException(e)
305 58051326 David Friesecký
306 d65b022d David Friesecký
        return self.cursor.rowcount > 0
307 58051326 David Friesecký
308
    def get_all_revoked_by(self, certificate_id: int):
309
        """
310
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
311
312
        :param certificate_id: ID of specific certificate
313
314
        :return:
315 8b049f43 David Friesecký
            list of the certificates OR
316
            None if the list is empty OR
317
            sqlite3.Error if an exception is thrown
318 58051326 David Friesecký
        """
319
320
        try:
321
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
322
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
323
            values = [certificate_id]
324
            self.cursor.execute(sql, values)
325
            certificate_rows = self.cursor.fetchall()
326
327
            certificates: List[Certificate] = []
328
            for certificate_row in certificate_rows:
329
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
330
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
331
                values = [certificate_row[0]]
332
                self.cursor.execute(sql, values)
333
                usage_rows = self.cursor.fetchall()
334
335
                usage_dict: Dict[int, bool] = {}
336
                for usage_row in usage_rows:
337
                    usage_dict[usage_row[2]] = True
338
339
                certificates.append(Certificate(certificate_row[0],
340
                                                certificate_row[1],
341
                                                certificate_row[2],
342
                                                certificate_row[3],
343
                                                certificate_row[4],
344
                                                certificate_row[7],
345
                                                certificate_row[8],
346
                                                certificate_row[9],
347
                                                usage_dict,
348
                                                certificate_row[5],
349
                                                certificate_row[6]))
350
        except Error as e:
351 d65b022d David Friesecký
            raise DatabaseException(e)
352 58051326 David Friesecký
353 d65b022d David Friesecký
        return certificates
354 58051326 David Friesecký
355
    def get_all_issued_by(self, certificate_id: int):
356
        """
357
        Get list of the certificates that are direct descendants of the certificate with the ID
358
359
        :param certificate_id: ID of specific certificate
360
361
        :return:
362 8b049f43 David Friesecký
            list of the certificates OR
363
            None if the list is empty OR
364
            sqlite3.Error if an exception is thrown
365 58051326 David Friesecký
        """
366
367
        try:
368
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
369 8b049f43 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
370
            values = [certificate_id, certificate_id]
371 58051326 David Friesecký
            self.cursor.execute(sql, values)
372
            certificate_rows = self.cursor.fetchall()
373
374
            certificates: List[Certificate] = []
375
            for certificate_row in certificate_rows:
376
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
377
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
378
                values = [certificate_row[0]]
379
                self.cursor.execute(sql, values)
380
                usage_rows = self.cursor.fetchall()
381
382
                usage_dict: Dict[int, bool] = {}
383
                for usage_row in usage_rows:
384
                    usage_dict[usage_row[2]] = True
385
386
                certificates.append(Certificate(certificate_row[0],
387
                                                certificate_row[1],
388
                                                certificate_row[2],
389
                                                certificate_row[3],
390
                                                certificate_row[4],
391
                                                certificate_row[7],
392
                                                certificate_row[8],
393
                                                certificate_row[9],
394
                                                usage_dict,
395
                                                certificate_row[5],
396
                                                certificate_row[6]))
397
        except Error as e:
398 d65b022d David Friesecký
            raise DatabaseException(e)
399 58051326 David Friesecký
400 d65b022d David Friesecký
        return certificates