Projekt

Obecné

Profil

Stáhnout (2.68 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
import logging
3
from time import sleep
4

    
5
from .usb_reader import read_connected_devices
6

    
7

    
8
_listeners_connected = []
9
_listeners_disconnected = []
10
_last_connected_devices = []
11
_config = None
12

    
13

    
14
def usb_detector_set_config(config):
15
    global _config
16
    _config = config
17

    
18

    
19
def register_listener(callback, connected: bool = True):
20
    logging.info(f"Registering callback: {callback}.")
21
    if connected is True:
22
        _listeners_connected.append(callback)
23
    else:
24
        _listeners_disconnected.append(callback)
25

    
26

    
27
def _notify_listeners(listeners: list, devices: list):
28
    if listeners is None or devices is None:
29
        return
30
    for callback in listeners:
31
        for device in devices:
32
            callback(device)
33

    
34

    
35
def _store_connected_devices(devices: list):
36
    logging.debug("storing newly connected devices")
37
    with open(_config.connected_devices_filename, "w") as file:
38
        json.dump(devices, file)
39

    
40

    
41
def _load_last_connected_devices() -> list:
42
    logging.debug("loading last connected devices")
43
    try:
44
        with open(_config.connected_devices_filename, "r") as file:
45
            return json.loads(file.read())
46
    except IOError:
47
        logging.error("loading of last connected devices failed")
48
        return []
49

    
50

    
51
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
52
    if last_connected_devices is None and detected_devices is not None:
53
        return detected_devices
54
    if detected_devices is None:
55
        return []
56
    return [device for device in detected_devices if device not in last_connected_devices]
57

    
58

    
59
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
60
    if last_connected_devices is None:
61
        return []
62
    if detected_devices is None:
63
        return last_connected_devices
64
    return [device for device in last_connected_devices if device not in detected_devices]
65

    
66

    
67
def _update():
68
    global _last_connected_devices
69
    detected_devices = read_connected_devices()
70

    
71
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
72
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
73

    
74
    _notify_listeners(_listeners_connected, connected_devices)
75
    _notify_listeners(_listeners_disconnected, disconnected_devices)
76

    
77
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
78
        _store_connected_devices(detected_devices)
79
        _last_connected_devices = detected_devices
80

    
81

    
82
def usb_detector_run():
83
    logging.info("USB device detector is now running")
84

    
85
    global _last_connected_devices
86
    _last_connected_devices = _load_last_connected_devices()
87

    
88
    while True:
89
        _update()
90
        sleep(_config.scan_period_seconds)
(2-2/4)