Projekt

Obecné

Profil

« Předchozí | Další » 

Revize c0aed2f5

Přidáno uživatelem Stanislav Král před asi 4 roky(ů)

Re #8472 - Added create_sscrt method that creates a self signed certificate

Added 5 unit tests verifying the validity of the certificate generation.

Zobrazit rozdíly:

proj/model/subject.py
1
class Subject:
2

  
3
    def __init__(self, common_name=None, country=None, locality=None, state=None, organization=None,
4
                 organization_unit=None, email_address=None):
5
        self.common_name = common_name
6
        self.country = country
7
        self.locality = locality
8
        self.state = state
9
        self.organization = organization
10
        self.organization_unit = organization_unit
11
        self.email_address = email_address
proj/services/cryptography.py
1
import subprocess
2

  
3
# encryption method to be used when generating private keys
4
from proj.utils.temporary_file import TemporaryFile
5

  
6
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
7

  
8
# openssl executable name
9
OPENSSL_EXECUTABLE = "openssl"
10

  
11

  
12
class CryptographyService:
13

  
14
    @staticmethod
15
    def _run_for_output(args=None, stdin=None, executable=OPENSSL_EXECUTABLE):
16
        """
17
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
18
        If the process ends with a non-zero then <CryptographyException> is raised.
19
        :param args: Arguments to be passed to the program.
20
        :param stdin: String input to be passed to the stdin of the created process.
21
        :param executable: Executable to be run (defaults to openssl)
22
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
23
        """
24
        if args is None:
25
            args = []
26
        try:
27
            # prepend the name of the executable
28
            args.insert(0, executable)
29

  
30
            # create a new process
31
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if stdin is not None else None, stdout=subprocess.PIPE,
32
                                    stderr=subprocess.PIPE)
33

  
34
            out, err = proc.communicate(stdin)
35

  
36
            if proc.returncode != 0:
37
                # if the process did not result in zero result code, then raise an exception
38
                if err is not None:
39
                    raise CryptographyException(executable, args, err.decode())
40
                else:
41
                    raise CryptographyException(executable, args,
42
                                                f""""Execution resulted in non-zero argument""")
43

  
44
            return out
45
        except FileNotFoundError:
46
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
47

  
48
    def create_private_key(self, passphrase=None):
49
        """
50
        Creates a private key with the option to encrypt it using a passphrase.
51
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
52
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
53
        :return: A text representation of the generated private key.
54
        """
55
        if passphrase is None or len(passphrase) == 0:
56
            return self._run_for_output(["genrsa", "2048"]).decode()
57
        else:
58
            return self._run_for_output(
59
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
60

  
61
    def create_sscrt(self, key, subject, config="", extensions="", key_passphrase=None):
62
        """
63
        Creates a root CA
64

  
65
        :param key: private key of the CA to be used
66
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
67
        :param config: string containing the configuration to be used
68
        :param extensions: name of the section in the configuration representing extensions
69
        :param key_passphrase: passphrase of the private key
70

  
71
        :return: byte array containing the generated certificate
72
        """
73
        assert key is not None
74
        assert subject is not None
75

  
76
        subj_dict = {}
77
        if subject.common_name is not None:
78
            subj_dict["CN"] = subject.common_name
79
        if subject.country is not None:
80
            subj_dict["C"] = subject.country
81
        if subject.locality is not None:
82
            subj_dict["L"] = subject.locality
83
        if subject.state is not None:
84
            subj_dict["ST"] = subject.state
85
        if subject.organization is not None:
86
            subj_dict["O"] = subject.organization
87
        if subject.organization_unit is not None:
88
            subj_dict["OU"] = subject.organization_unit
89
        if subject.email_address is not None:
90
            subj_dict["emailAddress"] = subject.email_address
91

  
92
        # merge the subject into a "subj" parameter
93
        subj = "".join([f"/{key}={value}" for key, value in subj_dict.items()])
94

  
95
        with TemporaryFile("openssl.conf", config) as conf_path:
96
            args = ["req", "-x509", "-new", "-subj", subj,
97
                    "-key", "-"]
98
            if len(config) > 0:
99
                args.extend(["-config", conf_path])
100

  
101
            if len(extensions) > 0:
102
                args.extend(["-extensions", extensions])
103

  
104
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
105
            # the pass phrase (this does not happen when run from pycharm)
106

  
107
            # if key_passphrase is not None:
108
            args.extend(["-passin", f"pass:{key_passphrase}"])
109

  
110
            return self._run_for_output(args, stdin=bytes(key, encoding="utf-8")).decode()
111

  
112

  
113
class CryptographyException(Exception):
114

  
115
    def __init__(self, executable, args, message):
116
        self.executable = executable
117
        self.args = args
118
        self.message = message
119

  
120
    def __str__(self):
121
        return f"""
122
        EXECUTABLE: {self.executable}
123
        ARGS: {self.args}
124
        MESSAGE: {self.message}
125
        """
proj/tests/services/cryptography_test.py
1
import pytest
2
import subprocess
3

  
4
from proj.model.subject import Subject
5
from proj.services.cryptography import CryptographyService, CryptographyException
6

  
7

  
8
@pytest.fixture
9
def service():
10
    # provide a CryptographyService fixture
11
    return CryptographyService()
12

  
13

  
14
def test_private_key(service):
15
    private_key = service.create_private_key()
16

  
17
    # verify the private key
18
    subprocess.check_output(["openssl", "rsa", "-in", "-", "-check"], input=bytes(private_key, encoding="utf-8"),
19
                            stderr=subprocess.STDOUT)
20

  
21

  
22
def test_encrypted_private_key(service):
23
    private_key = service.create_private_key(passphrase="foobar")
24

  
25
    # verify the private key providing a correct passphrase
26
    subprocess.check_output(["openssl", "rsa", "-in", "-", "-passin", "pass:foobar", "-check"],
27
                            input=bytes(private_key, encoding="utf-8"), stderr=subprocess.STDOUT)
28

  
29

  
30
def test_encrypted_private_key_incorrect_pass(service):
31
    private_key = service.create_private_key(passphrase="foobar")
32

  
33
    # incorrect passphrase provided
34
    with pytest.raises(subprocess.CalledProcessError):
35
        subprocess.check_output(["openssl", "rsa", "-in", "-", "-passin", "pass:bazbaz", "-check"],
36
                                input=bytes(private_key, encoding="utf-8"), stderr=subprocess.STDOUT)
37

  
38

  
39
def test_create_sscrt(service):
40
    # create a self signed certificate using configuration and extensions
41
    private_key = service.create_private_key(passphrase="foobar")
42

  
43
    # distinguished_name is always required
44
    config = """
45
    # Simple Root CA
46

  
47
    [ req ]
48
    distinguished_name      = ca_dn                 # DN section
49

  
50
    [ ca_dn ]
51

  
52
    [ root_ca_ext ]
53
    keyUsage                = critical,keyCertSign,cRLSign
54
    basicConstraints        = critical,CA:true
55
    subjectKeyIdentifier    = hash
56
    authorityKeyIdentifier  = keyid:always
57
    """
58

  
59
    cert = service.create_sscrt(private_key,
60
                                Subject(common_name="Topnax",
61
                                        country="CZ",
62
                                        locality="My Locality",
63
                                        state="My state",
64
                                        organization="Mysterious Org.",
65
                                        organization_unit="Department of Mysteries",
66
                                        email_address="mysterious@box.cz"),
67
                                config=config,
68
                                extensions="root_ca_ext",
69
                                key_passphrase="foobar")
70

  
71
    cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"],
72
                                           input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode()
73

  
74
    assert "Certificate Sign, CRL Sign" in cert_printed
75
    assert "X509v3 Key Usage: critical" in cert_printed
76
    assert "CA:TRUE" in cert_printed
77

  
78
    assert "Issuer: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed
79
    assert "Subject: CN = Topnax, C = CZ, L = My Locality, ST = My state, O = Mysterious Org., OU = Department of Mysteries, emailAddress = mysterious@box.cz" in cert_printed
80

  
81

  
82
def test_create_sscrt_config_without_extensions(service):
83
    # create a self signed certificate without specifying extensions
84
    private_key = service.create_private_key()
85

  
86
    config = """
87
    # Simple Root CA
88

  
89
    [ req ]
90
    distinguished_name      = ca_dn                 # DN section
91

  
92
    [ ca_dn ]
93

  
94
    """
95

  
96
    cert = service.create_sscrt(private_key, Subject(common_name="Topnax", country="CZ"), config=config)
97

  
98
    cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"],
99
                                           input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode()
100

  
101
    # TODO pass something in the configuration that can be asserted
102
    assert "Issuer: CN = Topnax, C = CZ" in cert_printed
103
    assert "Subject: CN = Topnax, C = CZ" in cert_printed
104

  
105

  
106
def test_create_sscrt_plain(service):
107
    # create a self signed certificate without configuration
108
    private_key = service.create_private_key()
109

  
110
    cert = service.create_sscrt(private_key, Subject(common_name="Topnax", country="CZ"))
111

  
112
    cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"],
113
                                           input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode()
114

  
115
    assert "Issuer: CN = Topnax, C = CZ" in cert_printed
116
    assert "Subject: CN = Topnax, C = CZ" in cert_printed
117

  
118

  
119
def test_create_sscrt_passphrase(service):
120
    # create a self signed certificate with a PK that is protected by a passphrase
121
    private_key = service.create_private_key(passphrase="foobar")
122

  
123
    cert = service.create_sscrt(private_key, Subject(common_name="Topnax", country="CZ"), key_passphrase="foobar")
124

  
125
    cert_printed = subprocess.check_output(["openssl", "x509", "-noout", "-text", "-in", "-"],
126
                                           input=bytes(cert, encoding="utf-8"), stderr=subprocess.STDOUT).decode()
127

  
128
    assert "Issuer: CN = Topnax, C = CZ" in cert_printed
129
    assert "Subject: CN = Topnax, C = CZ" in cert_printed
130

  
131

  
132
def test_create_sscrt_incorrect_passphrase(service):
133
    # make an attempt to create a self signed certificate using a private key with specifying wrong key passphrase or
134
    # no passphrase at all
135
    private_key = service.create_private_key(passphrase="foobar")
136

  
137
    # incorrect passphrase provided when using a protected private key
138
    with pytest.raises(CryptographyException) as e:
139
        service.create_sscrt(private_key, Subject(common_name="Topnax", country="CZ"), key_passphrase="bazfoo")
140
    assert "bad decrypt" in e.value.message
141

  
142
    # no passphrase provided when using a protected private key
143
    with pytest.raises(CryptographyException) as e:
144
        service.create_sscrt(private_key, Subject(common_name="Topnax", country="CZ"))
145
    assert "bad decrypt" in e.value.message
proj/utils/temporary_file.py
1
import os
2

  
3

  
4
# allows the caller, when using "with" keyword, access a temporary file with the desired content that is deleted
5
# when "with" context is destroyed
6
class TemporaryFile(object):
7
    def __init__(self, file_name, content):
8
        self.file_name = file_name
9
        self.content = content
10

  
11
    def __enter__(self):
12
        # if content was passed, write it to a new file specified by the given file name
13
        if len(self.content) > 0:
14
            with open(self.file_name, "w") as file:
15
                file.write(self.content)
16
        return self.file_name
17

  
18
    def __exit__(self, type_, value, traceback):
19
        if len(self.content) > 0:
20
            # if content was passed then the file has been created => delete it.
21
            os.remove(self.file_name)
services/cryptography.py
1
import subprocess
2

  
3
# encryption method to be used when generating private keys
4
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
5

  
6
# openssl executable name
7
OPENSSL_EXECUTABLE = "openssl"
8

  
9

  
10
class CryptographyService:
11

  
12
    @staticmethod
13
    def _run_for_output(args=None, stdin=None, executable=OPENSSL_EXECUTABLE):
14
        """
15
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
16
        If the process ends with a non-zero then <CryptographyException> is raised.
17
        :param args: Arguments to be passed to the program.
18
        :param stdin: Arguments to be passed to the process.
19
        :param executable: Executable to be run (defaults to openssl)
20
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
21
        """
22
        if args is None:
23
            args = []
24
        try:
25
            # prepend the name of the executable
26
            args.insert(0, executable)
27

  
28
            # create a new process
29
            proc = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE,
30
                                    stderr=subprocess.PIPE)
31

  
32
            # get process result
33
            out, err = proc.communicate()
34

  
35
            if proc.returncode != 0:
36
                # if the process did not result in zero result code, then raise an exception
37
                if err is not None:
38
                    raise CryptographyException(err.decode())
39
                else:
40
                    raise CryptographyException(
41
                        f""""{executable} with parameters {args} resulted in non-zero argument""")
42

  
43
            return out
44
        except FileNotFoundError:
45
            raise CryptographyException(f""""{executable}" not found in the current PATH.""")
46

  
47
    def create_private_key(self, passphrase=None):
48
        """
49
        Creates a private key with the option to encrypt it using a passphrase.
50
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
51
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
52
        :return: A text representation of the generated private key.
53
        """
54
        if passphrase is None or len(passphrase) == 0:
55
            return self._run_for_output(["genrsa", "2048"]).decode()
56
        else:
57
            return self._run_for_output(
58
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
59

  
60

  
61
class CryptographyException(Exception):
62

  
63
    def __init__(self, message):
64
        self.message = message
tests/services/cryptography_test.py
1
import pytest
2
from services.cryptography import CryptographyService, CryptographyException
3
import subprocess
4

  
5

  
6
@pytest.fixture
7
def service():
8
    # provide a CryptographyService fixture
9
    return CryptographyService()
10

  
11

  
12
def test_private_key(service):
13
    private_key = service.create_private_key()
14

  
15
    # verify the private key
16
    subprocess.check_output(["openssl", "rsa", "-in", "-", "-check"], input=bytes(private_key, encoding="utf-8"),
17
                            stderr=subprocess.STDOUT)
18

  
19

  
20
def test_encrypted_private_key(service):
21
    private_key = service.create_private_key(passphrase="foobar")
22

  
23
    # verify the private key providing a correct passphrase
24
    subprocess.check_output(["openssl", "rsa", "-in", "-", "-passin", "pass:foobar", "-check"],
25
                            input=bytes(private_key, encoding="utf-8"), stderr=subprocess.STDOUT)
26

  
27

  
28
def test_encrypted_private_key_incorrect_pass(service):
29
    private_key = service.create_private_key(passphrase="foobar")
30

  
31
    # incorrect passphrase provided
32
    with pytest.raises(subprocess.CalledProcessError) as e:
33
        subprocess.check_output(["openssl", "rsa", "-in", "-", "-passin", "pass:bazbaz", "-check"],
34
                                input=bytes(private_key, encoding="utf-8"), stderr=subprocess.STDOUT)
35
    # no passphrase provided
36
    with pytest.raises(subprocess.CalledProcessError) as e:
37
        subprocess.check_output(["openssl", "rsa", "-in", "-", "-check"],
38
                                input=bytes(private_key, encoding="utf-8"), stderr=subprocess.STDOUT)

Také k dispozici: Unified diff