1
|
import json
|
2
|
import logging
|
3
|
from time import sleep
|
4
|
|
5
|
from .usb_reader import read_connected_devices
|
6
|
from config_manager import scan_period_seconds, connected_devices_filename
|
7
|
|
8
|
|
9
|
_listeners_connected = []
|
10
|
_listeners_disconnected = []
|
11
|
_last_connected_devices = []
|
12
|
|
13
|
|
14
|
def register_listener(callback, connected: bool = True):
|
15
|
logging.info(f'Registering callback: {callback}.')
|
16
|
|
17
|
if connected is True:
|
18
|
_listeners_connected.append(callback)
|
19
|
else:
|
20
|
_listeners_disconnected.append(callback)
|
21
|
|
22
|
|
23
|
def _notify_listeners(listeners: list, devices: list):
|
24
|
for callback in listeners:
|
25
|
for device in devices:
|
26
|
callback(device)
|
27
|
|
28
|
|
29
|
def _store_connected_devices(devices: list):
|
30
|
logging.debug('storing newly connected devices')
|
31
|
with open(connected_devices_filename, "w") as file:
|
32
|
json.dump(devices, file)
|
33
|
|
34
|
|
35
|
def _load_last_connected_devices() -> list:
|
36
|
logging.debug('loading last connected devices')
|
37
|
try:
|
38
|
with open(connected_devices_filename, "r") as file:
|
39
|
return json.loads(file.read())
|
40
|
except IOError:
|
41
|
logging.error('loading of last connected devices failed')
|
42
|
return []
|
43
|
|
44
|
|
45
|
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
46
|
return [device for device in detected_devices if device not in last_connected_devices]
|
47
|
|
48
|
|
49
|
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
50
|
return [device for device in last_connected_devices if device not in detected_devices]
|
51
|
|
52
|
|
53
|
def _update():
|
54
|
global _last_connected_devices
|
55
|
detected_devices = read_connected_devices()
|
56
|
|
57
|
connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
|
58
|
disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
|
59
|
|
60
|
_notify_listeners(_listeners_connected, connected_devices)
|
61
|
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
62
|
|
63
|
if len(connected_devices) > 0 or len(disconnected_devices) > 0:
|
64
|
_store_connected_devices(detected_devices)
|
65
|
_last_connected_devices = detected_devices
|
66
|
|
67
|
|
68
|
def usb_detector_run():
|
69
|
logging.info("USB device detector is now running")
|
70
|
|
71
|
global _last_connected_devices
|
72
|
_last_connected_devices = _load_last_connected_devices()
|
73
|
|
74
|
while True:
|
75
|
_update()
|
76
|
sleep(scan_period_seconds)
|