Projekt

Obecné

Profil

Stáhnout (20.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
import re
2
import subprocess
3
import time
4
import random
5

    
6
from src.constants import CRL_CONFIG
7
from src.model.certificate import Certificate
8
from src.model.private_key import PrivateKey
9
from src.model.subject import Subject
10
from src.utils.logger import Logger
11
from src.utils.temporary_file import TemporaryFile
12

    
13
# encryption method to be used when generating private keys
14
PRIVATE_KEY_ENCRYPTION_METHOD = "-aes256"
15

    
16
# openssl executable name
17
OPENSSL_EXECUTABLE = "openssl"
18

    
19
# format of NOT_BEFORE NOT_AFTER date fields
20
NOT_AFTER_BEFORE_DATE_FORMAT = "%b %d %H:%M:%S %Y %Z"
21

    
22
# minimal configuration file to be used for openssl req command
23
# specifies distinguished_name that references empty section only
24
# openssl requires this option to be present
25
MINIMAL_CONFIG_FILE = "[req]\ndistinguished_name = req_distinguished_name\n[req_distinguished_name]\n\n"
26

    
27
# section to be used to specify extensions when creating a SSCRT
28
SSCRT_SECTION = "sscrt_ext"
29

    
30
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE"
31

    
32
# upper bound of the range of random serial numbers to be generated
33
MAX_SN = 4294967296
34

    
35

    
36
class CryptographyService:
37

    
38
    @staticmethod
39
    def __subject_to_param_format(subject):
40
        """
41
        Converts the given subject to a dictionary containing openssl field names mapped to subject's fields
42
        :param subject: subject to be converted
43
        :return: a dictionary containing openssl field names mapped to subject's fields
44
        """
45

    
46
        Logger.debug("Function launched.")
47

    
48
        subj_dict = {}
49
        if subject.common_name is not None:
50
            subj_dict["CN"] = subject.common_name
51
        if subject.country is not None:
52
            subj_dict["C"] = subject.country
53
        if subject.locality is not None:
54
            subj_dict["L"] = subject.locality
55
        if subject.state is not None:
56
            subj_dict["ST"] = subject.state
57
        if subject.organization is not None:
58
            subj_dict["O"] = subject.organization
59
        if subject.organization_unit is not None:
60
            subj_dict["OU"] = subject.organization_unit
61
        if subject.email_address is not None:
62
            subj_dict["emailAddress"] = subject.email_address
63

    
64
        # merge the subject into a "subj" parameter format
65
        return "".join([f"/{key}={value}" for key, value in subj_dict.items()])
66

    
67
    @staticmethod
68
    def __run_for_output(args=None, proc_input=None, executable=OPENSSL_EXECUTABLE):
69
        """
70
        Launches a new process in which the given executable is run. STDIN and process arguments can be set.
71
        If the process ends with a non-zero then <CryptographyException> is raised.
72

    
73
        :param args: Arguments to be passed to the program.
74
        :param proc_input: String input to be passed to the stdin of the created process.
75
        :param executable: Executable to be run (defaults to openssl)
76
        :return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array.
77
        """
78

    
79
        Logger.debug("Function launched.")
80

    
81
        if args is None:
82
            args = []
83
        try:
84
            # prepend the name of the executable
85
            args.insert(0, executable)
86

    
87
            # create a new process
88
            proc = subprocess.Popen(args, stdin=subprocess.PIPE if proc_input is not None else None,
89
                                    stdout=subprocess.PIPE,
90
                                    stderr=subprocess.PIPE)
91

    
92
            out, err = proc.communicate(proc_input)
93

    
94
            if proc.returncode != 0:
95
                # if the process did not result in zero result code, then raise an exception
96
                if err is not None and len(err) > 0:
97
                    Logger.error("CryptographyException")
98
                    raise CryptographyException(executable, args, err.decode())
99
                else:
100
                    Logger.error("CryptographyException")
101
                    raise CryptographyException(executable, args,
102
                                                f""""Execution resulted in non-zero argument""")
103

    
104
            return out
105
        except FileNotFoundError:
106
            Logger.error("CryptographyException")
107
            raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""")
108

    
109
    def create_private_key(self, passphrase=None):
110
        """
111
        Creates a private key with the option to encrypt it using a passphrase.
112
        :param passphrase: A passphrase to be used when encrypting the key (if none is passed then the key is not
113
        encrypted at all). Empty passphrase ("") also results in a key that is not encrypted.
114
        :return: string containing the generated private key in PEM format
115
        """
116

    
117
        Logger.debug("Function launched.")
118

    
119
        if passphrase is None or len(passphrase) == 0:
120
            return self.__run_for_output(["genrsa", "2048"]).decode()
121
        else:
122
            return self.__run_for_output(
123
                ["genrsa", PRIVATE_KEY_ENCRYPTION_METHOD, "-passout", f"pass:{passphrase}", "2048"]).decode()
124

    
125
    def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None):
126
        """
127
        Creates a self signed certificate
128

    
129
        :param subject: an instance of <Subject> representing the subject to be added to the certificate
130
        :param key: private key of the CA to be used
131
        :param config: string containing the configuration to be used
132
        :param extensions: name of the section in the configuration representing extensions
133
        :param key_pass: passphrase of the private key
134
        :param days: number of days for which the certificate will be valid
135
        :param sn: serial number to be set, when "None" is set a random serial number is generated
136

    
137
        :return: string containing the generated certificate in PEM format
138
        """
139

    
140
        Logger.debug("Function launched.")
141

    
142
        assert key is not None
143
        assert subject is not None
144

    
145
        subj = self.__subject_to_param_format(subject)
146

    
147
        # To specify extension for creating a SSCRT, one has to use a configuration
148
        # file instead of an extension file. Therefore the following code creates
149
        # the most basic configuration file with sscrt_ext section, that is later
150
        # reference in openssl req command using -extensions option.
151
        if len(config) == 0:
152
            config += MINIMAL_CONFIG_FILE
153
        config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions
154

    
155
        with TemporaryFile("openssl.conf", config) as conf_path:
156
            args = ["req", "-x509", "-new", "-subj", subj, "-days", f"{days}",
157
                    "-key", "-"]
158

    
159
            # serial number passed, use it when generating the certificate,
160
            # without passing it openssl generates a random one
161
            if sn is not None:
162
                args.extend(["-set_serial", str(sn)])
163

    
164
            if len(config) > 0:
165
                args.extend(["-config", conf_path])
166
            if len(extensions) > 0:
167
                args.extend(["-extensions", SSCRT_SECTION])  # when creating SSCRT, section references section in config
168

    
169
            # it would be best to not send the pass phrase at all, but for some reason pytest then prompts for
170
            # the pass phrase (this does not happen when run from pycharm)
171

    
172
            #  add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
173
            # waiting for the passphrase to be typed in
174
            args.extend(["-passin", f"pass:{key_pass}"])
175

    
176
            return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
177

    
178
    def __create_csr(self, subject, key, key_pass=""):
179
        """
180
        Creates a CSR (Certificate Signing Request)
181

    
182
        :param subject: an instance of <Subject> representing the subject to be added to the CSR
183
        :param key: the private key of the subject to be used to generate the CSR
184
        :param key_pass: passphrase of the subject's private key
185
        :return: string containing the generated certificate signing request in PEM format
186
        """
187

    
188
        Logger.debug("Function launched.")
189

    
190
        subj_param = self.__subject_to_param_format(subject)
191

    
192
        args = ["req", "-new", "-subj", subj_param, "-key", "-"]
193

    
194
        # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
195
        # waiting for the passphrase to be typed in
196
        args.extend(["-passin", f"pass:{key_pass}"])
197

    
198
        return self.__run_for_output(args, proc_input=bytes(key, encoding="utf-8")).decode()
199

    
200
    def __sign_csr(self, csr, issuer_pem, issuer_key, issuer_key_pass=None, extensions="", days=30, sn: int = None):
201
        """
202
        Signs the given CSR by the given issuer CA
203

    
204
        :param csr: a string containing the CSR to be signed
205
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
206
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
207
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
208
        format
209
        :param extensions: extensions to be applied when signing the CSR
210
        :param days: number of days for which the certificate will be valid
211
        :param sn: serial number to be set, when "None" is set a random serial number is generated
212
        :return: string containing the generated and signed certificate in PEM format
213
        """
214

    
215
        Logger.debug("Function launched.")
216

    
217
        # concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call)
218
        proc_input = csr + issuer_pem + issuer_key
219

    
220
        # TODO find a better way to generate a random serial number or let openssl generate a .srl file
221
        # when serial number is not passed generate a random one
222
        if sn is None:
223
            sn = random.randint(0, MAX_SN)
224

    
225
        # prepare openssl parameters...
226
        # CSR, CA and CA's private key will be passed via stdin (that's the meaning of the '-' symbol)
227
        params = ["x509", "-req", "-in", "-", "-CA", "-", "-CAkey", "-", "-CAcreateserial", "-days", str(days),
228
                  "-set_serial", str(sn)]
229

    
230
        with TemporaryFile("extensions.conf", extensions) as ext_path:
231
            # add the passphrase even when None is passed. Otherwise when running tests with pytest some tests freeze
232
            # waiting for the passphrase to be typed in
233
            params.extend(["-passin", f"pass:{issuer_key_pass}"])
234

    
235
            if len(extensions) > 0:
236
                params.extend(["-extfile", ext_path])
237

    
238
            return self.__run_for_output(params, proc_input=(bytes(proc_input, encoding="utf-8"))).decode()
239

    
240
    def create_crt(self, subject, subject_key, issuer_pem, issuer_key, subject_key_pass=None, issuer_key_pass=None,
241
                   extensions="",
242
                   days=30,
243
                   sn: int = None):
244
        """
245
        Creates a certificate by using the given subject, subject's key, issuer and its key.
246

    
247
        :param subject: subject to be added to the created certificate
248
        :param subject_key: string containing the private key to be used when creating the certificate in PEM format
249
        :param issuer_key: string containing the private key of the issuer's certificate in PEM format
250
        :param issuer_pem: string containing the certificate of the issuer that will sign this CSR in PEM format
251
        :param subject_key_pass: string containing the passphrase of the private key used when creating the certificate
252
        in PEM format
253
        :param issuer_key_pass: string containing the passphrase of the private key of the issuer's certificate in PEM
254
        format
255
        :param extensions: extensions to be applied when creating the certificate
256
        :param days: number of days for which the certificate will be valid
257
        :param sn: serial number to be set, when "None" is set a random serial number is generated
258
        :return: string containing the generated certificate in PEM format
259
        """
260

    
261
        Logger.debug("Function launched.")
262

    
263
        csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass)
264
        return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions,
265
                               days=days, sn=sn)
266

    
267
    @staticmethod
268
    def verify_cert(certificate):
269
        """
270
        Verifies whether the given certificate is not expired.
271

    
272
        :param certificate: certificate to be verified in PEM format
273
        :return: Returns `true` if the certificate is not expired, `false` when expired.
274
        """
275

    
276
        Logger.debug("Function launched.")
277

    
278
        # call openssl to check whether the certificate is valid to this date
279
        args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"]
280

    
281
        # create a new process
282
        proc = subprocess.Popen(args, stdin=subprocess.PIPE,
283
                                stdout=subprocess.PIPE,
284
                                stderr=subprocess.PIPE)
285

    
286
        out, err = proc.communicate(bytes(certificate, encoding="utf-8"))
287

    
288
        # zero return code means that the certificate is valid
289
        if proc.returncode == 0:
290
            return True
291
        elif proc.returncode == 1 and "Certificate will expire" in out.decode():
292
            # 1 return code means that the certificate is invalid but such message has to be present in the proc output
293
            return False
294
        else:
295
            # the process failed because of some other reason (incorrect cert format)
296
            Logger.error("CryptographyException")
297
            raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode())
298

    
299
    def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str:
300
        """
301
        Extracts a public key from the given private key passed in PEM format
302
        :param private_key_pem: PEM data representing the private key from which a public key should be extracted
303
        :param passphrase: passphrase to be provided when the supplied private key is encrypted
304
        :return: a string containing the extracted public key in PEM format
305
        """
306

    
307
        Logger.debug("Function launched.")
308

    
309
        args = ["rsa", "-in", "-", "-pubout"]
310
        if passphrase is not None:
311
            args.extend(["-passin", f"pass:{passphrase}"])
312
        return self.__run_for_output(args, proc_input=bytes(private_key_pem, encoding="utf-8")).decode()
313

    
314
    def extract_public_key_from_certificate(self, cert_pem: str) -> str:
315
        """
316
        Extracts a public key from the given certificate passed in PEM format
317
        :param cert_pem: PEM data representing a certificate from which a public key should be extracted
318
        :return: a string containing the extracted public key in PEM format
319
        """
320

    
321
        Logger.debug("Function launched.")
322

    
323
        # extracting public key from a certificate does not seem to require a passphrase even when
324
        # signed using an encrypted PK
325
        args = ["x509", "-in", "-", "-noout", "-pubkey"]
326
        return self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
327

    
328
    def parse_cert_pem(self, cert_pem):
329
        """
330
        Parses the given certificate in PEM format and returns the subject of the certificate and it's NOT_BEFORE
331
        and NOT_AFTER field
332
        :param cert_pem: a certificated in a PEM format to be parsed
333
        :return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates
334
        """
335

    
336
        Logger.debug("Function launched.")
337

    
338
        # run openssl x509 to view certificate content
339
        args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"]
340

    
341
        cert_info_raw = self.__run_for_output(args, proc_input=bytes(cert_pem, encoding="utf-8")).decode()
342

    
343
        # split lines
344
        results = re.split("\n", cert_info_raw)
345
        subj_line = results[0].strip()
346
        not_before_line = results[1].strip()
347
        not_after_line = results[2].strip()
348

    
349
        # attempt to extract subject via regex
350
        match = re.search(r"subject=(.*)", subj_line)
351
        if match is None:
352
            # TODO use logger
353
            print(f"Could not find subject to parse: {subj_line}")
354
            return None
355
        else:
356
            # find all attributes (key = value)
357
            found = re.findall(r"\s?([^c=\s]+)\s?=\s?([^,\n]+)", match.group(1))
358
            subj = Subject()
359
            for key, value in found:
360
                if key == "C":
361
                    subj.country = value.strip()
362
                elif key == "ST":
363
                    subj.state = value.strip()
364
                elif key == "L":
365
                    subj.locality = value.strip()
366
                elif key == "O":
367
                    subj.organization = value.strip()
368
                elif key == "OU":
369
                    subj.organization_unit = value.strip()
370
                elif key == "CN":
371
                    subj.common_name = value.strip()
372
                elif key == "emailAddress":
373
                    subj.email_address = value.strip()
374

    
375
        # extract notBefore and notAfter date fields
376
        not_before = re.search(r"notBefore=(.*)", not_before_line)
377
        not_after = re.search(r"notAfter=(.*)", not_after_line)
378

    
379
        # if date fields are found parse them into date objects
380
        if not_before is not None:
381
            not_before = time.strptime(not_before.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
382
        if not_after is not None:
383
            not_after = time.strptime(not_after.group(1).strip(), NOT_AFTER_BEFORE_DATE_FORMAT)
384

    
385
        # TODO wrapper class?
386
        # return it as a tuple
387
        return subj, not_before, not_after
388

    
389
    def get_openssl_version(self) -> str:
390
        """
391
        Get version of the OpenSSL installed on the system
392
        :return: version of the OpenSSL as returned from the process
393
        """
394

    
395
        Logger.debug("Function launched.")
396

    
397
        return self.__run_for_output(["version"]).decode("utf-8")
398

    
399
    def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str:
400
        """
401
        Generate a CertificateRevocationList for a specified
402
        certificate authority.
403

    
404
        :param key: key that is used to sign the CRL (must belong to the given certificate)
405
        :param cert: Certificate of the certificate authority that issue the CRL
406
        :param index_file_path: path to a file that contains the openssl index with all revoked certificates
407
        :return: CRL encoded in PEM format string
408
        """
409

    
410
        Logger.debug("Function launched.")
411

    
412
        # openssl ca requires the .srl file to exists, therefore a dummy, unused file is created
413
        with TemporaryFile("serial.srl", "0") as serial_file, \
414
             TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \
415
             TemporaryFile("certificate.pem", cert.pem_data) as cert_file, \
416
             TemporaryFile("private_key.pem", key.private_key) as key_file:
417

    
418
            args = ["ca", "-config", config_file, "-gencrl", "-keyfile", key_file, "-cert", cert_file, "-outdir", "."]
419

    
420
            if key.password is not None and key.password != "":
421
                args.extend(["-passin", f"pass:{key.password}"])
422

    
423
            return self.__run_for_output(args).decode("utf-8")
424

    
425
    def generate_ocsp(self, cert, key, index_path, der_ocsp_request):
426
        """
427
        Generate an OCSP Response from an OCSP Request given the issuer cert, issuer cert key and the index file.
428
        The OSCP Response is signed by the CA itself (recommended way according to multiple sources).
429

    
430
        :param cert: issuer certificate
431
        :param key: corresponding key
432
        :param index_path: path/to/the/generated/index/file
433
        :param der_ocsp_request: DER encoded OCSP Request
434
        :return: DER encoded OCSP Response
435
        """
436

    
437
        Logger.debug("Function launched.")
438

    
439
        with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \
440
             TemporaryFile("private_key.pem", key.private_key) as key_file, \
441
             TemporaryFile("request.der", der_ocsp_request) as request_file:
442

    
443
            args = ["ocsp", "-index", index_path, "-CA", ca_certificate, "-rsigner", ca_certificate, "-rkey", key_file,
444
                    "-reqin", request_file, "-respout", "-"]
445

    
446
            if key.password is not None and key.password != "":
447
                args.extend(["-passin", f"pass:{key.password}"])
448

    
449
            return self.__run_for_output(args)
450

    
451

    
452
class CryptographyException(Exception):
453

    
454
    def __init__(self, executable, args, message):
455
        self.executable = executable
456
        self.args = args
457
        self.message = message
458

    
459
    def __str__(self):
460
        # TODO check log is valid here
461
        msg = f"""
462
        EXECUTABLE: {self.executable}
463
        ARGS: {self.args}
464
        MESSAGE: {self.message}
465
        """
466

    
467
        Logger.error(msg)
468
        return msg
(3-3/4)