Projekt

Obecné

Profil

Stáhnout (15 KB) Statistiky
| Větev: | Tag: | Revize:
1 9e22e20c David Friesecký
from typing import Dict, List
2 f8b23532 David Friesecký
from sqlite3 import Connection, Cursor, Error
3 1636aefe David Friesecký
4 181e1196 Jan Pašek
from src.model.certificate import Certificate
5 f8b23532 David Friesecký
from src.constants import *
6 e9e55282 David Friesecký
7
8 f8b23532 David Friesecký
class CertificateRepository:
9 25053504 David Friesecký
10 f8b23532 David Friesecký
    def __init__(self, connection: Connection, cursor: Cursor):
11 a0602bad David Friesecký
        """
12 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
13 a0602bad David Friesecký
14 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16 a0602bad David Friesecký
        """
17
18 f8b23532 David Friesecký
        self.connection = connection
19
        self.cursor = cursor
20 e9e55282 David Friesecký
21 805077f5 David Friesecký
    def create(self, certificate: Certificate):
22 a0602bad David Friesecký
        """
23 f8b23532 David Friesecký
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
29 a0602bad David Friesecký
        """
30
31 f8b23532 David Friesecký
        try:
32
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
33
                   f"({COL_COMMON_NAME},"
34
                   f"{COL_VALID_FROM},"
35
                   f"{COL_VALID_TO},"
36
                   f"{COL_PEM_DATA},"
37 58051326 David Friesecký
                   f"{COL_REVOCATION_DATE},"
38
                   f"{COL_REVOCATION_REASON}"
39 f8b23532 David Friesecký
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47 58051326 David Friesecký
                      certificate.revocation_date,
48
                      certificate.revocation_reason,
49 f8b23532 David Friesecký
                      certificate.private_key_id,
50
                      certificate.type_id,
51
                      certificate.parent_id]
52
            self.cursor.execute(sql, values)
53
            self.connection.commit()
54
55
            last_id: int = self.cursor.lastrowid
56
57 f3125948 Stanislav Král
            # TODO assure that this is correct
58
            if certificate.type_id == ROOT_CA_ID:
59 f8b23532 David Friesecký
                certificate.parent_id = last_id
60 093d06df Stanislav Král
                self.update(last_id, certificate)
61 f8b23532 David Friesecký
            else:
62 fa72c969 Stanislav Král
                for usage_id, usage_value in certificate.usages.items():
63 f8b23532 David Friesecký
                    if usage_value:
64
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
65
                               f"({COL_CERTIFICATE_ID},"
66
                               f"{COL_USAGE_TYPE_ID}) "
67
                               f"VALUES (?,?)")
68
                        values = [last_id, usage_id]
69
                        self.cursor.execute(sql, values)
70
                        self.connection.commit()
71
        except Error as e:
72 58051326 David Friesecký
            return e
73 f8b23532 David Friesecký
74 805077f5 David Friesecký
        return last_id
75 f8b23532 David Friesecký
76 805077f5 David Friesecký
    def read(self, certificate_id: int):
77 f8b23532 David Friesecký
        """
78
        Reads (selects) a certificate.
79 e9e55282 David Friesecký
80 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
81 e9e55282 David Friesecký
82 f8b23532 David Friesecký
        :return: instance of the Certificate object
83
        """
84 e9e55282 David Friesecký
85 f8b23532 David Friesecký
        try:
86
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
87
                   f"WHERE {COL_ID} = ?")
88
            values = [certificate_id]
89
            self.cursor.execute(sql, values)
90 a7411982 Stanislav Král
            certificate_row = self.cursor.fetchone()
91 e9e55282 David Friesecký
92 6f4a5f24 Captain_Trojan
            if certificate_row is None:
93
                return None
94
95 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
96
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
97
            self.cursor.execute(sql, values)
98
            usage_rows = self.cursor.fetchall()
99 1636aefe David Friesecký
100
            usage_dict: Dict[int, bool] = {}
101
            for usage_row in usage_rows:
102 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
103
104
            certificate: Certificate = Certificate(certificate_row[0],
105
                                                   certificate_row[1],
106
                                                   certificate_row[2],
107
                                                   certificate_row[3],
108
                                                   certificate_row[4],
109
                                                   certificate_row[7],
110 58051326 David Friesecký
                                                   certificate_row[8],
111
                                                   certificate_row[9],
112
                                                   usage_dict,
113
                                                   certificate_row[5],
114
                                                   certificate_row[6])
115 f8b23532 David Friesecký
        except Error as e:
116 58051326 David Friesecký
            return e
117 f8b23532 David Friesecký
118 805077f5 David Friesecký
        if len(certificate_row) > 0:
119
            return certificate
120
        else:
121
            return None
122 f8b23532 David Friesecký
123 58051326 David Friesecký
    def read_all(self, filter_type: int = None):
124 9e22e20c David Friesecký
        """
125
        Reads (selects) all certificates (with type).
126
127
        :param filter_type: ID of certificate type from CertificateTypes table
128
129
        :return: list of certificates
130
        """
131
132
        try:
133
            sql_extension = ""
134
            values = []
135
            if filter_type is not None:
136
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
137 6f64f062 Stanislav Král
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
138 9e22e20c David Friesecký
                values = [filter_type]
139
140
            sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}")
141
            self.cursor.execute(sql, values)
142
            certificate_rows = self.cursor.fetchall()
143
144
            certificates: List[Certificate] = []
145
            for certificate_row in certificate_rows:
146
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
147
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
148
                values = [certificate_row[0]]
149
                self.cursor.execute(sql, values)
150
                usage_rows = self.cursor.fetchall()
151
152
                usage_dict: Dict[int, bool] = {}
153
                for usage_row in usage_rows:
154
                    usage_dict[usage_row[2]] = True
155
156
                certificates.append(Certificate(certificate_row[0],
157
                                                certificate_row[1],
158
                                                certificate_row[2],
159
                                                certificate_row[3],
160
                                                certificate_row[4],
161
                                                certificate_row[7],
162 58051326 David Friesecký
                                                certificate_row[8],
163
                                                certificate_row[9],
164
                                                usage_dict,
165
                                                certificate_row[5],
166
                                                certificate_row[6]))
167 9e22e20c David Friesecký
        except Error as e:
168 58051326 David Friesecký
            return e
169 9e22e20c David Friesecký
170 58051326 David Friesecký
        if len(certificates) > 0:
171
            return certificates
172
        else:
173
            return None
174 9e22e20c David Friesecký
175 58051326 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate):
176 a0602bad David Friesecký
        """
177 f8b23532 David Friesecký
        Updates a certificate.
178
        If the parameter of certificate (Certificate object) is not to be changed,
179
        the same value must be specified.
180 a0602bad David Friesecký
181
        :param certificate_id: ID of specific certificate
182 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
183 a0602bad David Friesecký
184
        :return: the result of whether the updation was successful
185
        """
186
187 f8b23532 David Friesecký
        try:
188
            sql = (f"UPDATE {TAB_CERTIFICATES} "
189
                   f"SET {COL_COMMON_NAME} = ?, "
190
                   f"{COL_VALID_FROM} = ?, "
191
                   f"{COL_VALID_TO} = ?, "
192
                   f"{COL_PEM_DATA} = ?, "
193 58051326 David Friesecký
                   f"{COL_REVOCATION_DATE} = ?, "
194
                   f"{COL_REVOCATION_REASON} = ?, "
195 f8b23532 David Friesecký
                   f"{COL_PRIVATE_KEY_ID} = ?, "
196
                   f"{COL_TYPE_ID} = ?, "
197
                   f"{COL_PARENT_ID} = ? "
198
                   f"WHERE {COL_ID} = ?")
199
            values = [certificate.common_name,
200
                      certificate.valid_from,
201
                      certificate.valid_to,
202
                      certificate.pem_data,
203 58051326 David Friesecký
                      certificate.revocation_date,
204
                      certificate.revocation_reason,
205 f8b23532 David Friesecký
                      certificate.private_key_id,
206
                      certificate.type_id,
207 805077f5 David Friesecký
                      certificate.parent_id,
208
                      certificate_id]
209 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
210
            self.connection.commit()
211
212
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
213
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
214
            values = [certificate_id]
215
            self.cursor.execute(sql, values)
216
            self.connection.commit()
217
218 f3125948 Stanislav Král
            # iterate over usage pairs
219
            for usage_id, usage_value in certificate.usages.items():
220 f8b23532 David Friesecký
                if usage_value:
221
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
222
                           f"({COL_CERTIFICATE_ID},"
223
                           f"{COL_USAGE_TYPE_ID}) "
224
                           f"VALUES (?,?)")
225
                    values = [certificate_id, usage_id]
226
                    self.cursor.execute(sql, values)
227
                    self.connection.commit()
228
        except Error as e:
229 58051326 David Friesecký
            return e
230 f8b23532 David Friesecký
231
        return True
232 e9e55282 David Friesecký
233 58051326 David Friesecký
    def delete(self, certificate_id: int):
234 a0602bad David Friesecký
        """
235
        Deletes a certificate
236
237
        :param certificate_id: ID of specific certificate
238
239
        :return: the result of whether the deletion was successful
240
        """
241
242 f8b23532 David Friesecký
        try:
243
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
244
                   f"WHERE {COL_ID} = ?")
245
            values = [certificate_id]
246
            self.cursor.execute(sql, values)
247
            self.connection.commit()
248
        except Error as e:
249 58051326 David Friesecký
            return e
250 f8b23532 David Friesecký
251 45744020 Stanislav Král
        return self.cursor.rowcount > 0
252 58051326 David Friesecký
253
    def set_certificate_revoked(
254
            self, certificate_id: int, revocation_date: str, revocation_reason: str = "unspecified"):
255
        """
256
        Revoke a certificate
257
258
        :param certificate_id: ID of specific certificate
259
        :param revocation_date: Date, when the certificate is revoked
260
        :param revocation_reason: Reason of the revocation
261
262
        :return: the result of whether the revocation was successful
263
        """
264
265
        try:
266
            sql = (f"UPDATE {TAB_CERTIFICATES} "
267
                   f"SET {COL_REVOCATION_DATE} = ?, "
268
                   f"{COL_REVOCATION_REASON} = ?, "
269
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
270
            values = [revocation_date,
271
                      revocation_reason,
272
                      certificate_id]
273
            self.cursor.execute(sql, values)
274
            self.connection.commit()
275
        except Error as e:
276
            return e
277
278
        return True
279
280
    def clear_certificate_revocation(self, certificate_id: int):
281
        """
282
        Clear revocation of a certificate
283
284
        :param certificate_id: ID of specific certificate
285
286
        :return: the result of whether the clear revocation was successful
287
        """
288
289
        try:
290
            sql = (f"UPDATE {TAB_CERTIFICATES} "
291
                   f"SET {COL_REVOCATION_DATE} = '', "
292
                   f"{COL_REVOCATION_REASON} = '', "
293
                   f"WHERE {COL_ID} = ?")
294
            values = [certificate_id]
295
            self.cursor.execute(sql, values)
296
            self.connection.commit()
297
        except Error as e:
298
            return e
299
300
        return True
301
302
    def get_all_revoked_by(self, certificate_id: int):
303
        """
304
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
305
306
        :param certificate_id: ID of specific certificate
307
308
        :return:
309
            # list of the certificates
310
            # None if the list is empty
311
            # sqlite3.Error if an exception is thrown
312
        """
313
314
        try:
315
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
316
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
317
            values = [certificate_id]
318
            self.cursor.execute(sql, values)
319
            certificate_rows = self.cursor.fetchall()
320
321
            certificates: List[Certificate] = []
322
            for certificate_row in certificate_rows:
323
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
324
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
325
                values = [certificate_row[0]]
326
                self.cursor.execute(sql, values)
327
                usage_rows = self.cursor.fetchall()
328
329
                usage_dict: Dict[int, bool] = {}
330
                for usage_row in usage_rows:
331
                    usage_dict[usage_row[2]] = True
332
333
                certificates.append(Certificate(certificate_row[0],
334
                                                certificate_row[1],
335
                                                certificate_row[2],
336
                                                certificate_row[3],
337
                                                certificate_row[4],
338
                                                certificate_row[7],
339
                                                certificate_row[8],
340
                                                certificate_row[9],
341
                                                usage_dict,
342
                                                certificate_row[5],
343
                                                certificate_row[6]))
344
        except Error as e:
345
            return e
346
347
        if len(certificates) > 0:
348
            return certificates
349
        else:
350
            return None
351
352
    def get_all_issued_by(self, certificate_id: int):
353
        """
354
        Get list of the certificates that are direct descendants of the certificate with the ID
355
356
        :param certificate_id: ID of specific certificate
357
358
        :return:
359
            # list of the certificates
360
            # None if the list is empty
361
            # sqlite3.Error if an exception is thrown
362
        """
363
364
        try:
365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
366
                   f"WHERE {COL_PARENT_ID} = ?")
367
            values = [certificate_id]
368
            self.cursor.execute(sql, values)
369
            certificate_rows = self.cursor.fetchall()
370
371
            certificates: List[Certificate] = []
372
            for certificate_row in certificate_rows:
373
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
374
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
375
                values = [certificate_row[0]]
376
                self.cursor.execute(sql, values)
377
                usage_rows = self.cursor.fetchall()
378
379
                usage_dict: Dict[int, bool] = {}
380
                for usage_row in usage_rows:
381
                    usage_dict[usage_row[2]] = True
382
383
                certificates.append(Certificate(certificate_row[0],
384
                                                certificate_row[1],
385
                                                certificate_row[2],
386
                                                certificate_row[3],
387
                                                certificate_row[4],
388
                                                certificate_row[7],
389
                                                certificate_row[8],
390
                                                certificate_row[9],
391
                                                usage_dict,
392
                                                certificate_row[5],
393
                                                certificate_row[6]))
394
        except Error as e:
395
            return e
396
397
        if len(certificates) > 0:
398
            return certificates
399
        else:
400
            return None