Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 6425fa36

Přidáno uživatelem David Friesecký před téměř 4 roky(ů)

Re #8670 - Modified deletion for historical storage of certificates

Zobrazit rozdíly:

SQLite_database.sql
1 1
/* ---------------------------------------------------- */
2 2
/*  Generated by Enterprise Architect Version 13.5 		*/
3
/*  Created On : 11-dub-2021 0:19:51 				*/
3
/*  Created On : 27-dub-2021 11:31:54 				*/
4 4
/*  DBMS       : SQLite 								*/
5 5
/* ---------------------------------------------------- */
6 6

  
7 7
/* Drop Tables */
8 8

  
9
DROP TABLE IF EXISTS 'PrivateKeys'
9
DROP TABLE IF EXISTS 'Certificates'
10 10
;
11 11

  
12 12
DROP TABLE IF EXISTS 'CertificateTypes'
13 13
;
14 14

  
15
DROP TABLE IF EXISTS 'UsageTypes'
16
;
17

  
18
DROP TABLE IF EXISTS 'Certificates'
19
;
20

  
21 15
DROP TABLE IF EXISTS 'CertificateUsages'
22 16
;
23 17

  
24
/* Create Tables with Primary and Foreign Keys, Check and Unique Constraints */
25

  
26
CREATE TABLE 'PrivateKeys'
27
(
28
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
29
	'private_key' TEXT NOT NULL,
30
	'password' TEXT NULL
31
)
18
DROP TABLE IF EXISTS 'PrivateKeys'
32 19
;
33 20

  
34
CREATE TABLE 'CertificateTypes'
35
(
36
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
37
	'certificate_type' TEXT NOT NULL
38
)
21
DROP TABLE IF EXISTS 'UsageTypes'
39 22
;
40 23

  
41
CREATE TABLE 'UsageTypes'
42
(
43
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
44
	'usage_type' TEXT NOT NULL
45
)
46
;
24
/* Create Tables with Primary and Foreign Keys, Check and Unique Constraints */
47 25

  
48 26
CREATE TABLE 'Certificates'
49 27
(
......
54 32
	'pem_data' TEXT NOT NULL,
55 33
	'revocation_date' TEXT NULL,
56 34
	'revocation_reason' TEXT NULL,
35
	'deletion_date' TEXT NULL,
57 36
	'private_key_id' INTEGER NOT NULL,
58 37
	'certificate_type_id' INTEGER NOT NULL,
59 38
	'parent_certificate_id' INTEGER NOT NULL,
......
63 42
)
64 43
;
65 44

  
45
CREATE TABLE 'CertificateTypes'
46
(
47
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
48
	'certificate_type' TEXT NOT NULL
49
)
50
;
51

  
66 52
CREATE TABLE 'CertificateUsages'
67 53
(
68 54
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
......
72 58
	CONSTRAINT 'FK_UsageTypes' FOREIGN KEY ('usage_type_id') REFERENCES 'UsageTypes' ('id') ON DELETE No Action ON UPDATE No Action
73 59
)
74 60
;
61

  
62
CREATE TABLE 'PrivateKeys'
63
(
64
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
65
	'private_key' TEXT NOT NULL,
66
	'password' TEXT NULL
67
)
68
;
69

  
70
CREATE TABLE 'UsageTypes'
71
(
72
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
73
	'usage_type' TEXT NOT NULL
74
)
75
;
src/constants.py
37 37
COL_PEM_DATA = "pem_data"
38 38
COL_REVOCATION_DATE = "revocation_date"
39 39
COL_REVOCATION_REASON = "revocation_reason"
40
COL_DELETION_DATE = "deletion_date"
40 41
COL_PRIVATE_KEY_ID = "private_key_id"
41 42
COL_TYPE_ID = "certificate_type_id"
42 43
COL_PARENT_ID = "parent_certificate_id"
src/dao/certificate_repository.py
1
import time
1 2
from sqlite3 import Connection, Error
2 3
from typing import Dict, List
3 4

  
......
81 82

  
82 83
        try:
83 84
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
84
                   f"WHERE {COL_ID} = ?")
85
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
85 86
            values = [certificate_id]
86 87
            self.cursor.execute(sql, values)
87 88
            certificate_row = self.cursor.fetchone()
......
103 104
                                                   certificate_row[2],
104 105
                                                   certificate_row[3],
105 106
                                                   certificate_row[4],
106
                                                   certificate_row[7],
107 107
                                                   certificate_row[8],
108 108
                                                   certificate_row[9],
109
                                                   certificate_row[10],
109 110
                                                   usage_dict,
110 111
                                                   certificate_row[5],
111 112
                                                   certificate_row[6])
......
127 128
            sql_extension = ""
128 129
            values = []
129 130
            if filter_type is not None:
130
                sql_extension = (f" WHERE {COL_TYPE_ID} = ("
131
                sql_extension = (f" AND {COL_TYPE_ID} = ("
131 132
                                 f"SELECT {COL_ID} FROM {TAB_CERTIFICATE_TYPES} WHERE {COL_ID} = ?)")
132 133
                values = [filter_type]
133 134

  
134
            sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extension}"
135
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
136
                   f"WHERE ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = ''){sql_extension}")
135 137
            self.cursor.execute(sql, values)
136 138
            certificate_rows = self.cursor.fetchall()
137 139

  
......
152 154
                                                certificate_row[2],
153 155
                                                certificate_row[3],
154 156
                                                certificate_row[4],
155
                                                certificate_row[7],
156 157
                                                certificate_row[8],
157 158
                                                certificate_row[9],
159
                                                certificate_row[10],
158 160
                                                usage_dict,
159 161
                                                certificate_row[5],
160 162
                                                certificate_row[6]))
......
193 195
                      certificate.type_id,
194 196
                      certificate.parent_id,
195 197
                      certificate_id]
198

  
199
            print(f"Parent: {certificate.parent_id}")
200

  
196 201
            self.cursor.execute(sql, values)
197 202
            self.connection.commit()
198 203

  
......
227 232
        """
228 233

  
229 234
        try:
230
            sql = (f"DELETE FROM {TAB_CERTIFICATES} "
231
                   f"WHERE {COL_ID} = ?")
232
            values = [certificate_id]
235
            sql = (f"UPDATE {TAB_CERTIFICATES} "
236
                   f"SET {COL_DELETION_DATE} = ? "
237
                   f"WHERE {COL_ID} = ? AND ({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
238
            values = [int(time.time()),
239
                      certificate_id]
233 240
            self.cursor.execute(sql, values)
234 241
            self.connection.commit()
235 242
        except Error as e:
......
331 338
                                                certificate_row[2],
332 339
                                                certificate_row[3],
333 340
                                                certificate_row[4],
334
                                                certificate_row[7],
335 341
                                                certificate_row[8],
336 342
                                                certificate_row[9],
343
                                                certificate_row[10],
337 344
                                                usage_dict,
338 345
                                                certificate_row[5],
339 346
                                                certificate_row[6]))
......
356 363

  
357 364
        try:
358 365
            sql = (f"SELECT * FROM {TAB_CERTIFICATES} "
359
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ?")
366
                   f"WHERE {COL_PARENT_ID} = ? AND {COL_ID} != ? AND "
367
                   f"({COL_DELETION_DATE} IS NULL OR {COL_DELETION_DATE} = '')")
360 368
            values = [certificate_id, certificate_id]
361 369
            self.cursor.execute(sql, values)
362 370
            certificate_rows = self.cursor.fetchall()
......
378 386
                                                certificate_row[2],
379 387
                                                certificate_row[3],
380 388
                                                certificate_row[4],
381
                                                certificate_row[7],
382 389
                                                certificate_row[8],
383 390
                                                certificate_row[9],
391
                                                certificate_row[10],
384 392
                                                usage_dict,
385 393
                                                certificate_row[5],
386 394
                                                certificate_row[6]))
src/db/init_queries.py
1 1
SCHEMA_SQL = """
2 2
/* ---------------------------------------------------- */
3 3
/*  Generated by Enterprise Architect Version 13.5 		*/
4
/*  Created On : 01-dub-2021 15:16:53 				*/
4
/*  Created On : 27-dub-2021 11:31:54 				*/
5 5
/*  DBMS       : SQLite 								*/
6 6
/* ---------------------------------------------------- */
7 7

  
8 8
/* Drop Tables */
9 9

  
10
DROP TABLE IF EXISTS 'PrivateKeys'
10
DROP TABLE IF EXISTS 'Certificates'
11 11
;
12 12

  
13 13
DROP TABLE IF EXISTS 'CertificateTypes'
14 14
;
15 15

  
16
DROP TABLE IF EXISTS 'UsageTypes'
17
;
18

  
19
DROP TABLE IF EXISTS 'Certificates'
20
;
21

  
22 16
DROP TABLE IF EXISTS 'CertificateUsages'
23 17
;
24 18

  
25
/* Create Tables with Primary and Foreign Keys, Check and Unique Constraints */
26

  
27
CREATE TABLE 'PrivateKeys'
28
(
29
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
30
	'private_key' TEXT NOT NULL,
31
	'password' TEXT NULL
32
)
19
DROP TABLE IF EXISTS 'PrivateKeys'
33 20
;
34 21

  
35
CREATE TABLE 'CertificateTypes'
36
(
37
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
38
	'certificate_type' TEXT NOT NULL
39
)
22
DROP TABLE IF EXISTS 'UsageTypes'
40 23
;
41 24

  
42
CREATE TABLE 'UsageTypes'
43
(
44
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
45
	'usage_type' TEXT NOT NULL
46
)
47
;
25
/* Create Tables with Primary and Foreign Keys, Check and Unique Constraints */
48 26

  
49 27
CREATE TABLE 'Certificates'
50 28
(
......
55 33
	'pem_data' TEXT NOT NULL,
56 34
	'revocation_date' TEXT NULL,
57 35
	'revocation_reason' TEXT NULL,
36
	'deletion_date' TEXT NULL,
58 37
	'private_key_id' INTEGER NOT NULL,
59 38
	'certificate_type_id' INTEGER NOT NULL,
60 39
	'parent_certificate_id' INTEGER NOT NULL,
......
64 43
)
65 44
;
66 45

  
46
CREATE TABLE 'CertificateTypes'
47
(
48
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
49
	'certificate_type' TEXT NOT NULL
50
)
51
;
52

  
67 53
CREATE TABLE 'CertificateUsages'
68 54
(
69 55
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
......
74 60
)
75 61
;
76 62

  
63
CREATE TABLE 'PrivateKeys'
64
(
65
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
66
	'private_key' TEXT NOT NULL,
67
	'password' TEXT NULL
68
)
69
;
70

  
71
CREATE TABLE 'UsageTypes'
72
(
73
	'id' INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
74
	'usage_type' TEXT NOT NULL
75
)
76
;
77

  
77 78
"""
78 79

  
79 80
DEFAULT_VALUES_SQL = """
tests/integration_tests/dao/certificate_repository_test.py
138 138
    assert len(certificates) == 2
139 139
    assert certificates[0].certificate_id == child_1_id and certificates[1].certificate_id == child_3_id
140 140

  
141
    certificate_repository.delete(certificate_id=sec_tree_child_1)
142

  
143
    certificates = certificate_repository.get_all_revoked_by(certificate_id=certificate_id)
144

  
145
    assert len(certificates) == 2
146
    assert certificates[0].certificate_id == child_1_id and certificates[1].certificate_id == child_3_id
141 147

  
142 148
def test_get_all_issued_by(certificate_repository, private_key_repository_unique):
143 149
    private_key = PrivateKey(private_key_id=-1, private_key="test_pk", password="test_password")
......
191 197
    certificates = certificate_repository.get_all_issued_by(certificate_id=certificate_id)
192 198

  
193 199
    assert len(certificates) == 3
194
    assert certificates[0].certificate_id == child_1_id and \
195
           certificates[1].certificate_id == child_2_id and \
196
           certificates[2].certificate_id == child_3_id
200
    assert certificates[0].certificate_id == child_1_id
201
    assert certificates[1].certificate_id == child_2_id
202
    assert certificates[2].certificate_id == child_3_id
203

  
204
    certificate_repository.delete(certificate_id=child_1_id)
205

  
206
    certificates = certificate_repository.get_all_issued_by(certificate_id=certificate_id)
207

  
208
    assert len(certificates) == 2
209
    assert certificates[0].certificate_id == child_2_id
210
    assert certificates[1].certificate_id == child_3_id
211

  

Také k dispozici: Unified diff