Projekt

Obecné

Profil

Stáhnout (22.4 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5
from typing import List
6

    
7
from src.constants import CRL_CONFIG
8
from src.model.certificate import Certificate
9
from src.model.private_key import PrivateKey
10
from src.model.subject import Subject
11
from src.utils.logger import Logger
12
from src.utils.temporary_file import TemporaryFile
13

    
14
# encryption method to be used when generating private keys
15
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
16

    
17
# openssl executable name
18
OPENSSL_EXECUTABLE = "openssl"
19

    
20
# format of NOT_BEFORE NOT_AFTER date fields
21
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
22

    
23
# minimal configuration file to be used for openssl req command
24
# specifies distinguished_name that references empty section only
25
# openssl requires this option to be present
26
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
27

    
28
# section to be used to specify extensions when creating a SSCRT
29
SSCRT_SECTION = "sscrt_ext"
30

    
31
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
32

    
33
# upper bound of the range of random serial numbers to be generated
34
MAX_SN = 4294967296
35

    
36

    
37
class CryptographyService:
38

    
39
    @staticmethod
40
    def __subject_to_param_format(subject):
41
        """
42
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
43
        :param subject: subject to be converted
44
        :return: a dictionary containing openssl field names mapped to subject's fields
45
        """
46

    
47
        Logger.debug("Function launched.")
48

    
49
        subj_dict = {}
50
        if subject.common_name is not None:
51
            subj_dict["CN"] = subject.common_name
52
        if subject.country is not None:
53
            subj_dict["C"] = subject.country
54
        if subject.locality is not None:
55
            subj_dict["L"] = subject.locality
56
        if subject.state is not None:
57
            subj_dict["ST"] = subject.state
58
        if subject.organization is not None:
59
            subj_dict["O"] = subject.organization
60
        if subject.organization_unit is not None:
61
            subj_dict["OU"] = subject.organization_unit
62
        if subject.email_address is not None:
63
            subj_dict["emailAddress"] = subject.email_address
64

    
65
        # merge the subject into a "subj" parameter format
66
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
67

    
68
    @staticmethod
69
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
70
        """
71
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
72
        If the process ends with a non-zero then <CryptographyException> is raised.
73

    
74
        :param args: Arguments to be passed to the program.
75
        :param proc_input: String input to be passed to the stdin of the created process.
76
        :param executable: Executable to be run (defaults to openssl)
77
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
78
        """
79

    
80
        Logger.debug("Function launched.")
81

    
82
        if args is None:
83
            args = []
84
        try:
85
            # prepend the name of the executable
86
            args.insert(0, executable)
87

    
88
            # create a new process
89
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
90
                                    stdout=subprocess.PIPE,
91
                                    stderr=subprocess.PIPE)
92

    
93
            out, err = proc.communicate(proc_input)
94

    
95
            if proc.returncode != 0:
96
                # if the process did not result in zero result code, then raise an exception
97
                if err is not None and len(err) > 0:
98
                    Logger.error("CryptographyException")
99
                    raise CryptographyException(executable, args, err.decode())
100
                else:
101
                    Logger.error("CryptographyException")
102
                    raise CryptographyException(executable, args,
103
                                                f""""Execution resulted in non-zero argument""")
104

    
105
            return out
106
        except FileNotFoundError:
107
            Logger.error("CryptographyException")
108
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
109

    
110
    def create_private_key(self, passphrase=None):
111
        """
112
        Creates a private key with the option to encrypt it using a passphrase.
113
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
114
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
115
        :return: string containing the generated private key in PEM format
116
        """
117

    
118
        Logger.debug("Function launched.")
119

    
120
        if passphrase is None or len(passphrase) == 0:
121
            return self.__run_for_output(["genrsa", "2048"]).decode()
122
        else:
123
            return self.__run_for_output(
124
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
125

    
126
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
127
        """
128
        Creates a self signed certificate
129

    
130
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
131
        :param key: private key of the CA to be used
132
        :param config: string containing the configuration to be used
133
        :param extensions: name of the section in the configuration representing extensions
134
        :param key_pass: passphrase of the private key
135
        :param days: number of days for which the certificate will be valid
136
        :param sn: serial number to be set, when "None" is set a random serial number is generated
137

    
138
        :return: string containing the generated certificate in PEM format
139
        """
140

    
141
        Logger.debug("Function launched.")
142

    
143
        assert key is not None
144
        assert subject is not None
145

    
146
        subj = self.__subject_to_param_format(subject)
147

    
148
        # To specify extension for creating a SSCRT, one has to use a configuration
149
        # file instead of an extension file. Therefore the following code creates
150
        # the most basic configuration file with sscrt_ext section, that is later
151
        # reference in openssl req command using -extensions option.
152
        if len(config) == 0:
153
            config += MINIMAL_CONFIG_FILE
154
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
155

    
156
        with TemporaryFile("openssl.conf", config) as conf_path:
157
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
158
                    "-key", "-"]
159

    
160
            # serial number passed, use it when generating the certificate,
161
            # without passing it openssl generates a random one
162
            if sn is not None:
163
                args.extend(["-set_serial", str(sn)])
164

    
165
            if len(config) > 0:
166
                args.extend(["-config", conf_path])
167
            if len(extensions) > 0:
168
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
169

    
170
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
171
            # the pass phrase (this does not happen when run from pycharm)
172

    
173
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
174
            # waiting for the passphrase to be typed in
175
            args.extend(["-passin", f"pass:{key_pass}"])
176

    
177
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
178

    
179
    def __create_csr(self, subject, key, key_pass=""):
180
        """
181
        Creates a CSR (Certificate Signing Request)
182

    
183
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
184
        :param key: the private key of the subject to be used to generate the CSR
185
        :param key_pass: passphrase of the subject's private key
186
        :return: string containing the generated certificate signing request in PEM format
187
        """
188

    
189
        Logger.debug("Function launched.")
190

    
191
        subj_param = self.__subject_to_param_format(subject)
192

    
193
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
194

    
195
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
196
        # waiting for the passphrase to be typed in
197
        args.extend(["-passin", f"pass:{key_pass}"])
198

    
199
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
200

    
201
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
202
        """
203
        Signs the given CSR by the given issuer CA
204

    
205
        :param csr: a string containing the CSR to be signed
206
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
207
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
208
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
209
        format
210
        :param extensions: extensions to be applied when signing the CSR
211
        :param days: number of days for which the certificate will be valid
212
        :param sn: serial number to be set, when "None" is set a random serial number is generated
213
        :return: string containing the generated and signed certificate in PEM format
214
        """
215

    
216
        Logger.debug("Function launched.")
217

    
218
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
219
        proc_input = csr + issuer_pem + issuer_key
220

    
221
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
222
        # when serial number is not passed generate a random one
223
        if sn is None:
224
            sn = random.randint(0, MAX_SN)
225

    
226
        # prepare openssl parameters...
227
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
228
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
229
                  "-set_serial", str(sn)]
230

    
231
        with TemporaryFile("extensions.conf", extensions) as ext_path:
232
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
233
            # waiting for the passphrase to be typed in
234
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
235

    
236
            if len(extensions) > 0:
237
                params.extend(["-extfile", ext_path])
238

    
239
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
240

    
241
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
242
                   extensions="",
243
                   days=30,
244
                   sn: int = None):
245
        """
246
        Creates a certificate by using the given subject, subject's key, issuer and its key.
247

    
248
        :param subject: subject to be added to the created certificate
249
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
250
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
251
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
252
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
253
        in PEM format
254
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
255
        format
256
        :param extensions: extensions to be applied when creating the certificate
257
        :param days: number of days for which the certificate will be valid
258
        :param sn: serial number to be set, when "None" is set a random serial number is generated
259
        :return: string containing the generated certificate in PEM format
260
        """
261

    
262
        Logger.debug("Function launched.")
263

    
264
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
265
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
266
                               days=days, sn=sn)
267

    
268
    @staticmethod
269
    def verify_cert(certificate):
270
        """
271
        Verifies whether the given certificate is not expired.
272

    
273
        :param certificate: certificate to be verified in PEM format
274
        :return: Returns `true` if the certificate is not expired, `false` when expired.
275
        """
276

    
277
        Logger.debug("Function launched.")
278

    
279
        # call openssl to check whether the certificate is valid to this date
280
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
281

    
282
        # create a new process
283
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
284
                                stdout=subprocess.PIPE,
285
                                stderr=subprocess.PIPE)
286

    
287
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
288

    
289
        # zero return code means that the certificate is valid
290
        if proc.returncode == 0:
291
            return True
292
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
293
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
294
            return False
295
        else:
296
            # the process failed because of some other reason (incorrect cert format)
297
            Logger.error("CryptographyException")
298
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
299

    
300
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
301
        """
302
        Extracts a public key from the given private key passed in PEM format
303
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
304
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
305
        :return: a string containing the extracted public key in PEM format
306
        """
307

    
308
        Logger.debug("Function launched.")
309

    
310
        args = ["rsa", "-in", "-", "-pubout"]
311
        if passphrase is not None:
312
            args.extend(["-passin", f"pass:{passphrase}"])
313
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
314

    
315
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
316
        """
317
        Extracts a public key from the given certificate passed in PEM format
318
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
319
        :return: a string containing the extracted public key in PEM format
320
        """
321

    
322
        Logger.debug("Function launched.")
323

    
324
        # extracting public key from a certificate does not seem to require a passphrase even when
325
        # signed using an encrypted PK
326
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
327
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
328

    
329
    def parse_cert_pem(self, cert_pem):
330
        """
331
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
332
        and NOT_AFTER field
333
        :param cert_pem: a certificated in a PEM format to be parsed
334
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
335
        """
336

    
337
        Logger.debug("Function launched.")
338

    
339
        # run openssl x509 to view certificate content
340
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
341

    
342
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
343

    
344
        # split lines
345
        results = re.split("\n", cert_info_raw)
346
        subj_line = results[0].strip()
347
        not_before_line = results[1].strip()
348
        not_after_line = results[2].strip()
349

    
350
        # attempt to extract subject via regex
351
        match = re.search(r"subject=(.*)", subj_line)
352
        if match is None:
353
            # TODO use logger
354
            print(f"Could not find subject to parse: {subj_line}")
355
            return None
356
        else:
357
            # find all attributes (key = value)
358
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
359
            subj = Subject()
360
            for key, value in found:
361
                if key == "C":
362
                    subj.country = value.strip()
363
                elif key == "ST":
364
                    subj.state = value.strip()
365
                elif key == "L":
366
                    subj.locality = value.strip()
367
                elif key == "O":
368
                    subj.organization = value.strip()
369
                elif key == "OU":
370
                    subj.organization_unit = value.strip()
371
                elif key == "CN":
372
                    subj.common_name = value.strip()
373
                elif key == "emailAddress":
374
                    subj.email_address = value.strip()
375

    
376
        # extract notBefore and notAfter date fields
377
        not_before = re.search(r"notBefore=(.*)", not_before_line)
378
        not_after = re.search(r"notAfter=(.*)", not_after_line)
379

    
380
        # if date fields are found parse them into date objects
381
        if not_before is not None:
382
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
383
        if not_after is not None:
384
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
385

    
386
        # TODO wrapper class?
387
        # return it as a tuple
388
        return subj, not_before, not_after
389

    
390
    def get_openssl_version(self) -> str:
391
        """
392
        Get version of the OpenSSL installed on the system
393
        :return: version of the OpenSSL as returned from the process
394
        """
395

    
396
        Logger.debug("Function launched.")
397

    
398
        return self.__run_for_output(["version"]).decode("utf-8")
399

    
400
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
401
        """
402
        Generate a CertificateRevocationList for a specified
403
        certificate authority.
404

    
405
        :param key: key that is used to sign the CRL (must belong to the given certificate)
406
        :param cert: Certificate of the certificate authority that issue the CRL
407
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
408
        :return: CRL encoded in PEM format string
409
        """
410

    
411
        Logger.debug("Function launched.")
412

    
413
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
414
        with TemporaryFile("serial.srl", "0") as serial_file, \
415
                TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
416
                TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
417
                TemporaryFile("private_key.pem", key.private_key) as key_file:
418
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
419

    
420
            if key.password is not None and key.password != "":
421
                args.extend(["-passin", f"pass:{key.password}"])
422

    
423
            return self.__run_for_output(args).decode("utf-8")
424

    
425
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
426
        """
427
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
428
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
429

    
430
        :param cert: issuer certificate
431
        :param key: corresponding key
432
        :param index_path: path/to/the/generated/index/file
433
        :param der_ocsp_request: DER encoded OCSP Request
434
        :return: DER encoded OCSP Response
435
        """
436

    
437
        Logger.debug("Function launched.")
438

    
439
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
440
                TemporaryFile("private_key.pem", key.private_key) as key_file, \
441
                TemporaryFile("request.der", der_ocsp_request) as request_file:
442
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
443
                    "-reqin", request_file, "-respout", "-"]
444

    
445
            if key.password is not None and key.password != "":
446
                args.extend(["-passin", f"pass:{key.password}"])
447

    
448
            return self.__run_for_output(args)
449

    
450
    def generate_pkcs_identity(self, cert_pem: str, cert_key_pem: str, identity_name: str, identity_passphrase: str,
451
                               chain_of_trust_pems: List[str], cert_key_passphrase: str = None):
452
        """
453
        Generates a PKCS12 identity of the given child certificate while including the given chain of trust.
454

    
455
        :param cert_pem: PEM of the certificate whose identity should be created
456
        :param cert_key_pem: PEM of the private key used to sign the certificate whose identity should be created
457
        :param identity_name: the name to be given to the identity created
458
        :param chain_of_trust_pems: list of PEMs representing certificates present in the chain of trust of the certificate
459
        whose identity should be created
460
        :param identity_passphrase: passphrase to be used when encrypting the identity
461
        :param cert_key_passphrase: passphrase of the key used to sign the certificate whose identity should be created
462
        :return: byte array containing the generated identity
463
        """
464
        with TemporaryFile("cert_key.pem", cert_key_pem) as cert_key_pem_file:
465
            if identity_passphrase is None:
466
                identity_passphrase = ""
467

    
468
            args = ["pkcs12", "-export", "-name", identity_name, "-in", "-", "-inkey", cert_key_pem_file, "-passout", f"pass:{identity_passphrase}", "-passin", f"pass:{cert_key_passphrase}"]
469
            proc_input = cert_pem
470
            # when the chain of trust is not empty append the -CAfile argument and the concatenated list of CoT PEMs
471
            # to the input of the process to be launched
472
            if len(chain_of_trust_pems) > 0:
473
                args.extend(["-CAfile", "-", ])
474
                proc_input += "".join(chain_of_trust_pems)
475
            return self.__run_for_output(args,
476
                                         proc_input=bytes(proc_input, encoding="utf-8"))
477

    
478

    
479
class CryptographyException(Exception):
480

    
481
    def __init__(self, executable, args, message):
482
        self.executable = executable
483
        self.args = args
484
        self.message = message
485

    
486
    def __str__(self):
487
        # TODO check log is valid here
488
        msg = f"""
489
        EXECUTABLE: {self.executable}
490
        ARGS: {self.args}
491
        MESSAGE: {self.message}
492
        """
493

    
494
        Logger.error(msg)
495
        return msg
(3-3/4)