Projekt

Obecné

Profil

Stáhnout (4.24 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    public class ApiClient : IApiClient {
12
        
13
        private readonly string _uri;
14
        private readonly HttpClient _client;
15
        private readonly IPersistentQueue _cache;
16
        private readonly uint _retryPeriod;
17
        private readonly uint _maxEntries;
18
        private readonly uint _maxRetries;
19
        
20
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries,
21
            string cacheFilename) {
22
            _uri = $"{url}:{port}{path}";
23
            _retryPeriod = retryPeriod;
24
            _maxEntries = maxEntries;
25
            _maxRetries = maxRetries;
26

    
27
            _client = new HttpClient();
28
            _cache = new PersistentQueue(cacheFilename);
29
        }
30

    
31
        public async Task SendPayloadAsync(Payload payload) {
32
            try {
33
                Stopwatch stopWatch = new();
34
                stopWatch.Start();
35
                
36
                var response = await _client.PostAsJsonAsync(_uri, payload, new JsonSerializerOptions {
37
                    Converters = {
38
                        new JsonStringEnumConverter( JsonNamingPolicy.CamelCase)
39
                    }
40
                });
41
                stopWatch.Stop();
42
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
43

    
44
                response.EnsureSuccessStatusCode();
45
            } catch (Exception e) {
46
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
47
                CachePayload(payload);
48
            }
49
        }
50

    
51
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
52
            var responseToLog = new {
53
                statusCode = response.StatusCode,
54
                content = response.Content,
55
                headers = response.Headers,
56
                errorMessage = response.RequestMessage,
57
            };
58

    
59
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
60
                                 $"Request body: {payload},\n" +
61
                                 $"Response: {responseToLog}");
62
        }
63
        
64
        private async Task ResendPayloadsAsync() {
65
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
66
            var payloads = new List<Payload>();
67

    
68
            using (var session = _cache.OpenSession()) {
69
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
70
                    var rawBytes = session.Dequeue();
71
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
72
                    if (payload is not null) {
73
                        payloads.Add(payload);
74
                    }
75
                }
76
                session.Flush();
77
            }
78

    
79
            if (payloads.Count > 0) {
80
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
81
                var tasks = new List<Task>();
82
                foreach (var payload in payloads) {
83
                    Program.DefaultLogger.Info($"Resending {payload}.");
84
                    tasks.Add(SendPayloadAsync(payload));
85
                }
86
                await Task.WhenAll(tasks);
87
            }
88
        }
89
        
90
        private void CachePayload(Payload payload) {
91
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
92
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
93
            using var session = _cache.OpenSession();
94
            if (numberOfCachedPayloads >= _maxEntries) {
95
                session.Dequeue();
96
            }
97
            var payloadJson = JsonSerializer.Serialize(payload);
98
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
99
            session.Flush();
100
        }
101

    
102
        public async void Run() {
103
            Program.DefaultLogger.Info("Api Client thread has started");
104
            while (true) {
105
                await ResendPayloadsAsync();
106
                Thread.Sleep((int) _retryPeriod);
107
            }
108
            // ReSharper disable once FunctionNeverReturns
109
        }
110
    }
111
}
(1-1/2)