Revize b3c80ccb
Přidáno uživatelem David Friesecký před téměř 4 roky(ů)
src/services/cryptography.py | ||
---|---|---|
7 | 7 |
from src.model.certificate import Certificate |
8 | 8 |
from src.model.private_key import PrivateKey |
9 | 9 |
from src.model.subject import Subject |
10 |
from src.utils.logger import Logger |
|
10 | 11 |
from src.utils.temporary_file import TemporaryFile |
11 | 12 |
|
12 | 13 |
# encryption method to be used when generating private keys |
... | ... | |
41 | 42 |
:param subject: subject to be converted |
42 | 43 |
:return: a dictionary containing openssl field names mapped to subject's fields |
43 | 44 |
""" |
45 |
|
|
46 |
Logger.debug("Function launched.") |
|
47 |
|
|
44 | 48 |
subj_dict = {} |
45 | 49 |
if subject.common_name is not None: |
46 | 50 |
subj_dict["CN"] = subject.common_name |
... | ... | |
71 | 75 |
:param executable: Executable to be run (defaults to openssl) |
72 | 76 |
:return: If the process ends with a zero return code then the STDOUT of the process is returned as a byte array. |
73 | 77 |
""" |
78 |
|
|
79 |
Logger.debug("Function launched.") |
|
80 |
|
|
74 | 81 |
if args is None: |
75 | 82 |
args = [] |
76 | 83 |
try: |
... | ... | |
87 | 94 |
if proc.returncode != 0: |
88 | 95 |
# if the process did not result in zero result code, then raise an exception |
89 | 96 |
if err is not None and len(err) > 0: |
97 |
Logger.error("CryptographyException") |
|
90 | 98 |
raise CryptographyException(executable, args, err.decode()) |
91 | 99 |
else: |
100 |
Logger.error("CryptographyException") |
|
92 | 101 |
raise CryptographyException(executable, args, |
93 | 102 |
f""""Execution resulted in non-zero argument""") |
94 | 103 |
|
95 | 104 |
return out |
96 | 105 |
except FileNotFoundError: |
106 |
Logger.error("CryptographyException") |
|
97 | 107 |
raise CryptographyException(executable, args, f""""{executable}" not found in the current PATH.""") |
98 | 108 |
|
99 | 109 |
def create_private_key(self, passphrase=None): |
... | ... | |
103 | 113 |
encrypted at all). Empty passphrase ("") also results in a key that is not encrypted. |
104 | 114 |
:return: string containing the generated private key in PEM format |
105 | 115 |
""" |
116 |
|
|
117 |
Logger.debug("Function launched.") |
|
118 |
|
|
106 | 119 |
if passphrase is None or len(passphrase) == 0: |
107 | 120 |
return self.__run_for_output(["genrsa", "2048"]).decode() |
108 | 121 |
else: |
... | ... | |
123 | 136 |
|
124 | 137 |
:return: string containing the generated certificate in PEM format |
125 | 138 |
""" |
139 |
|
|
140 |
Logger.debug("Function launched.") |
|
141 |
|
|
126 | 142 |
assert key is not None |
127 | 143 |
assert subject is not None |
128 | 144 |
|
... | ... | |
170 | 186 |
:return: string containing the generated certificate signing request in PEM format |
171 | 187 |
""" |
172 | 188 |
|
189 |
Logger.debug("Function launched.") |
|
190 |
|
|
173 | 191 |
subj_param = self.__subject_to_param_format(subject) |
174 | 192 |
|
175 | 193 |
args = ["req", "-new", "-subj", subj_param, "-key", "-"] |
... | ... | |
195 | 213 |
:return: string containing the generated and signed certificate in PEM format |
196 | 214 |
""" |
197 | 215 |
|
216 |
Logger.debug("Function launched.") |
|
217 |
|
|
198 | 218 |
# concatenate CSR, issuer certificate and issuer's key (will be used in the openssl call) |
199 | 219 |
proc_input = csr + issuer_pem + issuer_key |
200 | 220 |
|
... | ... | |
238 | 258 |
:param sn: serial number to be set, when "None" is set a random serial number is generated |
239 | 259 |
:return: string containing the generated certificate in PEM format |
240 | 260 |
""" |
261 |
|
|
262 |
Logger.debug("Function launched.") |
|
263 |
|
|
241 | 264 |
csr = self.__create_csr(subject, subject_key, key_pass=subject_key_pass) |
242 | 265 |
return self.__sign_csr(csr, issuer_pem, issuer_key, issuer_key_pass=issuer_key_pass, extensions=extensions, |
243 | 266 |
days=days, sn=sn) |
... | ... | |
250 | 273 |
:param certificate: certificate to be verified in PEM format |
251 | 274 |
:return: Returns `true` if the certificate is not expired, `false` when expired. |
252 | 275 |
""" |
276 |
|
|
277 |
Logger.debug("Function launched.") |
|
278 |
|
|
253 | 279 |
# call openssl to check whether the certificate is valid to this date |
254 | 280 |
args = [OPENSSL_EXECUTABLE, "x509", "-checkend", "0", "-noout", "-text", "-in", "-"] |
255 | 281 |
|
... | ... | |
268 | 294 |
return False |
269 | 295 |
else: |
270 | 296 |
# the process failed because of some other reason (incorrect cert format) |
297 |
Logger.error("CryptographyException") |
|
271 | 298 |
raise CryptographyException(OPENSSL_EXECUTABLE, args, err.decode()) |
272 | 299 |
|
273 | 300 |
def extract_public_key_from_private_key(self, private_key_pem: str, passphrase=None) -> str: |
... | ... | |
277 | 304 |
:param passphrase: passphrase to be provided when the supplied private key is encrypted |
278 | 305 |
:return: a string containing the extracted public key in PEM format |
279 | 306 |
""" |
307 |
|
|
308 |
Logger.debug("Function launched.") |
|
309 |
|
|
280 | 310 |
args = ["rsa", "-in", "-", "-pubout"] |
281 | 311 |
if passphrase is not None: |
282 | 312 |
args.extend(["-passin", f"pass:{passphrase}"]) |
... | ... | |
288 | 318 |
:param cert_pem: PEM data representing a certificate from which a public key should be extracted |
289 | 319 |
:return: a string containing the extracted public key in PEM format |
290 | 320 |
""" |
321 |
|
|
322 |
Logger.debug("Function launched.") |
|
323 |
|
|
291 | 324 |
# extracting public key from a certificate does not seem to require a passphrase even when |
292 | 325 |
# signed using an encrypted PK |
293 | 326 |
args = ["x509", "-in", "-", "-noout", "-pubkey"] |
... | ... | |
300 | 333 |
:param cert_pem: a certificated in a PEM format to be parsed |
301 | 334 |
:return: a tuple containing a subject, NOT_BEFORE and NOT_AFTER dates |
302 | 335 |
""" |
336 |
|
|
337 |
Logger.debug("Function launched.") |
|
338 |
|
|
303 | 339 |
# run openssl x509 to view certificate content |
304 | 340 |
args = ["x509", "-noout", "-subject", "-startdate", "-enddate", "-in", "-"] |
305 | 341 |
|
... | ... | |
356 | 392 |
Get version of the OpenSSL installed on the system |
357 | 393 |
:return: version of the OpenSSL as returned from the process |
358 | 394 |
""" |
395 |
|
|
396 |
Logger.debug("Function launched.") |
|
397 |
|
|
359 | 398 |
return self.__run_for_output(["version"]).decode("utf-8") |
360 | 399 |
|
361 | 400 |
def generate_crl(self, cert: Certificate, key: PrivateKey, index_file_path: str) -> str: |
... | ... | |
368 | 407 |
:param index_file_path: path to a file that contains the openssl index with all revoked certificates |
369 | 408 |
:return: CRL encoded in PEM format string |
370 | 409 |
""" |
410 |
|
|
411 |
Logger.debug("Function launched.") |
|
412 |
|
|
371 | 413 |
# openssl ca requires the .srl file to exists, therefore a dummy, unused file is created |
372 | 414 |
with TemporaryFile("serial.srl", "0") as serial_file, \ |
373 | 415 |
TemporaryFile("crl.conf", CRL_CONFIG % (index_file_path, serial_file)) as config_file, \ |
... | ... | |
392 | 434 |
:param der_ocsp_request: DER encoded OCSP Request |
393 | 435 |
:return: DER encoded OCSP Response |
394 | 436 |
""" |
437 |
|
|
438 |
Logger.debug("Function launched.") |
|
439 |
|
|
395 | 440 |
with TemporaryFile("certificate.pem", cert.pem_data) as ca_certificate, \ |
396 | 441 |
TemporaryFile("private_key.pem", key.private_key) as key_file, \ |
397 | 442 |
TemporaryFile("request.der", der_ocsp_request) as request_file: |
... | ... | |
413 | 458 |
self.message = message |
414 | 459 |
|
415 | 460 |
def __str__(self): |
416 |
return f""" |
|
461 |
# TODO check log is valid here |
|
462 |
msg = f""" |
|
417 | 463 |
EXECUTABLE: {self.executable} |
418 | 464 |
ARGS: {self.args} |
419 | 465 |
MESSAGE: {self.message} |
420 | 466 |
""" |
467 |
|
|
468 |
Logger.error(msg) |
|
469 |
return msg |
Také k dispozici: Unified diff
Re #8570 - Added logs