Projekt

Obecné

Profil

Stáhnout (14.8 KB) Statistiky
| Větev: | Tag: | Revize:
1
package cz.zcu.kiv.backendapi.external;
2

    
3
import com.zaxxer.hikari.HikariDataSource;
4
import lombok.RequiredArgsConstructor;
5
import lombok.extern.slf4j.Slf4j;
6
import org.apache.commons.csv.CSVFormat;
7
import org.apache.commons.csv.CSVParser;
8
import org.apache.commons.csv.CSVRecord;
9
import org.apache.tomcat.util.http.fileupload.FileUtils;
10
import org.springframework.stereotype.Service;
11
import org.springframework.transaction.annotation.Transactional;
12
import org.springframework.web.multipart.MultipartFile;
13

    
14
import javax.persistence.Table;
15
import java.io.*;
16
import java.net.URL;
17
import java.nio.charset.StandardCharsets;
18
import java.nio.file.Files;
19
import java.nio.file.Paths;
20
import java.sql.Connection;
21
import java.sql.PreparedStatement;
22
import java.util.ArrayList;
23
import java.util.List;
24
import java.util.concurrent.Callable;
25
import java.util.concurrent.ExecutorService;
26
import java.util.concurrent.Executors;
27
import java.util.stream.Collectors;
28
import java.util.zip.GZIPInputStream;
29
import java.util.zip.ZipEntry;
30
import java.util.zip.ZipInputStream;
31

    
32
/**
33
 * External catalog item service implementation
34
 */
35
@Service
36
@Transactional
37
@RequiredArgsConstructor
38
@Slf4j
39
public class ExternalCatalogItemItemServiceImpl implements IExternalCatalogItemService {
40
    /**
41
     * Buffer size
42
     */
43
    public static final int BUFFER_SIZE = 1024;
44

    
45
    /**
46
     * Directory where files for external directory will be stored
47
     */
48
    private static final String DIRECTORY_FOR_EXTERNAL_FILES = "sources";
49

    
50
    /**
51
     * URL for Pleiades file
52
     */
53
    private static final String PLEIADES_FILE_URL = "https://atlantides.org/downloads/pleiades/dumps/pleiades-names-latest.csv.gz";
54

    
55
    /**
56
     * Name of Pleiades file
57
     */
58
    private static final String PLEIADES_FILE_NAME = "pleiades-names-latest.csv";
59

    
60
    /**
61
     * URL for Pleiades file – needs to be formatted (more sources)
62
     */
63
    private static final String GEONAMES_FILE_URL = "https://download.geonames.org/export/dump/%s.zip";
64

    
65
    /**
66
     * Name of GeoNames file – needs to be formatted (more sources)
67
     */
68
    private static final String GEONAMES_FILE_NAME = "%s.txt";
69

    
70
    /**
71
     * Name of ANE file
72
     */
73
    private static final String ANE_FILE = "ANE.csv";
74

    
75
    /**
76
     * Batch size for saving items
77
     */
78
    private static final int BATCH_SIZE = 5000;
79

    
80
    /**
81
     * Regex for one arbitrary character in string
82
     */
83
    private static final String WILDCARD_CHARACTER_REGEX = "\\?";
84

    
85
    /**
86
     * Regex for one arbitrary character in string used in SQL
87
     */
88
    private static final String WILDCARD_CHARACTER_REGEX_SQL = "_";
89

    
90
    /**
91
     * Regex for any number of arbitrary characters in string
92
     */
93
    private static final String WILDCARD_CHARACTERS_REGEX = "\\*";
94

    
95
    /**
96
     * Regex for any number of arbitrary characters in string used in SQL
97
     */
98
    private static final String WILDCARD_CHARACTERS_REGEX_SQL = "%";
99

    
100
    /**
101
     * External catalog repository
102
     */
103
    private final ExternalCatalogItemRepository externalCatalogItemRepository;
104

    
105
    /**
106
     * Hikari data source
107
     */
108
    private final HikariDataSource hikariDataSource;
109

    
110
    /**
111
     * Set to string convertor
112
     */
113
    private final SetToStringConverter setToStringConverter;
114

    
115

    
116
    @Override
117
    public void updateCatalog(MultipartFile file) {
118
        log.info("Updating external catalog");
119

    
120
        try {
121
            Files.createDirectories(Paths.get(DIRECTORY_FOR_EXTERNAL_FILES)); // creates directory if not exists
122
            FileUtils.cleanDirectory(new File(DIRECTORY_FOR_EXTERNAL_FILES)); // cleans the directory
123
        } catch (Exception e) {
124
            e.printStackTrace();
125
        }
126

    
127
        externalCatalogItemRepository.deleteAll(); // clears database – updated list will be stored later
128

    
129
        try {
130
            addPleiadesRecords();
131
        } catch (Exception e) {
132
            e.printStackTrace();
133
        }
134

    
135
        try {
136
            addGeonamesRecords();
137
        } catch (Exception e) {
138
            e.printStackTrace();
139
        }
140

    
141
        try {
142
            if (file.getOriginalFilename() != null) {
143
                File cigsFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), file.getOriginalFilename());
144
                Files.copy(file.getInputStream(), cigsFile.toPath());
145
                addCigsRecords(cigsFile);
146
            }
147
        } catch (Exception e) {
148
            e.printStackTrace();
149
        }
150

    
151
        try {
152
            addAneRecords();
153
        } catch (Exception e) {
154
            e.printStackTrace();
155
        }
156

    
157
        log.info("External catalog updated");
158
    }
159

    
160
    @Override
161
    public List<ExternalCatalogItem> getCatalog(String name, ExternalSource source) {
162
        name = WILDCARD_CHARACTERS_REGEX_SQL +
163
                name.replaceAll(WILDCARD_CHARACTER_REGEX, WILDCARD_CHARACTER_REGEX_SQL).replaceAll(WILDCARD_CHARACTERS_REGEX, WILDCARD_CHARACTERS_REGEX_SQL) +
164
                WILDCARD_CHARACTERS_REGEX_SQL;
165
        return externalCatalogItemRepository.getFilteredExternalCatalog(name, source.name());
166
    }
167

    
168
    /**
169
     * Downloads, extracts and reads records from Pleiades and saves them to database
170
     */
171
    private void addPleiadesRecords() {
172
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
173
        byte[] buffer = new byte[BUFFER_SIZE];
174
        File pleiadesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), PLEIADES_FILE_NAME);
175
        try (InputStream fileIn = new URL(PLEIADES_FILE_URL).openStream();
176
             GZIPInputStream gZIPInputStream = new GZIPInputStream(fileIn);
177
             FileOutputStream fileOutputStream = new FileOutputStream(pleiadesFile)) {
178

    
179
            int bytes_read;
180

    
181
            while ((bytes_read = gZIPInputStream.read(buffer)) > 0) {
182
                fileOutputStream.write(buffer, 0, bytes_read);
183
            }
184

    
185
            log.info("The Pleiades file was decompressed successfully");
186

    
187
        } catch (IOException ex) {
188
            ex.printStackTrace();
189
        }
190

    
191
        try (InputStream csvData = new FileInputStream(pleiadesFile)) {
192
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
193
                    .setHeader()
194
                    .setSkipHeaderRecord(true)
195
                    .build());
196
            for (CSVRecord csvRecord : parser) {
197
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.PLEIADES);
198
                externalCatalogItems.add(e);
199
            }
200

    
201
        } catch (IOException ex) {
202
            ex.printStackTrace();
203
        }
204
        saveAllWithThreads(externalCatalogItems);
205

    
206
        log.info("Records from Pleiades added");
207
    }
208

    
209
    /**
210
     * Downloads, extracts and reads records from GeoNames and saves them to database
211
     */
212
    private void addGeonamesRecords() {
213
        byte[] buffer = new byte[BUFFER_SIZE];
214
        for (String countryCode : ExternalCatalogItem.COUNTRY_CODES.keySet()) {
215
            List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
216
            // Downloads file from URL and extracts it
217
            String url = String.format(GEONAMES_FILE_URL, countryCode);
218
            try (ZipInputStream zis = new ZipInputStream(new URL(url).openStream())) {
219
                ZipEntry zipEntry = zis.getNextEntry();
220
                while (zipEntry != null) {
221
                    FileOutputStream fileOutputStream = new FileOutputStream(new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), zipEntry.getName()));
222
                    int bytes_read;
223
                    while ((bytes_read = zis.read(buffer)) > 0) {
224
                        fileOutputStream.write(buffer, 0, bytes_read);
225
                    }
226
                    zipEntry = zis.getNextEntry();
227
                    fileOutputStream.close();
228
                }
229
            } catch (IOException e) {
230
                e.printStackTrace();
231
            }
232

    
233
            log.info("The GeoNames file with country code " + countryCode + " was decompressed successfully");
234

    
235
            // Reads file and adds catalog items to list
236
            File geoNamesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), String.format(GEONAMES_FILE_NAME, countryCode));
237
            try (BufferedReader reader = new BufferedReader(new FileReader(geoNamesFile))) {
238
                String line;
239
                while ((line = reader.readLine()) != null) {
240
                    ExternalCatalogItem e = new ExternalCatalogItem(List.of(line.split("\t")), ExternalSource.GEONAMES);
241
                    externalCatalogItems.add(e);
242
                }
243
            } catch (IOException e) {
244
                e.printStackTrace();
245
            }
246
            saveAllWithThreads(externalCatalogItems);
247
        }
248

    
249
        log.info("Records from GeoNames added");
250
    }
251

    
252
    /**
253
     * Saves records from ANE to database
254
     */
255
    private void addAneRecords() {
256
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
257
        // Reads file and adds catalog items to list
258
        try (InputStream csvData = getClass().getClassLoader().getResourceAsStream(ANE_FILE)) {
259
            if (csvData == null) {
260
                return;
261
            }
262
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
263
                    .setHeader()
264
                    .setSkipHeaderRecord(true)
265
                    .build());
266
            for (CSVRecord csvRecord : parser) {
267
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.ANE);
268
                externalCatalogItems.add(e);
269
            }
270
        } catch (IOException e) {
271
            e.printStackTrace();
272
        }
273
        saveAllWithThreads(externalCatalogItems);
274

    
275
        log.info("Records from ANE added");
276
    }
277

    
278

    
279
    /**
280
     * Reads records from CIGS and saves them to database
281
     *
282
     * @param cigsFile CIGS file
283
     */
284
    private void addCigsRecords(File cigsFile) {
285
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
286

    
287
        try (InputStream csvData = new FileInputStream(cigsFile)) {
288
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
289
                    .setHeader()
290
                    .setSkipHeaderRecord(true)
291
                    .build());
292
            for (CSVRecord csvRecord : parser) {
293
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.CIGS);
294
                externalCatalogItems.add(e);
295
            }
296
        } catch (IOException e) {
297
            e.printStackTrace();
298
        }
299
        saveAllWithThreads(externalCatalogItems);
300

    
301
        log.info("Records from CIGS added");
302
    }
303

    
304
    /**
305
     * Creates list of lists of external catalog items divided by batch size
306
     *
307
     * @param externalCatalogItems list of external catalog items
308
     * @return divided list of lists of external catalog items
309
     */
310
    private List<List<ExternalCatalogItem>> createSublist(List<ExternalCatalogItem> externalCatalogItems) {
311
        List<List<ExternalCatalogItem>> listOfSubList = new ArrayList<>();
312
        for (int i = 0; i < externalCatalogItems.size(); i += BATCH_SIZE) {
313
            if (i + BATCH_SIZE <= externalCatalogItems.size()) {
314
                listOfSubList.add(externalCatalogItems.subList(i, i + BATCH_SIZE));
315
            } else {
316
                listOfSubList.add(externalCatalogItems.subList(i, externalCatalogItems.size()));
317
            }
318
        }
319
        return listOfSubList;
320
    }
321

    
322
    /**
323
     * Divides list of external catalog items to sublist, creates threads (for saving sublists in batch) and executes them
324
     *
325
     * @param externalCatalogItems list of external catalog items
326
     */
327
    private void saveAllWithThreads(List<ExternalCatalogItem> externalCatalogItems) {
328
        ExecutorService executorService = Executors.newFixedThreadPool(hikariDataSource.getMaximumPoolSize());
329
        List<List<ExternalCatalogItem>> subList = createSublist(externalCatalogItems);
330
        List<Callable<Void>> callables = subList.stream().map(sublist ->
331
                (Callable<Void>) () -> {
332
                    saveAllInBatch(sublist);
333
                    return null;
334
                }).collect(Collectors.toList());
335
        try {
336
            executorService.invokeAll(callables);
337
        } catch (InterruptedException e) {
338
            e.printStackTrace();
339
        }
340
    }
341

    
342
    /**
343
     * Saves external catalog items in batch
344
     *
345
     * @param externalCatalogItems list of external catalog items
346
     */
347
    private void saveAllInBatch(List<ExternalCatalogItem> externalCatalogItems) {
348
        String sqlExternalCatalogItem = String.format("INSERT INTO %s (id, external_source, latitude, longitude, location_precision, max_date, " +
349
                "min_date, time_period_keys, pid, names, feature_code, country, accuracy, geoname_id, pleiades_id, osm_id) " +
350
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?,  ?, ?, ?, ?, ?, ?, ?, ?)", ExternalCatalogItem.class.getAnnotation(Table.class).name());
351
        try (Connection connection = hikariDataSource.getConnection();
352
             PreparedStatement statementExternalCatalogItem = connection.prepareStatement(sqlExternalCatalogItem)) {
353
            int counter = 0;
354
            for (ExternalCatalogItem item : externalCatalogItems) {
355
                statementExternalCatalogItem.clearParameters();
356
                statementExternalCatalogItem.setObject(1, item.getId());
357
                statementExternalCatalogItem.setString(2, item.getExternalSource().name());
358
                statementExternalCatalogItem.setObject(3, item.getLatitude());
359
                statementExternalCatalogItem.setObject(4, item.getLongitude());
360
                statementExternalCatalogItem.setString(5, item.getLocationPrecision());
361
                statementExternalCatalogItem.setObject(6, item.getMaxDate());
362
                statementExternalCatalogItem.setObject(7, item.getMinDate());
363
                statementExternalCatalogItem.setString(8, item.getTimePeriodKeys());
364
                statementExternalCatalogItem.setString(9, item.getPid());
365
                statementExternalCatalogItem.setString(10, setToStringConverter.convertToDatabaseColumn(item.getNames()));
366
                statementExternalCatalogItem.setString(11, item.getFeatureCode());
367
                statementExternalCatalogItem.setString(12, item.getCountry());
368
                statementExternalCatalogItem.setObject(13, item.getAccuracy());
369
                statementExternalCatalogItem.setObject(14, item.getGeonameId());
370
                statementExternalCatalogItem.setObject(15, item.getPleiadesId());
371
                statementExternalCatalogItem.setObject(16, item.getOsmId());
372
                statementExternalCatalogItem.addBatch();
373

    
374
                counter++;
375
                if (counter % BATCH_SIZE == 0 || counter == externalCatalogItems.size()) {
376
                    statementExternalCatalogItem.executeBatch();
377
                    statementExternalCatalogItem.clearBatch();
378
                }
379
            }
380
        } catch (Exception e) {
381
            e.printStackTrace();
382
        }
383
    }
384
}
(3-3/7)