Projekt

Obecné

Profil

Stáhnout (4.04 KB) Statistiky
| Větev: | Tag: | Revize:
1 3c82c2ad Pultak
using System.Diagnostics;
2
using System.Net.Http.Json;
3 88eab964 Pultak
using System.Text;
4 3c82c2ad Pultak
using System.Text.Json;
5
using System.Text.Json.Serialization;
6 88eab964 Pultak
using DiskQueue;
7 3c82c2ad Pultak
using LDClient.network.data;
8
9
namespace LDClient.network {
10 f281acac silhavyj
    
11 33c231a4 silhavyj
    public sealed class ApiClient : IApiClient {
12 3c82c2ad Pultak
        
13 afafbc22 Pultak
        public IHttpClient _client;
14 fa084fc4 Pultak
15
        public bool ClientRunning;
16
17 3c82c2ad Pultak
        private readonly uint _retryPeriod;
18 88eab964 Pultak
        private readonly uint _maxEntries;
19
        private readonly uint _maxRetries;
20 fa084fc4 Pultak
        private readonly IPersistentQueue _cache;
21
22
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, IPersistentQueue cache) {
23 afafbc22 Pultak
            var uri = $"{url}:{port}{path}";
24 3c82c2ad Pultak
            _retryPeriod = retryPeriod;
25 88eab964 Pultak
            _maxEntries = maxEntries;
26
            _maxRetries = maxRetries;
27
28 afafbc22 Pultak
            _client = new HttpClient(uri);
29 fa084fc4 Pultak
            _cache = cache;
30 3c82c2ad Pultak
        }
31
32
        public async Task SendPayloadAsync(Payload payload) {
33
            try {
34
                Stopwatch stopWatch = new();
35
                stopWatch.Start();
36 522ba9b4 Pultak
                
37 afafbc22 Pultak
                var response = await _client.PostAsJsonAsync(payload);
38 3c82c2ad Pultak
                stopWatch.Stop();
39 522ba9b4 Pultak
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
40 3c82c2ad Pultak
41
                response.EnsureSuccessStatusCode();
42
            } catch (Exception e) {
43
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
44 522ba9b4 Pultak
                CachePayload(payload);
45 3c82c2ad Pultak
            }
46
        }
47
48 522ba9b4 Pultak
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
49 0ba79cc3 Pultak
            var responseToLog = new {
50
                statusCode = response.StatusCode,
51
                content = response.Content,
52
                headers = response.Headers,
53
                errorMessage = response.RequestMessage,
54
            };
55
56
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
57 522ba9b4 Pultak
                                 $"Request body: {payload},\n" +
58
                                 $"Response: {responseToLog}");
59
        }
60
        
61 eda1e8c7 Pultak
        private async Task ResendPayloadsAsync() {
62
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
63
            var payloads = new List<Payload>();
64 fa084fc4 Pultak
            if (numberOfPayloadsToResend > 0) {
65
                using var session = _cache.OpenSession();
66 eda1e8c7 Pultak
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
67
                    var rawBytes = session.Dequeue();
68
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
69
                    if (payload is not null) {
70
                        payloads.Add(payload);
71
                    }
72
                }
73
                session.Flush();
74
            }
75
76
            if (payloads.Count > 0) {
77
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
78
                var tasks = new List<Task>();
79
                foreach (var payload in payloads) {
80
                    Program.DefaultLogger.Info($"Resending {payload}.");
81
                    tasks.Add(SendPayloadAsync(payload));
82
                }
83
                await Task.WhenAll(tasks);
84
            }
85
        }
86 f281acac silhavyj
        
87 eda1e8c7 Pultak
        private void CachePayload(Payload payload) {
88
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
89
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
90
            using var session = _cache.OpenSession();
91
            if (numberOfCachedPayloads >= _maxEntries) {
92
                session.Dequeue();
93
            }
94
            var payloadJson = JsonSerializer.Serialize(payload);
95
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
96
            session.Flush();
97
        }
98
99 88eab964 Pultak
        public async void Run() {
100
            Program.DefaultLogger.Info("Api Client thread has started");
101 fa084fc4 Pultak
            ClientRunning = true;
102
            while (ClientRunning) {
103 88eab964 Pultak
                await ResendPayloadsAsync();
104 f281acac silhavyj
                Thread.Sleep((int) _retryPeriod);
105 88eab964 Pultak
            }
106 0ba79cc3 Pultak
        }
107 3c82c2ad Pultak
    }
108
}