Projekt

Obecné

Profil

Stáhnout (2.41 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
import logging
3
from time import sleep
4

    
5
from .usb_reader import read_connected_devices
6

    
7

    
8
_listeners_connected = []
9
_listeners_disconnected = []
10
_last_connected_devices = []
11
_config = None
12

    
13

    
14
def usb_detector_set_config(config):
15
    global _config
16
    _config = config
17

    
18

    
19
def register_listener(callback, connected: bool = True):
20
    logging.info(f"Registering callback: {callback}.")
21
    if connected is True:
22
        _listeners_connected.append(callback)
23
    else:
24
        _listeners_disconnected.append(callback)
25

    
26

    
27
def _notify_listeners(listeners: list, devices: list):
28
    if listeners is None or devices is None:
29
        return
30
    for callback in listeners:
31
        for device in devices:
32
            callback(device)
33

    
34

    
35
def _store_connected_devices(devices: list):
36
    logging.debug("storing newly connected devices")
37
    with open(_config.connected_devices_filename, "w") as file:
38
        json.dump(devices, file)
39

    
40

    
41
def _load_last_connected_devices() -> list:
42
    logging.debug("loading last connected devices")
43
    try:
44
        with open(_config.connected_devices_filename, "r") as file:
45
            return json.loads(file.read())
46
    except IOError:
47
        logging.error("loading of last connected devices failed")
48
        return []
49

    
50

    
51
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
52
    return [device for device in detected_devices if device not in last_connected_devices]
53

    
54

    
55
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
56
    return [device for device in last_connected_devices if device not in detected_devices]
57

    
58

    
59
def _update():
60
    global _last_connected_devices
61
    detected_devices = read_connected_devices()
62

    
63
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
64
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
65

    
66
    _notify_listeners(_listeners_connected, connected_devices)
67
    _notify_listeners(_listeners_disconnected, disconnected_devices)
68

    
69
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
70
        _store_connected_devices(detected_devices)
71
        _last_connected_devices = detected_devices
72

    
73

    
74
def usb_detector_run():
75
    logging.info("USB device detector is now running")
76

    
77
    global _last_connected_devices
78
    _last_connected_devices = _load_last_connected_devices()
79

    
80
    while True:
81
        _update()
82
        sleep(_config.scan_period_seconds)
(2-2/4)