Projekt

Obecné

Profil

Stáhnout (8.5 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    /// <summary>
12
    /// This class implements IApiClient which is an interface
13
    /// defining all the functionality required from an API client.
14
    /// </summary>
15
    public sealed class ApiClient : IApiClient {
16
        
17
        /// <summary>
18
        /// Instance of an HTTP client the is used to send data off to the server 
19
        /// </summary>
20
        public IHttpClient _client;
21

    
22
        /// <summary>
23
        /// Flag used to stop the client (periodical retrieval from the cache)
24
        /// </summary>
25
        public bool ClientRunning;
26

    
27
        /// <summary>
28
        /// Number of milliseconds after which the class tries to resend failed payloads to the server.
29
        /// </summary>
30
        private readonly uint _retryPeriod;
31
        
32
        /// <summary>
33
        /// Maximum number of entries (payloads) that can be sent to the server within one period (_retryPeriod).
34
        /// </summary>
35
        private readonly uint _maxEntries;
36
        
37
        /// <summary>
38
        /// Maximum number of failed payloads to be kept in the file-based cache (FIFO - when the maximum number is reached)
39
        /// </summary>
40
        private readonly uint _maxRetries;
41
        private readonly IPersistentQueue _cache;
42

    
43
        /// <summary>
44
        /// Creates an instance of the class.
45
        /// </summary>
46
        /// <param name="url">IP address of the server (url in case a DNS server is being used)</param>
47
        /// <param name="port">port that the API is running on</param>
48
        /// <param name="path">path of the API e.g. /api/v1/lg-logs</param>
49
        /// <param name="retryPeriod">number of milliseconds after which the class tries to resend failed payloads to the server</param>
50
        /// <param name="maxEntries">maximum number of entries (payloads) that can be sent to the server within one period</param>
51
        /// <param name="maxRetries">maximum number of failed payloads to be kept in the file-based cache</param>
52
        /// <param name="cache">instance of a persistent cache for storing failed payloads</param>
53
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, IPersistentQueue cache) {
54
            // Construct the entire path to the API.
55
            var uri = $"{url}:{port}{path}";
56
            
57
            // Store the values into class variables.
58
            _retryPeriod = retryPeriod;
59
            _maxEntries = maxEntries;
60
            _maxRetries = maxRetries;
61
            _cache = cache;
62

    
63
            // Create an instance of a HttpClient which takes care of
64
            // establishing a connection to the server;
65
            _client = new HttpClient(uri);
66
        }
67

    
68
        /// <summary>
69
        /// Sends a payload to the server (API).
70
        /// </summary>
71
        /// <param name="payload">instance of a payload to be sent off to the server</param>
72
        public async Task SendPayloadAsync(Payload payload) {
73
            try {
74
                // Create an instance of Stopwatch (to measure how much
75
                // the action took).
76
                Stopwatch stopWatch = new();
77
                
78
                // Send the payload to the server.
79
                stopWatch.Start();
80
                var response = await _client.PostAsJsonAsync(payload);
81
                stopWatch.Stop();
82
                
83
                // Create a log message.
84
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
85
                
86
                // Make sure the request was successful.
87
                response.EnsureSuccessStatusCode();
88
            } catch (Exception e) {
89
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
90
                CachePayload(payload);
91
            }
92
        }
93

    
94
        /// <summary>
95
        /// Creates a request log message.
96
        /// </summary>
97
        /// <param name="payload">payload involved in the process of sending data to the server</param>
98
        /// <param name="response">response from the server</param>
99
        /// <param name="durationMs">duration in milliseconds (how much time it took to send off the payload)</param>
100
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
101
            // Create the log message.
102
            var responseToLog = new {
103
                statusCode = response.StatusCode,
104
                content = response.Content,
105
                headers = response.Headers,
106
                errorMessage = response.RequestMessage,
107
            };
108
            
109
            // Log the message using the logger defined in Program (main class).
110
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
111
                                       $"Request body: {payload},\n" +
112
                                       $"Response: {responseToLog}");
113
        }
114
        
115
        /// <summary>
116
        /// Resends unsuccessful payloads to the server.
117
        /// </summary>
118
        private async Task ResendPayloadsAsync() {
119
            // Calculate the maximum number of payloads to be sent to the server.
120
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
121
            
122
            // Create a list for those payloads
123
            var payloads = new List<Payload>();
124
            
125
            // Retrieve the payloads from the cache.
126
            if (numberOfPayloadsToResend > 0) {
127
                // Open up a session to the cache.
128
                using var session = _cache.OpenSession();
129
                
130
                // Pop out payloads, deserialize them, and store them into the list.
131
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
132
                    var rawBytes = session.Dequeue();
133
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
134
                    if (payload is not null) {
135
                        payloads.Add(payload);
136
                    }
137
                }
138
                // Flush the changes.
139
                session.Flush();
140
            }
141
            
142
            // If there are some payloads to be resent to the server.
143
            if (payloads.Count > 0) {
144
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
145
                var tasks = new List<Task>();
146
                
147
                // Create a separate task for each payload - resend them to the server.
148
                foreach (var payload in payloads) {
149
                    Program.DefaultLogger.Info($"Resending {payload}.");
150
                    tasks.Add(SendPayloadAsync(payload));
151
                }
152
                // Wait until all tasks are finished. 
153
                await Task.WhenAll(tasks);
154
            }
155
        }
156
        
157
        /// <summary>
158
        /// Stores a failed payload into a persistent cache.
159
        /// </summary>
160
        /// <param name="payload"></param>
161
        private void CachePayload(Payload payload) {
162
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
163
            
164
            // Number of payloads stored in the cache.
165
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
166
            
167
            // Open up a session to the cache.
168
            using var session = _cache.OpenSession();
169
            
170
            // If the cache is "full", make room for the latest failed
171
            // payload by discarding the oldest one.
172
            if (numberOfCachedPayloads >= _maxEntries) {
173
                session.Dequeue();
174
            }
175
            
176
            // Store the payload into the cache.
177
            var payloadJson = JsonSerializer.Serialize(payload);
178
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
179
            
180
            // Flush the changes.
181
            session.Flush();
182
        }
183

    
184
        /// <summary>
185
        /// Runs the periodical retrieval of failed payloads stored
186
        /// in a file-based cache. This method is instantiated as a thread.
187
        /// </summary>
188
        public async void Run() {
189
            Program.DefaultLogger.Info("Api Client thread has started");
190
            
191
            // Keep the thread running.
192
            ClientRunning = true;
193
            
194
            // Keep resending failed payloads to the server.
195
            while (ClientRunning) {
196
                await ResendPayloadsAsync();
197
                Thread.Sleep((int) _retryPeriod);
198
            }
199
        }
200
    }
201
}
(1-1/4)