Projekt

Obecné

Profil

Stáhnout (2.41 KB) Statistiky
| Větev: | Tag: | Revize:
1 f7fb8759 silhavyj
import json
2
import logging
3
from time import sleep
4
5
from .usb_reader import read_connected_devices
6
7
8
_listeners_connected = []
9
_listeners_disconnected = []
10 e22c7a67 silhavyj
_last_connected_devices = []
11 474551ae silhavyj
_config = None
12 f7fb8759 silhavyj
13
14 474551ae silhavyj
def usb_detector_set_config(config):
15
    global _config
16
    _config = config
17
18 0b96f10c Pultak
19 474551ae silhavyj
def register_listener(callback, connected: bool = True):
20
    logging.info(f"Registering callback: {callback}.")
21 f7fb8759 silhavyj
    if connected is True:
22
        _listeners_connected.append(callback)
23
    else:
24
        _listeners_disconnected.append(callback)
25
26
27
def _notify_listeners(listeners: list, devices: list):
28 474551ae silhavyj
    if listeners is None or devices is None:
29
        return
30 f7fb8759 silhavyj
    for callback in listeners:
31
        for device in devices:
32
            callback(device)
33
34
35
def _store_connected_devices(devices: list):
36 474551ae silhavyj
    logging.debug("storing newly connected devices")
37
    with open(_config.connected_devices_filename, "w") as file:
38 f7fb8759 silhavyj
        json.dump(devices, file)
39
40
41
def _load_last_connected_devices() -> list:
42 474551ae silhavyj
    logging.debug("loading last connected devices")
43 f7fb8759 silhavyj
    try:
44 474551ae silhavyj
        with open(_config.connected_devices_filename, "r") as file:
45 f7fb8759 silhavyj
            return json.loads(file.read())
46
    except IOError:
47 474551ae silhavyj
        logging.error("loading of last connected devices failed")
48 f7fb8759 silhavyj
        return []
49
50
51 e22c7a67 silhavyj
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
52
    return [device for device in detected_devices if device not in last_connected_devices]
53 f7fb8759 silhavyj
54
55 e22c7a67 silhavyj
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
56
    return [device for device in last_connected_devices if device not in detected_devices]
57
58 f7fb8759 silhavyj
59 e22c7a67 silhavyj
def _update():
60
    global _last_connected_devices
61
    detected_devices = read_connected_devices()
62 f7fb8759 silhavyj
63 e22c7a67 silhavyj
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
64
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
65
66
    _notify_listeners(_listeners_connected, connected_devices)
67
    _notify_listeners(_listeners_disconnected, disconnected_devices)
68
69
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
70 f7fb8759 silhavyj
        _store_connected_devices(detected_devices)
71 e22c7a67 silhavyj
        _last_connected_devices = detected_devices
72
73
74
def usb_detector_run():
75
    logging.info("USB device detector is now running")
76
77
    global _last_connected_devices
78
    _last_connected_devices = _load_last_connected_devices()
79
80
    while True:
81
        _update()
82 474551ae silhavyj
        sleep(_config.scan_period_seconds)