Projekt

Obecné

Profil

Stáhnout (19.6 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5

    
6
from src.constants import CRL_CONFIG
7
from src.model.certificate import Certificate
8
from src.model.private_key import PrivateKey
9
from src.model.subject import Subject
10
from src.utils.temporary_file import TemporaryFile
11

    
12
# encryption method to be used when generating private keys
13
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
14

    
15
# openssl executable name
16
OPENSSL_EXECUTABLE = "openssl"
17

    
18
# format of NOT_BEFORE NOT_AFTER date fields
19
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
20

    
21
# minimal configuration file to be used for openssl req command
22
# specifies distinguished_name that references empty section only
23
# openssl requires this option to be present
24
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
25

    
26
# section to be used to specify extensions when creating a SSCRT
27
SSCRT_SECTION = "sscrt_ext"
28

    
29
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
30

    
31
# upper bound of the range of random serial numbers to be generated
32
MAX_SN = 4294967296
33

    
34

    
35
class CryptographyService:
36

    
37
    @staticmethod
38
    def __subject_to_param_format(subject):
39
        """
40
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
41
        :param subject: subject to be converted
42
        :return: a dictionary containing openssl field names mapped to subject's fields
43
        """
44
        subj_dict = {}
45
        if subject.common_name is not None:
46
            subj_dict["CN"] = subject.common_name
47
        if subject.country is not None:
48
            subj_dict["C"] = subject.country
49
        if subject.locality is not None:
50
            subj_dict["L"] = subject.locality
51
        if subject.state is not None:
52
            subj_dict["ST"] = subject.state
53
        if subject.organization is not None:
54
            subj_dict["O"] = subject.organization
55
        if subject.organization_unit is not None:
56
            subj_dict["OU"] = subject.organization_unit
57
        if subject.email_address is not None:
58
            subj_dict["emailAddress"] = subject.email_address
59

    
60
        # merge the subject into a "subj" parameter format
61
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
62

    
63
    @staticmethod
64
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
65
        """
66
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
67
        If the process ends with a non-zero then <CryptographyException> is raised.
68

    
69
        :param args: Arguments to be passed to the program.
70
        :param proc_input: String input to be passed to the stdin of the created process.
71
        :param executable: Executable to be run (defaults to openssl)
72
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
73
        """
74
        if args is None:
75
            args = []
76
        try:
77
            # prepend the name of the executable
78
            args.insert(0, executable)
79

    
80
            # create a new process
81
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
82
                                    stdout=subprocess.PIPE,
83
                                    stderr=subprocess.PIPE)
84

    
85
            out, err = proc.communicate(proc_input)
86

    
87
            if proc.returncode != 0:
88
                # if the process did not result in zero result code, then raise an exception
89
                if err is not None and len(err) > 0:
90
                    raise CryptographyException(executable, args, err.decode())
91
                else:
92
                    raise CryptographyException(executable, args,
93
                                                f""""Execution resulted in non-zero argument""")
94

    
95
            return out
96
        except FileNotFoundError:
97
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
98

    
99
    def create_private_key(self, passphrase=None):
100
        """
101
        Creates a private key with the option to encrypt it using a passphrase.
102
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
103
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
104
        :return: string containing the generated private key in PEM format
105
        """
106
        if passphrase is None or len(passphrase) == 0:
107
            return self.__run_for_output(["genrsa", "2048"]).decode()
108
        else:
109
            return self.__run_for_output(
110
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
111

    
112
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
113
        """
114
        Creates a self signed certificate
115

    
116
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
117
        :param key: private key of the CA to be used
118
        :param config: string containing the configuration to be used
119
        :param extensions: name of the section in the configuration representing extensions
120
        :param key_pass: passphrase of the private key
121
        :param days: number of days for which the certificate will be valid
122
        :param sn: serial number to be set, when "None" is set a random serial number is generated
123

    
124
        :return: string containing the generated certificate in PEM format
125
        """
126
        assert key is not None
127
        assert subject is not None
128

    
129
        subj = self.__subject_to_param_format(subject)
130

    
131
        # To specify extension for creating a SSCRT, one has to use a configuration
132
        # file instead of an extension file. Therefore the following code creates
133
        # the most basic configuration file with sscrt_ext section, that is later
134
        # reference in openssl req command using -extensions option.
135
        if len(config) == 0:
136
            config += MINIMAL_CONFIG_FILE
137
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
138

    
139
        with TemporaryFile("openssl.conf", config) as conf_path:
140
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
141
                    "-key", "-"]
142

    
143
            # serial number passed, use it when generating the certificate,
144
            # without passing it openssl generates a random one
145
            if sn is not None:
146
                args.extend(["-set_serial", str(sn)])
147

    
148
            if len(config) > 0:
149
                args.extend(["-config", conf_path])
150
            if len(extensions) > 0:
151
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
152

    
153
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
154
            # the pass phrase (this does not happen when run from pycharm)
155

    
156
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
157
            # waiting for the passphrase to be typed in
158
            args.extend(["-passin", f"pass:{key_pass}"])
159

    
160
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
161

    
162
    def __create_csr(self, subject, key, key_pass=""):
163
        """
164
        Creates a CSR (Certificate Signing Request)
165

    
166
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
167
        :param key: the private key of the subject to be used to generate the CSR
168
        :param key_pass: passphrase of the subject's private key
169
        :return: string containing the generated certificate signing request in PEM format
170
        """
171

    
172
        subj_param = self.__subject_to_param_format(subject)
173

    
174
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
175

    
176
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
177
        # waiting for the passphrase to be typed in
178
        args.extend(["-passin", f"pass:{key_pass}"])
179

    
180
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
181

    
182
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
183
        """
184
        Signs the given CSR by the given issuer CA
185

    
186
        :param csr: a string containing the CSR to be signed
187
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
188
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
189
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
190
        format
191
        :param extensions: extensions to be applied when signing the CSR
192
        :param days: number of days for which the certificate will be valid
193
        :param sn: serial number to be set, when "None" is set a random serial number is generated
194
        :return: string containing the generated and signed certificate in PEM format
195
        """
196

    
197
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
198
        proc_input = csr + issuer_pem + issuer_key
199

    
200
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
201
        # when serial number is not passed generate a random one
202
        if sn is None:
203
            sn = random.randint(0, MAX_SN)
204

    
205
        # prepare openssl parameters...
206
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
207
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
208
                  "-set_serial", str(sn)]
209

    
210
        with TemporaryFile("extensions.conf", extensions) as ext_path:
211
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
212
            # waiting for the passphrase to be typed in
213
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
214

    
215
            if len(extensions) > 0:
216
                params.extend(["-extfile", ext_path])
217

    
218
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
219

    
220
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
221
                   extensions="",
222
                   days=30,
223
                   sn: int = None):
224
        """
225
        Creates a certificate by using the given subject, subject's key, issuer and its key.
226

    
227
        :param subject: subject to be added to the created certificate
228
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
229
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
230
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
231
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
232
        in PEM format
233
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
234
        format
235
        :param extensions: extensions to be applied when creating the certificate
236
        :param days: number of days for which the certificate will be valid
237
        :param sn: serial number to be set, when "None" is set a random serial number is generated
238
        :return: string containing the generated certificate in PEM format
239
        """
240
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
241
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
242
                               days=days, sn=sn)
243

    
244
    @staticmethod
245
    def verify_cert(certificate):
246
        """
247
        Verifies whether the given certificate is not expired.
248

    
249
        :param certificate: certificate to be verified in PEM format
250
        :return: Returns `true` if the certificate is not expired, `false` when expired.
251
        """
252
        # call openssl to check whether the certificate is valid to this date
253
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
254

    
255
        # create a new process
256
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
257
                                stdout=subprocess.PIPE,
258
                                stderr=subprocess.PIPE)
259

    
260
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
261

    
262
        # zero return code means that the certificate is valid
263
        if proc.returncode == 0:
264
            return True
265
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
266
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
267
            return False
268
        else:
269
            # the process failed because of some other reason (incorrect cert format)
270
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
271

    
272
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
273
        """
274
        Extracts a public key from the given private key passed in PEM format
275
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
276
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
277
        :return: a string containing the extracted public key in PEM format
278
        """
279
        args = ["rsa", "-in", "-", "-pubout"]
280
        if passphrase is not None:
281
            args.extend(["-passin", f"pass:{passphrase}"])
282
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
283

    
284
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
285
        """
286
        Extracts a public key from the given certificate passed in PEM format
287
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
288
        :return: a string containing the extracted public key in PEM format
289
        """
290
        # extracting public key from a certificate does not seem to require a passphrase even when
291
        # signed using an encrypted PK
292
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
293
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
294

    
295
    def parse_cert_pem(self, cert_pem):
296
        """
297
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
298
        and NOT_AFTER field
299
        :param cert_pem: a certificated in a PEM format to be parsed
300
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
301
        """
302
        # run openssl x509 to view certificate content
303
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
304

    
305
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
306

    
307
        # split lines
308
        results = re.split("\n", cert_info_raw)
309
        subj_line = results[0].strip()
310
        not_before_line = results[1].strip()
311
        not_after_line = results[2].strip()
312

    
313
        # attempt to extract subject via regex
314
        match = re.search(r"subject=(.*)", subj_line)
315
        if match is None:
316
            # TODO use logger
317
            print(f"Could not find subject to parse: {subj_line}")
318
            return None
319
        else:
320
            # find all attributes (key = value)
321
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
322
            subj = Subject()
323
            for key, value in found:
324
                if key == "C":
325
                    subj.country = value.strip()
326
                elif key == "ST":
327
                    subj.state = value.strip()
328
                elif key == "L":
329
                    subj.locality = value.strip()
330
                elif key == "O":
331
                    subj.organization = value.strip()
332
                elif key == "OU":
333
                    subj.organization_unit = value.strip()
334
                elif key == "CN":
335
                    subj.common_name = value.strip()
336
                elif key == "emailAddress":
337
                    subj.email_address = value.strip()
338

    
339
        # extract notBefore and notAfter date fields
340
        not_before = re.search(r"notBefore=(.*)", not_before_line)
341
        not_after = re.search(r"notAfter=(.*)", not_after_line)
342

    
343
        # if date fields are found parse them into date objects
344
        if not_before is not None:
345
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
346
        if not_after is not None:
347
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
348

    
349
        # TODO wrapper class?
350
        # return it as a tuple
351
        return subj, not_before, not_after
352

    
353
    def get_openssl_version(self) -> str:
354
        """
355
        Get version of the OpenSSL installed on the system
356
        :return: version of the OpenSSL as returned from the process
357
        """
358
        return self.__run_for_output(["version"]).decode("utf-8")
359

    
360
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
361
        """
362
        Generate a CertificateRevocationList for a specified
363
        certificate authority.
364

    
365
        :param key: key that is used to sign the CRL (must belong to the given certificate)
366
        :param cert: Certificate of the certificate authority that issue the CRL
367
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
368
        :return: CRL encoded in PEM format string
369
        """
370
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
371
        with TemporaryFile("serial.srl", "0") as serial_file, \
372
             TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
373
             TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
374
             TemporaryFile("private_key.pem", key.private_key) as key_file:
375

    
376
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
377

    
378
            if key.password is not None and key.password != "":
379
                args.extend(["-passin", f"pass:{key.password}"])
380

    
381
            return self.__run_for_output(args).decode("utf-8")
382

    
383
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
384
        """
385
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
386
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
387

    
388
        :param cert: issuer certificate
389
        :param key: corresponding key
390
        :param index_path: path/to/the/generated/index/file
391
        :param der_ocsp_request: DER encoded OCSP Request
392
        :return: DER encoded OCSP Response
393
        """
394
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
395
             TemporaryFile("private_key.pem", key.private_key) as key_file, \
396
             TemporaryFile("request.der", der_ocsp_request) as request_file:
397

    
398
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
399
                    "-reqin", request_file, "-respout", "-"]
400

    
401
            if key.password is not None and key.password != "":
402
                args.extend(["-passin", f"pass:{key.password}"])
403

    
404
            return self.__run_for_output(args)
405

    
406

    
407
class CryptographyException(Exception):
408

    
409
    def __init__(self, executable, args, message):
410
        self.executable = executable
411
        self.args = args
412
        self.message = message
413

    
414
    def __str__(self):
415
        return f"""
416
        EXECUTABLE: {self.executable}
417
        ARGS: {self.args}
418
        MESSAGE: {self.message}
419
        """
(3-3/4)