Projekt

Obecné

Profil

Stáhnout (23.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5
from typing import List
6

    
7
from src.constants import CRL_CONFIG
8
from src.model.certificate import Certificate
9
from src.model.private_key import PrivateKey
10
from src.model.subject import Subject
11
from src.utils.logger import Logger
12
from src.utils.temporary_file import TemporaryFile
13

    
14
# the prefix of an rsa key
15
KEY_PREFIX = b"-----BEGIN RSA PRIVATE KEY-----"
16

    
17
# what every encrypted key contains
18
GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS = "ENCRYPTED"
19

    
20
# encryption method to be used when generating private keys
21
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
22

    
23
# openssl executable name
24
OPENSSL_EXECUTABLE = "openssl"
25

    
26
# format of NOT_BEFORE NOT_AFTER date fields
27
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
28

    
29
# minimal configuration file to be used for openssl req command
30
# specifies distinguished_name that references empty section only
31
# openssl requires this option to be present
32
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
33

    
34
# section to be used to specify extensions when creating a SSCRT
35
SSCRT_SECTION = "sscrt_ext"
36

    
37
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
38

    
39
# upper bound of the range of random serial numbers to be generated
40
MAX_SN = 4294967296
41

    
42

    
43
class CryptographyService:
44

    
45
    @staticmethod
46
    def __subject_to_param_format(subject):
47
        """
48
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
49
        :param subject: subject to be converted
50
        :return: a dictionary containing openssl field names mapped to subject's fields
51
        """
52

    
53
        Logger.debug("Function launched.")
54

    
55
        subj_dict = {}
56
        if subject.common_name is not None:
57
            subj_dict["CN"] = subject.common_name
58
        if subject.country is not None:
59
            subj_dict["C"] = subject.country
60
        if subject.locality is not None:
61
            subj_dict["L"] = subject.locality
62
        if subject.state is not None:
63
            subj_dict["ST"] = subject.state
64
        if subject.organization is not None:
65
            subj_dict["O"] = subject.organization
66
        if subject.organization_unit is not None:
67
            subj_dict["OU"] = subject.organization_unit
68
        if subject.email_address is not None:
69
            subj_dict["emailAddress"] = subject.email_address
70

    
71
        # merge the subject into a "subj" parameter format
72
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
73

    
74
    @staticmethod
75
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
76
        """
77
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
78
        If the process ends with a non-zero then <CryptographyException> is raised.
79

    
80
        :param args: Arguments to be passed to the program.
81
        :param proc_input: String input to be passed to the stdin of the created process.
82
        :param executable: Executable to be run (defaults to openssl)
83
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
84
        """
85

    
86
        Logger.debug("Function launched.")
87

    
88
        if args is None:
89
            args = []
90
        try:
91
            # prepend the name of the executable
92
            args.insert(0, executable)
93

    
94
            # create a new process
95
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
96
                                    stdout=subprocess.PIPE,
97
                                    stderr=subprocess.PIPE)
98

    
99
            out, err = proc.communicate(proc_input)
100

    
101
            if proc.returncode != 0:
102
                # if the process did not result in zero result code, then raise an exception
103
                if err is not None and len(err) > 0:
104
                    Logger.error("CryptographyException")
105
                    raise CryptographyException(executable, args, err.decode())
106
                else:
107
                    Logger.error("CryptographyException")
108
                    raise CryptographyException(executable, args,
109
                                                f""""Execution resulted in non-zero argument""")
110

    
111
            return out
112
        except FileNotFoundError:
113
            Logger.error("CryptographyException")
114
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
115

    
116
    def create_private_key(self, passphrase=None):
117
        """
118
        Creates a private key with the option to encrypt it using a passphrase.
119
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
120
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
121
        :return: string containing the generated private key in PEM format
122
        """
123

    
124
        Logger.debug("Function launched.")
125

    
126
        if passphrase is None or len(passphrase) == 0:
127
            return self.__run_for_output(["genrsa", "2048"]).decode()
128
        else:
129
            return self.__run_for_output(
130
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
131

    
132
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
133
        """
134
        Creates a self signed certificate
135

    
136
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
137
        :param key: private key of the CA to be used
138
        :param config: string containing the configuration to be used
139
        :param extensions: name of the section in the configuration representing extensions
140
        :param key_pass: passphrase of the private key
141
        :param days: number of days for which the certificate will be valid
142
        :param sn: serial number to be set, when "None" is set a random serial number is generated
143

    
144
        :return: string containing the generated certificate in PEM format
145
        """
146

    
147
        Logger.debug("Function launched.")
148

    
149
        assert key is not None
150
        assert subject is not None
151

    
152
        subj = self.__subject_to_param_format(subject)
153

    
154
        # To specify extension for creating a SSCRT, one has to use a configuration
155
        # file instead of an extension file. Therefore the following code creates
156
        # the most basic configuration file with sscrt_ext section, that is later
157
        # reference in openssl req command using -extensions option.
158
        if len(config) == 0:
159
            config += MINIMAL_CONFIG_FILE
160
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
161

    
162
        with TemporaryFile("openssl.conf", config) as conf_path:
163
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
164
                    "-key", "-"]
165

    
166
            # serial number passed, use it when generating the certificate,
167
            # without passing it openssl generates a random one
168
            if sn is not None:
169
                args.extend(["-set_serial", str(sn)])
170

    
171
            if len(config) > 0:
172
                args.extend(["-config", conf_path])
173
            if len(extensions) > 0:
174
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
175

    
176
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
177
            # the pass phrase (this does not happen when run from pycharm)
178

    
179
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
180
            # waiting for the passphrase to be typed in
181
            args.extend(["-passin", f"pass:{key_pass}"])
182

    
183
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
184

    
185
    def __create_csr(self, subject, key, key_pass=""):
186
        """
187
        Creates a CSR (Certificate Signing Request)
188

    
189
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
190
        :param key: the private key of the subject to be used to generate the CSR
191
        :param key_pass: passphrase of the subject's private key
192
        :return: string containing the generated certificate signing request in PEM format
193
        """
194

    
195
        Logger.debug("Function launched.")
196

    
197
        subj_param = self.__subject_to_param_format(subject)
198

    
199
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
200

    
201
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
202
        # waiting for the passphrase to be typed in
203
        args.extend(["-passin", f"pass:{key_pass}"])
204

    
205
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
206

    
207
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
208
        """
209
        Signs the given CSR by the given issuer CA
210

    
211
        :param csr: a string containing the CSR to be signed
212
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
213
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
214
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
215
        format
216
        :param extensions: extensions to be applied when signing the CSR
217
        :param days: number of days for which the certificate will be valid
218
        :param sn: serial number to be set, when "None" is set a random serial number is generated
219
        :return: string containing the generated and signed certificate in PEM format
220
        """
221

    
222
        Logger.debug("Function launched.")
223

    
224
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
225
        proc_input = csr + issuer_pem + issuer_key
226

    
227
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
228
        # when serial number is not passed generate a random one
229
        if sn is None:
230
            sn = random.randint(0, MAX_SN)
231

    
232
        # prepare openssl parameters...
233
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
234
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
235
                  "-set_serial", str(sn)]
236

    
237
        with TemporaryFile("extensions.conf", extensions) as ext_path:
238
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
239
            # waiting for the passphrase to be typed in
240
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
241

    
242
            if len(extensions) > 0:
243
                params.extend(["-extfile", ext_path])
244

    
245
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
246

    
247
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
248
                   extensions="",
249
                   days=30,
250
                   sn: int = None):
251
        """
252
        Creates a certificate by using the given subject, subject's key, issuer and its key.
253

    
254
        :param subject: subject to be added to the created certificate
255
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
256
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
257
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
258
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
259
        in PEM format
260
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
261
        format
262
        :param extensions: extensions to be applied when creating the certificate
263
        :param days: number of days for which the certificate will be valid
264
        :param sn: serial number to be set, when "None" is set a random serial number is generated
265
        :return: string containing the generated certificate in PEM format
266
        """
267

    
268
        Logger.debug("Function launched.")
269

    
270
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
271
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
272
                               days=days, sn=sn)
273

    
274
    @staticmethod
275
    def verify_cert(certificate):
276
        """
277
        Verifies whether the given certificate is not expired.
278

    
279
        :param certificate: certificate to be verified in PEM format
280
        :return: Returns `true` if the certificate is not expired, `false` when expired.
281
        """
282

    
283
        Logger.debug("Function launched.")
284

    
285
        # call openssl to check whether the certificate is valid to this date
286
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
287

    
288
        # create a new process
289
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
290
                                stdout=subprocess.PIPE,
291
                                stderr=subprocess.PIPE)
292

    
293
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
294

    
295
        # zero return code means that the certificate is valid
296
        if proc.returncode == 0:
297
            return True
298
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
299
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
300
            return False
301
        else:
302
            # the process failed because of some other reason (incorrect cert format)
303
            Logger.error("CryptographyException")
304
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
305

    
306
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
307
        """
308
        Extracts a public key from the given private key passed in PEM format
309
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
310
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
311
        :return: a string containing the extracted public key in PEM format
312
        """
313

    
314
        Logger.debug("Function launched.")
315

    
316
        args = ["rsa", "-in", "-", "-pubout"]
317
        if passphrase is not None:
318
            args.extend(["-passin", f"pass:{passphrase}"])
319
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
320

    
321
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
322
        """
323
        Extracts a public key from the given certificate passed in PEM format
324
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
325
        :return: a string containing the extracted public key in PEM format
326
        """
327

    
328
        Logger.debug("Function launched.")
329

    
330
        # extracting public key from a certificate does not seem to require a passphrase even when
331
        # signed using an encrypted PK
332
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
333
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
334

    
335
    def parse_cert_pem(self, cert_pem):
336
        """
337
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
338
        and NOT_AFTER field
339
        :param cert_pem: a certificated in a PEM format to be parsed
340
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
341
        """
342

    
343
        Logger.debug("Function launched.")
344

    
345
        # run openssl x509 to view certificate content
346
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
347

    
348
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
349

    
350
        # split lines
351
        results = re.split("\n", cert_info_raw)
352
        subj_line = results[0].strip()
353
        not_before_line = results[1].strip()
354
        not_after_line = results[2].strip()
355

    
356
        # attempt to extract subject via regex
357
        match = re.search(r"subject=(.*)", subj_line)
358
        if match is None:
359
            # TODO use logger
360
            print(f"Could not find subject to parse: {subj_line}")
361
            return None
362
        else:
363
            # find all attributes (key = value)
364
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
365
            subj = Subject()
366
            for key, value in found:
367
                if key == "C":
368
                    subj.country = value.strip()
369
                elif key == "ST":
370
                    subj.state = value.strip()
371
                elif key == "L":
372
                    subj.locality = value.strip()
373
                elif key == "O":
374
                    subj.organization = value.strip()
375
                elif key == "OU":
376
                    subj.organization_unit = value.strip()
377
                elif key == "CN":
378
                    subj.common_name = value.strip()
379
                elif key == "emailAddress":
380
                    subj.email_address = value.strip()
381

    
382
        # extract notBefore and notAfter date fields
383
        not_before = re.search(r"notBefore=(.*)", not_before_line)
384
        not_after = re.search(r"notAfter=(.*)", not_after_line)
385

    
386
        # if date fields are found parse them into date objects
387
        if not_before is not None:
388
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
389
        if not_after is not None:
390
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
391

    
392
        # TODO wrapper class?
393
        # return it as a tuple
394
        return subj, not_before, not_after
395

    
396
    def get_openssl_version(self) -> str:
397
        """
398
        Get version of the OpenSSL installed on the system
399
        :return: version of the OpenSSL as returned from the process
400
        """
401

    
402
        Logger.debug("Function launched.")
403

    
404
        return self.__run_for_output(["version"]).decode("utf-8")
405

    
406
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
407
        """
408
        Generate a CertificateRevocationList for a specified
409
        certificate authority.
410

    
411
        :param key: key that is used to sign the CRL (must belong to the given certificate)
412
        :param cert: Certificate of the certificate authority that issue the CRL
413
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
414
        :return: CRL encoded in PEM format string
415
        """
416

    
417
        Logger.debug("Function launched.")
418

    
419
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
420
        with TemporaryFile("serial.srl", "0") as serial_file, \
421
                TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
422
                TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
423
                TemporaryFile("private_key.pem", key.private_key) as key_file:
424
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
425

    
426
            if key.password is not None and key.password != "":
427
                args.extend(["-passin", f"pass:{key.password}"])
428

    
429
            return self.__run_for_output(args).decode("utf-8")
430

    
431
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
432
        """
433
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
434
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
435

    
436
        :param cert: issuer certificate
437
        :param key: corresponding key
438
        :param index_path: path/to/the/generated/index/file
439
        :param der_ocsp_request: DER encoded OCSP Request
440
        :return: DER encoded OCSP Response
441
        """
442

    
443
        Logger.debug("Function launched.")
444

    
445
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
446
                TemporaryFile("private_key.pem", key.private_key) as key_file, \
447
                TemporaryFile("request.der", der_ocsp_request) as request_file:
448
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
449
                    "-reqin", request_file, "-respout", "-"]
450

    
451
            if key.password is not None and key.password != "":
452
                args.extend(["-passin", f"pass:{key.password}"])
453

    
454
            return self.__run_for_output(args)
455

    
456
    def verify_key(self, key, passphrase):
457
        """
458
        Verifies whether the provided key is encrypted by the provided passphrase. If passphrase is none, verifies
459
        that the provided key is unencrypted.
460
        :param key: target key
461
        :param passphrase: target passphrase or None
462
        :return: True if the condition is fulfilled, else False
463
        """
464
        if passphrase is None:
465
            if re.search(GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS, key) is not None:
466
                return False
467
            else:
468
                try:
469
                    with TemporaryFile("tested_key.pem", key) as f:
470
                        ret = self.__run_for_output(["rsa", "-in", f, "-inform", "PEM"])
471
                    return ret.startswith(KEY_PREFIX)
472
                except CryptographyException:
473
                    return False
474
        else:
475
            if re.search(GUARANTEED_SUBSTRING_OF_ENCRYPTED_KEYS, key) is None:
476
                return False
477
            else:
478
                try:
479
                    with TemporaryFile("tested_key.pem", key) as f:
480
                        ret = self.__run_for_output(["rsa", "-in", f, "-passin", f"pass:{passphrase}"])
481
                    return ret.startswith(KEY_PREFIX)
482
                except CryptographyException:
483
                    return False
484

    
485
    def generate_pkcs_identity(self, cert_pem: str, cert_key_pem: str, identity_name: str, identity_passphrase: str,
486
                               chain_of_trust_pems: List[str], cert_key_passphrase: str = None):
487
        """
488
        Generates a PKCS12 identity of the given child certificate while including the given chain of trust.
489

    
490
        :param cert_pem: PEM of the certificate whose identity should be created
491
        :param cert_key_pem: PEM of the private key used to sign the certificate whose identity should be created
492
        :param identity_name: the name to be given to the identity created
493
        :param chain_of_trust_pems: list of PEMs representing certificates present in the chain of trust of the certificate
494
        whose identity should be created
495
        :param identity_passphrase: passphrase to be used when encrypting the identity
496
        :param cert_key_passphrase: passphrase of the key used to sign the certificate whose identity should be created
497
        :return: byte array containing the generated identity
498
        """
499
        with TemporaryFile("cert_key.pem", cert_key_pem) as cert_key_pem_file:
500
            if identity_passphrase is None:
501
                identity_passphrase = ""
502

    
503
            args = ["pkcs12", "-export", "-name", identity_name, "-in", "-", "-inkey", cert_key_pem_file, "-passout", f"pass:{identity_passphrase}", "-passin", f"pass:{cert_key_passphrase}"]
504
            proc_input = cert_pem
505
            # when the chain of trust is not empty append the -CAfile argument and the concatenated list of CoT PEMs
506
            # to the input of the process to be launched
507
            if len(chain_of_trust_pems) > 0:
508
                args.extend(["-CAfile", "-", ])
509
                proc_input += "".join(chain_of_trust_pems)
510
            return self.__run_for_output(args,
511
                                         proc_input=bytes(proc_input, encoding="utf-8"))
512

    
513

    
514
class CryptographyException(Exception):
515

    
516
    def __init__(self, executable, args, message):
517
        self.executable = executable
518
        self.args = args
519
        self.message = message
520

    
521
    def __str__(self):
522
        # TODO check log is valid here
523
        # TODO Standa does not think so...
524
        msg = f"""
525
        EXECUTABLE: {self.executable}
526
        ARGS: {self.args}
527
        MESSAGE: {self.message}
528
        """
529

    
530
        Logger.error(msg)
531
        return msg
(3-3/4)