Projekt

Obecné

Profil

« Předchozí | Další » 

Revize ed35ce72

Přidáno uživatelem David Friesecký před téměř 4 roky(ů)

Re #8570 - Rotating file settings

Zobrazit rozdíly:

src/config/configuration.py
2 2

  
3 3
import configparser
4 4
import logging
5
from logging import handlers
6

  
5 7
from injector import singleton
6 8

  
7
from src.constants import DEFAULT_CONNECTION_STRING, TEST_DATABASE_FILE, DEFAULT_SERVER_BASE_URL
9
from src.constants import DEFAULT_CONNECTION_STRING, TEST_DATABASE_FILE, DEFAULT_SERVER_BASE_URL, LOG_NAME, \
10
    LOG_DIR_LOCATION
8 11
from src.constants import LOG_FILE_LOCATION, LOG_FORMAT
9 12
from src.utils.logger import Logger
10 13

  
......
70 73

  
71 74

  
72 75
def configure_logging():
73
    logging.basicConfig(filename=LOG_FILE_LOCATION.shortest_relative_path(),
74
                        filemode='a+',
75
                        format=LOG_FORMAT,
76
                        level=logging.DEBUG)
76
    if not os.path.exists(LOG_DIR_LOCATION.shortest_relative_path()):
77
        os.makedirs(LOG_DIR_LOCATION.shortest_relative_path())
78

  
79
    handler = logging.handlers.TimedRotatingFileHandler(
80
        LOG_FILE_LOCATION.shortest_relative_path(),
81
        when='H', interval=1)
82
    formatter = logging.Formatter(LOG_FORMAT)
83
    handler.setFormatter(formatter)
84

  
85
    app_logger = logging.getLogger(LOG_NAME)
86
    app_logger.setLevel(logging.DEBUG)
87
    app_logger.addHandler(handler)
77 88

  
78 89
    # TODO check is 'valid'
79 90
    log = logging.getLogger('werkzeug')
src/constants.py
6 6
DATABASE_FILE_LOCATION = FileAnchor("aswi2021jmsd", DATABASE_FILE)
7 7
DATETIME_FORMAT = "%d.%m.%Y %H:%M:%S"
8 8

  
9
LOG_FILE = "log/application.log"
9
LOG_DIR = "logs"
10
LOG_DIR_LOCATION = FileAnchor("aswi2021jmsd", LOG_DIR)
11
LOG_FILE = f"{LOG_DIR}/application.log"
10 12
LOG_FILE_LOCATION = FileAnchor("aswi2021jmsd", LOG_FILE)
11 13
LOG_FORMAT = "%(levelname)-8s %(asctime)s - %(message)s"
14
LOG_NAME = "app_logger"
12 15

  
13 16
REV_REASON_UNSPECIFIED = "unspecified"
14 17

  
src/services/crl_ocsp/ca_index_file_line_generator.py
3 3

  
4 4
from src.model.certificate import Certificate
5 5
from src.model.subject import Subject
6
from src.utils.logger import Logger
6 7

  
7 8
SRL_LEN = 8  # number of hex digits in serial number
8 9
TAB_CHAR = "\t"
......
11 12

  
12 13
def get_index_file_time_entry(date: struct_time):
13 14
    # convert the time to the format of openssl CA index file
15

  
16
    Logger.debug("Function launched.")
17

  
14 18
    return time.strftime(INDEX_FILE_DATE_ENTRY_FORMAT, date)
15 19

  
16 20

  
17 21
def get_distinguished_name(subject: Subject):
18 22
    # convert subject class instance to the distinguished name in the openssl CA index file format
23

  
24
    Logger.debug("Function launched.")
25

  
19 26
    return "".join([f"/{key}={value}" if value is not None else "" for key, value in subject.to_dict().items()])
20 27

  
21 28

  
......
23 30
                                   valid_to: struct_time) -> str:
24 31
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
25 32
    # index file format
33

  
34
    Logger.debug("Function launched.")
35

  
26 36
    items = [
27 37
        # certificate status flag (R stands for revoked)
28 38
        "R",
......
43 53
def create_index_file_valid_line(certificate: Certificate, subject: Subject, valid_to: struct_time) -> str:
44 54
    # converts the given certificate as well as the subject and revocation / valid_to dates to a line of openssl CA
45 55
    # index file format
56

  
57
    Logger.debug("Function launched.")
58

  
46 59
    items = [
47 60
        # certificate status flag (R stands for revoked)
48 61
        "V",
......
61 74
    return TAB_CHAR.join(items)
62 75

  
63 76
def __get_serial(cert_id) -> str:
77

  
78
    Logger.debug("Function launched.")
79

  
64 80
    srl = hex(cert_id).replace("0x", "")
65 81
    srl = "0"*(SRL_LEN - len(srl)) + srl  # generate exactly SRL_LEN digits
66 82
    return srl.upper()
src/services/crl_ocsp/crl_ocsp_service.py
8 8
from src.exceptions.unknown_exception import UnknownException
9 9
from src.services.crl_ocsp.ca_index_file_line_generator import create_index_file_revoked_line, create_index_file_valid_line
10 10
from src.services.cryptography import CryptographyService
11
from src.utils.logger import Logger
11 12
from src.utils.temporary_file import TemporaryFile
12 13

  
13 14

  
......
30 31
        :param ca_id: ID of the CA whose revoked certificates should be put into the index file
31 32
        :return: a str representing the content of a CA index file
32 33
        """
34

  
35
        Logger.debug("Function launched.")
36

  
33 37
        # get issuing certificate
34 38
        certificate = self.certificate_repository.read(ca_id)
35 39
        if certificate is None:
40
            Logger.error(f"No such certificate found 'ID = {ca_id}'.")
36 41
            raise CertificateNotFoundException(ca_id)
37 42

  
38 43
        # get subject and notAfter of the issuer
......
64 69
        :param ca_id: ID of the CA whose issued (=child) certificates should be put into the index file
65 70
        :return: a str representing the content of a CA index file
66 71
        """
72

  
73
        Logger.debug("Function launched.")
74

  
67 75
        # get issuing certificate
68 76
        certificate = self.certificate_repository.read(ca_id)
69 77
        if certificate is None:
78
            Logger.error(f"No such certificate found 'ID = {ca_id}'.")
70 79
            raise CertificateNotFoundException(ca_id)
71 80

  
72 81
        # get subject and notAfter of the issuer
......
101 110
        :param ca_id: ID of a CA whose CRL shall be generated
102 111
        :return: CRL in PEM format
103 112
        """
113

  
114
        Logger.debug("Function launched.")
115

  
104 116
        # get cert and check if the requested CA exists and if not throw an exception
105 117
        cert, key = self.check_ca_and_key(ca_id)
106 118

  
......
117 129
        :param der_ocsp_request: DER encoded OCSP Request
118 130
        :return: DER encoded OCSP Response
119 131
        """
132

  
133
        Logger.debug("Function launched.")
134

  
120 135
        cert, key = self.check_ca_and_key(ca_id)
121 136

  
122 137
        # Create an index file and call cryptography service to generate the OCSP response
......
132 147
        :param ca_id: certificate authority ID
133 148
        :return: certificate data & associated key
134 149
        """
150

  
151
        Logger.debug("Function launched.")
152

  
135 153
        # get cert and check if the requested CA exists and if not throw an exception
136 154
        cert = self.certificate_repository.read(ca_id)
137 155
        if cert is None:
......
139 157
        # get key and check if it exists
140 158
        key = self.key_repository.read(cert.private_key_id)
141 159
        if key is None:
160
            Logger.error("Corrupted database, expected PrivateKey not found.")
142 161
            raise UnknownException("Corrupted database, expected PrivateKey not found.")
143
            # TODO log this
144 162
        return cert, key
src/utils/logger.py
2 2
import logging
3 3
from pathlib import Path
4 4

  
5
from src.constants import LOG_NAME
6

  
5 7

  
6 8
class Logger:
7 9

  
......
20 22
    @staticmethod
21 23
    def debug(message: str):
22 24
        names = Logger.get_names()
23
        logging.debug(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
25
        app_logger = logging.getLogger(LOG_NAME)
26
        app_logger.debug(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
24 27

  
25 28
    @staticmethod
26 29
    def info(message: str):
27 30
        names = Logger.get_names()
28
        logging.info(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
31
        app_logger = logging.getLogger(LOG_NAME)
32
        app_logger.info(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
29 33

  
30 34
    @staticmethod
31 35
    def warning(message: str):
32 36
        names = Logger.get_names()
33
        logging.warning(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
37
        app_logger = logging.getLogger(LOG_NAME)
38
        app_logger.warning(f'{Path(names[0]).name}[{names[1]}{names[2]}()]: {message}')
34 39

  
35 40
    @staticmethod
36 41
    def error(message: str):
37 42
        names = Logger.get_names()
38
        logging.error(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}')
43
        app_logger = logging.getLogger(LOG_NAME)
44
        app_logger.error(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}')
39 45

  
40 46
    @staticmethod
41 47
    def critical(message: str):
42 48
        names = Logger.get_names()
43
        logging.critical(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}')
49
        app_logger = logging.getLogger(LOG_NAME)
50
        app_logger.critical(f'{Path(names[0]).name}[{names[1]}.{names[2]}()]: {message}')

Také k dispozici: Unified diff