1
|
import json
|
2
|
import requests
|
3
|
import logging
|
4
|
from time import sleep
|
5
|
from diskcache import Deque
|
6
|
from requests import HTTPError, ConnectionError
|
7
|
|
8
|
|
9
|
_uri = None
|
10
|
_cache = None
|
11
|
_config = None
|
12
|
|
13
|
|
14
|
def api_client_set_config(config):
|
15
|
global _config, _cache, _uri
|
16
|
_config = config
|
17
|
_cache = Deque(directory=_config.cache_dir)
|
18
|
_uri = config.server_url + ":" + config.server_port + config.server_endpoint
|
19
|
|
20
|
|
21
|
def send_data(payload: dict):
|
22
|
try:
|
23
|
logging.info(f"sending payload = {payload} to {_uri}")
|
24
|
response = requests.post(url=_uri, data=json.dumps(payload))
|
25
|
logging.info(f"response text: {response.text}")
|
26
|
except ConnectionError:
|
27
|
logging.warning(f"sending payload = {payload} to {_uri} failed")
|
28
|
_cache_failed_payload(payload)
|
29
|
except HTTPError as error:
|
30
|
logging.error(f"HTTP Error ({_uri}) payload = {payload}, {error}")
|
31
|
_cache_failed_payload(payload)
|
32
|
|
33
|
|
34
|
def _cache_failed_payload(payload: dict):
|
35
|
if len(_cache) >= _config.cache_max_entries:
|
36
|
oldest_payload = _cache.pop()
|
37
|
logging.warning(f"cache is full - discarding payload = {oldest_payload}")
|
38
|
|
39
|
logging.info(f"adding payload = {payload} into cache")
|
40
|
_cache.append(payload)
|
41
|
|
42
|
|
43
|
def _resend_cached_payloads():
|
44
|
retries = min(_config.cache_max_retries, len(_cache))
|
45
|
logging.info(f"emptying the cache ({retries} records)")
|
46
|
for _ in range(0, retries):
|
47
|
payload = _cache.pop()
|
48
|
send_data(payload)
|
49
|
|
50
|
|
51
|
def api_client_run():
|
52
|
while True:
|
53
|
_resend_cached_payloads()
|
54
|
sleep(_config.cache_retry_period_seconds)
|