Projekt

Obecné

Profil

Stáhnout (16.8 KB) Statistiky
| Větev: | Tag: | Revize:
1
import time
2
from sqlite3 import Connection, Error
3
from typing import Dict, List
4

    
5
from src.exceptions.database_exception import DatabaseException
6
from injector import inject
7
from src.constants import *
8
from src.model.certificate import Certificate
9

    
10

    
11
class CertificateRepository:
12

    
13
    @inject
14
    def __init__(self, connection: Connection):
15
        """
16
        Constructor of the CertificateRepository object
17

    
18
        :param connection: Instance of the Connection object
19
        """
20
        self.connection = connection
21
        self.cursor = connection.cursor()
22

    
23
    def create(self, certificate: Certificate):
24
        """
25
        Creates a certificate.
26
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
27

    
28
        :param certificate: Instance of the Certificate object
29

    
30
        :return: the result of whether the creation was successful
31
        """
32

    
33
        try:
34
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
35
                   f"({COL_COMMON_NAME},"
36
                   f"{COL_VALID_FROM},"
37
                   f"{COL_VALID_TO},"
38
                   f"{COL_PEM_DATA},"
39
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47
                      certificate.private_key_id,
48
                      certificate.type_id,
49
                      certificate.parent_id]
50
            self.cursor.execute(sql, values)
51
            self.connection.commit()
52

    
53
            last_id: int = self.cursor.lastrowid
54

    
55
            # TODO assure that this is correct
56
            if certificate.type_id == ROOT_CA_ID:
57
                certificate.parent_id = last_id
58
                self.update(last_id, certificate)
59
            else:
60
                for usage_id, usage_value in certificate.usages.items():
61
                    if usage_value:
62
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
63
                               f"({COL_CERTIFICATE_ID},"
64
                               f"{COL_USAGE_TYPE_ID}) "
65
                               f"VALUES (?,?)")
66
                        values = [last_id, usage_id]
67
                        self.cursor.execute(sql, values)
68
                        self.connection.commit()
69
        except Error as e:
70
            raise DatabaseException(e)
71

    
72
        return last_id
73

    
74
    def read(self, certificate_id: int):
75
        """
76
        Reads (selects) a certificate.
77

    
78
        :param certificate_id: ID of specific certificate
79

    
80
        :return: instance of the Certificate object
81
        """
82

    
83
        try:
84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
85
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
86
            values = [certificate_id]
87
            self.cursor.execute(sql, values)
88
            certificate_row = self.cursor.fetchone()
89

    
90
            if certificate_row is None:
91
                return None
92

    
93
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
94
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
95
            self.cursor.execute(sql, values)
96
            usage_rows = self.cursor.fetchall()
97

    
98
            usage_dict: Dict[int, bool] = {}
99
            for usage_row in usage_rows:
100
                usage_dict[usage_row[2]] = True
101

    
102
            certificate: Certificate = Certificate(certificate_row[0],
103
                                                   certificate_row[1],
104
                                                   certificate_row[2],
105
                                                   certificate_row[3],
106
                                                   certificate_row[4],
107
                                                   certificate_row[8],
108
                                                   certificate_row[9],
109
                                                   certificate_row[10],
110
                                                   usage_dict,
111
                                                   certificate_row[5],
112
                                                   certificate_row[6])
113
        except Error as e:
114
            raise DatabaseException(e)
115

    
116
        return certificate
117

    
118
    def read_all(self, filter_type: int = None):
119
        """
120
        Reads (selects) all certificates (with type).
121

    
122
        :param filter_type: ID of certificate type from CertificateTypes table
123

    
124
        :return: list of certificates
125
        """
126

    
127
        try:
128
            sql_extension = ""
129
            values = []
130
            if filter_type is not None:
131
                sql_extension = (f" AND {COL_TYPE_ID} = ("
132
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
133
                values = [filter_type]
134

    
135
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
136
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
137
            self.cursor.execute(sql, values)
138
            certificate_rows = self.cursor.fetchall()
139

    
140
            certificates: List[Certificate] = []
141
            for certificate_row in certificate_rows:
142
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
143
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
144
                values = [certificate_row[0]]
145
                self.cursor.execute(sql, values)
146
                usage_rows = self.cursor.fetchall()
147

    
148
                usage_dict: Dict[int, bool] = {}
149
                for usage_row in usage_rows:
150
                    usage_dict[usage_row[2]] = True
151

    
152
                certificates.append(Certificate(certificate_row[0],
153
                                                certificate_row[1],
154
                                                certificate_row[2],
155
                                                certificate_row[3],
156
                                                certificate_row[4],
157
                                                certificate_row[8],
158
                                                certificate_row[9],
159
                                                certificate_row[10],
160
                                                usage_dict,
161
                                                certificate_row[5],
162
                                                certificate_row[6]))
163
        except Error as e:
164
            raise DatabaseException(e)
165

    
166
        return certificates
167

    
168
    def update(self, certificate_id: int, certificate: Certificate):
169
        """
170
        Updates a certificate.
171
        If the parameter of certificate (Certificate object) is not to be changed,
172
        the same value must be specified.
173

    
174
        :param certificate_id: ID of specific certificate
175
        :param certificate: Instance of the Certificate object
176

    
177
        :return: the result of whether the updation was successful
178
        """
179

    
180
        try:
181
            sql = (f"UPDATE {TAB_CERTIFICATES} "
182
                   f"SET {COL_COMMON_NAME} = ?, "
183
                   f"{COL_VALID_FROM} = ?, "
184
                   f"{COL_VALID_TO} = ?, "
185
                   f"{COL_PEM_DATA} = ?, "
186
                   f"{COL_PRIVATE_KEY_ID} = ?, "
187
                   f"{COL_TYPE_ID} = ?, "
188
                   f"{COL_PARENT_ID} = ? "
189
                   f"WHERE {COL_ID} = ?")
190
            values = [certificate.common_name,
191
                      certificate.valid_from,
192
                      certificate.valid_to,
193
                      certificate.pem_data,
194
                      certificate.private_key_id,
195
                      certificate.type_id,
196
                      certificate.parent_id,
197
                      certificate_id]
198

    
199
            self.cursor.execute(sql, values)
200
            self.connection.commit()
201

    
202
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
203
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
204
            values = [certificate_id]
205
            self.cursor.execute(sql, values)
206
            self.connection.commit()
207

    
208
            check_updated = self.cursor.rowcount > 0
209

    
210
            # iterate over usage pairs
211
            for usage_id, usage_value in certificate.usages.items():
212
                if usage_value:
213
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
214
                           f"({COL_CERTIFICATE_ID},"
215
                           f"{COL_USAGE_TYPE_ID}) "
216
                           f"VALUES (?,?)")
217
                    values = [certificate_id, usage_id]
218
                    self.cursor.execute(sql, values)
219
                    self.connection.commit()
220
        except Error as e:
221
            raise DatabaseException(e)
222

    
223
        return check_updated
224

    
225
    def delete(self, certificate_id: int):
226
        """
227
        Deletes a certificate
228

    
229
        :param certificate_id: ID of specific certificate
230

    
231
        :return: the result of whether the deletion was successful
232
        """
233

    
234
        try:
235
            sql = (f"UPDATE {TAB_CERTIFICATES} "
236
                   f"SET {COL_DELETION_DATE} = ? "
237
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
238
            values = [int(time.time()),
239
                      certificate_id]
240
            self.cursor.execute(sql, values)
241
            self.connection.commit()
242
        except Error as e:
243
            raise DatabaseException(e)
244

    
245
        return self.cursor.rowcount > 0
246

    
247
    def set_certificate_revoked(
248
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
249
        """
250
        Revoke a certificate
251

    
252
        :param certificate_id: ID of specific certificate
253
        :param revocation_date: Date, when the certificate is revoked
254
        :param revocation_reason: Reason of the revocation
255

    
256
        :return:
257
            the result of whether the revocation was successful OR
258
            sqlite3.Error if an exception is thrown
259
        """
260

    
261
        try:
262
            if revocation_date != "" and revocation_reason == "":
263
                revocation_reason = REV_REASON_UNSPECIFIED
264
            elif revocation_date == "":
265
                return False
266

    
267
            sql = (f"UPDATE {TAB_CERTIFICATES} "
268
                   f"SET {COL_REVOCATION_DATE} = ?, "
269
                   f"{COL_REVOCATION_REASON} = ? "
270
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
271
            values = [revocation_date,
272
                      revocation_reason,
273
                      certificate_id]
274
            self.cursor.execute(sql, values)
275
            self.connection.commit()
276
        except Error as e:
277
            raise DatabaseException(e)
278

    
279
        return self.cursor.rowcount > 0
280

    
281
    def clear_certificate_revocation(self, certificate_id: int):
282
        """
283
        Clear revocation of a certificate
284

    
285
        :param certificate_id: ID of specific certificate
286

    
287
        :return:
288
            the result of whether the clear revocation was successful OR
289
            sqlite3.Error if an exception is thrown
290
        """
291

    
292
        try:
293
            sql = (f"UPDATE {TAB_CERTIFICATES} "
294
                   f"SET {COL_REVOCATION_DATE} = '', "
295
                   f"{COL_REVOCATION_REASON} = '' "
296
                   f"WHERE {COL_ID} = ?")
297
            values = [certificate_id]
298
            self.cursor.execute(sql, values)
299
            self.connection.commit()
300
        except Error as e:
301
            raise DatabaseException(e)
302

    
303
        return self.cursor.rowcount > 0
304

    
305
    def get_all_revoked_by(self, certificate_id: int):
306
        """
307
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
308

    
309
        :param certificate_id: ID of specific certificate
310

    
311
        :return:
312
            list of the certificates OR
313
            None if the list is empty OR
314
            sqlite3.Error if an exception is thrown
315
        """
316

    
317
        try:
318
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
319
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
320
            values = [certificate_id]
321
            self.cursor.execute(sql, values)
322
            certificate_rows = self.cursor.fetchall()
323

    
324
            certificates: List[Certificate] = []
325
            for certificate_row in certificate_rows:
326
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
327
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
328
                values = [certificate_row[0]]
329
                self.cursor.execute(sql, values)
330
                usage_rows = self.cursor.fetchall()
331

    
332
                usage_dict: Dict[int, bool] = {}
333
                for usage_row in usage_rows:
334
                    usage_dict[usage_row[2]] = True
335

    
336
                certificates.append(Certificate(certificate_row[0],
337
                                                certificate_row[1],
338
                                                certificate_row[2],
339
                                                certificate_row[3],
340
                                                certificate_row[4],
341
                                                certificate_row[8],
342
                                                certificate_row[9],
343
                                                certificate_row[10],
344
                                                usage_dict,
345
                                                certificate_row[5],
346
                                                certificate_row[6]))
347
        except Error as e:
348
            raise DatabaseException(e)
349

    
350
        return certificates
351

    
352
    def get_all_issued_by(self, certificate_id: int):
353
        """
354
        Get list of the certificates that are direct descendants of the certificate with the ID
355

    
356
        :param certificate_id: ID of specific certificate
357

    
358
        :return:
359
            list of the certificates OR
360
            None if the list is empty OR
361
            sqlite3.Error if an exception is thrown
362
        """
363

    
364
        try:
365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
366
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
367
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
368
            values = [certificate_id, certificate_id]
369
            self.cursor.execute(sql, values)
370
            certificate_rows = self.cursor.fetchall()
371

    
372
            certificates: List[Certificate] = []
373
            for certificate_row in certificate_rows:
374
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
375
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
376
                values = [certificate_row[0]]
377
                self.cursor.execute(sql, values)
378
                usage_rows = self.cursor.fetchall()
379

    
380
                usage_dict: Dict[int, bool] = {}
381
                for usage_row in usage_rows:
382
                    usage_dict[usage_row[2]] = True
383

    
384
                certificates.append(Certificate(certificate_row[0],
385
                                                certificate_row[1],
386
                                                certificate_row[2],
387
                                                certificate_row[3],
388
                                                certificate_row[4],
389
                                                certificate_row[8],
390
                                                certificate_row[9],
391
                                                certificate_row[10],
392
                                                usage_dict,
393
                                                certificate_row[5],
394
                                                certificate_row[6]))
395
        except Error as e:
396
            raise DatabaseException(e)
397

    
398
        return certificates
399

    
400
    def get_all_descendants_of(self, certificate_id: int):
401
        """
402
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
403
        between C and its root certificate authority (i.e. is an ancestor of C).
404
        :param certificate_id: target certificate ID
405
        :return: list of all descendants
406
        """
407
        def dfs(children_of, this, collection: list):
408
            for child in children_of(this.certificate_id):
409
                dfs(children_of, child, collection)
410
            collection.append(this)
411

    
412
        subtree_root = self.read(certificate_id)
413
        if subtree_root is None:
414
            return None
415

    
416
        all_certs = []
417
        dfs(self.get_all_issued_by, subtree_root, all_certs)
418
        return all_certs
419

    
420
    def get_next_id(self) -> int:
421
        """
422
        Get identifier of the next certificate that will be inserted into the database
423
        :return: identifier of the next certificate that will be added into the database
424
        """
425
        # get next IDs of all tables
426
        self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
427
        results = self.cursor.fetchall()
428

    
429
        # search for next ID in Certificates table and return it
430
        for result in results:
431
            if result[0] == TAB_CERTIFICATES:
432
                return result[1] + 1  # current last id + 1
433
        # if certificates table is not present in the query results, return 1
434
        return 1
(2-2/3)