Projekt

Obecné

Profil

« Předchozí | Další » 

Revize 1526cb8b

Přidáno uživatelem Jakub Šilhavý před více než 2 roky(ů)

re #9422 Commented detector.py, event_listener.py, and usb_reader.py

Zobrazit rozdíly:

client/src/usb_detector/detector.py
5 5
from .usb_reader import read_connected_devices
6 6

  
7 7

  
8
_listeners_connected = []
9
_listeners_disconnected = []
10
_last_connected_devices = []
11
_config = None
8
_listeners_connected = []       # list of listeners (USB devices is connected)
9
_listeners_disconnected = []    # list of listeners (USB devices is disconnected)
10

  
11
_last_connected_devices = []    # list of the lastly connected USB devices
12
_config = None                  # instance of Config (config manager)
12 13

  
13 14

  
14 15
def usb_detector_set_config(config):
16
    """Initializes the usb detector module (file).
17

  
18
    This function is meant to be called prior to calling
19
    any other function of the detector module. It stores
20
    an instance of the Config class which is then used
21
    by other functions within this file.
22

  
23
    :param config: instance of Config (config manager)
24
    """
25
    # Store the instance into the global variable.
15 26
    global _config
16 27
    _config = config
17 28

  
18 29

  
19 30
def register_listener(callback, connected: bool = True):
31
    """Registers a new event listener.
32

  
33
    The caller is supposed to passed in a function they
34
    wish to be called whenever an event occurs. What kind
35
    of event will trigger (call) the callback function is
36
    determined by the second parameter.
37

  
38
    :param callback: Function that is called whenever the desired event happens.
39
    :param connected: If the value is set to True, the callback function
40
                      will be called whenever a USB device is connected.
41
                      If it is set to False, it will be triggered whenever a
42
                      USB device is disconnected.
43
    """
20 44
    logging.info(f"Registering callback: {callback}.")
45

  
21 46
    if connected is True:
47
        # Register the callback for "connected devices"
22 48
        _listeners_connected.append(callback)
23 49
    else:
50
        # Register the callback for "disconnected devices"
24 51
        _listeners_disconnected.append(callback)
25 52

  
26 53

  
27 54
def _notify_listeners(listeners: list, devices: list):
55
    """ Notifies (calls) all listeners based on the even that just occurred.
56

  
57
    This function is called whenever a USB device is plugged or unplugged.
58
    Based on the type of the event, the corresponding list of listeners
59
    is passed in along with a list of all devices involved in the event.
60

  
61
    :param listeners: List of listeners registered for the event.
62
    :param devices: List of all USB devices involved in the event
63
                    (usually a single device).
64
    """
65
    # Make sure both lists are not None
28 66
    if listeners is None or devices is None:
29 67
        return
68

  
69
    # Iterate over the listeners and notify them
70
    # of all USB devices involved in the event.
30 71
    for callback in listeners:
31 72
        for device in devices:
32 73
            callback(device)
33 74

  
34 75

  
35 76
def _store_connected_devices(devices: list):
77
    """Stores the list of currently connected USB devices into a file.
78

  
79
    This function is called whenever a device is connected or disconnected.
80
    Its main purpose is to dump the list of the currently plugged devices
81
    on the disk (so it's not kept in RAM when the computer shuts down). The
82
    list is then loaded upon every start of the application.
83

  
84
    :param devices: List of the devices that are currently connected to the PC.
85
    """
36 86
    logging.debug("storing newly connected devices")
87

  
88
    # Dump the list into a JSON format.
37 89
    with open(_config.connected_devices_filename, "w") as file:
38 90
        json.dump(devices, file)
39 91

  
40 92

  
41 93
def _load_last_connected_devices() -> list:
94
    """Loads the list of the connected devices from the disk.
95

  
96
    This function is called with every start of the application.
97
    It ensures that the application remembers the USB devices
98
    that were connected to the PC before it was turned off
99
    (persistent memory).
100

  
101
    :return: List of the lastly connected USB devices.
102
    """
42 103
    logging.debug("loading last connected devices")
43 104
    try:
44 105
        with open(_config.connected_devices_filename, "r") as file:
......
49 110

  
50 111

  
51 112
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
113
    """Returns a list of USB devices that were just plugged into the computer.
114

  
115
    Using the two lists passed in as parameters, it figures out what devices
116
    were just connected to the PC. Essentially, any device of the detected_devices list
117
    that does not apper in the last_connected_devices list must have been just plugged in.
118

  
119
    :param detected_devices: list of currently plugged USB devices
120
    :param last_connected_devices: list of the lastly connected USB devices
121
    :return: list of all USB devices that were just plugged into the computer
122
    """
123
    # If there is no previous record of what USB devices where connected to the PC,
124
    # all newly-connected devices are treated as if they were just plugged in.
52 125
    if last_connected_devices is None and detected_devices is not None:
53 126
        return detected_devices
127

  
128
    # Return an empty list if no devices were detected.
54 129
    if detected_devices is None:
55 130
        return []
131

  
132
    # Return a list of all devices that were just plugged into the PC.
56 133
    return [device for device in detected_devices if device not in last_connected_devices]
57 134

  
58 135

  
59 136
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
137
    """Returns a list of USB devices that were just disconnected from the computer.
138

  
139
    Using the two lists passed in as parameters, it figures out what devices where just
140
    disconnected from the computer. Basically, any device that was seen connected to the PC
141
    and does not apper in the detected_devices list must have been just unplugged from the PC.
142

  
143
    :param detected_devices: list of currently plugged USB devices
144
    :param last_connected_devices: list of the lastly connected USB devices
145
    :return: list of all USB devices that were just disconnected from the PC
146
    """
147
    # If there is no previous record of what USB devices where connected to the PC,
148
    # no devices were unplugged
60 149
    if last_connected_devices is None:
61 150
        return []
151

  
152
    # Return last_connected_devices if no devices were detected.
62 153
    if detected_devices is None:
63 154
        return last_connected_devices
155

  
156
    # Return a list of all devices that were just disconnected.
64 157
    return [device for device in last_connected_devices if device not in detected_devices]
65 158

  
66 159

  
67 160
def _update():
161
    """Updates the USB detector.
162

  
163
    This function is periodically called from the usb_detector_run function.
164
    It uses the other functions of this file to figure out if there have
165
    been any changes since the last time this function was called - what
166
    USB devices have been connected/disconnected.
167
    """
168
    # Retrieve a list of the currently plugged USB devices
169
    # and store it globally within the file.
68 170
    global _last_connected_devices
69 171
    detected_devices = read_connected_devices()
70 172

  
173
    # Figure out what USB devices were connected to the PC since
174
    # the last time this function was called.
71 175
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
176

  
177
    # Figure out what USB devices were disconnected from the PC since
178
    # the last time this function was called.
72 179
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
73 180

  
181
    # Notify both kinds of listeners (call the registered callback functions).
74 182
    _notify_listeners(_listeners_connected, connected_devices)
75 183
    _notify_listeners(_listeners_disconnected, disconnected_devices)
76 184

  
185
    # If there have been any changes, update the file on the disk (persistent memory).
77 186
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
78 187
        _store_connected_devices(detected_devices)
79 188
        _last_connected_devices = detected_devices
80 189

  
81 190

  
82 191
def usb_detector_run():
192
    """Keeps detecting what USB devices were plugged/unplugged.
193

  
194
    This function is instantiated as a thread that periodically scans
195
    what USB devices are connected to the PC. The scan period can be
196
    found and modified in the configuration file of the application.
197
    """
83 198
    logging.info("USB device detector is now running")
84 199

  
200
    # Read the list of the lastly connected USB devices from the disk (once).
85 201
    global _last_connected_devices
86 202
    _last_connected_devices = _load_last_connected_devices()
87 203

  
88 204
    while True:
205
        # Update the USB detector.
89 206
        _update()
207

  
208
        # Sleep for a predefined amount of seconds
90 209
        sleep(_config.scan_period_seconds)
client/src/usb_detector/event_listener.py
7 7

  
8 8

  
9 9
def _get_metadata() -> dict:
10
    """Returns metadata associated with the computer.
11

  
12
    This metadata is sent to the server as a part
13
    of each payload. It includes the username, hostname,
14
    and timestamp.
15

  
16
    :return: metadata associated with the PC
17
    """
10 18
    logging.debug("getting computer metadata")
11 19
    return {
12
        "username": getpass.getuser(),
13
        "hostname": platform.uname().node,
14
        "timestamp": str(datetime.now()).split('.')[0]
20
        "username": getpass.getuser(),                  # username
21
        "hostname": platform.uname().node,              # hostname
22
        "timestamp": str(datetime.now()).split('.')[0]  # timestamp (format: 2022-04-07 12:11:02)
15 23
    }
16 24

  
17 25

  
18 26
def _send_payload_to_server(device: dict, status: str):
27
    """ Creates a payload and calls the send_data function to send it to the server.
28

  
29
    Each payload consists of metadata, status (connected/disconnected), and device,
30
    which contains a vendor id, product id, and serial number.
31

  
32
    :param device: USB device that has been detected
33
    :param status: status of the USB device (connected/disconnected)
34
    """
19 35
    logging.debug("payload send preparation")
36

  
37
    # Get metadata that will be put into the payload.
20 38
    payload = _get_metadata()
39

  
40
    # Add information about the USB device.
21 41
    payload["device"] = device
42

  
43
    # Add the status of the USB device (connected/disconnected).
22 44
    payload["status"] = status
45

  
46
    # Send the payload off to the server.
23 47
    send_data(payload)
24 48

  
25 49

  
26 50
def usb_connected_callback(device: dict):
51
    """Callback function for a connected USB device.
52

  
53
    This function gets called whenever a USB device is
54
    plugged into the computer (it is registered as a listener in
55
    the USB detector). The device consists of a vendor id, product id,
56
    and serial number.
57

  
58
    :param device: USB device that was just plugged into the PC.
59
    """
27 60
    logging.info(f"Device {device} has been connected")
61

  
62
    # Create a payload and send it to the API (server).
28 63
    _send_payload_to_server(device, "connected")
29 64

  
30 65

  
31 66
def usb_disconnected_callback(device: dict):
67
    """Callback function for a disconnected USB device.
68

  
69
    This function gets called whenever a USB device is
70
    disconnected from the computer (it is registered as a listener in
71
    the USB detector). The device consists of a vendor id, product id,
72
    and serial number.
73

  
74
    :param device: USB device that was just disconnected from the PC.
75
    """
32 76
    logging.info(f"Device {device} has been disconnected")
77

  
78
    # Create a payload and send it to the API (server).
33 79
    _send_payload_to_server(device, "disconnected")
client/src/usb_detector/usb_reader.py
4 4
import usb.util
5 5

  
6 6

  
7
# list of devices from which the application
8
# could not retrieve a serial number
7 9
_invalid_devices = []
8 10

  
9 11

  
10 12
def read_connected_devices():
13
    """Reads and returns all USB devices currently connected to the computer.
14

  
15
    It iterates over devices connected to individual buses and for each of
16
    them, it tries to retrieve its vendor id, product id, and serial number.
17
    If the application fails to retrieve the serial number from a device, it
18
    will be stored into an in-RAM list to prevent "spam" logs. Once the application
19
    successes to read the serial number, the device will be removed from the list.
20

  
21
    :return: list of all USB devices connected to the PC
22
    """
11 23
    logging.debug("reading all currently connected devices")
24

  
25
    # Create an empty list of USB devices.
12 26
    detected_devices = []
13 27

  
28
    # Get a list of all buses.
14 29
    busses = usb.busses()
15 30

  
16 31
    for bus in busses:
32
        # Get all devices connected to the current bus.
17 33
        devices = bus.devices
18 34
        for dev in devices:
35
            # Create a record of the device.
19 36
            device = {
20 37
                "vendor_id": dev.idVendor,
21 38
                "product_id": dev.idProduct
22 39
            }
40

  
41
            # Try to retrieve the serial number of the device.
23 42
            serial_number = None
24 43
            device_info = usb.core.find(idProduct=dev.idProduct)
25 44
            try:
26 45
                serial_number = usb.util.get_string(device_info, device_info.iSerialNumber)
27 46
            except ValueError:
47
                # If you fail, store the device into the in-RAM list (if it's
48
                # not already there).
28 49
                if device not in _invalid_devices:
29 50
                    logging.warning(f"Could not retrieve serial number from device {device}")
30 51
                    _invalid_devices.append(device)
31 52

  
32 53
            if serial_number is not None:
54
                # If you manage to read the serial number of a USB device
55
                # that was previously stored into the list of "failures", remove it.
33 56
                if device in _invalid_devices:
34 57
                    _invalid_devices.remove(device)
35 58

  
59
                # Add the serial number into to USB device record.
36 60
                device["serial_number"] = serial_number
61

  
62
                # Append the record into the list of the connected USB devices.
37 63
                detected_devices.append(device)
38 64

  
65
    # Return the list of currently plugged USB devices.
39 66
    return detected_devices

Také k dispozici: Unified diff