Projekt

Obecné

Profil

Stáhnout (15.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import Dict, List
2
from sqlite3 import Connection, Cursor, Error
3

    
4
from src.model.certificate import Certificate
5
from src.constants import *
6

    
7

    
8
class CertificateRepository:
9

    
10
    def __init__(self, connection: Connection, cursor: Cursor):
11
        """
12
        Constructor of the CertificateRepository object
13

    
14
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16
        """
17

    
18
        self.connection = connection
19
        self.cursor = cursor
20

    
21
    def create(self, certificate: Certificate):
22
        """
23
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25

    
26
        :param certificate: Instance of the Certificate object
27

    
28
        :return: the result of whether the creation was successful
29
        """
30

    
31
        try:
32
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
33
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
34
            elif certificate.revocation_date == "":
35
                certificate.revocation_reason = ""
36

    
37
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
38
                   f"({COL_COMMON_NAME},"
39
                   f"{COL_VALID_FROM},"
40
                   f"{COL_VALID_TO},"
41
                   f"{COL_PEM_DATA},"
42
                   f"{COL_PRIVATE_KEY_ID},"
43
                   f"{COL_TYPE_ID},"
44
                   f"{COL_PARENT_ID})"
45
                   f"VALUES(?,?,?,?,?,?,?)")
46
            values = [certificate.common_name,
47
                      certificate.valid_from,
48
                      certificate.valid_to,
49
                      certificate.pem_data,
50
                      certificate.private_key_id,
51
                      certificate.type_id,
52
                      certificate.parent_id]
53
            self.cursor.execute(sql, values)
54
            self.connection.commit()
55

    
56
            last_id: int = self.cursor.lastrowid
57

    
58
            # TODO assure that this is correct
59
            if certificate.type_id == ROOT_CA_ID:
60
                certificate.parent_id = last_id
61
                self.update(last_id, certificate)
62
            else:
63
                for usage_id, usage_value in certificate.usages.items():
64
                    if usage_value:
65
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
66
                               f"({COL_CERTIFICATE_ID},"
67
                               f"{COL_USAGE_TYPE_ID}) "
68
                               f"VALUES (?,?)")
69
                        values = [last_id, usage_id]
70
                        self.cursor.execute(sql, values)
71
                        self.connection.commit()
72
        except Error as e:
73
            return e
74

    
75
        return last_id
76

    
77
    def read(self, certificate_id: int):
78
        """
79
        Reads (selects) a certificate.
80

    
81
        :param certificate_id: ID of specific certificate
82

    
83
        :return: instance of the Certificate object
84
        """
85

    
86
        try:
87
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
88
                   f"WHERE {COL_ID} = ?")
89
            values = [certificate_id]
90
            self.cursor.execute(sql, values)
91
            certificate_row = self.cursor.fetchone()
92

    
93
            if certificate_row is None:
94
                return None
95

    
96
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
97
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
98
            self.cursor.execute(sql, values)
99
            usage_rows = self.cursor.fetchall()
100

    
101
            usage_dict: Dict[int, bool] = {}
102
            for usage_row in usage_rows:
103
                usage_dict[usage_row[2]] = True
104

    
105
            certificate: Certificate = Certificate(certificate_row[0],
106
                                                   certificate_row[1],
107
                                                   certificate_row[2],
108
                                                   certificate_row[3],
109
                                                   certificate_row[4],
110
                                                   certificate_row[7],
111
                                                   certificate_row[8],
112
                                                   certificate_row[9],
113
                                                   usage_dict,
114
                                                   certificate_row[5],
115
                                                   certificate_row[6])
116
        except Error as e:
117
            return e
118

    
119
        if len(certificate_row) > 0:
120
            return certificate
121
        else:
122
            return None
123

    
124
    def read_all(self, filter_type: int = None):
125
        """
126
        Reads (selects) all certificates (with type).
127

    
128
        :param filter_type: ID of certificate type from CertificateTypes table
129

    
130
        :return: list of certificates
131
        """
132

    
133
        try:
134
            sql_extension = ""
135
            values = []
136
            if filter_type is not None:
137
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
138
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
139
                values = [filter_type]
140

    
141
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
142
            self.cursor.execute(sql, values)
143
            certificate_rows = self.cursor.fetchall()
144

    
145
            certificates: List[Certificate] = []
146
            for certificate_row in certificate_rows:
147
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
148
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
149
                values = [certificate_row[0]]
150
                self.cursor.execute(sql, values)
151
                usage_rows = self.cursor.fetchall()
152

    
153
                usage_dict: Dict[int, bool] = {}
154
                for usage_row in usage_rows:
155
                    usage_dict[usage_row[2]] = True
156

    
157
                certificates.append(Certificate(certificate_row[0],
158
                                                certificate_row[1],
159
                                                certificate_row[2],
160
                                                certificate_row[3],
161
                                                certificate_row[4],
162
                                                certificate_row[7],
163
                                                certificate_row[8],
164
                                                certificate_row[9],
165
                                                usage_dict,
166
                                                certificate_row[5],
167
                                                certificate_row[6]))
168
        except Error as e:
169
            return e
170

    
171
        if len(certificates) > 0:
172
            return certificates
173
        else:
174
            return None
175

    
176
    def update(self, certificate_id: int, certificate: Certificate):
177
        """
178
        Updates a certificate.
179
        If the parameter of certificate (Certificate object) is not to be changed,
180
        the same value must be specified.
181

    
182
        :param certificate_id: ID of specific certificate
183
        :param certificate: Instance of the Certificate object
184

    
185
        :return: the result of whether the updation was successful
186
        """
187

    
188
        try:
189
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
190
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
191
            elif certificate.revocation_date == "":
192
                certificate.revocation_reason = ""
193

    
194
            sql = (f"UPDATE {TAB_CERTIFICATES} "
195
                   f"SET {COL_COMMON_NAME} = ?, "
196
                   f"{COL_VALID_FROM} = ?, "
197
                   f"{COL_VALID_TO} = ?, "
198
                   f"{COL_PEM_DATA} = ?, "
199
                   f"{COL_PRIVATE_KEY_ID} = ?, "
200
                   f"{COL_TYPE_ID} = ?, "
201
                   f"{COL_PARENT_ID} = ? "
202
                   f"WHERE {COL_ID} = ?")
203
            values = [certificate.common_name,
204
                      certificate.valid_from,
205
                      certificate.valid_to,
206
                      certificate.pem_data,
207
                      certificate.private_key_id,
208
                      certificate.type_id,
209
                      certificate.parent_id,
210
                      certificate_id]
211
            self.cursor.execute(sql, values)
212
            self.connection.commit()
213

    
214
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
215
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
216
            values = [certificate_id]
217
            self.cursor.execute(sql, values)
218
            self.connection.commit()
219

    
220
            # iterate over usage pairs
221
            for usage_id, usage_value in certificate.usages.items():
222
                if usage_value:
223
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
224
                           f"({COL_CERTIFICATE_ID},"
225
                           f"{COL_USAGE_TYPE_ID}) "
226
                           f"VALUES (?,?)")
227
                    values = [certificate_id, usage_id]
228
                    self.cursor.execute(sql, values)
229
                    self.connection.commit()
230
        except Error as e:
231
            return e
232

    
233
        if self.cursor.rowcount > 0:
234
            return True
235

    
236
        return False
237

    
238
    def delete(self, certificate_id: int):
239
        """
240
        Deletes a certificate
241

    
242
        :param certificate_id: ID of specific certificate
243

    
244
        :return: the result of whether the deletion was successful
245
        """
246

    
247
        try:
248
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
249
                   f"WHERE {COL_ID} = ?")
250
            values = [certificate_id]
251
            self.cursor.execute(sql, values)
252
            self.connection.commit()
253
        except Error as e:
254
            return e
255

    
256
        return self.cursor.rowcount > 0
257

    
258
    def set_certificate_revoked(
259
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
260
        """
261
        Revoke a certificate
262

    
263
        :param certificate_id: ID of specific certificate
264
        :param revocation_date: Date, when the certificate is revoked
265
        :param revocation_reason: Reason of the revocation
266

    
267
        :return:
268
            the result of whether the revocation was successful OR
269
            sqlite3.Error if an exception is thrown
270
        """
271

    
272
        try:
273
            if revocation_date != "" and revocation_reason == "":
274
                revocation_reason = REV_REASON_UNSPECIFIED
275
            elif revocation_date == "":
276
                return False
277

    
278
            sql = (f"UPDATE {TAB_CERTIFICATES} "
279
                   f"SET {COL_REVOCATION_DATE} = ?, "
280
                   f"{COL_REVOCATION_REASON} = ? "
281
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
282
            values = [revocation_date,
283
                      revocation_reason,
284
                      certificate_id]
285
            self.cursor.execute(sql, values)
286
            self.connection.commit()
287
        except Error as e:
288
            return e
289

    
290
        if self.cursor.rowcount > 0:
291
            return True
292

    
293
        return False
294

    
295
    def clear_certificate_revocation(self, certificate_id: int):
296
        """
297
        Clear revocation of a certificate
298

    
299
        :param certificate_id: ID of specific certificate
300

    
301
        :return:
302
            the result of whether the clear revocation was successful OR
303
            sqlite3.Error if an exception is thrown
304
        """
305

    
306
        try:
307
            sql = (f"UPDATE {TAB_CERTIFICATES} "
308
                   f"SET {COL_REVOCATION_DATE} = '', "
309
                   f"{COL_REVOCATION_REASON} = '' "
310
                   f"WHERE {COL_ID} = ?")
311
            values = [certificate_id]
312
            self.cursor.execute(sql, values)
313
            self.connection.commit()
314
        except Error as e:
315
            return e
316

    
317
        return True
318

    
319
    def get_all_revoked_by(self, certificate_id: int):
320
        """
321
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
322

    
323
        :param certificate_id: ID of specific certificate
324

    
325
        :return:
326
            list of the certificates OR
327
            None if the list is empty OR
328
            sqlite3.Error if an exception is thrown
329
        """
330

    
331
        try:
332
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
333
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
334
            values = [certificate_id]
335
            self.cursor.execute(sql, values)
336
            certificate_rows = self.cursor.fetchall()
337

    
338
            certificates: List[Certificate] = []
339
            for certificate_row in certificate_rows:
340
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
341
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
342
                values = [certificate_row[0]]
343
                self.cursor.execute(sql, values)
344
                usage_rows = self.cursor.fetchall()
345

    
346
                usage_dict: Dict[int, bool] = {}
347
                for usage_row in usage_rows:
348
                    usage_dict[usage_row[2]] = True
349

    
350
                certificates.append(Certificate(certificate_row[0],
351
                                                certificate_row[1],
352
                                                certificate_row[2],
353
                                                certificate_row[3],
354
                                                certificate_row[4],
355
                                                certificate_row[7],
356
                                                certificate_row[8],
357
                                                certificate_row[9],
358
                                                usage_dict,
359
                                                certificate_row[5],
360
                                                certificate_row[6]))
361
        except Error as e:
362
            return e
363

    
364
        if len(certificates) > 0:
365
            return certificates
366
        else:
367
            return None
368

    
369
    def get_all_issued_by(self, certificate_id: int):
370
        """
371
        Get list of the certificates that are direct descendants of the certificate with the ID
372

    
373
        :param certificate_id: ID of specific certificate
374

    
375
        :return:
376
            list of the certificates OR
377
            None if the list is empty OR
378
            sqlite3.Error if an exception is thrown
379
        """
380

    
381
        try:
382
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
383
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
384
            values = [certificate_id, certificate_id]
385
            self.cursor.execute(sql, values)
386
            certificate_rows = self.cursor.fetchall()
387

    
388
            certificates: List[Certificate] = []
389
            for certificate_row in certificate_rows:
390
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
391
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
392
                values = [certificate_row[0]]
393
                self.cursor.execute(sql, values)
394
                usage_rows = self.cursor.fetchall()
395

    
396
                usage_dict: Dict[int, bool] = {}
397
                for usage_row in usage_rows:
398
                    usage_dict[usage_row[2]] = True
399

    
400
                certificates.append(Certificate(certificate_row[0],
401
                                                certificate_row[1],
402
                                                certificate_row[2],
403
                                                certificate_row[3],
404
                                                certificate_row[4],
405
                                                certificate_row[7],
406
                                                certificate_row[8],
407
                                                certificate_row[9],
408
                                                usage_dict,
409
                                                certificate_row[5],
410
                                                certificate_row[6]))
411
        except Error as e:
412
            return e
413

    
414
        if len(certificates) > 0:
415
            return certificates
416
        else:
417
            return None
(2-2/3)