Projekt

Obecné

Profil

Stáhnout (6.58 KB) Statistiky
| Větev: | Tag: | Revize:
1
from typing import Dict
2
from sqlite3 import Connection, Cursor, Error
3

    
4
from src.db_objects.certificate import Certificate
5
from src.constants import *
6

    
7

    
8
class CertificateRepository:
9

    
10
    def __init__(self, connection: Connection, cursor: Cursor):
11
        """
12
        Constructor of the CertificateRepository object
13

    
14
        :param connection: Instance of the Connection object
15
        :param cursor: Instance of the Cursor object
16
        """
17

    
18
        self.connection = connection
19
        self.cursor = cursor
20

    
21
    def create(self, certificate: Certificate):
22
        """
23
        Creates a certificate.
24
        For root certificate (CA) the parent certificate id is modified to the same id (id == parent_id).
25

    
26
        :param certificate: Instance of the Certificate object
27

    
28
        :return: the result of whether the creation was successful
29
        """
30

    
31
        try:
32
            sql = (f"INSERT INTO {TAB_CERTIFICATES} "
33
                   f"({COL_COMMON_NAME},"
34
                   f"{COL_VALID_FROM},"
35
                   f"{COL_VALID_TO},"
36
                   f"{COL_PEM_DATA},"
37
                   f"{COL_PRIVATE_KEY_ID},"
38
                   f"{COL_TYPE_ID},"
39
                   f"{COL_PARENT_ID})"
40
                   f"VALUES(?,?,?,?,?,?,?)")
41
            values = [certificate.common_name,
42
                      certificate.valid_from,
43
                      certificate.valid_to,
44
                      certificate.pem_data,
45
                      certificate.private_key_id,
46
                      certificate.type_id,
47
                      certificate.parent_id]
48
            self.cursor.execute(sql, values)
49
            self.connection.commit()
50

    
51
            last_id: int = self.cursor.lastrowid
52

    
53
            if certificate.usages[ROOT_CA_ID - 1]:
54
                certificate.parent_id = last_id
55
                return self.update(last_id, certificate)
56
            else:
57
                for usage_id, usage_value in certificate.usages:
58
                    if usage_value:
59
                        sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
60
                               f"({COL_CERTIFICATE_ID},"
61
                               f"{COL_USAGE_TYPE_ID}) "
62
                               f"VALUES (?,?)")
63
                        values = [last_id, usage_id]
64
                        self.cursor.execute(sql, values)
65
                        self.connection.commit()
66
        except Error as e:
67
            print(e)
68
            return None
69

    
70
        return last_id
71

    
72
    def read(self, certificate_id: int):
73
        """
74
        Reads (selects) a certificate.
75

    
76
        :param certificate_id: ID of specific certificate
77

    
78
        :return: instance of the Certificate object
79
        """
80

    
81
        try:
82
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
83
                   f"WHERE {COL_ID} = ?")
84
            values = [certificate_id]
85
            self.cursor.execute(sql, values)
86
            certificate_row = self.cursor.fetchall()
87

    
88
            sql = (f"SELECT * FROM {TAB_CERTIFICATE_USAGES} "
89
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
90
            self.cursor.execute(sql, values)
91
            usage_rows = self.cursor.fetchall()
92

    
93
            usage_dict: Dict[int, bool] = {}
94
            for usage_row in usage_rows:
95
                usage_dict[usage_row[2]] = True
96

    
97
            certificate: Certificate = Certificate(certificate_row[0],
98
                                                   certificate_row[1],
99
                                                   certificate_row[2],
100
                                                   certificate_row[3],
101
                                                   certificate_row[4],
102
                                                   certificate_row[5],
103
                                                   certificate_row[6],
104
                                                   certificate_row[7],
105
                                                   usage_dict)
106
        except Error as e:
107
            print(e)
108
            return None
109

    
110
        if len(certificate_row) > 0:
111
            return certificate
112
        else:
113
            return None
114

    
115
    def update(self, certificate_id: int, certificate: Certificate) -> bool:
116
        """
117
        Updates a certificate.
118
        If the parameter of certificate (Certificate object) is not to be changed,
119
        the same value must be specified.
120

    
121
        :param certificate_id: ID of specific certificate
122
        :param certificate: Instance of the Certificate object
123

    
124
        :return: the result of whether the updation was successful
125
        """
126

    
127
        try:
128
            sql = (f"UPDATE {TAB_CERTIFICATES} "
129
                   f"SET {COL_COMMON_NAME} = ?, "
130
                   f"{COL_VALID_FROM} = ?, "
131
                   f"{COL_VALID_TO} = ?, "
132
                   f"{COL_PEM_DATA} = ?, "
133
                   f"{COL_PRIVATE_KEY_ID} = ?, "
134
                   f"{COL_TYPE_ID} = ?, "
135
                   f"{COL_PARENT_ID} = ? "
136
                   f"WHERE {COL_ID} = ?")
137
            values = [certificate.common_name,
138
                      certificate.valid_from,
139
                      certificate.valid_to,
140
                      certificate.pem_data,
141
                      certificate.private_key_id,
142
                      certificate.type_id,
143
                      certificate.parent_id,
144
                      certificate_id]
145
            self.cursor.execute(sql, values)
146
            self.connection.commit()
147

    
148
            sql = (f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
149
                   f"WHERE {COL_CERTIFICATE_ID} = ?")
150
            values = [certificate_id]
151
            self.cursor.execute(sql, values)
152
            self.connection.commit()
153

    
154
            for usage_id, usage_value in certificate.usages:
155
                if usage_value:
156
                    sql = (f"INSERT INTO {TAB_CERTIFICATE_USAGES} "
157
                           f"({COL_CERTIFICATE_ID},"
158
                           f"{COL_USAGE_TYPE_ID}) "
159
                           f"VALUES (?,?)")
160
                    values = [certificate_id, usage_id]
161
                    self.cursor.execute(sql, values)
162
                    self.connection.commit()
163
        except Error as e:
164
            print(e)
165
            return False
166

    
167
        return True
168

    
169
    def delete(self, certificate_id: int) -> bool:
170
        """
171
        Deletes a certificate
172

    
173
        :param certificate_id: ID of specific certificate
174

    
175
        :return: the result of whether the deletion was successful
176
        """
177

    
178
        try:
179
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
180
                   f"WHERE {COL_ID} = ?")
181
            values = [certificate_id]
182
            self.cursor.execute(sql, values)
183
            self.connection.commit()
184
        except Error as e:
185
            print(e)
186
            return False
187

    
188
        return True
(1-1/2)