Projekt

Obecné

Profil

Stáhnout (8.27 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
import logging
3
from time import sleep
4

    
5
from .usb_reader import read_connected_devices
6

    
7

    
8
_listeners_connected = []       # list of listeners (USB devices is connected)
9
_listeners_disconnected = []    # list of listeners (USB devices is disconnected)
10

    
11
_last_connected_devices = []    # list of the lastly connected USB devices
12
_config = None                  # instance of Config (config manager)
13

    
14

    
15
def usb_detector_set_config(config):
16
    """Initializes the usb detector module (file).
17

    
18
    This function is meant to be called prior to calling
19
    any other function of the detector module. It stores
20
    an instance of the Config class which is then used
21
    by other functions within this file.
22

    
23
    :param config: instance of Config (config manager)
24
    """
25
    # Store the instance into the global variable.
26
    global _config
27
    _config = config
28

    
29

    
30
def register_listener(callback, connected: bool = True):
31
    """Registers a new event listener.
32

    
33
    The caller is supposed to passed in a function they
34
    wish to be called whenever an event occurs. What kind
35
    of event will trigger (call) the callback function is
36
    determined by the second parameter.
37

    
38
    :param callback: Function that is called whenever the desired event happens.
39
    :param connected: If the value is set to True, the callback function
40
                      will be called whenever a USB device is connected.
41
                      If it is set to False, it will be triggered whenever a
42
                      USB device is disconnected.
43
    """
44
    logging.info(f"Registering callback: {callback}.")
45

    
46
    if connected is True:
47
        # Register the callback for "connected devices"
48
        _listeners_connected.append(callback)
49
    else:
50
        # Register the callback for "disconnected devices"
51
        _listeners_disconnected.append(callback)
52

    
53

    
54
def _notify_listeners(listeners: list, devices: list):
55
    """ Notifies (calls) all listeners based on the even that just occurred.
56

    
57
    This function is called whenever a USB device is plugged or unplugged.
58
    Based on the type of the event, the corresponding list of listeners
59
    is passed in along with a list of all devices involved in the event.
60

    
61
    :param listeners: List of listeners registered for the event.
62
    :param devices: List of all USB devices involved in the event
63
                    (usually a single device).
64
    """
65
    # Make sure both lists are not None
66
    if listeners is None or devices is None:
67
        return
68

    
69
    # Iterate over the listeners and notify them
70
    # of all USB devices involved in the event.
71
    for callback in listeners:
72
        for device in devices:
73
            callback(device)
74

    
75

    
76
def _store_connected_devices(devices: list):
77
    """Stores the list of currently connected USB devices into a file.
78

    
79
    This function is called whenever a device is connected or disconnected.
80
    Its main purpose is to dump the list of the currently plugged devices
81
    on the disk (so it's not kept in RAM when the computer shuts down). The
82
    list is then loaded upon every start of the application.
83

    
84
    :param devices: List of the devices that are currently connected to the PC.
85
    """
86
    logging.debug("storing newly connected devices")
87

    
88
    # Dump the list into a JSON format.
89
    with open(_config.connected_devices_filename, "w") as file:
90
        json.dump(devices, file)
91

    
92

    
93
def _load_last_connected_devices() -> list:
94
    """Loads the list of the connected devices from the disk.
95

    
96
    This function is called with every start of the application.
97
    It ensures that the application remembers the USB devices
98
    that were connected to the PC before it was turned off
99
    (persistent memory).
100

    
101
    :return: List of the lastly connected USB devices.
102
    """
103
    logging.debug("loading last connected devices")
104
    try:
105
        with open(_config.connected_devices_filename, "r") as file:
106
            return json.loads(file.read())
107
    except IOError:
108
        logging.error("loading of last connected devices failed")
109
        return []
110

    
111

    
112
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
113
    """Returns a list of USB devices that were just plugged into the computer.
114

    
115
    Using the two lists passed in as parameters, it figures out what devices
116
    were just connected to the PC. Essentially, any device of the detected_devices list
117
    that does not apper in the last_connected_devices list must have been just plugged in.
118

    
119
    :param detected_devices: list of currently plugged USB devices
120
    :param last_connected_devices: list of the lastly connected USB devices
121
    :return: list of all USB devices that were just plugged into the computer
122
    """
123
    # If there is no previous record of what USB devices where connected to the PC,
124
    # all newly-connected devices are treated as if they were just plugged in.
125
    if last_connected_devices is None and detected_devices is not None:
126
        return detected_devices
127

    
128
    # Return an empty list if no devices were detected.
129
    if detected_devices is None:
130
        return []
131

    
132
    # Return a list of all devices that were just plugged into the PC.
133
    return [device for device in detected_devices if device not in last_connected_devices]
134

    
135

    
136
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
137
    """Returns a list of USB devices that were just disconnected from the computer.
138

    
139
    Using the two lists passed in as parameters, it figures out what devices where just
140
    disconnected from the computer. Basically, any device that was seen connected to the PC
141
    and does not apper in the detected_devices list must have been just unplugged from the PC.
142

    
143
    :param detected_devices: list of currently plugged USB devices
144
    :param last_connected_devices: list of the lastly connected USB devices
145
    :return: list of all USB devices that were just disconnected from the PC
146
    """
147
    # If there is no previous record of what USB devices where connected to the PC,
148
    # no devices were unplugged
149
    if last_connected_devices is None:
150
        return []
151

    
152
    # Return last_connected_devices if no devices were detected.
153
    if detected_devices is None:
154
        return last_connected_devices
155

    
156
    # Return a list of all devices that were just disconnected.
157
    return [device for device in last_connected_devices if device not in detected_devices]
158

    
159

    
160
def _update():
161
    """Updates the USB detector.
162

    
163
    This function is periodically called from the usb_detector_run function.
164
    It uses the other functions of this file to figure out if there have
165
    been any changes since the last time this function was called - what
166
    USB devices have been connected/disconnected.
167
    """
168
    # Retrieve a list of the currently plugged USB devices
169
    # and store it globally within the file.
170
    global _last_connected_devices
171
    detected_devices = read_connected_devices()
172

    
173
    # Figure out what USB devices were connected to the PC since
174
    # the last time this function was called.
175
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
176

    
177
    # Figure out what USB devices were disconnected from the PC since
178
    # the last time this function was called.
179
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
180

    
181
    # Notify both kinds of listeners (call the registered callback functions).
182
    _notify_listeners(_listeners_connected, connected_devices)
183
    _notify_listeners(_listeners_disconnected, disconnected_devices)
184

    
185
    # If there have been any changes, update the file on the disk (persistent memory).
186
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
187
        _store_connected_devices(detected_devices)
188
        _last_connected_devices = detected_devices
189

    
190

    
191
def usb_detector_run():
192
    """Keeps detecting what USB devices were plugged/unplugged.
193

    
194
    This function is instantiated as a thread that periodically scans
195
    what USB devices are connected to the PC. The scan period can be
196
    found and modified in the configuration file of the application.
197
    """
198
    logging.info("USB device detector is now running")
199

    
200
    # Read the list of the lastly connected USB devices from the disk (once).
201
    global _last_connected_devices
202
    _last_connected_devices = _load_last_connected_devices()
203

    
204
    while True:
205
        # Update the USB detector.
206
        _update()
207

    
208
        # Sleep for a predefined amount of seconds
209
        sleep(_config.scan_period_seconds)
(3-3/5)