Projekt

Obecné

Profil

Stáhnout (2.31 KB) Statistiky
| Větev: | Tag: | Revize:
1
import json
2
import logging
3
from time import sleep
4

    
5
from .usb_reader import read_connected_devices
6
from config_manager import scan_period_seconds, connected_devices_filename
7

    
8

    
9
_listeners_connected = []
10
_listeners_disconnected = []
11
_last_connected_devices = []
12

    
13

    
14
def register_listener(callback, connected: bool = True):
15
    logging.info(f'Registering callback: {callback}.')
16

    
17
    if connected is True:
18
        _listeners_connected.append(callback)
19
    else:
20
        _listeners_disconnected.append(callback)
21

    
22

    
23
def _notify_listeners(listeners: list, devices: list):
24
    for callback in listeners:
25
        for device in devices:
26
            callback(device)
27

    
28

    
29
def _store_connected_devices(devices: list):
30
    logging.debug('storing newly connected devices')
31
    with open(connected_devices_filename, "w") as file:
32
        json.dump(devices, file)
33

    
34

    
35
def _load_last_connected_devices() -> list:
36
    logging.debug('loading last connected devices')
37
    try:
38
        with open(connected_devices_filename, "r") as file:
39
            return json.loads(file.read())
40
    except IOError:
41
        logging.error('loading of last connected devices failed')
42
        return []
43

    
44

    
45
def _get_connected_devices(detected_devices: list, last_connected_devices: list) -> list:
46
    return [device for device in detected_devices if device not in last_connected_devices]
47

    
48

    
49
def _get_disconnected_devices(detected_devices: list, last_connected_devices: list) -> list:
50
    return [device for device in last_connected_devices if device not in detected_devices]
51

    
52

    
53
def _update():
54
    global _last_connected_devices
55
    detected_devices = read_connected_devices()
56

    
57
    connected_devices = _get_connected_devices(detected_devices, _last_connected_devices)
58
    disconnected_devices = _get_disconnected_devices(detected_devices, _last_connected_devices)
59

    
60
    _notify_listeners(_listeners_connected, connected_devices)
61
    _notify_listeners(_listeners_disconnected, disconnected_devices)
62

    
63
    if len(connected_devices) > 0 or len(disconnected_devices) > 0:
64
        _store_connected_devices(detected_devices)
65
        _last_connected_devices = detected_devices
66

    
67

    
68
def usb_detector_run():
69
    logging.info("USB device detector is now running")
70

    
71
    global _last_connected_devices
72
    _last_connected_devices = _load_last_connected_devices()
73

    
74
    while True:
75
        _update()
76
        sleep(scan_period_seconds)
(2-2/4)