Projekt

Obecné

Profil

Stáhnout (15.6 KB) Statistiky
| Větev: | Tag: | Revize:
1 9e22e20c David Friesecký
from typing import Dict, List
2 f8b23532 David Friesecký
from sqlite3 import Connection, Cursor, Error
3 1636aefe David Friesecký
4 181e1196 Jan Pašek
from src.model.certificate import Certificate
5 f8b23532 David Friesecký
from src.constants import *
6 e9e55282 David Friesecký
7
8 f8b23532 David Friesecký
class CertificateRepository:
9 25053504 David Friesecký
10 f8b23532 David Friesecký
    def __init__(self, connection: Connection, cursor: Cursor):
11 a0602bad David Friesecký
        """
12 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
13 a0602bad David Friesecký
14 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16 a0602bad David Friesecký
        """
17
18 f8b23532 David Friesecký
        self.connection = connection
19
        self.cursor = cursor
20 e9e55282 David Friesecký
21 805077f5 David Friesecký
    def create(self, certificate: Certificate):
22 a0602bad David Friesecký
        """
23 f8b23532 David Friesecký
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
29 a0602bad David Friesecký
        """
30
31 f8b23532 David Friesecký
        try:
32 8b049f43 David Friesecký
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
33
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
34
            elif certificate.revocation_date == "":
35
                certificate.revocation_reason = ""
36
37 f8b23532 David Friesecký
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
38
                   f"({COL_COMMON_NAME},"
39
                   f"{COL_VALID_FROM},"
40
                   f"{COL_VALID_TO},"
41
                   f"{COL_PEM_DATA},"
42
                   f"{COL_PRIVATE_KEY_ID},"
43
                   f"{COL_TYPE_ID},"
44
                   f"{COL_PARENT_ID})"
45
                   f"VALUES(?,?,?,?,?,?,?)")
46
            values = [certificate.common_name,
47
                      certificate.valid_from,
48
                      certificate.valid_to,
49
                      certificate.pem_data,
50
                      certificate.private_key_id,
51
                      certificate.type_id,
52
                      certificate.parent_id]
53
            self.cursor.execute(sql, values)
54
            self.connection.commit()
55
56
            last_id: int = self.cursor.lastrowid
57
58 f3125948 Stanislav Král
            # TODO assure that this is correct
59
            if certificate.type_id == ROOT_CA_ID:
60 f8b23532 David Friesecký
                certificate.parent_id = last_id
61 093d06df Stanislav Král
                self.update(last_id, certificate)
62 f8b23532 David Friesecký
            else:
63 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
64 f8b23532 David Friesecký
                    if usage_value:
65
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
66
                               f"({COL_CERTIFICATE_ID},"
67
                               f"{COL_USAGE_TYPE_ID}) "
68
                               f"VALUES (?,?)")
69
                        values = [last_id, usage_id]
70
                        self.cursor.execute(sql, values)
71
                        self.connection.commit()
72
        except Error as e:
73 58051326 David Friesecký
            return e
74 f8b23532 David Friesecký
75 805077f5 David Friesecký
        return last_id
76 f8b23532 David Friesecký
77 805077f5 David Friesecký
    def read(self, certificate_id: int):
78 f8b23532 David Friesecký
        """
79
        Reads (selects) a certificate.
80 e9e55282 David Friesecký
81 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
82 e9e55282 David Friesecký
83 f8b23532 David Friesecký
        :return: instance of the Certificate object
84
        """
85 e9e55282 David Friesecký
86 f8b23532 David Friesecký
        try:
87
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
88
                   f"WHERE {COL_ID} = ?")
89
            values = [certificate_id]
90
            self.cursor.execute(sql, values)
91 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
92 e9e55282 David Friesecký
93 6f4a5f24 Captain_Trojan
            if certificate_row is None:
94
                return None
95
96 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
97
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
98
            self.cursor.execute(sql, values)
99
            usage_rows = self.cursor.fetchall()
100 1636aefe David Friesecký
101
            usage_dict: Dict[int, bool] = {}
102
            for usage_row in usage_rows:
103 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
104
105
            certificate: Certificate = Certificate(certificate_row[0],
106
                                                   certificate_row[1],
107
                                                   certificate_row[2],
108
                                                   certificate_row[3],
109
                                                   certificate_row[4],
110
                                                   certificate_row[7],
111 58051326 David Friesecký
                                                   certificate_row[8],
112
                                                   certificate_row[9],
113
                                                   usage_dict,
114
                                                   certificate_row[5],
115
                                                   certificate_row[6])
116 f8b23532 David Friesecký
        except Error as e:
117 58051326 David Friesecký
            return e
118 f8b23532 David Friesecký
119 805077f5 David Friesecký
        if len(certificate_row) > 0:
120
            return certificate
121
        else:
122
            return None
123 f8b23532 David Friesecký
124 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
125 9e22e20c David Friesecký
        """
126
        Reads (selects) all certificates (with type).
127
128
        :param filter_type: ID of certificate type from CertificateTypes table
129
130
        :return: list of certificates
131
        """
132
133
        try:
134
            sql_extension = ""
135
            values = []
136
            if filter_type is not None:
137
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
138 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
139 9e22e20c David Friesecký
                values = [filter_type]
140
141 8b049f43 David Friesecký
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
142 9e22e20c David Friesecký
            self.cursor.execute(sql, values)
143
            certificate_rows = self.cursor.fetchall()
144
145
            certificates: List[Certificate] = []
146
            for certificate_row in certificate_rows:
147
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
148
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
149
                values = [certificate_row[0]]
150
                self.cursor.execute(sql, values)
151
                usage_rows = self.cursor.fetchall()
152
153
                usage_dict: Dict[int, bool] = {}
154
                for usage_row in usage_rows:
155
                    usage_dict[usage_row[2]] = True
156
157
                certificates.append(Certificate(certificate_row[0],
158
                                                certificate_row[1],
159
                                                certificate_row[2],
160
                                                certificate_row[3],
161
                                                certificate_row[4],
162
                                                certificate_row[7],
163 58051326 David Friesecký
                                                certificate_row[8],
164
                                                certificate_row[9],
165
                                                usage_dict,
166
                                                certificate_row[5],
167
                                                certificate_row[6]))
168 9e22e20c David Friesecký
        except Error as e:
169 58051326 David Friesecký
            return e
170 9e22e20c David Friesecký
171 58051326 David Friesecký
        if len(certificates) > 0:
172
            return certificates
173
        else:
174
            return None
175 9e22e20c David Friesecký
176 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
177 a0602bad David Friesecký
        """
178 f8b23532 David Friesecký
        Updates a certificate.
179
        If the parameter of certificate (Certificate object) is not to be changed,
180
        the same value must be specified.
181 a0602bad David Friesecký
182
        :param certificate_id: ID of specific certificate
183 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
184 a0602bad David Friesecký
185
        :return: the result of whether the updation was successful
186
        """
187
188 f8b23532 David Friesecký
        try:
189 8b049f43 David Friesecký
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
190
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
191
            elif certificate.revocation_date == "":
192
                certificate.revocation_reason = ""
193
194 f8b23532 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
195
                   f"SET {COL_COMMON_NAME} = ?, "
196
                   f"{COL_VALID_FROM} = ?, "
197
                   f"{COL_VALID_TO} = ?, "
198
                   f"{COL_PEM_DATA} = ?, "
199
                   f"{COL_PRIVATE_KEY_ID} = ?, "
200
                   f"{COL_TYPE_ID} = ?, "
201
                   f"{COL_PARENT_ID} = ? "
202
                   f"WHERE {COL_ID} = ?")
203
            values = [certificate.common_name,
204
                      certificate.valid_from,
205
                      certificate.valid_to,
206
                      certificate.pem_data,
207
                      certificate.private_key_id,
208
                      certificate.type_id,
209 805077f5 David Friesecký
                      certificate.parent_id,
210
                      certificate_id]
211 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
212
            self.connection.commit()
213
214
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
215
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
216
            values = [certificate_id]
217
            self.cursor.execute(sql, values)
218
            self.connection.commit()
219
220 f3125948 Stanislav Král
            # iterate over usage pairs
221
            for usage_id, usage_value in certificate.usages.items():
222 f8b23532 David Friesecký
                if usage_value:
223
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
224
                           f"({COL_CERTIFICATE_ID},"
225
                           f"{COL_USAGE_TYPE_ID}) "
226
                           f"VALUES (?,?)")
227
                    values = [certificate_id, usage_id]
228
                    self.cursor.execute(sql, values)
229
                    self.connection.commit()
230
        except Error as e:
231 58051326 David Friesecký
            return e
232 f8b23532 David Friesecký
233 8b049f43 David Friesecký
        if self.cursor.rowcount > 0:
234
            return True
235
236
        return False
237 e9e55282 David Friesecký
238 58051326 David Friesecký
    def delete(self, certificate_id: int):
239 a0602bad David Friesecký
        """
240
        Deletes a certificate
241
242
        :param certificate_id: ID of specific certificate
243
244
        :return: the result of whether the deletion was successful
245
        """
246
247 f8b23532 David Friesecký
        try:
248
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
249
                   f"WHERE {COL_ID} = ?")
250
            values = [certificate_id]
251
            self.cursor.execute(sql, values)
252
            self.connection.commit()
253
        except Error as e:
254 58051326 David Friesecký
            return e
255 f8b23532 David Friesecký
256 45744020 Stanislav Král
        return self.cursor.rowcount > 0
257 58051326 David Friesecký
258
    def set_certificate_revoked(
259 8b049f43 David Friesecký
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
260 58051326 David Friesecký
        """
261
        Revoke a certificate
262
263
        :param certificate_id: ID of specific certificate
264
        :param revocation_date: Date, when the certificate is revoked
265
        :param revocation_reason: Reason of the revocation
266
267 8b049f43 David Friesecký
        :return:
268
            the result of whether the revocation was successful OR
269
            sqlite3.Error if an exception is thrown
270 58051326 David Friesecký
        """
271
272
        try:
273 8b049f43 David Friesecký
            if revocation_date != "" and revocation_reason == "":
274
                revocation_reason = REV_REASON_UNSPECIFIED
275
            elif revocation_date == "":
276
                return False
277
278 58051326 David Friesecký
            sql = (f"UPDATE {TAB_CERTIFICATES} "
279
                   f"SET {COL_REVOCATION_DATE} = ?, "
280 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = ? "
281 58051326 David Friesecký
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
282
            values = [revocation_date,
283
                      revocation_reason,
284
                      certificate_id]
285
            self.cursor.execute(sql, values)
286
            self.connection.commit()
287
        except Error as e:
288
            return e
289
290 8b049f43 David Friesecký
        if self.cursor.rowcount > 0:
291
            return True
292
293
        return False
294 58051326 David Friesecký
295
    def clear_certificate_revocation(self, certificate_id: int):
296
        """
297
        Clear revocation of a certificate
298
299
        :param certificate_id: ID of specific certificate
300
301 8b049f43 David Friesecký
        :return:
302
            the result of whether the clear revocation was successful OR
303
            sqlite3.Error if an exception is thrown
304 58051326 David Friesecký
        """
305
306
        try:
307
            sql = (f"UPDATE {TAB_CERTIFICATES} "
308
                   f"SET {COL_REVOCATION_DATE} = '', "
309 8b049f43 David Friesecký
                   f"{COL_REVOCATION_REASON} = '' "
310 58051326 David Friesecký
                   f"WHERE {COL_ID} = ?")
311
            values = [certificate_id]
312
            self.cursor.execute(sql, values)
313
            self.connection.commit()
314
        except Error as e:
315
            return e
316
317
        return True
318
319
    def get_all_revoked_by(self, certificate_id: int):
320
        """
321
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
322
323
        :param certificate_id: ID of specific certificate
324
325
        :return:
326 8b049f43 David Friesecký
            list of the certificates OR
327
            None if the list is empty OR
328
            sqlite3.Error if an exception is thrown
329 58051326 David Friesecký
        """
330
331
        try:
332
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
333
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
334
            values = [certificate_id]
335
            self.cursor.execute(sql, values)
336
            certificate_rows = self.cursor.fetchall()
337
338
            certificates: List[Certificate] = []
339
            for certificate_row in certificate_rows:
340
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
341
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
342
                values = [certificate_row[0]]
343
                self.cursor.execute(sql, values)
344
                usage_rows = self.cursor.fetchall()
345
346
                usage_dict: Dict[int, bool] = {}
347
                for usage_row in usage_rows:
348
                    usage_dict[usage_row[2]] = True
349
350
                certificates.append(Certificate(certificate_row[0],
351
                                                certificate_row[1],
352
                                                certificate_row[2],
353
                                                certificate_row[3],
354
                                                certificate_row[4],
355
                                                certificate_row[7],
356
                                                certificate_row[8],
357
                                                certificate_row[9],
358
                                                usage_dict,
359
                                                certificate_row[5],
360
                                                certificate_row[6]))
361
        except Error as e:
362
            return e
363
364
        if len(certificates) > 0:
365
            return certificates
366
        else:
367
            return None
368
369
    def get_all_issued_by(self, certificate_id: int):
370
        """
371
        Get list of the certificates that are direct descendants of the certificate with the ID
372
373
        :param certificate_id: ID of specific certificate
374
375
        :return:
376 8b049f43 David Friesecký
            list of the certificates OR
377
            None if the list is empty OR
378
            sqlite3.Error if an exception is thrown
379 58051326 David Friesecký
        """
380
381
        try:
382
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
383 8b049f43 David Friesecký
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
384
            values = [certificate_id, certificate_id]
385 58051326 David Friesecký
            self.cursor.execute(sql, values)
386
            certificate_rows = self.cursor.fetchall()
387
388
            certificates: List[Certificate] = []
389
            for certificate_row in certificate_rows:
390
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
391
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
392
                values = [certificate_row[0]]
393
                self.cursor.execute(sql, values)
394
                usage_rows = self.cursor.fetchall()
395
396
                usage_dict: Dict[int, bool] = {}
397
                for usage_row in usage_rows:
398
                    usage_dict[usage_row[2]] = True
399
400
                certificates.append(Certificate(certificate_row[0],
401
                                                certificate_row[1],
402
                                                certificate_row[2],
403
                                                certificate_row[3],
404
                                                certificate_row[4],
405
                                                certificate_row[7],
406
                                                certificate_row[8],
407
                                                certificate_row[9],
408
                                                usage_dict,
409
                                                certificate_row[5],
410
                                                certificate_row[6]))
411
        except Error as e:
412
            return e
413
414
        if len(certificates) > 0:
415
            return certificates
416
        else:
417
            return None