Projekt

Obecné

Profil

Stáhnout (20.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5

    
6
from src.constants import CRL_CONFIG
7
from src.model.certificate import Certificate
8
from src.model.private_key import PrivateKey
9
from src.model.subject import Subject
10
from src.utils.logger import Logger
11
from src.utils.temporary_file import TemporaryFile
12

    
13
# encryption method to be used when generating private keys
14
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
15

    
16
# openssl executable name
17
OPENSSL_EXECUTABLE = "openssl"
18

    
19
# format of NOT_BEFORE NOT_AFTER date fields
20
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
21

    
22
# minimal configuration file to be used for openssl req command
23
# specifies distinguished_name that references empty section only
24
# openssl requires this option to be present
25
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
26

    
27
# section to be used to specify extensions when creating a SSCRT
28
SSCRT_SECTION = "sscrt_ext"
29

    
30
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
31

    
32
# upper bound of the range of random serial numbers to be generated
33
MAX_SN = 4294967296
34

    
35

    
36
class CryptographyService:
37

    
38
    @staticmethod
39
    def __subject_to_param_format(subject):
40
        """
41
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
42
        :param subject: subject to be converted
43
        :return: a dictionary containing openssl field names mapped to subject's fields
44
        """
45

    
46
        Logger.debug("Function launched.")
47

    
48
        subj_dict = {}
49
        if subject.common_name is not None:
50
            subj_dict["CN"] = subject.common_name
51
        if subject.country is not None:
52
            subj_dict["C"] = subject.country
53
        if subject.locality is not None:
54
            subj_dict["L"] = subject.locality
55
        if subject.state is not None:
56
            subj_dict["ST"] = subject.state
57
        if subject.organization is not None:
58
            subj_dict["O"] = subject.organization
59
        if subject.organization_unit is not None:
60
            subj_dict["OU"] = subject.organization_unit
61
        if subject.email_address is not None:
62
            subj_dict["emailAddress"] = subject.email_address
63

    
64
        # merge the subject into a "subj" parameter format
65
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
66

    
67
    @staticmethod
68
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
69
        """
70
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
71
        If the process ends with a non-zero then <CryptographyException> is raised.
72

    
73
        :param args: Arguments to be passed to the program.
74
        :param proc_input: String input to be passed to the stdin of the created process.
75
        :param executable: Executable to be run (defaults to openssl)
76
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
77
        """
78

    
79
        Logger.debug("Function launched.")
80

    
81
        if args is None:
82
            args = []
83
        try:
84
            # prepend the name of the executable
85
            args.insert(0, executable)
86

    
87
            # create a new process
88
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
89
                                    stdout=subprocess.PIPE,
90
                                    stderr=subprocess.PIPE)
91

    
92
            out, err = proc.communicate(proc_input)
93

    
94
            if proc.returncode != 0:
95
                # if the process did not result in zero result code, then raise an exception
96
                if err is not None and len(err) > 0:
97
                    Logger.error("CryptographyException")
98
                    raise CryptographyException(executable, args, err.decode())
99
                else:
100
                    Logger.error("CryptographyException")
101
                    raise CryptographyException(executable, args,
102
                                                f""""Execution resulted in non-zero argument""")
103

    
104
            return out
105
        except FileNotFoundError:
106
            Logger.error("CryptographyException")
107
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
108

    
109
    def create_private_key(self, passphrase=None):
110
        """
111
        Creates a private key with the option to encrypt it using a passphrase.
112
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
113
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
114
        :return: string containing the generated private key in PEM format
115
        """
116

    
117
        Logger.debug("Function launched.")
118

    
119
        if passphrase is None or len(passphrase) == 0:
120
            return self.__run_for_output(["genrsa", "2048"]).decode()
121
        else:
122
            return self.__run_for_output(
123
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
124

    
125
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
126
        """
127
        Creates a root CA
128

    
129
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
130
        :param key: private key of the CA to be used
131
        :param config: string containing the configuration to be used
132
        :param extensions: name of the section in the configuration representing extensions
133
        :param key_pass: passphrase of the private key
134
        :param days: number of days for which the certificate will be valid
135
        :param sn: serial number to be set, when "None" is set a random serial number is generated
136

    
137
        :return: string containing the generated certificate in PEM format
138
        """
139

    
140
        Logger.debug("Function launched.")
141

    
142
        assert key is not None
143
        assert subject is not None
144

    
145
        subj = self.__subject_to_param_format(subject)
146

    
147
        # To specify extension for creating a SSCRT, one has to use a configuration
148
        # file instead of an extension file. Therefore the following code creates
149
        # the most basic configuration file with sscrt_ext section, that is later
150
        # reference in openssl req command using -extensions option.
151
        extensions += "\n" + CA_EXTENSIONS
152
        if len(config) == 0:
153
            config += MINIMAL_CONFIG_FILE
154
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
155

    
156
        with TemporaryFile("openssl.conf", config) as conf_path:
157
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
158
                    "-key", "-"]
159

    
160
            # serial number passed, use it when generating the certificate,
161
            # without passing it openssl generates a random one
162
            if sn is not None:
163
                args.extend(["-set_serial", str(sn)])
164

    
165
            if len(config) > 0:
166
                args.extend(["-config", conf_path])
167
            if len(extensions) > 0:
168
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
169

    
170
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
171
            # the pass phrase (this does not happen when run from pycharm)
172

    
173
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
174
            # waiting for the passphrase to be typed in
175
            args.extend(["-passin", f"pass:{key_pass}"])
176

    
177
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
178

    
179
    def __create_csr(self, subject, key, key_pass=""):
180
        """
181
        Creates a CSR (Certificate Signing Request)
182

    
183
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
184
        :param key: the private key of the subject to be used to generate the CSR
185
        :param key_pass: passphrase of the subject's private key
186
        :return: string containing the generated certificate signing request in PEM format
187
        """
188

    
189
        Logger.debug("Function launched.")
190

    
191
        subj_param = self.__subject_to_param_format(subject)
192

    
193
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
194

    
195
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
196
        # waiting for the passphrase to be typed in
197
        args.extend(["-passin", f"pass:{key_pass}"])
198

    
199
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
200

    
201
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
202
        """
203
        Signs the given CSR by the given issuer CA
204

    
205
        :param csr: a string containing the CSR to be signed
206
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
207
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
208
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
209
        format
210
        :param extensions: extensions to be applied when signing the CSR
211
        :param days: number of days for which the certificate will be valid
212
        :param sn: serial number to be set, when "None" is set a random serial number is generated
213
        :return: string containing the generated and signed certificate in PEM format
214
        """
215

    
216
        Logger.debug("Function launched.")
217

    
218
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
219
        proc_input = csr + issuer_pem + issuer_key
220

    
221
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
222
        # when serial number is not passed generate a random one
223
        if sn is None:
224
            sn = random.randint(0, MAX_SN)
225

    
226
        # prepare openssl parameters...
227
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
228
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
229
                  "-set_serial", str(sn)]
230

    
231
        with TemporaryFile("extensions.conf", extensions) as ext_path:
232
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
233
            # waiting for the passphrase to be typed in
234
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
235

    
236
            if len(extensions) > 0:
237
                params.extend(["-extfile", ext_path])
238

    
239
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
240

    
241
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
242
                   extensions="",
243
                   days=30,
244
                   sn: int = None):
245
        """
246
        Creates a certificate by using the given subject, subject's key, issuer and its key.
247

    
248
        :param subject: subject to be added to the created certificate
249
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
250
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
251
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
252
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
253
        in PEM format
254
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
255
        format
256
        :param extensions: extensions to be applied when creating the certificate
257
        :param days: number of days for which the certificate will be valid
258
        :param sn: serial number to be set, when "None" is set a random serial number is generated
259
        :return: string containing the generated certificate in PEM format
260
        """
261

    
262
        Logger.debug("Function launched.")
263

    
264
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
265
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
266
                               days=days, sn=sn)
267

    
268
    @staticmethod
269
    def verify_cert(certificate):
270
        """
271
        Verifies whether the given certificate is not expired.
272

    
273
        :param certificate: certificate to be verified in PEM format
274
        :return: Returns `true` if the certificate is not expired, `false` when expired.
275
        """
276

    
277
        Logger.debug("Function launched.")
278

    
279
        # call openssl to check whether the certificate is valid to this date
280
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
281

    
282
        # create a new process
283
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
284
                                stdout=subprocess.PIPE,
285
                                stderr=subprocess.PIPE)
286

    
287
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
288

    
289
        # zero return code means that the certificate is valid
290
        if proc.returncode == 0:
291
            return True
292
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
293
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
294
            return False
295
        else:
296
            # the process failed because of some other reason (incorrect cert format)
297
            Logger.error("CryptographyException")
298
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
299

    
300
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
301
        """
302
        Extracts a public key from the given private key passed in PEM format
303
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
304
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
305
        :return: a string containing the extracted public key in PEM format
306
        """
307

    
308
        Logger.debug("Function launched.")
309

    
310
        args = ["rsa", "-in", "-", "-pubout"]
311
        if passphrase is not None:
312
            args.extend(["-passin", f"pass:{passphrase}"])
313
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
314

    
315
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
316
        """
317
        Extracts a public key from the given certificate passed in PEM format
318
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
319
        :return: a string containing the extracted public key in PEM format
320
        """
321

    
322
        Logger.debug("Function launched.")
323

    
324
        # extracting public key from a certificate does not seem to require a passphrase even when
325
        # signed using an encrypted PK
326
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
327
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
328

    
329
    def parse_cert_pem(self, cert_pem):
330
        """
331
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
332
        and NOT_AFTER field
333
        :param cert_pem: a certificated in a PEM format to be parsed
334
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
335
        """
336

    
337
        Logger.debug("Function launched.")
338

    
339
        # run openssl x509 to view certificate content
340
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
341

    
342
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
343

    
344
        # split lines
345
        results = re.split("\n", cert_info_raw)
346
        subj_line = results[0].strip()
347
        not_before_line = results[1].strip()
348
        not_after_line = results[2].strip()
349

    
350
        # attempt to extract subject via regex
351
        match = re.search(r"subject=(.*)", subj_line)
352
        if match is None:
353
            # TODO use logger
354
            print(f"Could not find subject to parse: {subj_line}")
355
            return None
356
        else:
357
            # find all attributes (key = value)
358
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
359
            subj = Subject()
360
            for key, value in found:
361
                if key == "C":
362
                    subj.country = value.strip()
363
                elif key == "ST":
364
                    subj.state = value.strip()
365
                elif key == "L":
366
                    subj.locality = value.strip()
367
                elif key == "O":
368
                    subj.organization = value.strip()
369
                elif key == "OU":
370
                    subj.organization_unit = value.strip()
371
                elif key == "CN":
372
                    subj.common_name = value.strip()
373
                elif key == "emailAddress":
374
                    subj.email_address = value.strip()
375

    
376
        # extract notBefore and notAfter date fields
377
        not_before = re.search(r"notBefore=(.*)", not_before_line)
378
        not_after = re.search(r"notAfter=(.*)", not_after_line)
379

    
380
        # if date fields are found parse them into date objects
381
        if not_before is not None:
382
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
383
        if not_after is not None:
384
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
385

    
386
        # TODO wrapper class?
387
        # return it as a tuple
388
        return subj, not_before, not_after
389

    
390
    def get_openssl_version(self) -> str:
391
        """
392
        Get version of the OpenSSL installed on the system
393
        :return: version of the OpenSSL as returned from the process
394
        """
395

    
396
        Logger.debug("Function launched.")
397

    
398
        return self.__run_for_output(["version"]).decode("utf-8")
399

    
400
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
401
        """
402
        Generate a CertificateRevocationList for a specified
403
        certificate authority.
404

    
405
        :param key: key that is used to sign the CRL (must belong to the given certificate)
406
        :param cert: Certificate of the certificate authority that issue the CRL
407
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
408
        :return: CRL encoded in PEM format string
409
        """
410

    
411
        Logger.debug("Function launched.")
412

    
413
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
414
        with TemporaryFile("serial.srl", "0") as serial_file, \
415
             TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
416
             TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
417
             TemporaryFile("private_key.pem", key.private_key) as key_file:
418

    
419
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
420

    
421
            if key.password is not None and key.password != "":
422
                args.extend(["-passin", f"pass:{key.password}"])
423

    
424
            return self.__run_for_output(args).decode("utf-8")
425

    
426
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
427
        """
428
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
429
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
430

    
431
        :param cert: issuer certificate
432
        :param key: corresponding key
433
        :param index_path: path/to/the/generated/index/file
434
        :param der_ocsp_request: DER encoded OCSP Request
435
        :return: DER encoded OCSP Response
436
        """
437

    
438
        Logger.debug("Function launched.")
439

    
440
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
441
             TemporaryFile("private_key.pem", key.private_key) as key_file, \
442
             TemporaryFile("request.der", der_ocsp_request) as request_file:
443

    
444
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
445
                    "-reqin", request_file, "-respout", "-"]
446

    
447
            if key.password is not None and key.password != "":
448
                args.extend(["-passin", f"pass:{key.password}"])
449

    
450
            return self.__run_for_output(args)
451

    
452

    
453
class CryptographyException(Exception):
454

    
455
    def __init__(self, executable, args, message):
456
        self.executable = executable
457
        self.args = args
458
        self.message = message
459

    
460
    def __str__(self):
461
        # TODO check log is valid here
462
        msg = f"""
463
        EXECUTABLE: {self.executable}
464
        ARGS: {self.args}
465
        MESSAGE: {self.message}
466
        """
467

    
468
        Logger.error(msg)
469
        return msg
(3-3/4)