Projekt

Obecné

Profil

Stáhnout (4 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    public sealed class ApiClient : IApiClient {
12
        
13
        public IHttpClient _client;
14
        public IPersistentQueue _cache;
15
        
16
        private readonly uint _retryPeriod;
17
        private readonly uint _maxEntries;
18
        private readonly uint _maxRetries;
19
        
20
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, string cacheFilename) {
21
            var uri = $"{url}:{port}{path}";
22
            _retryPeriod = retryPeriod;
23
            _maxEntries = maxEntries;
24
            _maxRetries = maxRetries;
25

    
26
            _client = new HttpClient(uri);
27
            _cache = new PersistentQueue(cacheFilename);
28
        }
29

    
30
        public async Task SendPayloadAsync(Payload payload) {
31
            try {
32
                Stopwatch stopWatch = new();
33
                stopWatch.Start();
34
                
35
                var response = await _client.PostAsJsonAsync(payload);
36
                stopWatch.Stop();
37
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
38

    
39
                response.EnsureSuccessStatusCode();
40
            } catch (Exception e) {
41
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
42
                CachePayload(payload);
43
            }
44
        }
45

    
46
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
47
            var responseToLog = new {
48
                statusCode = response.StatusCode,
49
                content = response.Content,
50
                headers = response.Headers,
51
                errorMessage = response.RequestMessage,
52
            };
53

    
54
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
55
                                 $"Request body: {payload},\n" +
56
                                 $"Response: {responseToLog}");
57
        }
58
        
59
        private async Task ResendPayloadsAsync() {
60
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
61
            var payloads = new List<Payload>();
62

    
63
            using (var session = _cache.OpenSession()) {
64
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
65
                    var rawBytes = session.Dequeue();
66
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
67
                    if (payload is not null) {
68
                        payloads.Add(payload);
69
                    }
70
                }
71
                session.Flush();
72
            }
73

    
74
            if (payloads.Count > 0) {
75
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
76
                var tasks = new List<Task>();
77
                foreach (var payload in payloads) {
78
                    Program.DefaultLogger.Info($"Resending {payload}.");
79
                    tasks.Add(SendPayloadAsync(payload));
80
                }
81
                await Task.WhenAll(tasks);
82
            }
83
        }
84
        
85
        private void CachePayload(Payload payload) {
86
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
87
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
88
            using var session = _cache.OpenSession();
89
            if (numberOfCachedPayloads >= _maxEntries) {
90
                session.Dequeue();
91
            }
92
            var payloadJson = JsonSerializer.Serialize(payload);
93
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
94
            session.Flush();
95
        }
96

    
97
        public async void Run() {
98
            Program.DefaultLogger.Info("Api Client thread has started");
99
            while (true) {
100
                await ResendPayloadsAsync();
101
                Thread.Sleep((int) _retryPeriod);
102
            }
103
            // ReSharper disable once FunctionNeverReturns
104
        }
105
    }
106
}
(1-1/4)