Revize 329216fe
Přidáno uživatelem Stanislav Král před asi 4 roky(ů)
src/services/certificate_service.py | ||
---|---|---|
4 | 4 | |
5 | 5 |
from src.config.configuration import Configuration |
6 | 6 |
from src.constants import ROOT_CA_ID, INTERMEDIATE_CA_ID, CA_ID, CERTIFICATE_ID, CERTIFICATE_STATES, \ |
7 |
CERTIFICATE_REVOCATION_REASONS |
|
7 |
CERTIFICATE_REVOCATION_REASONS, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID
|
|
8 | 8 |
from src.dao.certificate_repository import CertificateRepository |
9 | 9 |
from src.exceptions.certificate_not_found_exception import CertificateNotFoundException |
10 | 10 |
from src.exceptions.database_exception import DatabaseException |
... | ... | |
16 | 16 | |
17 | 17 |
import time |
18 | 18 | |
19 |
from src.utils.usages_to_extensions import usages_to_extension_lines, ExtensionFieldFlags, CRITICAL, KEY_CERT_SIGN, \ |
|
20 |
CRL_SIGN, CA, DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT, SERVER_AUTH, NON_REPUDIATION, TIME_STAMPING, \ |
|
21 |
CLIENT_AUTH |
|
22 | ||
19 | 23 |
VALID_FROM_TO_DATE_FORMAT = "%d.%m.%Y %H:%M:%S" |
20 | 24 |
CA_EXTENSIONS = "basicConstraints=critical,CA:TRUE" |
21 | 25 |
CRL_EXTENSION = "crlDistributionPoints=URI:" |
... | ... | |
23 | 27 |
STATUS_REVOKED = "revoked" |
24 | 28 |
STATUS_VALID = "valid" |
25 | 29 | |
30 |
# define which flags are required for various usages |
|
31 |
REQUIRED_USAGE_EXTENSION_FLAGS = { |
|
32 |
CA_ID: ExtensionFieldFlags({CRITICAL, KEY_CERT_SIGN, CRL_SIGN}, {}, {CRITICAL, CA}), |
|
33 |
SSL_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, KEY_ENCIPHERMENT, KEY_AGREEMENT}, {SERVER_AUTH}, {}), |
|
34 |
SIGNATURE_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE, NON_REPUDIATION}, {TIME_STAMPING}, {}), |
|
35 |
AUTHENTICATION_ID: ExtensionFieldFlags({DIGITAL_SIGNATURE}, {CLIENT_AUTH}, {})} |
|
36 | ||
26 | 37 | |
27 | 38 |
class CertificateService: |
28 | 39 | |
... | ... | |
52 | 63 | |
53 | 64 |
cert_id = self.certificate_repository.get_next_id() |
54 | 65 | |
66 |
# specify CA usage |
|
67 |
usages[CA_ID] = True |
|
68 | ||
69 |
# generate extension configuration lines based on the specified usages |
|
70 |
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS)) |
|
71 | ||
55 | 72 |
# create a new self signed certificate |
56 | 73 |
cert_pem = self.cryptography_service.create_sscrt(subject, key.private_key, key_pass=key.password, |
57 | 74 |
extensions=extensions, config=config, days=days, sn=cert_id) |
58 |
# specify CA usage |
|
59 |
usages[CA_ID] = True |
|
60 | 75 | |
61 | 76 |
# wrap into Certificate class |
62 | 77 |
certificate = self.__create_wrapper(cert_pem, key.private_key_id, usages, 0, |
... | ... | |
111 | 126 |
if usages is None: |
112 | 127 |
usages = {} |
113 | 128 | |
114 |
extensions = extensions + "\n" + CA_EXTENSIONS |
|
129 |
# specify CA usage |
|
130 |
usages[CA_ID] = True |
|
131 | ||
132 |
# generate extension configuration lines based on the specified usages |
|
133 |
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS)) |
|
134 | ||
115 | 135 |
# Add CRL and OCSP distribution point to certificate extensions |
116 | 136 |
cert_id = self.certificate_repository.get_next_id() |
117 | 137 |
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id) |
... | ... | |
125 | 145 |
days=days, |
126 | 146 |
sn=cert_id) |
127 | 147 | |
128 |
# specify CA usage |
|
129 |
usages[CA_ID] = True |
|
130 | ||
131 | 148 |
# wrap into Certificate class |
132 | 149 |
self.__create_wrapper(cert_pem, subject_key.private_key_id, usages, |
133 | 150 |
issuer_cert.certificate_id, INTERMEDIATE_CA_ID) |
... | ... | |
139 | 156 |
not_before_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_before) |
140 | 157 |
not_after_formatted = time.strftime(VALID_FROM_TO_DATE_FORMAT, not_after) |
141 | 158 | |
142 |
# specify CA usage |
|
143 |
usages[CA_ID] = True |
|
144 | ||
145 | 159 |
# create a certificate wrapper |
146 | 160 |
certificate = Certificate(-1, subject.common_name, not_before_formatted, not_after_formatted, cert_pem, |
147 | 161 |
subject_key.private_key_id, INTERMEDIATE_CA_ID, issuer_cert.certificate_id, usages) |
... | ... | |
174 | 188 |
# get the next certificate ID in order to be able to specify the serial number |
175 | 189 |
cert_id = self.certificate_repository.get_next_id() |
176 | 190 | |
191 |
# generate extension configuration lines based on the specified usages |
|
192 |
extensions = extensions + "\n" + "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS)) |
|
193 | ||
177 | 194 |
# Add CRL and OCSP distribution point to certificate extensions |
178 | 195 |
extensions = extensions + "\n" + CRL_EXTENSION + " " + self.__get_crl_endpoint(issuer_cert.certificate_id) |
179 | 196 |
extensions = extensions + "\n" + OCSP_EXTENSION + " " + self.__get_ocsp_endpoint(issuer_cert.certificate_id) |
src/services/cryptography.py | ||
---|---|---|
111 | 111 | |
112 | 112 |
def create_sscrt(self, subject, key, config="", extensions="", key_pass=None, days=30, sn: int = None): |
113 | 113 |
""" |
114 |
Creates a root CA
|
|
114 |
Creates a self signed certificate
|
|
115 | 115 | |
116 | 116 |
:param subject: an instance of <Subject> representing the subject to be added to the certificate |
117 | 117 |
:param key: private key of the CA to be used |
... | ... | |
132 | 132 |
# file instead of an extension file. Therefore the following code creates |
133 | 133 |
# the most basic configuration file with sscrt_ext section, that is later |
134 | 134 |
# reference in openssl req command using -extensions option. |
135 |
extensions += "\n" + CA_EXTENSIONS |
|
136 | 135 |
if len(config) == 0: |
137 | 136 |
config += MINIMAL_CONFIG_FILE |
138 | 137 |
config += "\n[ " + SSCRT_SECTION + " ]" + "\n" + extensions |
src/utils/usages_to_extensions.py | ||
---|---|---|
1 |
from functools import cmp_to_key |
|
2 | ||
3 |
# basic constraints |
|
4 |
from typing import Dict |
|
5 | ||
6 |
BASIC_CONSTRAINTS_KEY = "basicConstraints" |
|
7 | ||
8 |
CRITICAL = "critical" |
|
9 |
CA = "CA:TRUE" |
|
10 | ||
11 |
# key usages |
|
12 |
KEY_USAGES_KEY = "keyUsage" |
|
13 | ||
14 |
DIGITAL_SIGNATURE = "digitalSignature" |
|
15 |
NON_REPUDIATION = "nonRepudiation" |
|
16 |
KEY_ENCIPHERMENT = "keyEncipherment" |
|
17 |
DATA_ENCIPHERMENT = "dataEncipherment" |
|
18 |
KEY_AGREEMENT = "keyAgreement" |
|
19 |
KEY_CERT_SIGN = "keyCertSign" |
|
20 |
CRL_SIGN = "cRLSign" |
|
21 |
ENCIPHER_ONLY = "encipherOnly" |
|
22 |
DECIPHER_ONLY = "decipherOnly" |
|
23 | ||
24 |
# extended key usages |
|
25 |
EXTENDED_KEY_USAGE_KEY = "extendedKeyUsage" |
|
26 | ||
27 |
SERVER_AUTH = "serverAuth" |
|
28 |
CLIENT_AUTH = "clientAuth" |
|
29 |
CODE_SIGNING = "codeSigning" |
|
30 |
EMAIL_PROTECTION = "emailProtection" |
|
31 |
TIME_STAMPING = "timeStamping" |
|
32 |
OCSP_SIGNING = "OCSPSigning" |
|
33 | ||
34 | ||
35 |
class ExtensionFieldFlags: |
|
36 |
def __init__(self, key_usages_flags, extended_key_usages_flags, basic_constraints_flags): |
|
37 |
self.key_usages_flags = key_usages_flags |
|
38 |
self.extended_key_usages_flags = extended_key_usages_flags |
|
39 |
self.basic_constraints_flags = basic_constraints_flags |
|
40 | ||
41 | ||
42 |
def __compare_with_critical_prioritized(item1, item2): |
|
43 |
# compare in such way, that the CRITICAL str is always considered to be less than any other string |
|
44 |
if item1 == CRITICAL: |
|
45 |
# first item is CRITICAL, return -1 |
|
46 |
return -1 |
|
47 |
elif item2 == CRITICAL: |
|
48 |
# second item is CRITICAL, return 2 |
|
49 |
return 1 |
|
50 |
elif item1 == item2: |
|
51 |
# items are same, return 0 |
|
52 |
return 0 |
|
53 |
else: |
|
54 |
# none of the items is CRITICAL and they are not equal, compare via < str overloaded operator |
|
55 |
return -1 if item1 < item2 else 1 |
|
56 | ||
57 | ||
58 |
def usages_to_extension_lines(usages, required_extension_flags: Dict[int, ExtensionFieldFlags]): |
|
59 |
""" |
|
60 |
Converts usages dictionary to a configuration lines to be put in the extensions section |
|
61 | ||
62 |
:param usages: a dictionary containing usages to be converted into ext. configuration lines |
|
63 |
:param required_extension_flags: an object containing an information about which flags are required to be present |
|
64 |
in the extension configuration lines to be generated |
|
65 |
:return: a list of configuration lines |
|
66 |
""" |
|
67 |
# initialize sets that will represent values of all required fields (keyUsages, extendedKeyUsage, basicConstraints) |
|
68 |
key_usages = set() |
|
69 |
extended_key_usages = set() |
|
70 |
basic_constraints = set() |
|
71 | ||
72 |
# iterate over given usages |
|
73 |
for usage, value in usages.items(): |
|
74 |
# check whether the usage is set to true |
|
75 |
if value: |
|
76 |
# load required extension fields |
|
77 |
key_usage = required_extension_flags[usage] |
|
78 | ||
79 |
# append the required keyUsage flags |
|
80 |
key_usages = key_usages.union(key_usage.key_usages_flags) |
|
81 | ||
82 |
# append the required extendedKeyUsage flags |
|
83 |
extended_key_usages = extended_key_usages.union(key_usage.extended_key_usages_flags) |
|
84 | ||
85 |
# append the required basicConstraints flags |
|
86 |
basic_constraints = basic_constraints.union(key_usage.basic_constraints_flags) |
|
87 | ||
88 |
lines = [] |
|
89 | ||
90 |
# sort flags so their order is consistent and therefore testable |
|
91 |
# after sorting the flags generate an extension line joining the flags with "," character |
|
92 |
if len(basic_constraints) > 0: |
|
93 |
# "basicConstraints" (maybe) require the "critical" flag to be at the head of the list if present |
|
94 |
lines.append( |
|
95 |
f"""{BASIC_CONSTRAINTS_KEY}={",".join(sorted(basic_constraints, key=cmp_to_key(__compare_with_critical_prioritized)))}""") |
|
96 | ||
97 |
if len(key_usages) > 0: |
|
98 |
# "keyUsages" (maybe) require the "critical" flag to be at the head of the list if present |
|
99 |
lines.append( |
|
100 |
f"""{KEY_USAGES_KEY}={",".join(sorted(key_usages, key=cmp_to_key(__compare_with_critical_prioritized)))}""") |
|
101 | ||
102 |
if len(extended_key_usages) > 0: |
|
103 |
lines.append(f"""{EXTENDED_KEY_USAGE_KEY}={",".join(sorted(extended_key_usages))}""") |
|
104 | ||
105 |
return lines |
tests/unit_tests/services/cryptography/self_signed_cert_test.py | ||
---|---|---|
41 | 41 |
assert "Certificate Sign, CRL Sign" in cert_printed |
42 | 42 |
assert "X509v3 Key Usage: critical" in cert_printed |
43 | 43 |
assert "CA:TRUE" in cert_printed |
44 |
assert "X509v3 Key Usage: critical" in cert_printed |
|
45 |
assert "Certificate Sign, CRL Sign" in cert_printed |
|
44 | 46 |
assert "X509v3 CRL Distribution Points:" in cert_printed |
45 | 47 |
assert "URI:http://localhost/api/crl/0" in cert_printed |
46 | 48 |
assert "Authority Information Access:" in cert_printed |
... | ... | |
73 | 75 |
assert "Certificate Sign, CRL Sign" in cert_printed |
74 | 76 |
assert "X509v3 Key Usage: critical" in cert_printed |
75 | 77 |
assert "CA:TRUE" in cert_printed |
78 |
assert "X509v3 Key Usage: critical" in cert_printed |
|
79 |
assert "Certificate Sign, CRL Sign" in cert_printed |
|
76 | 80 |
assert "X509v3 CRL Distribution Points:" in cert_printed |
77 | 81 |
assert "URI:http://localhost/api/crl/0" in cert_printed |
78 | 82 |
assert "Authority Information Access:" in cert_printed |
tests/unit_tests/utils/usages_to_extensions_test.py | ||
---|---|---|
1 |
from src.constants import CA_ID, SSL_ID, SIGNATURE_ID, AUTHENTICATION_ID, DICT_USAGES |
|
2 |
from src.services.certificate_service import REQUIRED_USAGE_EXTENSION_FLAGS |
|
3 |
from src.utils.usages_to_extensions import usages_to_extension_lines |
|
4 | ||
5 | ||
6 |
def test_usages_to_extensions(): |
|
7 |
usages = DICT_USAGES.copy() |
|
8 |
usages[CA_ID] = True |
|
9 |
usages[SSL_ID] = True |
|
10 | ||
11 |
expected = """basicConstraints=critical,CA:TRUE |
|
12 |
keyUsage=critical,cRLSign,digitalSignature,keyAgreement,keyCertSign,keyEncipherment |
|
13 |
extendedKeyUsage=serverAuth""" |
|
14 | ||
15 |
assert "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS)) == expected |
|
16 | ||
17 | ||
18 |
def test_usages_to_extensions_2(): |
|
19 |
usages = DICT_USAGES.copy() |
|
20 |
usages[SIGNATURE_ID] = True |
|
21 |
usages[AUTHENTICATION_ID] = True |
|
22 | ||
23 |
expected = """keyUsage=digitalSignature,nonRepudiation |
|
24 |
extendedKeyUsage=clientAuth,timeStamping""" |
|
25 | ||
26 |
assert "\n".join(usages_to_extension_lines(usages, REQUIRED_USAGE_EXTENSION_FLAGS)) == expected |
Také k dispozici: Unified diff
Re #8585 - Fixed an issue where required extensions based on cert. usages were not present in the generated certificate
Created an utility that converts usages to extension configuration lines
Defined which extension field flags are required for certain usages