Projekt

Obecné

Profil

Stáhnout (8.57 KB) Statistiky
| Větev: | Tag: | Revize:
1 9e22e20c David Friesecký
from typing import Dict, List
2 f8b23532 David Friesecký
from sqlite3 import Connection, Cursor, Error
3 1636aefe David Friesecký
4 f8b23532 David Friesecký
from src.db_objects.certificate import Certificate
5
from src.constants import *
6 e9e55282 David Friesecký
7
8 f8b23532 David Friesecký
class CertificateRepository:
9 25053504 David Friesecký
10 f8b23532 David Friesecký
    def __init__(self, connection: Connection, cursor: Cursor):
11 a0602bad David Friesecký
        """
12 f8b23532 David Friesecký
        Constructor of the CertificateRepository object
13 a0602bad David Friesecký
14 f8b23532 David Friesecký
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16 a0602bad David Friesecký
        """
17
18 f8b23532 David Friesecký
        self.connection = connection
19
        self.cursor = cursor
20 e9e55282 David Friesecký
21 805077f5 David Friesecký
    def create(self, certificate: Certificate):
22 a0602bad David Friesecký
        """
23 f8b23532 David Friesecký
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25 a0602bad David Friesecký
26 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
27 a0602bad David Friesecký
28 f8b23532 David Friesecký
        :return: the result of whether the creation was successful
29 a0602bad David Friesecký
        """
30
31 f8b23532 David Friesecký
        try:
32
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
33
                   f"({COL_COMMON_NAME},"
34
                   f"{COL_VALID_FROM},"
35
                   f"{COL_VALID_TO},"
36
                   f"{COL_PEM_DATA},"
37
                   f"{COL_PRIVATE_KEY_ID},"
38
                   f"{COL_TYPE_ID},"
39
                   f"{COL_PARENT_ID})"
40
                   f"VALUES(?,?,?,?,?,?,?)")
41
            values = [certificate.common_name,
42
                      certificate.valid_from,
43
                      certificate.valid_to,
44
                      certificate.pem_data,
45
                      certificate.private_key_id,
46
                      certificate.type_id,
47
                      certificate.parent_id]
48
            self.cursor.execute(sql, values)
49
            self.connection.commit()
50
51
            last_id: int = self.cursor.lastrowid
52
53
            if certificate.usages[ROOT_CA_ID - 1]:
54
                certificate.parent_id = last_id
55
                return self.update(last_id, certificate)
56
            else:
57
                for usage_id, usage_value in certificate.usages:
58
                    if usage_value:
59
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
60
                               f"({COL_CERTIFICATE_ID},"
61
                               f"{COL_USAGE_TYPE_ID}) "
62
                               f"VALUES (?,?)")
63
                        values = [last_id, usage_id]
64
                        self.cursor.execute(sql, values)
65
                        self.connection.commit()
66
        except Error as e:
67
            print(e)
68 805077f5 David Friesecký
            return None
69 f8b23532 David Friesecký
70 805077f5 David Friesecký
        return last_id
71 f8b23532 David Friesecký
72 805077f5 David Friesecký
    def read(self, certificate_id: int):
73 f8b23532 David Friesecký
        """
74
        Reads (selects) a certificate.
75 e9e55282 David Friesecký
76 f8b23532 David Friesecký
        :param certificate_id: ID of specific certificate
77 e9e55282 David Friesecký
78 f8b23532 David Friesecký
        :return: instance of the Certificate object
79
        """
80 e9e55282 David Friesecký
81 f8b23532 David Friesecký
        try:
82
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
83
                   f"WHERE {COL_ID} = ?")
84
            values = [certificate_id]
85
            self.cursor.execute(sql, values)
86
            certificate_row = self.cursor.fetchall()
87 e9e55282 David Friesecký
88 f8b23532 David Friesecký
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
89
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
90
            self.cursor.execute(sql, values)
91
            usage_rows = self.cursor.fetchall()
92 1636aefe David Friesecký
93
            usage_dict: Dict[int, bool] = {}
94
            for usage_row in usage_rows:
95 f8b23532 David Friesecký
                usage_dict[usage_row[2]] = True
96
97
            certificate: Certificate = Certificate(certificate_row[0],
98
                                                   certificate_row[1],
99
                                                   certificate_row[2],
100
                                                   certificate_row[3],
101
                                                   certificate_row[4],
102
                                                   certificate_row[5],
103
                                                   certificate_row[6],
104
                                                   certificate_row[7],
105
                                                   usage_dict)
106
        except Error as e:
107
            print(e)
108
            return None
109
110 805077f5 David Friesecký
        if len(certificate_row) > 0:
111
            return certificate
112
        else:
113
            return None
114 f8b23532 David Friesecký
115 9e22e20c David Friesecký
    def read_all(self, filter_type: int = None):
116
        """
117
        Reads (selects) all certificates (with type).
118
119
        :param filter_type: ID of certificate type from CertificateTypes table
120
121
        :return: list of certificates
122
        """
123
124
        try:
125
            sql_extension = ""
126
            values = []
127
            if filter_type is not None:
128
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
129
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?")
130
                values = [filter_type]
131
132
            sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}")
133
            self.cursor.execute(sql, values)
134
            certificate_rows = self.cursor.fetchall()
135
136
            certificates: List[Certificate] = []
137
            for certificate_row in certificate_rows:
138
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
139
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
140
                values = [certificate_row[0]]
141
                self.cursor.execute(sql, values)
142
                usage_rows = self.cursor.fetchall()
143
144
                usage_dict: Dict[int, bool] = {}
145
                for usage_row in usage_rows:
146
                    usage_dict[usage_row[2]] = True
147
148
                certificates.append(Certificate(certificate_row[0],
149
                                                certificate_row[1],
150
                                                certificate_row[2],
151
                                                certificate_row[3],
152
                                                certificate_row[4],
153
                                                certificate_row[5],
154
                                                certificate_row[6],
155
                                                certificate_row[7],
156
                                                usage_dict))
157
        except Error as e:
158
            print(e)
159
            return None
160
161
        if len(certificates) > 0:
162
            return certificates
163
        else:
164
            return None
165
166 f8b23532 David Friesecký
    def update(self, certificate_id: int, certificate: Certificate) -> bool:
167 a0602bad David Friesecký
        """
168 f8b23532 David Friesecký
        Updates a certificate.
169
        If the parameter of certificate (Certificate object) is not to be changed,
170
        the same value must be specified.
171 a0602bad David Friesecký
172
        :param certificate_id: ID of specific certificate
173 f8b23532 David Friesecký
        :param certificate: Instance of the Certificate object
174 a0602bad David Friesecký
175
        :return: the result of whether the updation was successful
176
        """
177
178 f8b23532 David Friesecký
        try:
179
            sql = (f"UPDATE {TAB_CERTIFICATES} "
180
                   f"SET {COL_COMMON_NAME} = ?, "
181
                   f"{COL_VALID_FROM} = ?, "
182
                   f"{COL_VALID_TO} = ?, "
183
                   f"{COL_PEM_DATA} = ?, "
184
                   f"{COL_PRIVATE_KEY_ID} = ?, "
185
                   f"{COL_TYPE_ID} = ?, "
186
                   f"{COL_PARENT_ID} = ? "
187
                   f"WHERE {COL_ID} = ?")
188
            values = [certificate.common_name,
189
                      certificate.valid_from,
190
                      certificate.valid_to,
191
                      certificate.pem_data,
192
                      certificate.private_key_id,
193
                      certificate.type_id,
194 805077f5 David Friesecký
                      certificate.parent_id,
195
                      certificate_id]
196 f8b23532 David Friesecký
            self.cursor.execute(sql, values)
197
            self.connection.commit()
198
199
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
200
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
201
            values = [certificate_id]
202
            self.cursor.execute(sql, values)
203
            self.connection.commit()
204
205
            for usage_id, usage_value in certificate.usages:
206
                if usage_value:
207
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
208
                           f"({COL_CERTIFICATE_ID},"
209
                           f"{COL_USAGE_TYPE_ID}) "
210
                           f"VALUES (?,?)")
211
                    values = [certificate_id, usage_id]
212
                    self.cursor.execute(sql, values)
213
                    self.connection.commit()
214
        except Error as e:
215
            print(e)
216
            return False
217
218
        return True
219 e9e55282 David Friesecký
220
    def delete(self, certificate_id: int) -> bool:
221 a0602bad David Friesecký
        """
222
        Deletes a certificate
223
224
        :param certificate_id: ID of specific certificate
225
226
        :return: the result of whether the deletion was successful
227
        """
228
229 f8b23532 David Friesecký
        try:
230
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
231
                   f"WHERE {COL_ID} = ?")
232
            values = [certificate_id]
233
            self.cursor.execute(sql, values)
234
            self.connection.commit()
235
        except Error as e:
236
            print(e)
237
            return False
238
239
        return True