1
|
import os
|
2
|
import json
|
3
|
import logging
|
4
|
from time import sleep
|
5
|
|
6
|
from .usb_reader import read_connected_devices
|
7
|
from config_manager import scan_period_seconds, connected_devices_filename
|
8
|
|
9
|
|
10
|
_listeners_connected = []
|
11
|
_listeners_disconnected = []
|
12
|
|
13
|
|
14
|
def register_listener(callback, connected: bool = True):
|
15
|
if connected is True:
|
16
|
_listeners_connected.append(callback)
|
17
|
else:
|
18
|
_listeners_disconnected.append(callback)
|
19
|
|
20
|
|
21
|
def _notify_listeners(listeners: list, devices: list):
|
22
|
for callback in listeners:
|
23
|
for device in devices:
|
24
|
callback(device)
|
25
|
|
26
|
|
27
|
def _store_connected_devices(devices: list):
|
28
|
with open(connected_devices_filename, "w") as file:
|
29
|
json.dump(devices, file)
|
30
|
|
31
|
|
32
|
def _load_last_connected_devices() -> list:
|
33
|
try:
|
34
|
with open(connected_devices_filename, "r") as file:
|
35
|
return json.loads(file.read())
|
36
|
except IOError:
|
37
|
return []
|
38
|
|
39
|
|
40
|
def usb_detector_run():
|
41
|
logging.info("USB device detector is now running")
|
42
|
|
43
|
if not os.path.exists(connected_devices_filename):
|
44
|
dir_name = os.path.dirname(connected_devices_filename)
|
45
|
os.makedirs(dir_name)
|
46
|
|
47
|
while True:
|
48
|
last_connected_devices = _load_last_connected_devices()
|
49
|
detected_devices = read_connected_devices()
|
50
|
|
51
|
connected_devices = [device for device in detected_devices if device not in last_connected_devices]
|
52
|
disconnected_devices = [device for device in last_connected_devices if device not in detected_devices]
|
53
|
|
54
|
_notify_listeners(_listeners_connected, connected_devices)
|
55
|
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
56
|
|
57
|
_store_connected_devices(detected_devices)
|
58
|
sleep(scan_period_seconds)
|