1
|
import json
|
2
|
import logging
|
3
|
from time import sleep
|
4
|
|
5
|
from .usb_reader import read_connected_devices
|
6
|
|
7
|
|
8
|
_listeners_connected = []
|
9
|
_listeners_disconnected = []
|
10
|
_last_connected_devices = []
|
11
|
_config = None
|
12
|
|
13
|
|
14
|
def usb_detector_set_config(config):
|
15
|
global _config
|
16
|
_config = config
|
17
|
|
18
|
|
19
|
def register_listener(callback, connected: bool = True):
|
20
|
logging.info(f"Registering callback: {callback}.")
|
21
|
if connected is True:
|
22
|
_listeners_connected.append(callback)
|
23
|
else:
|
24
|
_listeners_disconnected.append(callback)
|
25
|
|
26
|
|
27
|
def _notify_listeners(listeners: list, devices: list):
|
28
|
if listeners is None or devices is None:
|
29
|
return
|
30
|
for callback in listeners:
|
31
|
for device in devices:
|
32
|
callback(device)
|
33
|
|
34
|
|
35
|
def _store_connected_devices(devices: list):
|
36
|
logging.debug("storing newly connected devices")
|
37
|
with open(_config.connected_devices_filename, "w") as file:
|
38
|
json.dump(devices, file)
|
39
|
|
40
|
|
41
|
def _load_last_connected_devices() -> list:
|
42
|
logging.debug("loading last connected devices")
|
43
|
try:
|
44
|
with open(_config.connected_devices_filename, "r") as file:
|
45
|
return json.loads(file.read())
|
46
|
except IOError:
|
47
|
logging.error("loading of last connected devices failed")
|
48
|
return []
|
49
|
|
50
|
|
51
|
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
52
|
if last_connected_devices is None and detected_devices is not None:
|
53
|
return detected_devices
|
54
|
if detected_devices is None:
|
55
|
return []
|
56
|
return [device for device in detected_devices if device not in last_connected_devices]
|
57
|
|
58
|
|
59
|
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
60
|
if last_connected_devices is None:
|
61
|
return []
|
62
|
if detected_devices is None:
|
63
|
return last_connected_devices
|
64
|
return [device for device in last_connected_devices if device not in detected_devices]
|
65
|
|
66
|
|
67
|
def _update():
|
68
|
global _last_connected_devices
|
69
|
detected_devices = read_connected_devices()
|
70
|
|
71
|
connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
|
72
|
disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
|
73
|
|
74
|
_notify_listeners(_listeners_connected, connected_devices)
|
75
|
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
76
|
|
77
|
if len(connected_devices) > 0 or len(disconnected_devices) > 0:
|
78
|
_store_connected_devices(detected_devices)
|
79
|
_last_connected_devices = detected_devices
|
80
|
|
81
|
|
82
|
def usb_detector_run():
|
83
|
logging.info("USB device detector is now running")
|
84
|
|
85
|
global _last_connected_devices
|
86
|
_last_connected_devices = _load_last_connected_devices()
|
87
|
|
88
|
while True:
|
89
|
_update()
|
90
|
sleep(_config.scan_period_seconds)
|