Projekt

Obecné

Profil

Stáhnout (4.04 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    public sealed class ApiClient : IApiClient {
12
        
13
        public IHttpClient _client;
14

    
15
        public bool ClientRunning;
16

    
17
        private readonly uint _retryPeriod;
18
        private readonly uint _maxEntries;
19
        private readonly uint _maxRetries;
20
        private readonly IPersistentQueue _cache;
21

    
22
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, IPersistentQueue cache) {
23
            var uri = $"{url}:{port}{path}";
24
            _retryPeriod = retryPeriod;
25
            _maxEntries = maxEntries;
26
            _maxRetries = maxRetries;
27

    
28
            _client = new HttpClient(uri);
29
            _cache = cache;
30
        }
31

    
32
        public async Task SendPayloadAsync(Payload payload) {
33
            try {
34
                Stopwatch stopWatch = new();
35
                stopWatch.Start();
36
                
37
                var response = await _client.PostAsJsonAsync(payload);
38
                stopWatch.Stop();
39
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
40

    
41
                response.EnsureSuccessStatusCode();
42
            } catch (Exception e) {
43
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
44
                CachePayload(payload);
45
            }
46
        }
47

    
48
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
49
            var responseToLog = new {
50
                statusCode = response.StatusCode,
51
                content = response.Content,
52
                headers = response.Headers,
53
                errorMessage = response.RequestMessage,
54
            };
55

    
56
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
57
                                 $"Request body: {payload},\n" +
58
                                 $"Response: {responseToLog}");
59
        }
60
        
61
        private async Task ResendPayloadsAsync() {
62
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
63
            var payloads = new List<Payload>();
64
            if (numberOfPayloadsToResend > 0) {
65
                using var session = _cache.OpenSession();
66
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
67
                    var rawBytes = session.Dequeue();
68
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
69
                    if (payload is not null) {
70
                        payloads.Add(payload);
71
                    }
72
                }
73
                session.Flush();
74
            }
75

    
76
            if (payloads.Count > 0) {
77
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
78
                var tasks = new List<Task>();
79
                foreach (var payload in payloads) {
80
                    Program.DefaultLogger.Info($"Resending {payload}.");
81
                    tasks.Add(SendPayloadAsync(payload));
82
                }
83
                await Task.WhenAll(tasks);
84
            }
85
        }
86
        
87
        private void CachePayload(Payload payload) {
88
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
89
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
90
            using var session = _cache.OpenSession();
91
            if (numberOfCachedPayloads >= _maxEntries) {
92
                session.Dequeue();
93
            }
94
            var payloadJson = JsonSerializer.Serialize(payload);
95
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
96
            session.Flush();
97
        }
98

    
99
        public async void Run() {
100
            Program.DefaultLogger.Info("Api Client thread has started");
101
            ClientRunning = true;
102
            while (ClientRunning) {
103
                await ResendPayloadsAsync();
104
                Thread.Sleep((int) _retryPeriod);
105
            }
106
        }
107
    }
108
}
(1-1/4)