Revize ed35ce72
Přidáno uživatelem David Friesecký před téměř 4 roky(ů)
src/config/configuration.py | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import configparser |
4 | 4 |
import logging |
5 |
from logging import handlers |
|
6 |
|
|
5 | 7 |
from injector import singleton |
6 | 8 |
|
7 |
from src.constants import DEFAULT_CONNECTION_STRING, TEST_DATABASE_FILE, DEFAULT_SERVER_BASE_URL |
|
9 |
from src.constants import DEFAULT_CONNECTION_STRING, TEST_DATABASE_FILE, DEFAULT_SERVER_BASE_URL, LOG_NAME, \ |
|
10 |
LOG_DIR_LOCATION |
|
8 | 11 |
from src.constants import LOG_FILE_LOCATION, LOG_FORMAT |
9 | 12 |
from src.utils.logger import Logger |
10 | 13 |
|
... | ... | |
70 | 73 |
|
71 | 74 |
|
72 | 75 |
def configure_logging(): |
73 |
logging.basicConfig(filename=LOG_FILE_LOCATION.shortest_relative_path(), |
|
74 |
filemode='a+', |
|
75 |
format=LOG_FORMAT, |
|
76 |
level=logging.DEBUG) |
|
76 |
if not os.path.exists(LOG_DIR_LOCATION.shortest_relative_path()): |
|
77 |
os.makedirs(LOG_DIR_LOCATION.shortest_relative_path()) |
|
78 |
|
|
79 |
handler = logging.handlers.TimedRotatingFileHandler( |
|
80 |
LOG_FILE_LOCATION.shortest_relative_path(), |
|
81 |
when='H', interval=1) |
|
82 |
formatter = logging.Formatter(LOG_FORMAT) |
|
83 |
handler.setFormatter(formatter) |
|
84 |
|
|
85 |
app_logger = logging.getLogger(LOG_NAME) |
|
86 |
app_logger.setLevel(logging.DEBUG) |
|
87 |
app_logger.addHandler(handler) |
|
77 | 88 |
|
78 | 89 |
# TODO check is 'valid' |
79 | 90 |
log = logging.getLogger('werkzeug') |
src/constants.py | ||
---|---|---|
6 | 6 |
DATABASE_FILE_LOCATION = FileAnchor("aswi2021jmsd", DATABASE_FILE) |
7 | 7 |
DATETIME_FORMAT = "%d.%m.%Y %H:%M:%S" |
8 | 8 |
|
9 |
LOG_FILE = "log/application.log" |
|
9 |
LOG_DIR = "logs" |
|
10 |
LOG_DIR_LOCATION = FileAnchor("aswi2021jmsd", LOG_DIR) |
|
11 |
LOG_FILE = f"{LOG_DIR}/application.log" |
|
10 | 12 |
LOG_FILE_LOCATION = FileAnchor("aswi2021jmsd", LOG_FILE) |
11 | 13 |
LOG_FORMAT = "%(levelname)-8s %(asctime)s - %(message)s" |
14 |
LOG_NAME = "app_logger" |
|
12 | 15 |
|
13 | 16 |
REV_REASON_UNSPECIFIED = "unspecified" |
14 | 17 |
|
src/services/crl_ocsp/ca_index_file_line_generator.py | ||
---|---|---|
3 | 3 |
|
4 | 4 |
from src.model.certificate import Certificate |
5 | 5 |
from src.model.subject import Subject |
6 |
from src.utils.logger import Logger |
|
6 | 7 |
|
7 | 8 |
SRL_LEN = 8 # number of hex digits in serial number |
8 | 9 |
TAB_CHAR = "\t" |
... | ... | |
11 | 12 |
|
12 | 13 |
def get_index_file_time_entry(date: struct_time): |
13 | 14 |
# convert the time to the format of openssl CA index file |
15 |
|
|
16 |
Logger.debug("Function launched.") |
|
17 |
|
|
14 | 18 |
return time.strftime(INDEX_FILE_DATE_ENTRY_FORMAT, date) |
15 | 19 |
|
16 | 20 |
|
17 | 21 |
def get_distinguished_name(subject: Subject): |
18 | 22 |
# convert subject class instance to the distinguished name in the openssl CA index file format |
23 |
|
|
24 |
Logger.debug("Function launched.") |
|
25 |
|
|
19 | 26 |
return "".join([f"/{key}={value}" if value is not None else "" for key, value in subject.to_dict().items()]) |
20 | 27 |
|
21 | 28 |
|
... | ... | |
23 | 30 |
valid_to: struct_time) -> str: |
24 | 31 |
# converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA |
25 | 32 |
# index file format |
33 |
|
|
34 |
Logger.debug("Function launched.") |
|
35 |
|
|
26 | 36 |
items = [ |
27 | 37 |
# certificate status flag (R stands for revoked) |
28 | 38 |
"R", |
... | ... | |
43 | 53 |
def create_index_file_valid_line(certificate: Certificate, subject: Subject, valid_to: struct_time) -> str: |
44 | 54 |
# converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA |
45 | 55 |
# index file format |
56 |
|
|
57 |
Logger.debug("Function launched.") |
|
58 |
|
|
46 | 59 |
items = [ |
47 | 60 |
# certificate status flag (R stands for revoked) |
48 | 61 |
"V", |
... | ... | |
61 | 74 |
return TAB_CHAR.join(items) |
62 | 75 |
|
63 | 76 |
def __get_serial(cert_id) -> str: |
77 |
|
|
78 |
Logger.debug("Function launched.") |
|
79 |
|
|
64 | 80 |
srl = hex(cert_id).replace("0x", "") |
65 | 81 |
srl = "0"*(SRL_LEN - len(srl)) + srl # generate exactly SRL_LEN digits |
66 | 82 |
return srl.upper() |
src/services/crl_ocsp/crl_ocsp_service.py | ||
---|---|---|
8 | 8 |
from src.exceptions.unknown_exception import UnknownException |
9 | 9 |
from src.services.crl_ocsp.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line |
10 | 10 |
from src.services.cryptography import CryptographyService |
11 |
from src.utils.logger import Logger |
|
11 | 12 |
from src.utils.temporary_file import TemporaryFile |
12 | 13 |
|
13 | 14 |
|
... | ... | |
30 | 31 |
:param ca_id: ID of the CA whose revoked certificates should be put into the index file |
31 | 32 |
:return: a str representing the content of a CA index file |
32 | 33 |
""" |
34 |
|
|
35 |
Logger.debug("Function launched.") |
|
36 |
|
|
33 | 37 |
# get issuing certificate |
34 | 38 |
certificate = self.certificate_repository.read(ca_id) |
35 | 39 |
if certificate is None: |
40 |
Logger.error(f"No such certificate found 'ID = {ca_id}'.") |
|
36 | 41 |
raise CertificateNotFoundException(ca_id) |
37 | 42 |
|
38 | 43 |
# get subject and notAfter of the issuer |
... | ... | |
64 | 69 |
:param ca_id: ID of the CA whose issued (=child) certificates should be put into the index file |
65 | 70 |
:return: a str representing the content of a CA index file |
66 | 71 |
""" |
72 |
|
|
73 |
Logger.debug("Function launched.") |
|
74 |
|
|
67 | 75 |
# get issuing certificate |
68 | 76 |
certificate = self.certificate_repository.read(ca_id) |
69 | 77 |
if certificate is None: |
78 |
Logger.error(f"No such certificate found 'ID = {ca_id}'.") |
|
70 | 79 |
raise CertificateNotFoundException(ca_id) |
71 | 80 |
|
72 | 81 |
# get subject and notAfter of the issuer |
... | ... | |
101 | 110 |
:param ca_id: ID of a CA whose CRL shall be generated |
102 | 111 |
:return: CRL in PEM format |
103 | 112 |
""" |
113 |
|
|
114 |
Logger.debug("Function launched.") |
|
115 |
|
|
104 | 116 |
# get cert and check if the requested CA exists and if not throw an exception |
105 | 117 |
cert, key = self.check_ca_and_key(ca_id) |
106 | 118 |
|
... | ... | |
117 | 129 |
:param der_ocsp_request: DER encoded OCSP Request |
118 | 130 |
:return: DER encoded OCSP Response |
119 | 131 |
""" |
132 |
|
|
133 |
Logger.debug("Function launched.") |
|
134 |
|
|
120 | 135 |
cert, key = self.check_ca_and_key(ca_id) |
121 | 136 |
|
122 | 137 |
# Create an index file and call cryptography service to generate the OCSP response |
... | ... | |
132 | 147 |
:param ca_id: certificate authority ID |
133 | 148 |
:return: certificate data & associated key |
134 | 149 |
""" |
150 |
|
|
151 |
Logger.debug("Function launched.") |
|
152 |
|
|
135 | 153 |
# get cert and check if the requested CA exists and if not throw an exception |
136 | 154 |
cert = self.certificate_repository.read(ca_id) |
137 | 155 |
if cert is None: |
... | ... | |
139 | 157 |
# get key and check if it exists |
140 | 158 |
key = self.key_repository.read(cert.private_key_id) |
141 | 159 |
if key is None: |
160 |
Logger.error("Corrupted database, expected PrivateKey not found.") |
|
142 | 161 |
raise UnknownException("Corrupted database, expected PrivateKey not found.") |
143 |
# TODO log this |
|
144 | 162 |
return cert, key |
src/utils/logger.py | ||
---|---|---|
2 | 2 |
import logging |
3 | 3 |
from pathlib import Path |
4 | 4 |
|
5 |
from src.constants import LOG_NAME |
|
6 |
|
|
5 | 7 |
|
6 | 8 |
class Logger: |
7 | 9 |
|
... | ... | |
20 | 22 |
@staticmethod |
21 | 23 |
def debug(message: str): |
22 | 24 |
names = Logger.get_names() |
23 |
logging.debug(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
25 |
app_logger = logging.getLogger(LOG_NAME) |
|
26 |
app_logger.debug(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
24 | 27 |
|
25 | 28 |
@staticmethod |
26 | 29 |
def info(message: str): |
27 | 30 |
names = Logger.get_names() |
28 |
logging.info(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
31 |
app_logger = logging.getLogger(LOG_NAME) |
|
32 |
app_logger.info(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
29 | 33 |
|
30 | 34 |
@staticmethod |
31 | 35 |
def warning(message: str): |
32 | 36 |
names = Logger.get_names() |
33 |
logging.warning(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
37 |
app_logger = logging.getLogger(LOG_NAME) |
|
38 |
app_logger.warning(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}') |
|
34 | 39 |
|
35 | 40 |
@staticmethod |
36 | 41 |
def error(message: str): |
37 | 42 |
names = Logger.get_names() |
38 |
logging.error(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}') |
|
43 |
app_logger = logging.getLogger(LOG_NAME) |
|
44 |
app_logger.error(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}') |
|
39 | 45 |
|
40 | 46 |
@staticmethod |
41 | 47 |
def critical(message: str): |
42 | 48 |
names = Logger.get_names() |
43 |
logging.critical(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}') |
|
49 |
app_logger = logging.getLogger(LOG_NAME) |
|
50 |
app_logger.critical(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}') |
Také k dispozici: Unified diff
Re #8570 - Rotating file settings