Projekt

Obecné

Profil

Stáhnout (8.67 KB) Statistiky
| Větev: | Tag: | Revize:
1
from sqlite3 import Connection, Error
2
from typing import Dict, List
3

    
4
from injector import inject
5

    
6
from src.constants import *
7
from src.model.certificate import Certificate
8

    
9

    
10
class CertificateRepository:
11

    
12
    @inject
13
    def __init__(self, connection: Connection):
14
        """
15
        Constructor of the CertificateRepository object
16

    
17
        :param connection: Instance of the Connection object
18
        """
19
        self.connection = connection
20
        self.cursor = connection.cursor()
21

    
22
    def create(self, certificate: Certificate):
23
        """
24
        Creates a certificate.
25
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
26

    
27
        :param certificate: Instance of the Certificate object
28

    
29
        :return: the result of whether the creation was successful
30
        """
31

    
32
        try:
33
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
34
                   f"({COL_COMMON_NAME},"
35
                   f"{COL_VALID_FROM},"
36
                   f"{COL_VALID_TO},"
37
                   f"{COL_PEM_DATA},"
38
                   f"{COL_PRIVATE_KEY_ID},"
39
                   f"{COL_TYPE_ID},"
40
                   f"{COL_PARENT_ID})"
41
                   f"VALUES(?,?,?,?,?,?,?)")
42
            values = [certificate.common_name,
43
                      certificate.valid_from,
44
                      certificate.valid_to,
45
                      certificate.pem_data,
46
                      certificate.private_key_id,
47
                      certificate.type_id,
48
                      certificate.parent_id]
49
            self.cursor.execute(sql, values)
50
            self.connection.commit()
51

    
52
            last_id: int = self.cursor.lastrowid
53

    
54
            # TODO assure that this is correct
55
            if certificate.type_id == ROOT_CA_ID:
56
                certificate.parent_id = last_id
57
                self.update(last_id, certificate)
58
            else:
59
                for usage_id, usage_value in certificate.usages.items():
60
                    if usage_value:
61
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
62
                               f"({COL_CERTIFICATE_ID},"
63
                               f"{COL_USAGE_TYPE_ID}) "
64
                               f"VALUES (?,?)")
65
                        values = [last_id, usage_id]
66
                        self.cursor.execute(sql, values)
67
                        self.connection.commit()
68
        except Error as e:
69
            print(e)
70
            return None
71

    
72
        return last_id
73

    
74
    def read(self, certificate_id: int):
75
        """
76
        Reads (selects) a certificate.
77

    
78
        :param certificate_id: ID of specific certificate
79

    
80
        :return: instance of the Certificate object
81
        """
82

    
83
        try:
84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
85
                   f"WHERE {COL_ID} = ?")
86
            values = [certificate_id]
87
            self.cursor.execute(sql, values)
88
            certificate_row = self.cursor.fetchone()
89

    
90
            if certificate_row is None:
91
                return None
92

    
93
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
94
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
95
            self.cursor.execute(sql, values)
96
            usage_rows = self.cursor.fetchall()
97

    
98
            usage_dict: Dict[int, bool] = {}
99
            for usage_row in usage_rows:
100
                usage_dict[usage_row[2]] = True
101

    
102
            certificate: Certificate = Certificate(certificate_row[0],
103
                                                   certificate_row[1],
104
                                                   certificate_row[2],
105
                                                   certificate_row[3],
106
                                                   certificate_row[4],
107
                                                   certificate_row[5],
108
                                                   certificate_row[6],
109
                                                   certificate_row[7],
110
                                                   usage_dict)
111
        except Error as e:
112
            print(e)
113
            return None
114

    
115
        if len(certificate_row) > 0:
116
            return certificate
117
        else:
118
            return None
119

    
120
    def read_all(self, filter_type: int = None) -> List[Certificate]:
121
        """
122
        Reads (selects) all certificates (with type).
123

    
124
        :param filter_type: ID of certificate type from CertificateTypes table
125

    
126
        :return: list of certificates
127
        """
128

    
129
        try:
130
            sql_extension = ""
131
            values = []
132
            if filter_type is not None:
133
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
134
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
135
                values = [filter_type]
136

    
137
            sql = (f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}")
138
            self.cursor.execute(sql, values)
139
            certificate_rows = self.cursor.fetchall()
140

    
141
            certificates: List[Certificate] = []
142
            for certificate_row in certificate_rows:
143
                sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
144
                       f"WHERE {COL_CERTIFICATE_ID} = ?")
145
                values = [certificate_row[0]]
146
                self.cursor.execute(sql, values)
147
                usage_rows = self.cursor.fetchall()
148

    
149
                usage_dict: Dict[int, bool] = {}
150
                for usage_row in usage_rows:
151
                    usage_dict[usage_row[2]] = True
152

    
153
                certificates.append(Certificate(certificate_row[0],
154
                                                certificate_row[1],
155
                                                certificate_row[2],
156
                                                certificate_row[3],
157
                                                certificate_row[4],
158
                                                certificate_row[5],
159
                                                certificate_row[6],
160
                                                certificate_row[7],
161
                                                usage_dict))
162
        except Error as e:
163
            print(e)
164
            return None
165

    
166
        return certificates
167

    
168
    def update(self, certificate_id: int, certificate: Certificate) -> bool:
169
        """
170
        Updates a certificate.
171
        If the parameter of certificate (Certificate object) is not to be changed,
172
        the same value must be specified.
173

    
174
        :param certificate_id: ID of specific certificate
175
        :param certificate: Instance of the Certificate object
176

    
177
        :return: the result of whether the updation was successful
178
        """
179

    
180
        try:
181
            sql = (f"UPDATE {TAB_CERTIFICATES} "
182
                   f"SET {COL_COMMON_NAME} = ?, "
183
                   f"{COL_VALID_FROM} = ?, "
184
                   f"{COL_VALID_TO} = ?, "
185
                   f"{COL_PEM_DATA} = ?, "
186
                   f"{COL_PRIVATE_KEY_ID} = ?, "
187
                   f"{COL_TYPE_ID} = ?, "
188
                   f"{COL_PARENT_ID} = ? "
189
                   f"WHERE {COL_ID} = ?")
190
            values = [certificate.common_name,
191
                      certificate.valid_from,
192
                      certificate.valid_to,
193
                      certificate.pem_data,
194
                      certificate.private_key_id,
195
                      certificate.type_id,
196
                      certificate.parent_id,
197
                      certificate_id]
198
            self.cursor.execute(sql, values)
199
            self.connection.commit()
200

    
201
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
202
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
203
            values = [certificate_id]
204
            self.cursor.execute(sql, values)
205
            self.connection.commit()
206

    
207
            # iterate over usage pairs
208
            for usage_id, usage_value in certificate.usages.items():
209
                if usage_value:
210
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
211
                           f"({COL_CERTIFICATE_ID},"
212
                           f"{COL_USAGE_TYPE_ID}) "
213
                           f"VALUES (?,?)")
214
                    values = [certificate_id, usage_id]
215
                    self.cursor.execute(sql, values)
216
                    self.connection.commit()
217
        except Error as e:
218
            print(e)
219
            return False
220

    
221
        return True
222

    
223
    def delete(self, certificate_id: int) -> bool:
224
        """
225
        Deletes a certificate
226

    
227
        :param certificate_id: ID of specific certificate
228

    
229
        :return: the result of whether the deletion was successful
230
        """
231

    
232
        try:
233
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
234
                   f"WHERE {COL_ID} = ?")
235
            values = [certificate_id]
236
            self.cursor.execute(sql, values)
237
            self.connection.commit()
238
        except Error as e:
239
            print(e)
240
            return False
241

    
242
        return self.cursor.rowcount > 0
(2-2/3)