Projekt

Obecné

Profil

« Předchozí | Další » 

Revize f7fb8759

Přidáno uživatelem Jakub Šilhavý před více než 2 roky(ů)

re #9305 Added a python module for USB detection

Zobrazit rozdíly:

client/main.py
1 1
import logging
2
from threading import Thread
2 3

  
3 4
from tendo import singleton
4 5

  
5 6
from config_manager import logger_format, logger_level
7
from usb_detector.detector import register_listener, usb_detector_run
8
from usb_detector.event_listener import usb_connected_callback, usb_disconnected_callback
9

  
6 10

  
7 11
if __name__ == "__main__":
8 12
    app_instance = singleton.SingleInstance()
9 13

  
10 14
    logging.basicConfig(format=logger_format, level=logger_level)
11 15

  
16
    register_listener(callback=usb_connected_callback, connected=True)
17
    register_listener(callback=usb_disconnected_callback, connected=False)
18

  
19
    usb_detector_thread = Thread(target=usb_detector_run)
20
    usb_detector_thread.setDaemon(True)
21
    usb_detector_thread.start()
12 22

  
23
    usb_detector_thread.join()
client/usb_detector/detector.py
1
import os
2
import json
3
import logging
4
from time import sleep
5

  
6
from .usb_reader import read_connected_devices
7
from config_manager import scan_period_seconds, connected_devices_filename
8

  
9

  
10
_listeners_connected = []
11
_listeners_disconnected = []
12

  
13

  
14
def register_listener(callback, connected: bool = True):
15
    if connected is True:
16
        _listeners_connected.append(callback)
17
    else:
18
        _listeners_disconnected.append(callback)
19

  
20

  
21
def _notify_listeners(listeners: list, devices: list):
22
    for callback in listeners:
23
        for device in devices:
24
            callback(device)
25

  
26

  
27
def _store_connected_devices(devices: list):
28
    with open(connected_devices_filename, "w") as file:
29
        json.dump(devices, file)
30

  
31

  
32
def _load_last_connected_devices() -> list:
33
    try:
34
        with open(connected_devices_filename, "r") as file:
35
            return json.loads(file.read())
36
    except IOError:
37
        return []
38

  
39

  
40
def usb_detector_run():
41
    logging.info("USB device detector is now running")
42

  
43
    if not os.path.exists(connected_devices_filename):
44
        dir_name = os.path.dirname(connected_devices_filename)
45
        os.makedirs(dir_name)
46

  
47
    while True:
48
        last_connected_devices = _load_last_connected_devices()
49
        detected_devices = read_connected_devices()
50

  
51
        connected_devices = [device for device in detected_devices if device not in last_connected_devices]
52
        disconnected_devices = [device for device in last_connected_devices if device not in detected_devices]
53

  
54
        _notify_listeners(_listeners_connected, connected_devices)
55
        _notify_listeners(_listeners_disconnected, disconnected_devices)
56

  
57
        _store_connected_devices(detected_devices)
58
        sleep(scan_period_seconds)
client/usb_detector/event_listener.py
1
import platform
2
import logging
3
import getpass
4
from datetime import datetime
5

  
6

  
7
def _get_metadata() -> dict:
8
    return {
9
        "username": getpass.getuser(),
10
        "hostname": platform.uname().node,
11
        "timestamp": str(datetime.now())
12
    }
13

  
14

  
15
def _send_payload_to_server(device: dict, status: str):
16
    payload = _get_metadata()
17
    payload["device"] = device
18
    payload["status"] = status
19

  
20
    # TODO send the payload off to the server
21

  
22

  
23
def usb_connected_callback(device: dict):
24
    logging.info(f"Device {device} has been connected")
25
    _send_payload_to_server(device, "connected")
26

  
27

  
28
def usb_disconnected_callback(device: dict):
29
    logging.info(f"Device {device} has been disconnected")
30
    _send_payload_to_server(device, "disconnected")
client/usb_detector/usb_reader.py
1
import usb.core
2
import usb.util
3

  
4

  
5
def read_connected_devices():
6
    detected_devices = []
7

  
8
    busses = usb.busses()
9

  
10
    for bus in busses:
11
        devices = bus.devices
12
        for dev in devices:
13
            detected_devices.append({
14
                "vendor_id": dev.idVendor,
15
                "product_id": dev.idProduct
16
            })
17

  
18
    return detected_devices

Také k dispozici: Unified diff