5 |
5 |
from .usb_reader import read_connected_devices
|
6 |
6 |
|
7 |
7 |
|
8 |
|
_listeners_connected = []
|
9 |
|
_listeners_disconnected = []
|
10 |
|
_last_connected_devices = []
|
11 |
|
_config = None
|
|
8 |
_listeners_connected = [] # list of listeners (USB devices is connected)
|
|
9 |
_listeners_disconnected = [] # list of listeners (USB devices is disconnected)
|
|
10 |
|
|
11 |
_last_connected_devices = [] # list of the lastly connected USB devices
|
|
12 |
_config = None # instance of Config (config manager)
|
12 |
13 |
|
13 |
14 |
|
14 |
15 |
def usb_detector_set_config(config):
|
|
16 |
"""Initializes the usb detector module (file).
|
|
17 |
|
|
18 |
This function is meant to be called prior to calling
|
|
19 |
any other function of the detector module. It stores
|
|
20 |
an instance of the Config class which is then used
|
|
21 |
by other functions within this file.
|
|
22 |
|
|
23 |
:param config: instance of Config (config manager)
|
|
24 |
"""
|
|
25 |
# Store the instance into the global variable.
|
15 |
26 |
global _config
|
16 |
27 |
_config = config
|
17 |
28 |
|
18 |
29 |
|
19 |
30 |
def register_listener(callback, connected: bool = True):
|
|
31 |
"""Registers a new event listener.
|
|
32 |
|
|
33 |
The caller is supposed to passed in a function they
|
|
34 |
wish to be called whenever an event occurs. What kind
|
|
35 |
of event will trigger (call) the callback function is
|
|
36 |
determined by the second parameter.
|
|
37 |
|
|
38 |
:param callback: Function that is called whenever the desired event happens.
|
|
39 |
:param connected: If the value is set to True, the callback function
|
|
40 |
will be called whenever a USB device is connected.
|
|
41 |
If it is set to False, it will be triggered whenever a
|
|
42 |
USB device is disconnected.
|
|
43 |
"""
|
20 |
44 |
logging.info(f"Registering callback: {callback}.")
|
|
45 |
|
21 |
46 |
if connected is True:
|
|
47 |
# Register the callback for "connected devices"
|
22 |
48 |
_listeners_connected.append(callback)
|
23 |
49 |
else:
|
|
50 |
# Register the callback for "disconnected devices"
|
24 |
51 |
_listeners_disconnected.append(callback)
|
25 |
52 |
|
26 |
53 |
|
27 |
54 |
def _notify_listeners(listeners: list, devices: list):
|
|
55 |
""" Notifies (calls) all listeners based on the even that just occurred.
|
|
56 |
|
|
57 |
This function is called whenever a USB device is plugged or unplugged.
|
|
58 |
Based on the type of the event, the corresponding list of listeners
|
|
59 |
is passed in along with a list of all devices involved in the event.
|
|
60 |
|
|
61 |
:param listeners: List of listeners registered for the event.
|
|
62 |
:param devices: List of all USB devices involved in the event
|
|
63 |
(usually a single device).
|
|
64 |
"""
|
|
65 |
# Make sure both lists are not None
|
28 |
66 |
if listeners is None or devices is None:
|
29 |
67 |
return
|
|
68 |
|
|
69 |
# Iterate over the listeners and notify them
|
|
70 |
# of all USB devices involved in the event.
|
30 |
71 |
for callback in listeners:
|
31 |
72 |
for device in devices:
|
32 |
73 |
callback(device)
|
33 |
74 |
|
34 |
75 |
|
35 |
76 |
def _store_connected_devices(devices: list):
|
|
77 |
"""Stores the list of currently connected USB devices into a file.
|
|
78 |
|
|
79 |
This function is called whenever a device is connected or disconnected.
|
|
80 |
Its main purpose is to dump the list of the currently plugged devices
|
|
81 |
on the disk (so it's not kept in RAM when the computer shuts down). The
|
|
82 |
list is then loaded upon every start of the application.
|
|
83 |
|
|
84 |
:param devices: List of the devices that are currently connected to the PC.
|
|
85 |
"""
|
36 |
86 |
logging.debug("storing newly connected devices")
|
|
87 |
|
|
88 |
# Dump the list into a JSON format.
|
37 |
89 |
with open(_config.connected_devices_filename, "w") as file:
|
38 |
90 |
json.dump(devices, file)
|
39 |
91 |
|
40 |
92 |
|
41 |
93 |
def _load_last_connected_devices() -> list:
|
|
94 |
"""Loads the list of the connected devices from the disk.
|
|
95 |
|
|
96 |
This function is called with every start of the application.
|
|
97 |
It ensures that the application remembers the USB devices
|
|
98 |
that were connected to the PC before it was turned off
|
|
99 |
(persistent memory).
|
|
100 |
|
|
101 |
:return: List of the lastly connected USB devices.
|
|
102 |
"""
|
42 |
103 |
logging.debug("loading last connected devices")
|
43 |
104 |
try:
|
44 |
105 |
with open(_config.connected_devices_filename, "r") as file:
|
... | ... | |
49 |
110 |
|
50 |
111 |
|
51 |
112 |
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
|
113 |
"""Returns a list of USB devices that were just plugged into the computer.
|
|
114 |
|
|
115 |
Using the two lists passed in as parameters, it figures out what devices
|
|
116 |
were just connected to the PC. Essentially, any device of the detected_devices list
|
|
117 |
that does not apper in the last_connected_devices list must have been just plugged in.
|
|
118 |
|
|
119 |
:param detected_devices: list of currently plugged USB devices
|
|
120 |
:param last_connected_devices: list of the lastly connected USB devices
|
|
121 |
:return: list of all USB devices that were just plugged into the computer
|
|
122 |
"""
|
|
123 |
# If there is no previous record of what USB devices where connected to the PC,
|
|
124 |
# all newly-connected devices are treated as if they were just plugged in.
|
52 |
125 |
if last_connected_devices is None and detected_devices is not None:
|
53 |
126 |
return detected_devices
|
|
127 |
|
|
128 |
# Return an empty list if no devices were detected.
|
54 |
129 |
if detected_devices is None:
|
55 |
130 |
return []
|
|
131 |
|
|
132 |
# Return a list of all devices that were just plugged into the PC.
|
56 |
133 |
return [device for device in detected_devices if device not in last_connected_devices]
|
57 |
134 |
|
58 |
135 |
|
59 |
136 |
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
|
|
137 |
"""Returns a list of USB devices that were just disconnected from the computer.
|
|
138 |
|
|
139 |
Using the two lists passed in as parameters, it figures out what devices where just
|
|
140 |
disconnected from the computer. Basically, any device that was seen connected to the PC
|
|
141 |
and does not apper in the detected_devices list must have been just unplugged from the PC.
|
|
142 |
|
|
143 |
:param detected_devices: list of currently plugged USB devices
|
|
144 |
:param last_connected_devices: list of the lastly connected USB devices
|
|
145 |
:return: list of all USB devices that were just disconnected from the PC
|
|
146 |
"""
|
|
147 |
# If there is no previous record of what USB devices where connected to the PC,
|
|
148 |
# no devices were unplugged
|
60 |
149 |
if last_connected_devices is None:
|
61 |
150 |
return []
|
|
151 |
|
|
152 |
# Return last_connected_devices if no devices were detected.
|
62 |
153 |
if detected_devices is None:
|
63 |
154 |
return last_connected_devices
|
|
155 |
|
|
156 |
# Return a list of all devices that were just disconnected.
|
64 |
157 |
return [device for device in last_connected_devices if device not in detected_devices]
|
65 |
158 |
|
66 |
159 |
|
67 |
160 |
def _update():
|
|
161 |
"""Updates the USB detector.
|
|
162 |
|
|
163 |
This function is periodically called from the usb_detector_run function.
|
|
164 |
It uses the other functions of this file to figure out if there have
|
|
165 |
been any changes since the last time this function was called - what
|
|
166 |
USB devices have been connected/disconnected.
|
|
167 |
"""
|
|
168 |
# Retrieve a list of the currently plugged USB devices
|
|
169 |
# and store it globally within the file.
|
68 |
170 |
global _last_connected_devices
|
69 |
171 |
detected_devices = read_connected_devices()
|
70 |
172 |
|
|
173 |
# Figure out what USB devices were connected to the PC since
|
|
174 |
# the last time this function was called.
|
71 |
175 |
connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
|
|
176 |
|
|
177 |
# Figure out what USB devices were disconnected from the PC since
|
|
178 |
# the last time this function was called.
|
72 |
179 |
disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
|
73 |
180 |
|
|
181 |
# Notify both kinds of listeners (call the registered callback functions).
|
74 |
182 |
_notify_listeners(_listeners_connected, connected_devices)
|
75 |
183 |
_notify_listeners(_listeners_disconnected, disconnected_devices)
|
76 |
184 |
|
|
185 |
# If there have been any changes, update the file on the disk (persistent memory).
|
77 |
186 |
if len(connected_devices) > 0 or len(disconnected_devices) > 0:
|
78 |
187 |
_store_connected_devices(detected_devices)
|
79 |
188 |
_last_connected_devices = detected_devices
|
80 |
189 |
|
81 |
190 |
|
82 |
191 |
def usb_detector_run():
|
|
192 |
"""Keeps detecting what USB devices were plugged/unplugged.
|
|
193 |
|
|
194 |
This function is instantiated as a thread that periodically scans
|
|
195 |
what USB devices are connected to the PC. The scan period can be
|
|
196 |
found and modified in the configuration file of the application.
|
|
197 |
"""
|
83 |
198 |
logging.info("USB device detector is now running")
|
84 |
199 |
|
|
200 |
# Read the list of the lastly connected USB devices from the disk (once).
|
85 |
201 |
global _last_connected_devices
|
86 |
202 |
_last_connected_devices = _load_last_connected_devices()
|
87 |
203 |
|
88 |
204 |
while True:
|
|
205 |
# Update the USB detector.
|
89 |
206 |
_update()
|
|
207 |
|
|
208 |
# Sleep for a predefined amount of seconds
|
90 |
209 |
sleep(_config.scan_period_seconds)
|
re #9422 Commented detector.py, event_listener.py, and usb_reader.py