Projekt

Obecné

Profil

« Předchozí | Další » 

Revize c7214bba

Přidáno uživatelem David Friesecký před asi 4 roky(ů)

Re #8471 - Modified source
- deleted redundat files
- deleted '_impl' from repositories filename

Zobrazit rozdíly:

src/dao/certificate_repository.py
1
from typing import List, Dict
2

  
3
from ..db_objects.certificate import Certificate
4
from ..dao.repository import IRepository
5
from db_manager import DBManager
6
from ..constants import *
7

  
8

  
9
class CertificateRepositoryImpl(IRepository):
10

  
11
    def create(self, common_name: str, valid_from: str, valid_to: str, pem_data: str,
12
               private_key_id: int, type_id: int, parent_id: int, usages: Dict[int, bool]) -> bool:
13
        """
14
        Creates a certificate
15

  
16
        :param common_name: private key of the certificate
17
        :param valid_from: string of date (time) of validation after (from)
18
        :param valid_to: string of date (time) of validation before (to)
19
        :param pem_data: data of certificate in a PEM format
20
        :param private_key_id: ID from PrivateKeys table for specific private key
21
        :param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3)
22
        :param parent_id: ID from Certificates table for specific parent certificate
23
            for root CA use e.g. -1 (ID will be changed after created certificate)
24
        :param usages: dictionary of usages IDs which certificate use
25
            Dictionary[Integer, Boolean]
26
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
27
                - Value = used=True/unused=False
28

  
29
        :return: the result of whether the creation was successful
30
        """
31

  
32
        sql = (f"INSERT INTO {TAB_CERTIFICATES} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA},"
33
               f"{COL_PRIVATE_KEY_ID},{COL_TYPE_ID},{COL_PARENT_ID})"
34
               f"VALUES(?,?,?,?,?,?,?)")
35
        result = DBManager.create(sql, [common_name, valid_from, valid_to, pem_data,
36
                                        private_key_id, type_id, parent_id], usages)
37
        return result
38

  
39
    def read(self, certificate_id: int = None):
40
        """
41
        Reads (selects) a certificate or certificates
42

  
43
        :param certificate_id: ID of specific certificate
44

  
45
        :return: instance of Certificate class if ID was used otherwise list of this instances
46
        """
47

  
48
        sql_extend = ""
49

  
50
        if certificate_id is not None:
51
            sql_extend = f" WHERE {COL_ID} = ?"
52

  
53
        sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extend}"
54
        certificate_rows = DBManager.read(sql, certificate_id)
55

  
56
        certificates: list = []
57

  
58
        for certificate_row in certificate_rows:
59
            sql = f"SELECT * FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = ?"
60
            usage_rows = DBManager.read(sql, certificate_row[0])
61

  
62
            usage_dict: Dict[int, bool] = {}
63
            for usage_row in usage_rows:
64
                usage_dict[usage_row[1]] = True
65

  
66
            certificates.append(Certificate(certificate_row[0],
67
                                            certificate_row[1],
68
                                            certificate_row[2],
69
                                            certificate_row[3],
70
                                            certificate_row[4],
71
                                            certificate_row[5],
72
                                            certificate_row[6],
73
                                            certificate_row[7],
74
                                            usage_dict))
75

  
76
        if certificate_id is not None:
77
            return certificates[0]
78

  
79
        return certificates
80

  
81
    def update(self, certificate_id: int, common_name: str = None, valid_from: str = None, valid_to: str = None,
82
               pem_data: str = None, private_key_id: int = None, type_id: int = None, parent_id: int = None,
83
               usages: Dict[int, bool] = None) -> bool:
84
        """
85
        Updates a certificate
86

  
87
        :param certificate_id: ID of specific certificate
88
        :param common_name: private key of the certificate
89
        :param valid_from: string of date (time) of validation after (from)
90
        :param valid_to: string of date (time) of validation before (to)
91
        :param pem_data: data of certificate in a PEM format
92
        :param private_key_id: ID from PrivateKeys table for specific private key
93
        :param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3)
94
        :param parent_id: ID from Certificates table for specific parent certificate
95
            for root CA use e.g. -1 (ID will be changed after created certificate)
96
        :param usages: dictionary of usages IDs which certificate use
97
            Dictionary[Integer, Boolean]
98
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
99
                - Value = used=True/unused=False
100

  
101
        :return: the result of whether the updation was successful
102
        """
103

  
104
        updated_list = []
105
        values = []
106
        if common_name is not None:
107
            updated_list.append(f"{COL_COMMON_NAME} = ?")
108
            values.append(common_name)
109
        if valid_from is not None:
110
            updated_list.append(f"{COL_VALID_FROM} = ?")
111
            values.append(valid_from)
112
        if valid_to is not None:
113
            updated_list.append(f"{COL_VALID_TO} = ?")
114
            values.append(valid_to)
115
        if pem_data is not None:
116
            updated_list.append(f"{COL_PEM_DATA} = ?")
117
            values.append(pem_data)
118
        if private_key_id is not None:
119
            updated_list.append(f"{COL_PRIVATE_KEY_ID} = ?")
120
            values.append(private_key_id)
121
        if type_id is not None:
122
            updated_list.append(f"{COL_TYPE_ID} = ?")
123
            values.append(type_id)
124
        if parent_id is not None:
125
            updated_list.append(f"{COL_PARENT_ID} = ?")
126
            values.append(parent_id)
127

  
128
        updated_str = ", ".join(updated_list)
129
        sql = f"UPDATE {TAB_CERTIFICATES} SET {updated_str} WHERE {COL_ID} = ?"
130
        result = DBManager.update(sql, certificate_id, values, usages)
131
        return result
132

  
133
    def delete(self, certificate_id: int) -> bool:
134
        """
135
        Deletes a certificate
136

  
137
        :param certificate_id: ID of specific certificate
138

  
139
        :return: the result of whether the deletion was successful
140
        """
141

  
142
        sql = f"DELETE FROM {TAB_CERTIFICATES} WHERE {COL_ID} = ?"
143
        result = DBManager.delete(sql, certificate_id, True)
144
        return result
src/dao/private_key_repository.py
1
from ..db_objects.private_key import PrivateKey
2
from ..dao.repository import IRepository
3
from db_manager import DBManager
4
from ..constants import *
5

  
6

  
7
class PrivateKeyRepositoryImpl(IRepository):
8

  
9
    def create(self, private_key: str, password: str) -> bool:
10
        """
11
        Creates a private key
12

  
13
        :param private_key: private key
14
        :param password: passphrase for encode private key
15

  
16
        :return: the result of whether the creation was successful
17
        """
18

  
19
        sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES(?,?)"
20
        result = DBManager.create(sql, [private_key, password])
21
        return result
22

  
23
    def read(self, private_key_id: int = None):
24
        """
25
        Reads (selects) a private key
26

  
27
        :param private_key_id: ID of specific private
28

  
29
        :return: instance of PrivateKey class if ID was used otherwise list of this instances
30
        """
31

  
32
        sql_extend = ""
33

  
34
        if private_key_id is not None:
35
            sql_extend = f" WHERE {COL_ID} = ?"
36

  
37
        sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}{sql_extend}"
38
        rows = DBManager.read(sql, private_key_id)
39

  
40
        private_keys: list = []
41

  
42
        for row in rows:
43
            private_keys.append(PrivateKey(row))
44

  
45
        if private_key_id is not None:
46
            return private_keys[0]
47

  
48
        return private_keys
49

  
50
    def update(self, private_key_id: int, private_key: str = None, password: str = None) -> bool:
51
        """
52
        Updates a private key
53

  
54
        :param private_key_id: ID of specific private key
55
        :param private_key: private key
56
        :param password: passphrase for encode private key
57

  
58
        :return: the result of whether the updation was successful
59
        """
60

  
61
        updated_list = []
62
        values = []
63
        if private_key is not None:
64
            updated_list.append(f"{COL_PRIVATE_KEY} = ?")
65
            values.append(private_key)
66
        if password is not None:
67
            updated_list.append(f"{COL_PASSWORD} = ?")
68
            values.append(password)
69

  
70
        updated_str = ", ".join(updated_list)
71
        sql = f"UPDATE {TAB_PRIVATE_KEYS} SET {updated_str} WHERE {COL_ID} = ?"
72
        result = DBManager.update(sql, private_key_id, values)
73
        return result
74

  
75
    def delete(self, private_key_id: int) -> bool:
76
        """
77
        Deletes a private key
78

  
79
        :param private_key_id: ID of specific private key
80

  
81
        :return: the result of whether the deletion was successful
82
        """
83

  
84
        sql = f"DELETE FROM {TAB_PRIVATE_KEYS} WHERE {COL_ID} = ?"
85
        result = DBManager.delete(sql, private_key_id, False)
86
        return result
src/dao/repository.py
1
class IRepository:
2
    def create(self, *args) -> bool:
3
        pass
4

  
5
    def read(self, *args):
6
        pass
7

  
8
    def update(self, *args) -> bool:
9
        pass
10

  
11
    def delete(self, *args) -> bool:
12
        pass
src/impl/certificate_repository_impl.py
1
from typing import List, Dict
2

  
3
from ..db_objects.certificate import Certificate
4
from ..dao.repository import IRepository
5
from db_manager import DBManager
6
from ..constants import *
7

  
8

  
9
class CertificateRepositoryImpl(IRepository):
10

  
11
    def create(self, common_name: str, valid_from: str, valid_to: str, pem_data: str,
12
               private_key_id: int, type_id: int, parent_id: int, usages: Dict[int, bool]) -> bool:
13
        """
14
        Creates a certificate
15

  
16
        :param common_name: private key of the certificate
17
        :param valid_from: string of date (time) of validation after (from)
18
        :param valid_to: string of date (time) of validation before (to)
19
        :param pem_data: data of certificate in a PEM format
20
        :param private_key_id: ID from PrivateKeys table for specific private key
21
        :param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3)
22
        :param parent_id: ID from Certificates table for specific parent certificate
23
            for root CA use e.g. -1 (ID will be changed after created certificate)
24
        :param usages: dictionary of usages IDs which certificate use
25
            Dictionary[Integer, Boolean]
26
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
27
                - Value = used=True/unused=False
28

  
29
        :return: the result of whether the creation was successful
30
        """
31

  
32
        sql = (f"INSERT INTO {TAB_CERTIFICATES} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA},"
33
               f"{COL_PRIVATE_KEY_ID},{COL_TYPE_ID},{COL_PARENT_ID})"
34
               f"VALUES(?,?,?,?,?,?,?)")
35
        result = DBManager.create(sql, [common_name, valid_from, valid_to, pem_data,
36
                                        private_key_id, type_id, parent_id], usages)
37
        return result
38

  
39
    def read(self, certificate_id: int = None):
40
        """
41
        Reads (selects) a certificate or certificates
42

  
43
        :param certificate_id: ID of specific certificate
44

  
45
        :return: instance of Certificate class if ID was used otherwise list of this instances
46
        """
47

  
48
        sql_extend = ""
49

  
50
        if certificate_id is not None:
51
            sql_extend = f" WHERE {COL_ID} = ?"
52

  
53
        sql = f"SELECT * FROM {TAB_CERTIFICATES}{sql_extend}"
54
        certificate_rows = DBManager.read(sql, certificate_id)
55

  
56
        certificates: list = []
57

  
58
        for certificate_row in certificate_rows:
59
            sql = f"SELECT * FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = ?"
60
            usage_rows = DBManager.read(sql, certificate_row[0])
61

  
62
            usage_dict: Dict[int, bool] = {}
63
            for usage_row in usage_rows:
64
                usage_dict[usage_row[1]] = True
65

  
66
            certificates.append(Certificate(certificate_row[0],
67
                                            certificate_row[1],
68
                                            certificate_row[2],
69
                                            certificate_row[3],
70
                                            certificate_row[4],
71
                                            certificate_row[5],
72
                                            certificate_row[6],
73
                                            certificate_row[7],
74
                                            usage_dict))
75

  
76
        if certificate_id is not None:
77
            return certificates[0]
78

  
79
        return certificates
80

  
81
    def update(self, certificate_id: int, common_name: str = None, valid_from: str = None, valid_to: str = None,
82
               pem_data: str = None, private_key_id: int = None, type_id: int = None, parent_id: int = None,
83
               usages: Dict[int, bool] = None) -> bool:
84
        """
85
        Updates a certificate
86

  
87
        :param certificate_id: ID of specific certificate
88
        :param common_name: private key of the certificate
89
        :param valid_from: string of date (time) of validation after (from)
90
        :param valid_to: string of date (time) of validation before (to)
91
        :param pem_data: data of certificate in a PEM format
92
        :param private_key_id: ID from PrivateKeys table for specific private key
93
        :param type_id: ID from CertificateTypes table (ROOT_CA_ID=1, INTERMEDIATE_CA_ID=2, CERTIFICATE_ID=3)
94
        :param parent_id: ID from Certificates table for specific parent certificate
95
            for root CA use e.g. -1 (ID will be changed after created certificate)
96
        :param usages: dictionary of usages IDs which certificate use
97
            Dictionary[Integer, Boolean]
98
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
99
                - Value = used=True/unused=False
100

  
101
        :return: the result of whether the updation was successful
102
        """
103

  
104
        updated_list = []
105
        values = []
106
        if common_name is not None:
107
            updated_list.append(f"{COL_COMMON_NAME} = ?")
108
            values.append(common_name)
109
        if valid_from is not None:
110
            updated_list.append(f"{COL_VALID_FROM} = ?")
111
            values.append(valid_from)
112
        if valid_to is not None:
113
            updated_list.append(f"{COL_VALID_TO} = ?")
114
            values.append(valid_to)
115
        if pem_data is not None:
116
            updated_list.append(f"{COL_PEM_DATA} = ?")
117
            values.append(pem_data)
118
        if private_key_id is not None:
119
            updated_list.append(f"{COL_PRIVATE_KEY_ID} = ?")
120
            values.append(private_key_id)
121
        if type_id is not None:
122
            updated_list.append(f"{COL_TYPE_ID} = ?")
123
            values.append(type_id)
124
        if parent_id is not None:
125
            updated_list.append(f"{COL_PARENT_ID} = ?")
126
            values.append(parent_id)
127

  
128
        updated_str = ", ".join(updated_list)
129
        sql = f"UPDATE {TAB_CERTIFICATES} SET {updated_str} WHERE {COL_ID} = ?"
130
        result = DBManager.update(sql, certificate_id, values, usages)
131
        return result
132

  
133
    def delete(self, certificate_id: int) -> bool:
134
        """
135
        Deletes a certificate
136

  
137
        :param certificate_id: ID of specific certificate
138

  
139
        :return: the result of whether the deletion was successful
140
        """
141

  
142
        sql = f"DELETE FROM {TAB_CERTIFICATES} WHERE {COL_ID} = ?"
143
        result = DBManager.delete(sql, certificate_id, True)
144
        return result
src/impl/db_manager.py
1
import sqlite3
2
from sqlite3 import Error
3
from typing import List, Dict
4

  
5
from ..constants import *
6
from ..dao.repository import IRepository
7

  
8

  
9
class DBManager(IRepository):
10

  
11
    @staticmethod
12
    def create(sql: str, values: List, usages: Dict[int, bool] = None) -> bool:
13
        """
14
        Creates an item in database
15

  
16
        :param sql: SQL query for creation (parameter values are replaced by question mark)
17
        :param values: list of parameter values
18
        :param usages: dictionary of usages IDs which certificate use
19
            Dictionary[Integer, Boolean]
20
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
21
                - Value = used=True/unused=False
22

  
23
        :return: the result of whether the creation was successful
24
        """
25

  
26
        conn = None
27
        try:
28
            conn = sqlite3.connect(DATABASE_FILE)
29
            c = conn.cursor()
30
            c.execute(sql, values)
31

  
32
            if usages is not None:
33
                for usage_id, usage_value in usages:
34
                    if usage_value:
35
                        c.execute(f"INSERT INTO {TAB_CERTIFICATE_USAGES} ({COL_CERTIFICATE_ID},{COL_USAGE_TYPE_ID}) "
36
                                  f"VALUES (?,?)",
37
                                  [c.lastrowid, usage_id])
38

  
39
            conn.commit()
40
            c.close()
41
        except Error as e:
42
            print(e)
43
            return False
44
        finally:
45
            if conn:
46
                conn.close()
47
        return True
48

  
49
    @staticmethod
50
    def read(sql: str, item_id: int = None):
51
        """
52
        Reads (selecs) an item or items from database
53

  
54
        :param sql: SQL query for selection (parameter values are replaced by question mark)
55
        :param item_id: ID of specific item
56

  
57
        :return: list of rows selected from database
58
        """
59

  
60
        conn = None
61
        try:
62
            conn = sqlite3.connect(DATABASE_FILE)
63
            c = conn.cursor()
64

  
65
            if item_id is not None:
66
                c.execute(sql, [item_id])
67
            else:
68
                c.execute(sql)
69

  
70
            records = c.fetchall()
71
            c.close()
72
        except Error as e:
73
            print(e)
74
            return []
75
        finally:
76
            if conn:
77
                conn.close()
78
        return records
79

  
80
    @staticmethod
81
    def update(sql: str, item_id: int, values: List, usages: Dict[int, bool] = None) -> bool:
82
        """
83
        Updates an item in database
84

  
85
        :param sql: SQL query for creation (parameter values are replaced by question mark)
86
        :param item_id: ID of specific item
87
        :param values: list of parameter values
88
        :param usages: dictionary of usages IDs which certificate use
89
            Dictionary[Integer, Boolean]
90
                - Key = ID (CA_ID=1, SSL_ID=2, SIGNATURE_ID=3, AUTHENTICATION_ID=4)
91
                - Value = used=True/unused=False
92

  
93
        :return: the result of whether the updation was successful
94
        """
95

  
96
        conn = None
97
        try:
98
            values.append(item_id)
99

  
100
            conn = sqlite3.connect(DATABASE_FILE)
101
            c = conn.cursor()
102
            c.execute(sql, values)
103

  
104
            if usages is not None:
105
                for usage_id, usage_value in usages:
106
                    c.execute(f"DELETE FROM {TAB_CERTIFICATE_USAGES} "
107
                              f"WHERE {COL_CERTIFICATE_ID} = ? AND {COL_USAGE_TYPE_ID} = ?",
108
                              [item_id, usage_id])
109

  
110
                    if usage_value:
111
                        c.execute(f"INSERT INTO {TAB_CERTIFICATE_USAGES} ({COL_CERTIFICATE_ID},{COL_USAGE_TYPE_ID}) "
112
                                  f"VALUES (?,?)",
113
                                  [item_id, usage_id])
114

  
115
            conn.commit()
116
            c.close()
117
        except Error as e:
118
            print(e)
119
            return False
120
        finally:
121
            if conn:
122
                conn.close()
123
        return True
124

  
125
    @staticmethod
126
    def delete(sql: str, item_id: int, delete_usages: bool) -> bool:
127
        """
128
        Deletes an item from database
129

  
130
        :param sql: SQL query for selection (parameter values are replaced by question mark)
131
        :param item_id: ID of specific item
132
        :param delete_usages: flag specifying whether to erase rows with specific id
133
            from the table of usages (CertificateUsages)
134

  
135
        :return: the result of whether the deletion was successful
136
        """
137

  
138
        conn = None
139
        try:
140
            conn = sqlite3.connect(DATABASE_FILE)
141
            c = conn.cursor()
142

  
143
            if delete_usages:
144
                c.execute(f"DELETE FROM {TAB_CERTIFICATE_USAGES} WHERE {COL_CERTIFICATE_ID} = ?", [item_id])
145

  
146
            c.execute(sql, [item_id])
147
            conn.commit()
148
            c.close()
149
        except Error as e:
150
            print(e)
151
            return False
152
        finally:
153
            if conn:
154
                conn.close()
155
        return True
src/impl/private_key_repository_impl.py
1
from ..db_objects.private_key import PrivateKey
2
from ..dao.repository import IRepository
3
from db_manager import DBManager
4
from ..constants import *
5

  
6

  
7
class PrivateKeyRepositoryImpl(IRepository):
8

  
9
    def create(self, private_key: str, password: str) -> bool:
10
        """
11
        Creates a private key
12

  
13
        :param private_key: private key
14
        :param password: passphrase for encode private key
15

  
16
        :return: the result of whether the creation was successful
17
        """
18

  
19
        sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES(?,?)"
20
        result = DBManager.create(sql, [private_key, password])
21
        return result
22

  
23
    def read(self, private_key_id: int = None):
24
        """
25
        Reads (selects) a private key
26

  
27
        :param private_key_id: ID of specific private
28

  
29
        :return: instance of PrivateKey class if ID was used otherwise list of this instances
30
        """
31

  
32
        sql_extend = ""
33

  
34
        if private_key_id is not None:
35
            sql_extend = f" WHERE {COL_ID} = ?"
36

  
37
        sql = f"SELECT * FROM {TAB_PRIVATE_KEYS}{sql_extend}"
38
        rows = DBManager.read(sql, private_key_id)
39

  
40
        private_keys: list = []
41

  
42
        for row in rows:
43
            private_keys.append(PrivateKey(row))
44

  
45
        if private_key_id is not None:
46
            return private_keys[0]
47

  
48
        return private_keys
49

  
50
    def update(self, private_key_id: int, private_key: str = None, password: str = None) -> bool:
51
        """
52
        Updates a private key
53

  
54
        :param private_key_id: ID of specific private key
55
        :param private_key: private key
56
        :param password: passphrase for encode private key
57

  
58
        :return: the result of whether the updation was successful
59
        """
60

  
61
        updated_list = []
62
        values = []
63
        if private_key is not None:
64
            updated_list.append(f"{COL_PRIVATE_KEY} = ?")
65
            values.append(private_key)
66
        if password is not None:
67
            updated_list.append(f"{COL_PASSWORD} = ?")
68
            values.append(password)
69

  
70
        updated_str = ", ".join(updated_list)
71
        sql = f"UPDATE {TAB_PRIVATE_KEYS} SET {updated_str} WHERE {COL_ID} = ?"
72
        result = DBManager.update(sql, private_key_id, values)
73
        return result
74

  
75
    def delete(self, private_key_id: int) -> bool:
76
        """
77
        Deletes a private key
78

  
79
        :param private_key_id: ID of specific private key
80

  
81
        :return: the result of whether the deletion was successful
82
        """
83

  
84
        sql = f"DELETE FROM {TAB_PRIVATE_KEYS} WHERE {COL_ID} = ?"
85
        result = DBManager.delete(sql, private_key_id, False)
86
        return result
src/tests/impl/db_manager/conftest.py
1
from src.impl.db_manager import DBManager
2
import pytest
3

  
4

  
5
@pytest.fixture
6
def repository():
7
    return DBManager()
src/tests/impl/db_manager/transaction_certificates.py
1
import pytest
2
from src.constants import *
3

  
4

  
5
def test_create_first(repository):
6
    sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES (?,?)"
7
    repository.create(sql, ["test_pk_1", "test_pwd_1"])
8

  
9
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
10
    rows_private_keys = repository.read(sql)
11

  
12
    sql = f"INSERT INTO {TAB_CERTIFICATES} ({COL_COMMON_NAME},{COL_VALID_FROM},{COL_VALID_TO},{COL_PEM_DATA}," \
13
          f"{COL_PRIVATE_KEY_ID},{COL_TYPE_ID},{COL_PARENT_ID}) VALUES (?,?,?,?,?,?,?)"
14
    check_create = repository.create(sql, ["test_common_name", "test_valid_from", "test_valid_to", "test_pem_data",
15
                                           rows_private_keys[0][1], CA_ID, -1])
16

  
17
    sql = f"SELECT * FROM {TAB_CERTIFICATES} WHERE {COL_COMMON_NAME} LIKE 'test%'"
18
    rows_select = repository.read(sql)
19

  
20
    assert check_create
21
    assert rows_select[0][1] == "test_common_name" \
22
           and rows_select[0][2] == "test_valid_from" \
23
           and rows_select[0][3] == "test_valid_to" \
24
           and rows_select[0][4] == "test_pem_data" \
25
           and rows_select[0][5] == rows_private_keys[0][1] \
26
           and rows_select[0][6] == ROOT_CA_ID \
27
           and rows_select[0][7] == -1
28

  
29

  
30
def test_update(repository):
31
    sql = f"SELECT * FROM {TAB_CERTIFICATES} WHERE {COL_COMMON_NAME} LIKE 'test%'"
32
    rows_select = repository.read(sql)
33

  
34
    sql = f"UPDATE {TAB_CERTIFICATES} SET {COL_PARENT_ID} = ? WHERE {COL_ID} = ?"
35
    check_update: bool = repository.update(sql, rows_select[0][0], [rows_select[0][0]])
36

  
37
    sql = f"SELECT * FROM {TAB_CERTIFICATES} WHERE {COL_COMMON_NAME} LIKE 'test%'"
38
    rows_select = repository.read(sql)
39

  
40
    assert check_update
41
    assert rows_select[0][7] == rows_select[0][0]
42

  
43

  
44
def test_delete(repository):
45
    sql = f"SELECT * FROM {TAB_CERTIFICATES} WHERE {COL_COMMON_NAME} LIKE 'test%'"
46
    rows_select = repository.read(sql)
47

  
48
    i = 1
49
    print("\n")
50
    for row in rows_select:
51
        sql = f"DELETE FROM {TAB_CERTIFICATES} WHERE {COL_ID} = ?"
52
        check_delete = repository.delete(sql, row[0], False)
53
        assert check_delete
54

  
55
    sql = f"SELECT * FROM {TAB_CERTIFICATES} WHERE {COL_COMMON_NAME} LIKE 'test%'"
56
    rows_select = repository.read(sql)
57

  
58
    assert len(rows_select) == 0
src/tests/impl/db_manager/transaction_private_keys.py
1
import pytest
2
from src.constants import *
3

  
4

  
5
def test_create_first(repository):
6
    sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES (?,?)"
7
    check_create: bool = repository.create(sql, ["test_pk_1", "test_pwd_1"])
8

  
9
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
10
    rows_select = repository.read(sql)
11

  
12
    assert check_create
13
    assert rows_select[0][1] == "test_pk_1" and rows_select[0][2] == "test_pwd_1"
14

  
15

  
16
def test_create_count(repository):
17
    sql = f"INSERT INTO {TAB_PRIVATE_KEYS} ({COL_PRIVATE_KEY},{COL_PASSWORD}) VALUES (?,?)"
18
    check_create: bool = repository.create(sql, ["test_pk_1", "test_pwd_1"])
19

  
20
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
21
    rows_select = repository.read(sql)
22

  
23
    assert check_create
24
    assert len(rows_select) == 2
25

  
26

  
27
def test_update(repository):
28
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
29
    rows_select = repository.read(sql)
30

  
31
    sql = f"UPDATE {TAB_PRIVATE_KEYS} SET {COL_PASSWORD} = ? WHERE {COL_ID} = ?"
32
    check_update: bool = repository.update(sql, rows_select[0][0], ["test_pwd_updated"])
33

  
34
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
35
    rows_select = repository.read(sql)
36

  
37
    assert check_update
38
    assert rows_select[0][2] == "test_pwd_updated"
39

  
40

  
41
def test_delete(repository):
42
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
43
    rows_select = repository.read(sql)
44

  
45
    i = 1
46
    print("\n")
47
    for row in rows_select:
48
        sql = f"DELETE FROM {TAB_PRIVATE_KEYS} WHERE {COL_ID} = ?"
49
        check_delete = repository.delete(sql, row[0], False)
50
        assert check_delete
51

  
52
    sql = f"SELECT * FROM {TAB_PRIVATE_KEYS} WHERE {COL_PRIVATE_KEY} LIKE 'test%'"
53
    rows_select = repository.read(sql)
54

  
55
    assert len(rows_select) == 0

Také k dispozici: Unified diff