Projekt

Obecné

Profil

Stáhnout (22 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5

    
6
from src.constants import CRL_CONFIG
7
from src.model.certificate import Certificate
8
from src.model.private_key import PrivateKey
9
from src.model.subject import Subject
10
from src.utils.logger import Logger
11
from src.utils.temporary_file import TemporaryFile
12

    
13
# the prefix of an rsa key
14
KEY_PREFIX = b"-----BEGIN RSA PRIVATE KEY-----"
15

    
16
# what every encrypted key contains
17
GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS = "ENCRYPTED"
18

    
19
# encryption method to be used when generating private keys
20
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
21

    
22
# openssl executable name
23
OPENSSL_EXECUTABLE = "openssl"
24

    
25
# format of NOT_BEFORE NOT_AFTER date fields
26
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
27

    
28
# minimal configuration file to be used for openssl req command
29
# specifies distinguished_name that references empty section only
30
# openssl requires this option to be present
31
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
32

    
33
# section to be used to specify extensions when creating a SSCRT
34
SSCRT_SECTION = "sscrt_ext"
35

    
36
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
37

    
38
# upper bound of the range of random serial numbers to be generated
39
MAX_SN = 4294967296
40

    
41

    
42
class CryptographyService:
43

    
44
    @staticmethod
45
    def __subject_to_param_format(subject):
46
        """
47
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
48
        :param subject: subject to be converted
49
        :return: a dictionary containing openssl field names mapped to subject's fields
50
        """
51

    
52
        Logger.debug("Function launched.")
53

    
54
        subj_dict = {}
55
        if subject.common_name is not None:
56
            subj_dict["CN"] = subject.common_name
57
        if subject.country is not None:
58
            subj_dict["C"] = subject.country
59
        if subject.locality is not None:
60
            subj_dict["L"] = subject.locality
61
        if subject.state is not None:
62
            subj_dict["ST"] = subject.state
63
        if subject.organization is not None:
64
            subj_dict["O"] = subject.organization
65
        if subject.organization_unit is not None:
66
            subj_dict["OU"] = subject.organization_unit
67
        if subject.email_address is not None:
68
            subj_dict["emailAddress"] = subject.email_address
69

    
70
        # merge the subject into a "subj" parameter format
71
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
72

    
73
    @staticmethod
74
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
75
        """
76
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
77
        If the process ends with a non-zero then <CryptographyException> is raised.
78

    
79
        :param args: Arguments to be passed to the program.
80
        :param proc_input: String input to be passed to the stdin of the created process.
81
        :param executable: Executable to be run (defaults to openssl)
82
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
83
        """
84

    
85
        Logger.debug("Function launched.")
86

    
87
        if args is None:
88
            args = []
89
        try:
90
            # prepend the name of the executable
91
            args.insert(0, executable)
92

    
93
            # create a new process
94
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
95
                                    stdout=subprocess.PIPE,
96
                                    stderr=subprocess.PIPE)
97

    
98
            out, err = proc.communicate(proc_input)
99

    
100
            if proc.returncode != 0:
101
                # if the process did not result in zero result code, then raise an exception
102
                if err is not None and len(err) > 0:
103
                    Logger.error("CryptographyException")
104
                    raise CryptographyException(executable, args, err.decode())
105
                else:
106
                    Logger.error("CryptographyException")
107
                    raise CryptographyException(executable, args,
108
                                                f""""Execution resulted in non-zero argument""")
109

    
110
            return out
111
        except FileNotFoundError:
112
            Logger.error("CryptographyException")
113
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
114

    
115
    def create_private_key(self, passphrase=None):
116
        """
117
        Creates a private key with the option to encrypt it using a passphrase.
118
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
119
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
120
        :return: string containing the generated private key in PEM format
121
        """
122

    
123
        Logger.debug("Function launched.")
124

    
125
        if passphrase is None or len(passphrase) == 0:
126
            return self.__run_for_output(["genrsa", "2048"]).decode()
127
        else:
128
            return self.__run_for_output(
129
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
130

    
131
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
132
        """
133
        Creates a self signed certificate
134

    
135
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
136
        :param key: private key of the CA to be used
137
        :param config: string containing the configuration to be used
138
        :param extensions: name of the section in the configuration representing extensions
139
        :param key_pass: passphrase of the private key
140
        :param days: number of days for which the certificate will be valid
141
        :param sn: serial number to be set, when "None" is set a random serial number is generated
142

    
143
        :return: string containing the generated certificate in PEM format
144
        """
145

    
146
        Logger.debug("Function launched.")
147

    
148
        assert key is not None
149
        assert subject is not None
150

    
151
        subj = self.__subject_to_param_format(subject)
152

    
153
        # To specify extension for creating a SSCRT, one has to use a configuration
154
        # file instead of an extension file. Therefore the following code creates
155
        # the most basic configuration file with sscrt_ext section, that is later
156
        # reference in openssl req command using -extensions option.
157
        if len(config) == 0:
158
            config += MINIMAL_CONFIG_FILE
159
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
160

    
161
        with TemporaryFile("openssl.conf", config) as conf_path:
162
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
163
                    "-key", "-"]
164

    
165
            # serial number passed, use it when generating the certificate,
166
            # without passing it openssl generates a random one
167
            if sn is not None:
168
                args.extend(["-set_serial", str(sn)])
169

    
170
            if len(config) > 0:
171
                args.extend(["-config", conf_path])
172
            if len(extensions) > 0:
173
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
174

    
175
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
176
            # the pass phrase (this does not happen when run from pycharm)
177

    
178
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
179
            # waiting for the passphrase to be typed in
180
            args.extend(["-passin", f"pass:{key_pass}"])
181

    
182
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
183

    
184
    def __create_csr(self, subject, key, key_pass=""):
185
        """
186
        Creates a CSR (Certificate Signing Request)
187

    
188
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
189
        :param key: the private key of the subject to be used to generate the CSR
190
        :param key_pass: passphrase of the subject's private key
191
        :return: string containing the generated certificate signing request in PEM format
192
        """
193

    
194
        Logger.debug("Function launched.")
195

    
196
        subj_param = self.__subject_to_param_format(subject)
197

    
198
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
199

    
200
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
201
        # waiting for the passphrase to be typed in
202
        args.extend(["-passin", f"pass:{key_pass}"])
203

    
204
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
205

    
206
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
207
        """
208
        Signs the given CSR by the given issuer CA
209

    
210
        :param csr: a string containing the CSR to be signed
211
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
212
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
213
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
214
        format
215
        :param extensions: extensions to be applied when signing the CSR
216
        :param days: number of days for which the certificate will be valid
217
        :param sn: serial number to be set, when "None" is set a random serial number is generated
218
        :return: string containing the generated and signed certificate in PEM format
219
        """
220

    
221
        Logger.debug("Function launched.")
222

    
223
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
224
        proc_input = csr + issuer_pem + issuer_key
225

    
226
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
227
        # when serial number is not passed generate a random one
228
        if sn is None:
229
            sn = random.randint(0, MAX_SN)
230

    
231
        # prepare openssl parameters...
232
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
233
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
234
                  "-set_serial", str(sn)]
235

    
236
        with TemporaryFile("extensions.conf", extensions) as ext_path:
237
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
238
            # waiting for the passphrase to be typed in
239
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
240

    
241
            if len(extensions) > 0:
242
                params.extend(["-extfile", ext_path])
243

    
244
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
245

    
246
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
247
                   extensions="",
248
                   days=30,
249
                   sn: int = None):
250
        """
251
        Creates a certificate by using the given subject, subject's key, issuer and its key.
252

    
253
        :param subject: subject to be added to the created certificate
254
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
255
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
256
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
257
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
258
        in PEM format
259
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
260
        format
261
        :param extensions: extensions to be applied when creating the certificate
262
        :param days: number of days for which the certificate will be valid
263
        :param sn: serial number to be set, when "None" is set a random serial number is generated
264
        :return: string containing the generated certificate in PEM format
265
        """
266

    
267
        Logger.debug("Function launched.")
268

    
269
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
270
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
271
                               days=days, sn=sn)
272

    
273
    @staticmethod
274
    def verify_cert(certificate):
275
        """
276
        Verifies whether the given certificate is not expired.
277

    
278
        :param certificate: certificate to be verified in PEM format
279
        :return: Returns `true` if the certificate is not expired, `false` when expired.
280
        """
281

    
282
        Logger.debug("Function launched.")
283

    
284
        # call openssl to check whether the certificate is valid to this date
285
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
286

    
287
        # create a new process
288
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
289
                                stdout=subprocess.PIPE,
290
                                stderr=subprocess.PIPE)
291

    
292
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
293

    
294
        # zero return code means that the certificate is valid
295
        if proc.returncode == 0:
296
            return True
297
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
298
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
299
            return False
300
        else:
301
            # the process failed because of some other reason (incorrect cert format)
302
            Logger.error("CryptographyException")
303
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
304

    
305
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
306
        """
307
        Extracts a public key from the given private key passed in PEM format
308
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
309
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
310
        :return: a string containing the extracted public key in PEM format
311
        """
312

    
313
        Logger.debug("Function launched.")
314

    
315
        args = ["rsa", "-in", "-", "-pubout"]
316
        if passphrase is not None:
317
            args.extend(["-passin", f"pass:{passphrase}"])
318
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
319

    
320
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
321
        """
322
        Extracts a public key from the given certificate passed in PEM format
323
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
324
        :return: a string containing the extracted public key in PEM format
325
        """
326

    
327
        Logger.debug("Function launched.")
328

    
329
        # extracting public key from a certificate does not seem to require a passphrase even when
330
        # signed using an encrypted PK
331
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
332
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
333

    
334
    def parse_cert_pem(self, cert_pem):
335
        """
336
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
337
        and NOT_AFTER field
338
        :param cert_pem: a certificated in a PEM format to be parsed
339
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
340
        """
341

    
342
        Logger.debug("Function launched.")
343

    
344
        # run openssl x509 to view certificate content
345
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
346

    
347
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
348

    
349
        # split lines
350
        results = re.split("\n", cert_info_raw)
351
        subj_line = results[0].strip()
352
        not_before_line = results[1].strip()
353
        not_after_line = results[2].strip()
354

    
355
        # attempt to extract subject via regex
356
        match = re.search(r"subject=(.*)", subj_line)
357
        if match is None:
358
            # TODO use logger
359
            print(f"Could not find subject to parse: {subj_line}")
360
            return None
361
        else:
362
            # find all attributes (key = value)
363
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
364
            subj = Subject()
365
            for key, value in found:
366
                if key == "C":
367
                    subj.country = value.strip()
368
                elif key == "ST":
369
                    subj.state = value.strip()
370
                elif key == "L":
371
                    subj.locality = value.strip()
372
                elif key == "O":
373
                    subj.organization = value.strip()
374
                elif key == "OU":
375
                    subj.organization_unit = value.strip()
376
                elif key == "CN":
377
                    subj.common_name = value.strip()
378
                elif key == "emailAddress":
379
                    subj.email_address = value.strip()
380

    
381
        # extract notBefore and notAfter date fields
382
        not_before = re.search(r"notBefore=(.*)", not_before_line)
383
        not_after = re.search(r"notAfter=(.*)", not_after_line)
384

    
385
        # if date fields are found parse them into date objects
386
        if not_before is not None:
387
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
388
        if not_after is not None:
389
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
390

    
391
        # TODO wrapper class?
392
        # return it as a tuple
393
        return subj, not_before, not_after
394

    
395
    def get_openssl_version(self) -> str:
396
        """
397
        Get version of the OpenSSL installed on the system
398
        :return: version of the OpenSSL as returned from the process
399
        """
400

    
401
        Logger.debug("Function launched.")
402

    
403
        return self.__run_for_output(["version"]).decode("utf-8")
404

    
405
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
406
        """
407
        Generate a CertificateRevocationList for a specified
408
        certificate authority.
409

    
410
        :param key: key that is used to sign the CRL (must belong to the given certificate)
411
        :param cert: Certificate of the certificate authority that issue the CRL
412
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
413
        :return: CRL encoded in PEM format string
414
        """
415

    
416
        Logger.debug("Function launched.")
417

    
418
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
419
        with TemporaryFile("serial.srl", "0") as serial_file, \
420
                TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
421
                TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
422
                TemporaryFile("private_key.pem", key.private_key) as key_file:
423
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
424

    
425
            if key.password is not None and key.password != "":
426
                args.extend(["-passin", f"pass:{key.password}"])
427

    
428
            return self.__run_for_output(args).decode("utf-8")
429

    
430
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
431
        """
432
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
433
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
434

    
435
        :param cert: issuer certificate
436
        :param key: corresponding key
437
        :param index_path: path/to/the/generated/index/file
438
        :param der_ocsp_request: DER encoded OCSP Request
439
        :return: DER encoded OCSP Response
440
        """
441

    
442
        Logger.debug("Function launched.")
443

    
444
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
445
                TemporaryFile("private_key.pem", key.private_key) as key_file, \
446
                TemporaryFile("request.der", der_ocsp_request) as request_file:
447
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
448
                    "-reqin", request_file, "-respout", "-"]
449

    
450
            if key.password is not None and key.password != "":
451
                args.extend(["-passin", f"pass:{key.password}"])
452

    
453
            return self.__run_for_output(args)
454

    
455
    def verify_key(self, key, passphrase):
456
        """
457
        Verifies whether the provided key is encrypted by the provided passphrase. If passphrase is none, verifies
458
        that the provided key is unencrypted.
459
        :param key: target key
460
        :param passphrase: target passphrase or None
461
        :return: True if the condition is fulfilled, else False
462
        """
463
        if passphrase is None:
464
            if re.search(GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS, key) is not None:
465
                return False
466
            else:
467
                try:
468
                    with TemporaryFile("tested_key.pem", key) as f:
469
                        ret = self.__run_for_output(["rsa", "-in", f, "-inform", "PEM"])
470
                    return ret.startswith(KEY_PREFIX)
471
                except CryptographyException:
472
                    return False
473
        else:
474
            if re.search(GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS, key) is None:
475
                return False
476
            else:
477
                try:
478
                    with TemporaryFile("tested_key.pem", key) as f:
479
                        ret = self.__run_for_output(["rsa", "-in", f, "-passin", f"pass:{passphrase}"])
480
                    return ret.startswith(KEY_PREFIX)
481
                except CryptographyException:
482
                    return False
483

    
484

    
485
class CryptographyException(Exception):
486

    
487
    def __init__(self, executable, args, message):
488
        self.executable = executable
489
        self.args = args
490
        self.message = message
491

    
492
    def __str__(self):
493
        # TODO check log is valid here
494
        msg = f"""
495
        EXECUTABLE: {self.executable}
496
        ARGS: {self.args}
497
        MESSAGE: {self.message}
498
        """
499

    
500
        Logger.error(msg)
501
        return msg
(3-3/4)