Projekt

Obecné

Profil

Stáhnout (15.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import Dict, List
2
from sqlite3 import Connection, Cursor, Error
3

    
4
from src.exceptions.database_exception import DatabaseException
5
from src.model.certificate import Certificate
6
from src.constants import *
7

    
8

    
9
class CertificateRepository:
10

    
11
    def __init__(self, connection: Connection, cursor: Cursor):
12
        """
13
        Constructor of the CertificateRepository object
14

    
15
        :param connection: Instance of the Connection object
16
        :param cursor: Instance of the Cursor object
17
        """
18

    
19
        self.connection = connection
20
        self.cursor = cursor
21

    
22
    def create(self, certificate: Certificate):
23
        """
24
        Creates a certificate.
25
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
26

    
27
        :param certificate: Instance of the Certificate object
28

    
29
        :return: the result of whether the creation was successful
30
        """
31

    
32
        try:
33
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
34
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
35
            elif certificate.revocation_date == "":
36
                certificate.revocation_reason = ""
37

    
38
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
39
                   f"({COL_COMMON_NAME},"
40
                   f"{COL_VALID_FROM},"
41
                   f"{COL_VALID_TO},"
42
                   f"{COL_PEM_DATA},"
43
                   f"{COL_PRIVATE_KEY_ID},"
44
                   f"{COL_TYPE_ID},"
45
                   f"{COL_PARENT_ID})"
46
                   f"VALUES(?,?,?,?,?,?,?)")
47
            values = [certificate.common_name,
48
                      certificate.valid_from,
49
                      certificate.valid_to,
50
                      certificate.pem_data,
51
                      certificate.private_key_id,
52
                      certificate.type_id,
53
                      certificate.parent_id]
54
            self.cursor.execute(sql, values)
55
            self.connection.commit()
56

    
57
            last_id: int = self.cursor.lastrowid
58

    
59
            # TODO assure that this is correct
60
            if certificate.type_id == ROOT_CA_ID:
61
                certificate.parent_id = last_id
62
                self.update(last_id, certificate)
63
            else:
64
                for usage_id, usage_value in certificate.usages.items():
65
                    if usage_value:
66
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
67
                               f"({COL_CERTIFICATE_ID},"
68
                               f"{COL_USAGE_TYPE_ID}) "
69
                               f"VALUES (?,?)")
70
                        values = [last_id, usage_id]
71
                        self.cursor.execute(sql, values)
72
                        self.connection.commit()
73
        except Error as e:
74
            raise DatabaseException(e)
75

    
76
        return last_id
77

    
78
    def read(self, certificate_id: int):
79
        """
80
        Reads (selects) a certificate.
81

    
82
        :param certificate_id: ID of specific certificate
83

    
84
        :return: instance of the Certificate object
85
        """
86

    
87
        try:
88
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
89
                   f"WHERE {COL_ID} = ?")
90
            values = [certificate_id]
91
            self.cursor.execute(sql, values)
92
            certificate_row = self.cursor.fetchone()
93

    
94
            if certificate_row is None:
95
                return None
96

    
97
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
98
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
99
            self.cursor.execute(sql, values)
100
            usage_rows = self.cursor.fetchall()
101

    
102
            usage_dict: Dict[int, bool] = {}
103
            for usage_row in usage_rows:
104
                usage_dict[usage_row[2]] = True
105

    
106
            certificate: Certificate = Certificate(certificate_row[0],
107
                                                   certificate_row[1],
108
                                                   certificate_row[2],
109
                                                   certificate_row[3],
110
                                                   certificate_row[4],
111
                                                   certificate_row[7],
112
                                                   certificate_row[8],
113
                                                   certificate_row[9],
114
                                                   usage_dict,
115
                                                   certificate_row[5],
116
                                                   certificate_row[6])
117
        except Error as e:
118
            raise DatabaseException(e)
119

    
120
        return certificate
121

    
122
    def read_all(self, filter_type: int = None):
123
        """
124
        Reads (selects) all certificates (with type).
125

    
126
        :param filter_type: ID of certificate type from CertificateTypes table
127

    
128
        :return: list of certificates
129
        """
130

    
131
        try:
132
            sql_extension = ""
133
            values = []
134
            if filter_type is not None:
135
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
136
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
137
                values = [filter_type]
138

    
139
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
140
            self.cursor.execute(sql, values)
141
            certificate_rows = self.cursor.fetchall()
142

    
143
            certificates: List[Certificate] = []
144
            for certificate_row in certificate_rows:
145
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
146
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
147
                values = [certificate_row[0]]
148
                self.cursor.execute(sql, values)
149
                usage_rows = self.cursor.fetchall()
150

    
151
                usage_dict: Dict[int, bool] = {}
152
                for usage_row in usage_rows:
153
                    usage_dict[usage_row[2]] = True
154

    
155
                certificates.append(Certificate(certificate_row[0],
156
                                                certificate_row[1],
157
                                                certificate_row[2],
158
                                                certificate_row[3],
159
                                                certificate_row[4],
160
                                                certificate_row[7],
161
                                                certificate_row[8],
162
                                                certificate_row[9],
163
                                                usage_dict,
164
                                                certificate_row[5],
165
                                                certificate_row[6]))
166
        except Error as e:
167
            raise DatabaseException(e)
168

    
169
        return certificates
170

    
171
    def update(self, certificate_id: int, certificate: Certificate):
172
        """
173
        Updates a certificate.
174
        If the parameter of certificate (Certificate object) is not to be changed,
175
        the same value must be specified.
176

    
177
        :param certificate_id: ID of specific certificate
178
        :param certificate: Instance of the Certificate object
179

    
180
        :return: the result of whether the updation was successful
181
        """
182

    
183
        try:
184
            if certificate.revocation_date != "" and certificate.revocation_reason == "":
185
                certificate.revocation_reason = REV_REASON_UNSPECIFIED
186
            elif certificate.revocation_date == "":
187
                certificate.revocation_reason = ""
188

    
189
            sql = (f"UPDATE {TAB_CERTIFICATES} "
190
                   f"SET {COL_COMMON_NAME} = ?, "
191
                   f"{COL_VALID_FROM} = ?, "
192
                   f"{COL_VALID_TO} = ?, "
193
                   f"{COL_PEM_DATA} = ?, "
194
                   f"{COL_PRIVATE_KEY_ID} = ?, "
195
                   f"{COL_TYPE_ID} = ?, "
196
                   f"{COL_PARENT_ID} = ? "
197
                   f"WHERE {COL_ID} = ?")
198
            values = [certificate.common_name,
199
                      certificate.valid_from,
200
                      certificate.valid_to,
201
                      certificate.pem_data,
202
                      certificate.private_key_id,
203
                      certificate.type_id,
204
                      certificate.parent_id,
205
                      certificate_id]
206
            self.cursor.execute(sql, values)
207
            self.connection.commit()
208

    
209
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
210
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
211
            values = [certificate_id]
212
            self.cursor.execute(sql, values)
213
            self.connection.commit()
214

    
215
            # iterate over usage pairs
216
            for usage_id, usage_value in certificate.usages.items():
217
                if usage_value:
218
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
219
                           f"({COL_CERTIFICATE_ID},"
220
                           f"{COL_USAGE_TYPE_ID}) "
221
                           f"VALUES (?,?)")
222
                    values = [certificate_id, usage_id]
223
                    self.cursor.execute(sql, values)
224
                    self.connection.commit()
225
        except Error as e:
226
            raise DatabaseException(e)
227

    
228
        return self.cursor.rowcount > 0
229

    
230
    def delete(self, certificate_id: int):
231
        """
232
        Deletes a certificate
233

    
234
        :param certificate_id: ID of specific certificate
235

    
236
        :return: the result of whether the deletion was successful
237
        """
238

    
239
        try:
240
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
241
                   f"WHERE {COL_ID} = ?")
242
            values = [certificate_id]
243
            self.cursor.execute(sql, values)
244
            self.connection.commit()
245
        except Error as e:
246
            raise DatabaseException(e)
247

    
248
        return self.cursor.rowcount > 0
249

    
250
    def set_certificate_revoked(
251
            self, certificate_id: int, revocation_date: str, revocation_reason: str = REV_REASON_UNSPECIFIED):
252
        """
253
        Revoke a certificate
254

    
255
        :param certificate_id: ID of specific certificate
256
        :param revocation_date: Date, when the certificate is revoked
257
        :param revocation_reason: Reason of the revocation
258

    
259
        :return:
260
            the result of whether the revocation was successful OR
261
            sqlite3.Error if an exception is thrown
262
        """
263

    
264
        try:
265
            if revocation_date != "" and revocation_reason == "":
266
                revocation_reason = REV_REASON_UNSPECIFIED
267
            elif revocation_date == "":
268
                return False
269

    
270
            sql = (f"UPDATE {TAB_CERTIFICATES} "
271
                   f"SET {COL_REVOCATION_DATE} = ?, "
272
                   f"{COL_REVOCATION_REASON} = ? "
273
                   f"WHERE {COL_ID} = ? AND ({COL_REVOCATION_DATE} IS NULL OR {COL_REVOCATION_DATE} = '')")
274
            values = [revocation_date,
275
                      revocation_reason,
276
                      certificate_id]
277
            self.cursor.execute(sql, values)
278
            self.connection.commit()
279
        except Error as e:
280
            raise DatabaseException(e)
281

    
282
        return self.cursor.rowcount > 0
283

    
284
    def clear_certificate_revocation(self, certificate_id: int):
285
        """
286
        Clear revocation of a certificate
287

    
288
        :param certificate_id: ID of specific certificate
289

    
290
        :return:
291
            the result of whether the clear revocation was successful OR
292
            sqlite3.Error if an exception is thrown
293
        """
294

    
295
        try:
296
            sql = (f"UPDATE {TAB_CERTIFICATES} "
297
                   f"SET {COL_REVOCATION_DATE} = '', "
298
                   f"{COL_REVOCATION_REASON} = '' "
299
                   f"WHERE {COL_ID} = ?")
300
            values = [certificate_id]
301
            self.cursor.execute(sql, values)
302
            self.connection.commit()
303
        except Error as e:
304
            raise DatabaseException(e)
305

    
306
        return self.cursor.rowcount > 0
307

    
308
    def get_all_revoked_by(self, certificate_id: int):
309
        """
310
        Get list of the revoked certificates that are direct descendants of the certificate with the ID
311

    
312
        :param certificate_id: ID of specific certificate
313

    
314
        :return:
315
            list of the certificates OR
316
            None if the list is empty OR
317
            sqlite3.Error if an exception is thrown
318
        """
319

    
320
        try:
321
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
322
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_REVOCATION_DATE} IS NOT NULL AND {COL_REVOCATION_DATE} != ''")
323
            values = [certificate_id]
324
            self.cursor.execute(sql, values)
325
            certificate_rows = self.cursor.fetchall()
326

    
327
            certificates: List[Certificate] = []
328
            for certificate_row in certificate_rows:
329
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
330
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
331
                values = [certificate_row[0]]
332
                self.cursor.execute(sql, values)
333
                usage_rows = self.cursor.fetchall()
334

    
335
                usage_dict: Dict[int, bool] = {}
336
                for usage_row in usage_rows:
337
                    usage_dict[usage_row[2]] = True
338

    
339
                certificates.append(Certificate(certificate_row[0],
340
                                                certificate_row[1],
341
                                                certificate_row[2],
342
                                                certificate_row[3],
343
                                                certificate_row[4],
344
                                                certificate_row[7],
345
                                                certificate_row[8],
346
                                                certificate_row[9],
347
                                                usage_dict,
348
                                                certificate_row[5],
349
                                                certificate_row[6]))
350
        except Error as e:
351
            raise DatabaseException(e)
352

    
353
        return certificates
354

    
355
    def get_all_issued_by(self, certificate_id: int):
356
        """
357
        Get list of the certificates that are direct descendants of the certificate with the ID
358

    
359
        :param certificate_id: ID of specific certificate
360

    
361
        :return:
362
            list of the certificates OR
363
            None if the list is empty OR
364
            sqlite3.Error if an exception is thrown
365
        """
366

    
367
        try:
368
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
369
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
370
            values = [certificate_id, certificate_id]
371
            self.cursor.execute(sql, values)
372
            certificate_rows = self.cursor.fetchall()
373

    
374
            certificates: List[Certificate] = []
375
            for certificate_row in certificate_rows:
376
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
377
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
378
                values = [certificate_row[0]]
379
                self.cursor.execute(sql, values)
380
                usage_rows = self.cursor.fetchall()
381

    
382
                usage_dict: Dict[int, bool] = {}
383
                for usage_row in usage_rows:
384
                    usage_dict[usage_row[2]] = True
385

    
386
                certificates.append(Certificate(certificate_row[0],
387
                                                certificate_row[1],
388
                                                certificate_row[2],
389
                                                certificate_row[3],
390
                                                certificate_row[4],
391
                                                certificate_row[7],
392
                                                certificate_row[8],
393
                                                certificate_row[9],
394
                                                usage_dict,
395
                                                certificate_row[5],
396
                                                certificate_row[6]))
397
        except Error as e:
398
            raise DatabaseException(e)
399

    
400
        return certificates
(2-2/3)