Projekt

Obecné

Profil

Stáhnout (4.23 KB) Statistiky
| Větev: | Tag: | Revize:
1 3c82c2ad Pultak
using System.Diagnostics;
2
using System.Net.Http.Json;
3 88eab964 Pultak
using System.Text;
4 3c82c2ad Pultak
using System.Text.Json;
5
using System.Text.Json.Serialization;
6 88eab964 Pultak
using DiskQueue;
7 3c82c2ad Pultak
using LDClient.network.data;
8
9
namespace LDClient.network {
10 f281acac silhavyj
    
11 33c231a4 silhavyj
    public sealed class ApiClient : IApiClient {
12 3c82c2ad Pultak
        
13
        private readonly string _uri;
14
        private readonly HttpClient _client;
15 88eab964 Pultak
        private readonly IPersistentQueue _cache;
16 3c82c2ad Pultak
        private readonly uint _retryPeriod;
17 88eab964 Pultak
        private readonly uint _maxEntries;
18
        private readonly uint _maxRetries;
19
        
20 33c231a4 silhavyj
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, string cacheFilename) {
21 3c82c2ad Pultak
            _uri = $"{url}:{port}{path}";
22
            _retryPeriod = retryPeriod;
23 88eab964 Pultak
            _maxEntries = maxEntries;
24
            _maxRetries = maxRetries;
25
26 3c82c2ad Pultak
            _client = new HttpClient();
27 88eab964 Pultak
            _cache = new PersistentQueue(cacheFilename);
28 3c82c2ad Pultak
        }
29
30
        public async Task SendPayloadAsync(Payload payload) {
31
            try {
32
                Stopwatch stopWatch = new();
33
                stopWatch.Start();
34 522ba9b4 Pultak
                
35 3c82c2ad Pultak
                var response = await _client.PostAsJsonAsync(_uri, payload, new JsonSerializerOptions {
36
                    Converters = {
37
                        new JsonStringEnumConverter( JsonNamingPolicy.CamelCase)
38
                    }
39
                });
40
                stopWatch.Stop();
41 522ba9b4 Pultak
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
42 3c82c2ad Pultak
43
                response.EnsureSuccessStatusCode();
44
            } catch (Exception e) {
45
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
46 522ba9b4 Pultak
                CachePayload(payload);
47 3c82c2ad Pultak
            }
48
        }
49
50 522ba9b4 Pultak
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
51 0ba79cc3 Pultak
            var responseToLog = new {
52
                statusCode = response.StatusCode,
53
                content = response.Content,
54
                headers = response.Headers,
55
                errorMessage = response.RequestMessage,
56
            };
57
58
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
59 522ba9b4 Pultak
                                 $"Request body: {payload},\n" +
60
                                 $"Response: {responseToLog}");
61
        }
62
        
63 eda1e8c7 Pultak
        private async Task ResendPayloadsAsync() {
64
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
65
            var payloads = new List<Payload>();
66
67
            using (var session = _cache.OpenSession()) {
68
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
69
                    var rawBytes = session.Dequeue();
70
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
71
                    if (payload is not null) {
72
                        payloads.Add(payload);
73
                    }
74
                }
75
                session.Flush();
76
            }
77
78
            if (payloads.Count > 0) {
79
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
80
                var tasks = new List<Task>();
81
                foreach (var payload in payloads) {
82
                    Program.DefaultLogger.Info($"Resending {payload}.");
83
                    tasks.Add(SendPayloadAsync(payload));
84
                }
85
                await Task.WhenAll(tasks);
86
            }
87
        }
88 f281acac silhavyj
        
89 eda1e8c7 Pultak
        private void CachePayload(Payload payload) {
90
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
91
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
92
            using var session = _cache.OpenSession();
93
            if (numberOfCachedPayloads >= _maxEntries) {
94
                session.Dequeue();
95
            }
96
            var payloadJson = JsonSerializer.Serialize(payload);
97
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
98
            session.Flush();
99
        }
100
101 88eab964 Pultak
        public async void Run() {
102
            Program.DefaultLogger.Info("Api Client thread has started");
103 f281acac silhavyj
            while (true) {
104 88eab964 Pultak
                await ResendPayloadsAsync();
105 f281acac silhavyj
                Thread.Sleep((int) _retryPeriod);
106 88eab964 Pultak
            }
107 f281acac silhavyj
            // ReSharper disable once FunctionNeverReturns
108 0ba79cc3 Pultak
        }
109 3c82c2ad Pultak
    }
110
}