Projekt

Obecné

Profil

Stáhnout (4.23 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    public sealed class ApiClient : IApiClient {
12
        
13
        private readonly string _uri;
14
        private readonly HttpClient _client;
15
        private readonly IPersistentQueue _cache;
16
        private readonly uint _retryPeriod;
17
        private readonly uint _maxEntries;
18
        private readonly uint _maxRetries;
19
        
20
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, string cacheFilename) {
21
            _uri = $"{url}:{port}{path}";
22
            _retryPeriod = retryPeriod;
23
            _maxEntries = maxEntries;
24
            _maxRetries = maxRetries;
25

    
26
            _client = new HttpClient();
27
            _cache = new PersistentQueue(cacheFilename);
28
        }
29

    
30
        public async Task SendPayloadAsync(Payload payload) {
31
            try {
32
                Stopwatch stopWatch = new();
33
                stopWatch.Start();
34
                
35
                var response = await _client.PostAsJsonAsync(_uri, payload, new JsonSerializerOptions {
36
                    Converters = {
37
                        new JsonStringEnumConverter( JsonNamingPolicy.CamelCase)
38
                    }
39
                });
40
                stopWatch.Stop();
41
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
42

    
43
                response.EnsureSuccessStatusCode();
44
            } catch (Exception e) {
45
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
46
                CachePayload(payload);
47
            }
48
        }
49

    
50
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
51
            var responseToLog = new {
52
                statusCode = response.StatusCode,
53
                content = response.Content,
54
                headers = response.Headers,
55
                errorMessage = response.RequestMessage,
56
            };
57

    
58
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
59
                                 $"Request body: {payload},\n" +
60
                                 $"Response: {responseToLog}");
61
        }
62
        
63
        private async Task ResendPayloadsAsync() {
64
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
65
            var payloads = new List<Payload>();
66

    
67
            using (var session = _cache.OpenSession()) {
68
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
69
                    var rawBytes = session.Dequeue();
70
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
71
                    if (payload is not null) {
72
                        payloads.Add(payload);
73
                    }
74
                }
75
                session.Flush();
76
            }
77

    
78
            if (payloads.Count > 0) {
79
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
80
                var tasks = new List<Task>();
81
                foreach (var payload in payloads) {
82
                    Program.DefaultLogger.Info($"Resending {payload}.");
83
                    tasks.Add(SendPayloadAsync(payload));
84
                }
85
                await Task.WhenAll(tasks);
86
            }
87
        }
88
        
89
        private void CachePayload(Payload payload) {
90
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
91
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
92
            using var session = _cache.OpenSession();
93
            if (numberOfCachedPayloads >= _maxEntries) {
94
                session.Dequeue();
95
            }
96
            var payloadJson = JsonSerializer.Serialize(payload);
97
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
98
            session.Flush();
99
        }
100

    
101
        public async void Run() {
102
            Program.DefaultLogger.Info("Api Client thread has started");
103
            while (true) {
104
                await ResendPayloadsAsync();
105
                Thread.Sleep((int) _retryPeriod);
106
            }
107
            // ReSharper disable once FunctionNeverReturns
108
        }
109
    }
110
}
(1-1/2)