Projekt

Obecné

Profil

Stáhnout (2.31 KB) Statistiky
| Větev: | Tag: | Revize:
1 f7fb8759 silhavyj
import json
2
import logging
3
from time import sleep
4
5
from .usb_reader import read_connected_devices
6
from config_manager import scan_period_seconds, connected_devices_filename
7
8
9
_listeners_connected = []
10
_listeners_disconnected = []
11 e22c7a67 silhavyj
_last_connected_devices = []
12 f7fb8759 silhavyj
13
14
def register_listener(callback, connected: bool = True):
15 0b96f10c Pultak
    logging.info(f'Registering callback: {callback}.')
16
17 f7fb8759 silhavyj
    if connected is True:
18
        _listeners_connected.append(callback)
19
    else:
20
        _listeners_disconnected.append(callback)
21
22
23
def _notify_listeners(listeners: list, devices: list):
24
    for callback in listeners:
25
        for device in devices:
26
            callback(device)
27
28
29
def _store_connected_devices(devices: list):
30 0b96f10c Pultak
    logging.debug('storing newly connected devices')
31 f7fb8759 silhavyj
    with open(connected_devices_filename, "w") as file:
32
        json.dump(devices, file)
33
34
35
def _load_last_connected_devices() -> list:
36 0b96f10c Pultak
    logging.debug('loading last connected devices')
37 f7fb8759 silhavyj
    try:
38
        with open(connected_devices_filename, "r") as file:
39
            return json.loads(file.read())
40
    except IOError:
41 0b96f10c Pultak
        logging.error('loading of last connected devices failed')
42 f7fb8759 silhavyj
        return []
43
44
45 e22c7a67 silhavyj
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
46
    return [device for device in detected_devices if device not in last_connected_devices]
47 f7fb8759 silhavyj
48
49 e22c7a67 silhavyj
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
50
    return [device for device in last_connected_devices if device not in detected_devices]
51
52 f7fb8759 silhavyj
53 e22c7a67 silhavyj
def _update():
54
    global _last_connected_devices
55
    detected_devices = read_connected_devices()
56 f7fb8759 silhavyj
57 e22c7a67 silhavyj
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
58
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
59
60
    _notify_listeners(_listeners_connected, connected_devices)
61
    _notify_listeners(_listeners_disconnected, disconnected_devices)
62
63
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
64 f7fb8759 silhavyj
        _store_connected_devices(detected_devices)
65 e22c7a67 silhavyj
        _last_connected_devices = detected_devices
66
67
68
def usb_detector_run():
69
    logging.info("USB device detector is now running")
70
71
    global _last_connected_devices
72
    _last_connected_devices = _load_last_connected_devices()
73
74
    while True:
75
        _update()
76 f7fb8759 silhavyj
        sleep(scan_period_seconds)