1
|
import json
|
2
|
import logging
|
3
|
from time import sleep
|
4
|
|
5
|
from .usb_reader import read_connected_devices
|
6
|
|
7
|
|
8
|
_listeners_connected = [] # list of listeners (USB devices is connected)
|
9
|
_listeners_disconnected = [] # list of listeners (USB devices is disconnected)
|
10
|
|
11
|
_last_connected_devices = [] # list of the lastly connected USB devices
|
12
|
_config = None # instance of Config (config manager)
|
13
|
|
14
|
|
15
|
def usb_detector_set_config(config):
|
16
|
"""Initializes the usb detector module (file).
|
17
|
|
18
|
This function is meant to be called prior to calling
|
19
|
any other function of the detector module. It stores
|
20
|
an instance of the Config class which is then used
|
21
|
by other functions within this file.
|
22
|
|
23
|
:param config: instance of Config (config manager)
|
24
|
"""
|
25
|
# Store the instance into the global variable.
|
26
|
global _config
|
27
|
_config = config
|
28
|
|
29
|
|
30
|
def register_listener(callback, connected: bool = True):
|
31
|
"""Registers a new event listener.
|
32
|
|
33
|
The caller is supposed to passed in a function they
|
34
|
wish to be called whenever an event occurs. What kind
|
35
|
of event will trigger (call) the callback function is
|
36
|
determined by the second parameter.
|
37
|
|
38
|
:param callback: Function that is called whenever the desired event happens.
|
39
|
:param connected: If the value is set to True, the callback function
|
40
|
will be called whenever a USB device is connected.
|
41
|
If it is set to False, it will be triggered whenever a
|
42
|
USB device is disconnected.
|
43
|
"""
|
44
|
logging.info(f"Registering callback: {callback}.")
|
45
|
|
46
|
if connected is True:
|
47
|
# Register the callback for "connected devices"
|
48
|
_listeners_connected.append(callback)
|
49
|
else:
|
50
|
# Register the callback for "disconnected devices"
|
51
|
_listeners_disconnected.append(callback)
|
52
|
|
53
|
|
54
|
def _notify_listeners(listeners: list, devices: list):
|
55
|
""" Notifies (calls) all listeners based on the even that just occurred.
|
56
|
|
57
|
This function is called whenever a USB device is plugged or unplugged.
|
58
|
Based on the type of the event, the corresponding list of listeners
|
59
|
is passed in along with a list of all devices involved in the event.
|
60
|
|
61
|
:param listeners: List of listeners registered for the event.
|
62
|
:param devices: List of all USB devices involved in the event
|
63
|
(usually a single device).
|
64
|
"""
|
65
|
# Make sure both lists are not None
|
66
|
if listeners is None or devices is None:
|
67
|
return
|
68
|
|
69
|
# Iterate over the listeners and notify them
|
70
|
# of all USB devices involved in the event.
|
71
|
for callback in listeners:
|
72
|
for device in devices:
|
73
|
callback(device)
|
74
|
|
75
|
|
76
|
def _store_connected_devices(devices: list):
|
77
|
"""Stores the list of currently connected USB devices into a file.
|
78
|
|
79
|
This function is called whenever a device is connected or disconnected.
|
80
|
Its main purpose is to dump the list of the currently plugged devices
|
81
|
on the disk (so it's not kept in RAM when the computer shuts down). The
|
82
|
list is then loaded upon every start of the application.
|
83
|
|
84
|
:param devices: List of the devices that are currently connected to the PC.
|
85
|
"""
|
86
|
logging.debug("storing newly connected devices")
|
87
|
|
88
|
# Dump the list into a JSON format.
|
89
|
with open(_config.connected_devices_filename, "w") as file:
|
90
|
json.dump(devices, file)
|
91
|
|
92
|
|
93
|
def _load_last_connected_devices() -> list:
|
94
|
"""Loads the list of the connected devices from the disk.
|
95
|
|
96
|
This function is called with every start of the application.
|
97
|
It ensures that the application remembers the USB devices
|
98
|
that were connected to the PC before it was turned off
|
99
|
(persistent memory).
|
100
|
|
101
|
:return: List of the lastly connected USB devices.
|
102
|
"""
|
103
|
logging.debug("loading last connected devices")
|
104
|
try:
|
105
|
with open(_config.connected_devices_filename, "r") as file:
|
106
|
return json.loads(file.read())
|
107
|
except IOError:
|
108
|
logging.error("loading of last connected devices failed")
|
109
|
return []
|
110
|
|
111
|
|
112
|
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
113
|
"""Returns a list of USB devices that were just plugged into the computer.
|
114
|
|
115
|
Using the two lists passed in as parameters, it figures out what devices
|
116
|
were just connected to the PC. Essentially, any device of the detected_devices list
|
117
|
that does not apper in the last_connected_devices list must have been just plugged in.
|
118
|
|
119
|
:param detected_devices: list of currently plugged USB devices
|
120
|
:param last_connected_devices: list of the lastly connected USB devices
|
121
|
:return: list of all USB devices that were just plugged into the computer
|
122
|
"""
|
123
|
# If there is no previous record of what USB devices where connected to the PC,
|
124
|
# all newly-connected devices are treated as if they were just plugged in.
|
125
|
if last_connected_devices is None and detected_devices is not None:
|
126
|
return detected_devices
|
127
|
|
128
|
# Return an empty list if no devices were detected.
|
129
|
if detected_devices is None:
|
130
|
return []
|
131
|
|
132
|
# Return a list of all devices that were just plugged into the PC.
|
133
|
return [device for device in detected_devices if device not in last_connected_devices]
|
134
|
|
135
|
|
136
|
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
137
|
"""Returns a list of USB devices that were just disconnected from the computer.
|
138
|
|
139
|
Using the two lists passed in as parameters, it figures out what devices where just
|
140
|
disconnected from the computer. Basically, any device that was seen connected to the PC
|
141
|
and does not apper in the detected_devices list must have been just unplugged from the PC.
|
142
|
|
143
|
:param detected_devices: list of currently plugged USB devices
|
144
|
:param last_connected_devices: list of the lastly connected USB devices
|
145
|
:return: list of all USB devices that were just disconnected from the PC
|
146
|
"""
|
147
|
# If there is no previous record of what USB devices where connected to the PC,
|
148
|
# no devices were unplugged
|
149
|
if last_connected_devices is None:
|
150
|
return []
|
151
|
|
152
|
# Return last_connected_devices if no devices were detected.
|
153
|
if detected_devices is None:
|
154
|
return last_connected_devices
|
155
|
|
156
|
# Return a list of all devices that were just disconnected.
|
157
|
return [device for device in last_connected_devices if device not in detected_devices]
|
158
|
|
159
|
|
160
|
def _update():
|
161
|
"""Updates the USB detector.
|
162
|
|
163
|
This function is periodically called from the usb_detector_run function.
|
164
|
It uses the other functions of this file to figure out if there have
|
165
|
been any changes since the last time this function was called - what
|
166
|
USB devices have been connected/disconnected.
|
167
|
"""
|
168
|
# Retrieve a list of the currently plugged USB devices
|
169
|
# and store it globally within the file.
|
170
|
global _last_connected_devices
|
171
|
detected_devices = read_connected_devices()
|
172
|
|
173
|
# Figure out what USB devices were connected to the PC since
|
174
|
# the last time this function was called.
|
175
|
connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
|
176
|
|
177
|
# Figure out what USB devices were disconnected from the PC since
|
178
|
# the last time this function was called.
|
179
|
disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
|
180
|
|
181
|
# Notify both kinds of listeners (call the registered callback functions).
|
182
|
_notify_listeners(_listeners_connected, connected_devices)
|
183
|
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
184
|
|
185
|
# If there have been any changes, update the file on the disk (persistent memory).
|
186
|
if len(connected_devices) > 0 or len(disconnected_devices) > 0:
|
187
|
_store_connected_devices(detected_devices)
|
188
|
_last_connected_devices = detected_devices
|
189
|
|
190
|
|
191
|
def usb_detector_run():
|
192
|
"""Keeps detecting what USB devices were plugged/unplugged.
|
193
|
|
194
|
This function is instantiated as a thread that periodically scans
|
195
|
what USB devices are connected to the PC. The scan period can be
|
196
|
found and modified in the configuration file of the application.
|
197
|
"""
|
198
|
logging.info("USB device detector is now running")
|
199
|
|
200
|
# Read the list of the lastly connected USB devices from the disk (once).
|
201
|
global _last_connected_devices
|
202
|
_last_connected_devices = _load_last_connected_devices()
|
203
|
|
204
|
while True:
|
205
|
# Update the USB detector.
|
206
|
_update()
|
207
|
|
208
|
# Sleep for a predefined amount of seconds
|
209
|
sleep(_config.scan_period_seconds)
|