1
|
import json
|
2
|
import logging
|
3
|
from time import sleep
|
4
|
|
5
|
from .usb_reader import read_connected_devices
|
6
|
from config_manager import scan_period_seconds, connected_devices_filename
|
7
|
|
8
|
|
9
|
_listeners_connected = []
|
10
|
_listeners_disconnected = []
|
11
|
|
12
|
|
13
|
def register_listener(callback, connected: bool = True):
|
14
|
logging.info(f'Registering callback: {callback}.')
|
15
|
|
16
|
if connected is True:
|
17
|
_listeners_connected.append(callback)
|
18
|
else:
|
19
|
_listeners_disconnected.append(callback)
|
20
|
|
21
|
|
22
|
def _notify_listeners(listeners: list, devices: list):
|
23
|
for callback in listeners:
|
24
|
for device in devices:
|
25
|
callback(device)
|
26
|
|
27
|
|
28
|
def _store_connected_devices(devices: list):
|
29
|
logging.debug('storing newly connected devices')
|
30
|
with open(connected_devices_filename, "w") as file:
|
31
|
json.dump(devices, file)
|
32
|
|
33
|
|
34
|
def _load_last_connected_devices() -> list:
|
35
|
logging.debug('loading last connected devices')
|
36
|
try:
|
37
|
with open(connected_devices_filename, "r") as file:
|
38
|
return json.loads(file.read())
|
39
|
except IOError:
|
40
|
logging.error('loading of last connected devices failed')
|
41
|
return []
|
42
|
|
43
|
|
44
|
def usb_detector_run():
|
45
|
logging.info("USB device detector is now running")
|
46
|
|
47
|
while True:
|
48
|
last_connected_devices = _load_last_connected_devices()
|
49
|
detected_devices = read_connected_devices()
|
50
|
|
51
|
connected_devices = [device for device in detected_devices if device not in last_connected_devices]
|
52
|
disconnected_devices = [device for device in last_connected_devices if device not in detected_devices]
|
53
|
|
54
|
_notify_listeners(_listeners_connected, connected_devices)
|
55
|
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
56
|
|
57
|
_store_connected_devices(detected_devices)
|
58
|
sleep(scan_period_seconds)
|