Revize a0602bad
Přidáno uživatelem David Friesecký před asi 4 roky(ů)
src/constants.py | ||
---|---|---|
1 | 1 |
DATABASE_FILE = "../../../../db/database_sqlite.db" |
2 | 2 |
|
3 |
# Types of certificates |
|
3 | 4 |
ROOT_CA_ID = 1 |
4 | 5 |
INTERMEDIATE_CA_ID = 2 |
5 | 6 |
CERTIFICATE_ID = 3 |
6 | 7 |
|
8 |
# Usage types of certificates |
|
7 | 9 |
CA_ID = 1 |
8 | 10 |
SSL_ID = 2 |
9 | 11 |
SIGNATURE_ID = 3 |
src/impl/certificate_repository_impl.py | ||
---|---|---|
10 | 10 |
|
11 | 11 |
def create(self, common_name: str, valid_from: str, valid_to: str, pem_data: str, |
12 | 12 |
private_key_id: int, type_id: int, parent_id: int, usages: Dict[int, bool]) -> bool: |
13 |
""" |
|
14 |
Creates a certificate |
|
15 |
|
|
16 |
:param common_name: private key of the certificate |
|
17 |
:param valid_from: string of date (time) of validation after (from) |
|
18 |
:param valid_to: string of date (time) of validation before (to) |
|
19 |
:param pem_data: data of certificate in a PEM format |
|
20 |
:param private_key_id: ID from PrivateKeys table for specific private key |
|
21 |
:param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3) |
|
22 |
:param parent_id: ID from Certificates table for specific parent certificate |
|
23 |
for root CA use e.g. -1 (ID will be changed after created certificate) |
|
24 |
:param usages: dictionary of usages IDs which certificate use |
|
25 |
Dictionary[Integer, Boolean] |
|
26 |
- Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4) |
|
27 |
- Value = used=True/unused=False |
|
28 |
|
|
29 |
:return: the result of whether the creation was successful |
|
30 |
""" |
|
31 |
|
|
13 | 32 |
sql = (f"INSERT INTO {TAB_CERTIFICATES} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA}," |
14 | 33 |
f"{COL_PRIVATE_KEY_ID},{COL_TYPE_ID},{COL_PARENT_ID})" |
15 | 34 |
f"VALUES(?,?,?,?,?,?,?)") |
... | ... | |
18 | 37 |
return result |
19 | 38 |
|
20 | 39 |
def read(self, certificate_id: int = None): |
40 |
""" |
|
41 |
Reads (selects) a certificate or certificates |
|
42 |
|
|
43 |
:param certificate_id: ID of specific certificate |
|
44 |
|
|
45 |
:return: instance of Certificate class if ID was used otherwise list of this instances |
|
46 |
""" |
|
47 |
|
|
21 | 48 |
sql_extend = "" |
22 | 49 |
|
23 | 50 |
if certificate_id is not None: |
... | ... | |
54 | 81 |
def update(self, certificate_id: int, common_name: str = None, valid_from: str = None, valid_to: str = None, |
55 | 82 |
pem_data: str = None, private_key_id: int = None, type_id: int = None, parent_id: int = None, |
56 | 83 |
usages: Dict[int, bool] = None) -> bool: |
84 |
""" |
|
85 |
Updates a certificate |
|
86 |
|
|
87 |
:param certificate_id: ID of specific certificate |
|
88 |
:param common_name: private key of the certificate |
|
89 |
:param valid_from: string of date (time) of validation after (from) |
|
90 |
:param valid_to: string of date (time) of validation before (to) |
|
91 |
:param pem_data: data of certificate in a PEM format |
|
92 |
:param private_key_id: ID from PrivateKeys table for specific private key |
|
93 |
:param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3) |
|
94 |
:param parent_id: ID from Certificates table for specific parent certificate |
|
95 |
for root CA use e.g. -1 (ID will be changed after created certificate) |
|
96 |
:param usages: dictionary of usages IDs which certificate use |
|
97 |
Dictionary[Integer, Boolean] |
|
98 |
- Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4) |
|
99 |
- Value = used=True/unused=False |
|
100 |
|
|
101 |
:return: the result of whether the updation was successful |
|
102 |
""" |
|
103 |
|
|
57 | 104 |
updated_list = [] |
58 | 105 |
values = [] |
59 | 106 |
if common_name is not None: |
... | ... | |
84 | 131 |
return result |
85 | 132 |
|
86 | 133 |
def delete(self, certificate_id: int) -> bool: |
134 |
""" |
|
135 |
Deletes a certificate |
|
136 |
|
|
137 |
:param certificate_id: ID of specific certificate |
|
138 |
|
|
139 |
:return: the result of whether the deletion was successful |
|
140 |
""" |
|
141 |
|
|
87 | 142 |
sql = f"DELETE FROM {TAB_CERTIFICATES} WHERE {COL_ID} = ?" |
88 | 143 |
result = DBManager.delete(sql, certificate_id, True) |
89 | 144 |
return result |
src/impl/db_manager.py | ||
---|---|---|
10 | 10 |
|
11 | 11 |
@staticmethod |
12 | 12 |
def create(sql: str, values: List, usages: Dict[int, bool] = None) -> bool: |
13 |
""" |
|
14 |
Creates an item in database |
|
15 |
|
|
16 |
:param sql: SQL query for creation (parameter values are replaced by question mark) |
|
17 |
:param values: list of parameter values |
|
18 |
:param usages: dictionary of usages IDs which certificate use |
|
19 |
Dictionary[Integer, Boolean] |
|
20 |
- Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4) |
|
21 |
- Value = used=True/unused=False |
|
22 |
|
|
23 |
:return: the result of whether the creation was successful |
|
24 |
""" |
|
25 |
|
|
13 | 26 |
conn = None |
14 | 27 |
try: |
15 | 28 |
conn = sqlite3.connect(DATABASE_FILE) |
... | ... | |
35 | 48 |
|
36 | 49 |
@staticmethod |
37 | 50 |
def read(sql: str, item_id: int = None): |
51 |
""" |
|
52 |
Reads (selecs) an item or items from database |
|
53 |
|
|
54 |
:param sql: SQL query for selection (parameter values are replaced by question mark) |
|
55 |
:param item_id: ID of specific item |
|
56 |
|
|
57 |
:return: list of rows selected from database |
|
58 |
""" |
|
59 |
|
|
38 | 60 |
conn = None |
39 | 61 |
try: |
40 | 62 |
conn = sqlite3.connect(DATABASE_FILE) |
... | ... | |
57 | 79 |
|
58 | 80 |
@staticmethod |
59 | 81 |
def update(sql: str, item_id: int, values: List, usages: Dict[int, bool] = None) -> bool: |
82 |
""" |
|
83 |
Updates an item in database |
|
84 |
|
|
85 |
:param sql: SQL query for creation (parameter values are replaced by question mark) |
|
86 |
:param item_id: ID of specific item |
|
87 |
:param values: list of parameter values |
|
88 |
:param usages: dictionary of usages IDs which certificate use |
|
89 |
Dictionary[Integer, Boolean] |
|
90 |
- Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4) |
|
91 |
- Value = used=True/unused=False |
|
92 |
|
|
93 |
:return: the result of whether the updation was successful |
|
94 |
""" |
|
95 |
|
|
60 | 96 |
conn = None |
61 | 97 |
try: |
62 | 98 |
values.append(item_id) |
... | ... | |
88 | 124 |
|
89 | 125 |
@staticmethod |
90 | 126 |
def delete(sql: str, item_id: int, delete_usages: bool) -> bool: |
127 |
""" |
|
128 |
Deletes an item from database |
|
129 |
|
|
130 |
:param sql: SQL query for selection (parameter values are replaced by question mark) |
|
131 |
:param item_id: ID of specific item |
|
132 |
:param delete_usages: flag specifying whether to erase rows with specific id |
|
133 |
from the table of usages (CertificateUsages) |
|
134 |
|
|
135 |
:return: the result of whether the deletion was successful |
|
136 |
""" |
|
137 |
|
|
91 | 138 |
conn = None |
92 | 139 |
try: |
93 | 140 |
conn = sqlite3.connect(DATABASE_FILE) |
src/impl/private_key_repository_impl.py | ||
---|---|---|
7 | 7 |
class PrivateKeyRepositoryImpl(IRepository): |
8 | 8 |
|
9 | 9 |
def create(self, private_key: str, password: str) -> bool: |
10 |
""" |
|
11 |
Creates a private key |
|
12 |
|
|
13 |
:param private_key: private key |
|
14 |
:param password: passphrase for encode private key |
|
15 |
|
|
16 |
:return: the result of whether the creation was successful |
|
17 |
""" |
|
18 |
|
|
10 | 19 |
sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES(?,?)" |
11 | 20 |
result = DBManager.create(sql, [private_key, password]) |
12 | 21 |
return result |
13 | 22 |
|
14 | 23 |
def read(self, private_key_id: int = None): |
24 |
""" |
|
25 |
Reads (selects) a private key |
|
26 |
|
|
27 |
:param private_key_id: ID of specific private |
|
28 |
|
|
29 |
:return: instance of PrivateKey class if ID was used otherwise list of this instances |
|
30 |
""" |
|
31 |
|
|
15 | 32 |
sql_extend = "" |
16 | 33 |
|
17 | 34 |
if private_key_id is not None: |
... | ... | |
31 | 48 |
return private_keys |
32 | 49 |
|
33 | 50 |
def update(self, private_key_id: int, private_key: str = None, password: str = None) -> bool: |
51 |
""" |
|
52 |
Updates a private key |
|
53 |
|
|
54 |
:param private_key_id: ID of specific private key |
|
55 |
:param private_key: private key |
|
56 |
:param password: passphrase for encode private key |
|
57 |
|
|
58 |
:return: the result of whether the updation was successful |
|
59 |
""" |
|
60 |
|
|
34 | 61 |
updated_list = [] |
35 | 62 |
values = [] |
36 | 63 |
if private_key is not None: |
... | ... | |
46 | 73 |
return result |
47 | 74 |
|
48 | 75 |
def delete(self, private_key_id: int) -> bool: |
76 |
""" |
|
77 |
Deletes a private key |
|
78 |
|
|
79 |
:param private_key_id: ID of specific private key |
|
80 |
|
|
81 |
:return: the result of whether the deletion was successful |
|
82 |
""" |
|
83 |
|
|
49 | 84 |
sql = f"DELETE FROM {TAB_PRIVATE_KEYS} WHERE {COL_ID} = ?" |
50 | 85 |
result = DBManager.delete(sql, private_key_id, False) |
51 | 86 |
return result |
Také k dispozici: Unified diff
Re #8471 - Added comments for CRUD functions