Projekt

Obecné

Profil

Stáhnout (8.57 KB) Statistiky
| Větev: | Tag: | Revize:
1
using System.Diagnostics;
2
using System.Net.Http.Json;
3
using System.Text;
4
using System.Text.Json;
5
using System.Text.Json.Serialization;
6
using DiskQueue;
7
using LDClient.network.data;
8

    
9
namespace LDClient.network {
10
    
11
    /// <summary>
12
    /// This class implements IApiClient which is an interface
13
    /// defining all the functionality required from an API client.
14
    /// </summary>
15
    public sealed class ApiClient : IApiClient {
16
        
17
        /// <summary>
18
        /// Instance of an HTTP client the is used to send data off to the server 
19
        /// </summary>
20
        public IHttpClient _client;
21

    
22
        /// <summary>
23
        /// Flag used to stop the client (periodical retrieval from the cache)
24
        /// </summary>
25
        public bool ClientRunning;
26

    
27
        /// <summary>
28
        /// Number of milliseconds after which the class tries to resend failed payloads to the server.
29
        /// </summary>
30
        private readonly uint _retryPeriod;
31
        
32
        /// <summary>
33
        /// Maximum number of entries (payloads) that can be sent to the server within one period (_retryPeriod).
34
        /// </summary>
35
        private readonly uint _maxEntries;
36
        
37
        /// <summary>
38
        /// Maximum number of failed payloads to be kept in the file-based cache (FIFO - when the maximum number is reached)
39
        /// </summary>
40
        private readonly uint _maxRetries;
41
        private readonly IPersistentQueue _cache;
42

    
43
        /// <summary>
44
        /// Creates an instance of the class.
45
        /// </summary>
46
        /// <param name="url">IP address of the server (url in case a DNS server is being used)</param>
47
        /// <param name="port">port that the API is running on</param>
48
        /// <param name="path">path of the API e.g. /api/v1/lg-logs</param>
49
        /// <param name="retryPeriod">number of milliseconds after which the class tries to resend failed payloads to the server</param>
50
        /// <param name="maxEntries">maximum number of entries (payloads) that can be sent to the server within one period</param>
51
        /// <param name="maxRetries">maximum number of failed payloads to be kept in the file-based cache</param>
52
        /// <param name="cache">instance of a persistent cache for storing failed payloads</param>
53
        public ApiClient(string url, uint port, string path, uint retryPeriod, uint maxEntries, uint maxRetries, IPersistentQueue cache) {
54
            // Construct the entire path to the API.
55
            var uri = $"{url}:{port}{path}";
56
            
57
            // Store the values into class variables.
58
            _retryPeriod = retryPeriod;
59
            _maxEntries = maxEntries;
60
            _maxRetries = maxRetries;
61
            _cache = cache;
62

    
63
            // Create an instance of a HttpClient which takes care of
64
            // establishing a connection to the server;
65
            _client = new HttpClient(uri);
66
        }
67

    
68
        /// <summary>
69
        /// Sends a payload to the server (API).
70
        /// </summary>
71
        /// <param name="payload">instance of a payload to be sent off to the server</param>
72
        public async Task SendPayloadAsync(Payload payload) {
73
            Program.DefaultLogger.Debug("SendPayloadAsync called.");
74
            try {
75
                // Create an instance of Stopwatch (to measure how much
76
                // the action took).
77
                Stopwatch stopWatch = new();
78
                
79
                // Send the payload to the server.
80
                stopWatch.Start();
81
                var response = await _client.PostAsJsonAsync(payload);
82
                stopWatch.Stop();
83
                
84
                // Create a log message.
85
                CreateRequestLog(payload, response, stopWatch.ElapsedMilliseconds);
86
                
87
                // Make sure the request was successful.
88
                response.EnsureSuccessStatusCode();
89
            } catch (Exception e) {
90
                Program.DefaultLogger.Error($"Failed to send {payload} to the server. Due to: {e.Message}");
91
                CachePayload(payload);
92
            }
93
        }
94

    
95
        /// <summary>
96
        /// Creates a request log message.
97
        /// </summary>
98
        /// <param name="payload">payload involved in the process of sending data to the server</param>
99
        /// <param name="response">response from the server</param>
100
        /// <param name="durationMs">duration in milliseconds (how much time it took to send off the payload)</param>
101
        private static void CreateRequestLog(Payload payload, HttpResponseMessage response, long durationMs) {
102
            // Create the log message.
103
            var responseToLog = new {
104
                statusCode = response.StatusCode,
105
                content = response.Content,
106
                headers = response.Headers,
107
                errorMessage = response.RequestMessage,
108
            };
109
            
110
            // Log the message using the logger defined in Program (main class).
111
            Program.DefaultLogger.Info($"Request completed in {durationMs} ms,\n" +
112
                                       $"Request body: {payload},\n" +
113
                                       $"Response: {responseToLog}");
114
        }
115
        
116
        /// <summary>
117
        /// Resends unsuccessful payloads to the server.
118
        /// </summary>
119
        private async Task ResendPayloadsAsync() {
120
            // Calculate the maximum number of payloads to be sent to the server.
121
            var numberOfPayloadsToResend = Math.Min(_maxRetries, _cache.EstimatedCountOfItemsInQueue);
122
            
123
            // Create a list for those payloads
124
            var payloads = new List<Payload>();
125
            
126
            // Retrieve the payloads from the cache.
127
            if (numberOfPayloadsToResend > 0) {
128
                // Open up a session to the cache.
129
                using var session = _cache.OpenSession();
130
                
131
                // Pop out payloads, deserialize them, and store them into the list.
132
                for (var i = 0; i < numberOfPayloadsToResend; i++) {
133
                    var rawBytes = session.Dequeue();
134
                    var payload = JsonSerializer.Deserialize<Payload>(rawBytes);
135
                    if (payload is not null) {
136
                        payloads.Add(payload);
137
                    }
138
                }
139
                // Flush the changes.
140
                session.Flush();
141
            }
142
            
143
            // If there are some payloads to be resent to the server.
144
            if (payloads.Count > 0) {
145
                Program.DefaultLogger.Debug($"ResendPayloadAsync -> {payloads.Count} unsent payloads");
146
                var tasks = new List<Task>();
147
                
148
                // Create a separate task for each payload - resend them to the server.
149
                foreach (var payload in payloads) {
150
                    Program.DefaultLogger.Info($"Resending {payload}.");
151
                    tasks.Add(SendPayloadAsync(payload));
152
                }
153
                // Wait until all tasks are finished. 
154
                await Task.WhenAll(tasks);
155
            }
156
        }
157
        
158
        /// <summary>
159
        /// Stores a failed payload into a persistent cache.
160
        /// </summary>
161
        /// <param name="payload"></param>
162
        private void CachePayload(Payload payload) {
163
            Program.DefaultLogger.Info($"Storing {payload} into the cache.");
164
            
165
            // Number of payloads stored in the cache.
166
            var numberOfCachedPayloads = _cache.EstimatedCountOfItemsInQueue;
167
            
168
            // Open up a session to the cache.
169
            using var session = _cache.OpenSession();
170
            
171
            // If the cache is "full", make room for the latest failed
172
            // payload by discarding the oldest one.
173
            if (numberOfCachedPayloads >= _maxEntries) {
174
                session.Dequeue();
175
            }
176
            
177
            // Store the payload into the cache.
178
            var payloadJson = JsonSerializer.Serialize(payload);
179
            session.Enqueue(Encoding.UTF8.GetBytes(payloadJson));
180
            
181
            // Flush the changes.
182
            session.Flush();
183
        }
184

    
185
        /// <summary>
186
        /// Runs the periodical retrieval of failed payloads stored
187
        /// in a file-based cache. This method is instantiated as a thread.
188
        /// </summary>
189
        public async void Run() {
190
            Program.DefaultLogger.Info("Api Client thread has started");
191
            
192
            // Keep the thread running.
193
            ClientRunning = true;
194
            
195
            // Keep resending failed payloads to the server.
196
            while (ClientRunning) {
197
                await ResendPayloadsAsync();
198
                Thread.Sleep((int) _retryPeriod);
199
            }
200
        }
201
    }
202
}
(1-1/4)