Projekt

Obecné

Profil

Stáhnout (16.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error
2
from typing import Dict, List
3

    
4
from src.exceptions.database_exception import DatabaseException
5
from injector import inject
6
from src.constants import *
7
from src.model.certificate import Certificate
8
from src.utils.logger import Logger
9

    
10

    
11
class CertificateRepository:
12

    
13
    @inject
14
    def __init__(self, connection: Connection):
15
        """
16
        Constructor of the CertificateRepository object
17

    
18
        :param connection: Instance of the Connection object
19
        """
20
        self.connection = connection
21
        self.cursor = connection.cursor()
22

    
23
    def create(self, certificate: Certificate):
24
        """
25
        Creates a certificate.
26
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
27

    
28
        :param certificate: Instance of the Certificate object
29

    
30
        :return: the result of whether the creation was successful
31
        """
32

    
33
        try:
34
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
35
                   f"({COL_COMMON_NAME},"
36
                   f"{COL_VALID_FROM},"
37
                   f"{COL_VALID_TO},"
38
                   f"{COL_PEM_DATA},"
39
                   f"{COL_PRIVATE_KEY_ID},"
40
                   f"{COL_TYPE_ID},"
41
                   f"{COL_PARENT_ID})"
42
                   f"VALUES(?,?,?,?,?,?,?)")
43
            values = [certificate.common_name,
44
                      certificate.valid_from,
45
                      certificate.valid_to,
46
                      certificate.pem_data,
47
                      certificate.private_key_id,
48
                      certificate.type_id,
49
                      certificate.parent_id]
50
            self.cursor.execute(sql, values)
51
            self.connection.commit()
52

    
53
            last_id: int = self.cursor.lastrowid
54

    
55
            # TODO assure that this is correct
56
            if certificate.type_id == ROOT_CA_ID:
57
                certificate.parent_id = last_id
58
                self.update(last_id, certificate)
59
            else:
60
                for usage_id, usage_value in certificate.usages.items():
61
                    if usage_value:
62
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
63
                               f"({COL_CERTIFICATE_ID},"
64
                               f"{COL_USAGE_TYPE_ID}) "
65
                               f"VALUES (?,?)")
66
                        values = [last_id, usage_id]
67
                        self.cursor.execute(sql, values)
68
                        self.connection.commit()
69
        except Error as e:
70
            raise DatabaseException(e)
71

    
72
        return last_id
73

    
74
    def read(self, certificate_id: int):
75
        """
76
        Reads (selects) a certificate.
77

    
78
        :param certificate_id: ID of specific certificate
79

    
80
        :return: instance of the Certificate object
81
        """
82

    
83
        try:
84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
85
                   f"WHERE {COL_ID} = ?")
86
            values = [certificate_id]
87
            self.cursor.execute(sql, values)
88
            certificate_row = self.cursor.fetchone()
89

    
90
            if certificate_row is None:
91
                return None
92

    
93
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
94
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
95
            self.cursor.execute(sql, values)
96
            usage_rows = self.cursor.fetchall()
97

    
98
            usage_dict: Dict[int, bool] = {}
99
            for usage_row in usage_rows:
100
                usage_dict[usage_row[2]] = True
101

    
102
            certificate: Certificate = Certificate(certificate_row[0],
103
                                                   certificate_row[1],
104
                                                   certificate_row[2],
105
                                                   certificate_row[3],
106
                                                   certificate_row[4],
107
                                                   certificate_row[7],
108
                                                   certificate_row[8],
109
                                                   certificate_row[9],
110
                                                   usage_dict,
111
                                                   certificate_row[5],
112
                                                   certificate_row[6])
113
        except Error as e:
114
            raise DatabaseException(e)
115

    
116
        return certificate
117

    
118
    def read_all(self, filter_type: int = None):
119
        """
120
        Reads (selects) all certificates (with type).
121

    
122
        :param filter_type: ID of certificate type from CertificateTypes table
123

    
124
        :return: list of certificates
125
        """
126

    
127
        try:
128
            sql_extension = ""
129
            values = []
130
            if filter_type is not None:
131
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
132
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
133
                values = [filter_type]
134

    
135
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
136
            self.cursor.execute(sql, values)
137
            certificate_rows = self.cursor.fetchall()
138

    
139
            certificates: List[Certificate] = []
140
            for certificate_row in certificate_rows:
141
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
142
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
143
                values = [certificate_row[0]]
144
                self.cursor.execute(sql, values)
145
                usage_rows = self.cursor.fetchall()
146

    
147
                usage_dict: Dict[int, bool] = {}
148
                for usage_row in usage_rows:
149
                    usage_dict[usage_row[2]] = True
150

    
151
                certificates.append(Certificate(certificate_row[0],
152
                                                certificate_row[1],
153
                                                certificate_row[2],
154
                                                certificate_row[3],
155
                                                certificate_row[4],
156
                                                certificate_row[7],
157
                                                certificate_row[8],
158
                                                certificate_row[9],
159
                                                usage_dict,
160
                                                certificate_row[5],
161
                                                certificate_row[6]))
162
        except Error as e:
163
            raise DatabaseException(e)
164

    
165
        return certificates
166

    
167
    def update(self, certificate_id: int, certificate: Certificate):
168
        """
169
        Updates a certificate.
170
        If the parameter of certificate (Certificate object) is not to be changed,
171
        the same value must be specified.
172

    
173
        :param certificate_id: ID of specific certificate
174
        :param certificate: Instance of the Certificate object
175

    
176
        :return: the result of whether the updation was successful
177
        """
178

    
179
        try:
180
            sql = (f"UPDATE {TAB_CERTIFICATES} "
181
                   f"SET {COL_COMMON_NAME} = ?, "
182
                   f"{COL_VALID_FROM} = ?, "
183
                   f"{COL_VALID_TO} = ?, "
184
                   f"{COL_PEM_DATA} = ?, "
185
                   f"{COL_PRIVATE_KEY_ID} = ?, "
186
                   f"{COL_TYPE_ID} = ?, "
187
                   f"{COL_PARENT_ID} = ? "
188
                   f"WHERE {COL_ID} = ?")
189
            values = [certificate.common_name,
190
                      certificate.valid_from,
191
                      certificate.valid_to,
192
                      certificate.pem_data,
193
                      certificate.private_key_id,
194
                      certificate.type_id,
195
                      certificate.parent_id,
196
                      certificate_id]
197
            self.cursor.execute(sql, values)
198
            self.connection.commit()
199

    
200
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
201
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
202
            values = [certificate_id]
203
            self.cursor.execute(sql, values)
204
            self.connection.commit()
205

    
206
            # iterate over usage pairs
207
            for usage_id, usage_value in certificate.usages.items():
208
                if usage_value:
209
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
210
                           f"({COL_CERTIFICATE_ID},"
211
                           f"{COL_USAGE_TYPE_ID}) "
212
                           f"VALUES (?,?)")
213
                    values = [certificate_id, usage_id]
214
                    self.cursor.execute(sql, values)
215
                    self.connection.commit()
216
        except Error as e:
217
            raise DatabaseException(e)
218

    
219
        return self.cursor.rowcount > 0
220

    
221
    def delete(self, certificate_id: int):
222
        """
223
        Deletes a certificate
224

    
225
        :param certificate_id: ID of specific certificate
226

    
227
        :return: the result of whether the deletion was successful
228
        """
229

    
230
        try:
231
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
232
                   f"WHERE {COL_ID} = ?")
233
            values = [certificate_id]
234
            self.cursor.execute(sql, values)
235
            self.connection.commit()
236
        except Error as e:
237
            raise DatabaseException(e)
238

    
239
        return self.cursor.rowcount > 0
240

    
241
    def set_certificate_revoked(
242
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
243
        """
244
        Revoke a certificate
245

    
246
        :param certificate_id: ID of specific certificate
247
        :param revocation_date: Date, when the certificate is revoked
248
        :param revocation_reason: Reason of the revocation
249

    
250
        :return:
251
            the result of whether the revocation was successful OR
252
            sqlite3.Error if an exception is thrown
253
        """
254

    
255
        try:
256
            if revocation_date != "" and revocation_reason == "":
257
                revocation_reason = REV_REASON_UNSPECIFIED
258
            elif revocation_date == "":
259
                return False
260

    
261
            sql = (f"UPDATE {TAB_CERTIFICATES} "
262
                   f"SET {COL_REVOCATION_DATE} = ?, "
263
                   f"{COL_REVOCATION_REASON} = ? "
264
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
265
            values = [revocation_date,
266
                      revocation_reason,
267
                      certificate_id]
268
            self.cursor.execute(sql, values)
269
            self.connection.commit()
270
        except Error as e:
271
            raise DatabaseException(e)
272

    
273
        return self.cursor.rowcount > 0
274

    
275
    def clear_certificate_revocation(self, certificate_id: int):
276
        """
277
        Clear revocation of a certificate
278

    
279
        :param certificate_id: ID of specific certificate
280

    
281
        :return:
282
            the result of whether the clear revocation was successful OR
283
            sqlite3.Error if an exception is thrown
284
        """
285

    
286
        try:
287
            sql = (f"UPDATE {TAB_CERTIFICATES} "
288
                   f"SET {COL_REVOCATION_DATE} = '', "
289
                   f"{COL_REVOCATION_REASON} = '' "
290
                   f"WHERE {COL_ID} = ?")
291
            values = [certificate_id]
292
            self.cursor.execute(sql, values)
293
            self.connection.commit()
294
        except Error as e:
295
            raise DatabaseException(e)
296

    
297
        return self.cursor.rowcount > 0
298

    
299
    def get_all_revoked_by(self, certificate_id: int):
300
        """
301
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
302

    
303
        :param certificate_id: ID of specific certificate
304

    
305
        :return:
306
            list of the certificates OR
307
            None if the list is empty OR
308
            sqlite3.Error if an exception is thrown
309
        """
310

    
311
        try:
312
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
313
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
314
            values = [certificate_id]
315
            self.cursor.execute(sql, values)
316
            certificate_rows = self.cursor.fetchall()
317

    
318
            certificates: List[Certificate] = []
319
            for certificate_row in certificate_rows:
320
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
321
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
322
                values = [certificate_row[0]]
323
                self.cursor.execute(sql, values)
324
                usage_rows = self.cursor.fetchall()
325

    
326
                usage_dict: Dict[int, bool] = {}
327
                for usage_row in usage_rows:
328
                    usage_dict[usage_row[2]] = True
329

    
330
                certificates.append(Certificate(certificate_row[0],
331
                                                certificate_row[1],
332
                                                certificate_row[2],
333
                                                certificate_row[3],
334
                                                certificate_row[4],
335
                                                certificate_row[7],
336
                                                certificate_row[8],
337
                                                certificate_row[9],
338
                                                usage_dict,
339
                                                certificate_row[5],
340
                                                certificate_row[6]))
341
        except Error as e:
342
            raise DatabaseException(e)
343

    
344
        return certificates
345

    
346
    def get_all_issued_by(self, certificate_id: int):
347
        """
348
        Get list of the certificates that are direct descendants of the certificate with the ID
349

    
350
        :param certificate_id: ID of specific certificate
351

    
352
        :return:
353
            list of the certificates OR
354
            None if the list is empty OR
355
            sqlite3.Error if an exception is thrown
356
        """
357

    
358
        try:
359
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
360
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
361
            values = [certificate_id, certificate_id]
362
            self.cursor.execute(sql, values)
363
            certificate_rows = self.cursor.fetchall()
364

    
365
            certificates: List[Certificate] = []
366
            for certificate_row in certificate_rows:
367
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
368
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
369
                values = [certificate_row[0]]
370
                self.cursor.execute(sql, values)
371
                usage_rows = self.cursor.fetchall()
372

    
373
                usage_dict: Dict[int, bool] = {}
374
                for usage_row in usage_rows:
375
                    usage_dict[usage_row[2]] = True
376

    
377
                certificates.append(Certificate(certificate_row[0],
378
                                                certificate_row[1],
379
                                                certificate_row[2],
380
                                                certificate_row[3],
381
                                                certificate_row[4],
382
                                                certificate_row[7],
383
                                                certificate_row[8],
384
                                                certificate_row[9],
385
                                                usage_dict,
386
                                                certificate_row[5],
387
                                                certificate_row[6]))
388
        except Error as e:
389
            raise DatabaseException(e)
390

    
391
        return certificates
392

    
393
    def get_all_descendants_of(self, certificate_id: int):
394
        """
395
        Get a list of all certificates C such that the certificate identified by "certificate_id" belongs to a trust chain
396
        between C and its root certificate authority (i.e. is an ancestor of C).
397
        :param certificate_id: target certificate ID
398
        :return: list of all descendants
399
        """
400
        def dfs(children_of, this, collection: list):
401
            for child in children_of(this.certificate_id):
402
                dfs(children_of, child, collection)
403
            collection.append(this)
404

    
405
        subtree_root = self.read(certificate_id)
406
        if subtree_root is None:
407
            return None
408

    
409
        all_certs = []
410
        dfs(self.get_all_issued_by, subtree_root, all_certs)
411
        return all_certs
412

    
413
    def get_next_id(self) -> int:
414
        """
415
        Get identifier of the next certificate that will be inserted into the database
416
        :return: identifier of the next certificate that will be added into the database
417
        """
418
        # get next IDs of all tables
419
        self.cursor.execute("SELECT * FROM SQLITE_SEQUENCE")
420
        results = self.cursor.fetchall()
421

    
422
        # search for next ID in Certificates table and return it
423
        for result in results:
424
            if result[0] == TAB_CERTIFICATES:
425
                return result[1] + 1  # current last id + 1
426
        # if certificates table is not present in the query results, return 1
427
        return 1
(2-2/3)