Projekt

Obecné

Profil

Stáhnout (1.53 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
import logging
3
from time import sleep
4

    
5
from .usb_reader import read_connected_devices
6
from config_manager import scan_period_seconds, connected_devices_filename
7

    
8

    
9
_listeners_connected = []
10
_listeners_disconnected = []
11

    
12

    
13
def register_listener(callback, connected: bool = True):
14
    if connected is True:
15
        _listeners_connected.append(callback)
16
    else:
17
        _listeners_disconnected.append(callback)
18

    
19

    
20
def _notify_listeners(listeners: list, devices: list):
21
    for callback in listeners:
22
        for device in devices:
23
            callback(device)
24

    
25

    
26
def _store_connected_devices(devices: list):
27
    with open(connected_devices_filename, "w") as file:
28
        json.dump(devices, file)
29

    
30

    
31
def _load_last_connected_devices() -> list:
32
    try:
33
        with open(connected_devices_filename, "r") as file:
34
            return json.loads(file.read())
35
    except IOError:
36
        return []
37

    
38

    
39
def usb_detector_run():
40
    logging.info("USB device detector is now running")
41

    
42
    while True:
43
        last_connected_devices = _load_last_connected_devices()
44
        detected_devices = read_connected_devices()
45

    
46
        connected_devices = [device for device in detected_devices if device not in last_connected_devices]
47
        disconnected_devices = [device for device in last_connected_devices if device not in detected_devices]
48

    
49
        _notify_listeners(_listeners_connected, connected_devices)
50
        _notify_listeners(_listeners_disconnected, disconnected_devices)
51

    
52
        _store_connected_devices(detected_devices)
53
        sleep(scan_period_seconds)
(2-2/4)