Projekt

Obecné

Profil

Stáhnout (14.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
package cz.zcu.kiv.backendapi.external;
2

    
3
import com.zaxxer.hikari.HikariDataSource;
4
import lombok.RequiredArgsConstructor;
5
import lombok.extern.slf4j.Slf4j;
6
import org.apache.commons.csv.CSVFormat;
7
import org.apache.commons.csv.CSVParser;
8
import org.apache.commons.csv.CSVRecord;
9
import org.apache.tomcat.util.http.fileupload.FileUtils;
10
import org.springframework.stereotype.Service;
11
import org.springframework.transaction.annotation.Transactional;
12
import org.springframework.web.multipart.MultipartFile;
13

    
14
import javax.persistence.Table;
15
import java.io.*;
16
import java.net.URL;
17
import java.nio.charset.StandardCharsets;
18
import java.nio.file.Files;
19
import java.nio.file.Paths;
20
import java.sql.Connection;
21
import java.sql.PreparedStatement;
22
import java.util.ArrayList;
23
import java.util.List;
24
import java.util.concurrent.Callable;
25
import java.util.concurrent.ExecutorService;
26
import java.util.concurrent.Executors;
27
import java.util.stream.Collectors;
28
import java.util.zip.GZIPInputStream;
29
import java.util.zip.ZipEntry;
30
import java.util.zip.ZipInputStream;
31

    
32
/**
33
 * External catalog item service implementation
34
 */
35
@Service
36
@Transactional
37
@RequiredArgsConstructor
38
@Slf4j
39
public class ExternalCatalogItemItemServiceImpl implements IExternalCatalogItemService {
40
    /**
41
     * Buffer size
42
     */
43
    public static final int BUFFER_SIZE = 1024;
44

    
45
    /**
46
     * Directory where files for external directory will be stored
47
     */
48
    private static final String DIRECTORY_FOR_EXTERNAL_FILES = "sources";
49

    
50
    /**
51
     * URL for Pleiades file
52
     */
53
    private static final String PLEIADES_FILE_URL = "https://atlantides.org/downloads/pleiades/dumps/pleiades-names-latest.csv.gz";
54

    
55
    /**
56
     * Name of Pleiades file
57
     */
58
    private static final String PLEIADES_FILE_NAME = "pleiades-names-latest.csv";
59

    
60
    /**
61
     * URL for Pleiades file – needs to be formatted (more sources)
62
     */
63
    private static final String GEONAMES_FILE_URL = "https://download.geonames.org/export/dump/%s.zip";
64

    
65
    /**
66
     * Name of GeoNames file – needs to be formatted (more sources)
67
     */
68
    private static final String GEONAMES_FILE_NAME = "%s.txt";
69

    
70
    /**
71
     * Name of CIGS file
72
     */
73
    private static final String CIGS_FILE_NAME = "CIGS.csv";
74

    
75
    /**
76
     * Name of ANE file
77
     */
78
    private static final String ANE_FILE = "ANE.csv";
79

    
80
    /**
81
     * Batch size for saving items
82
     */
83
    private static final int BATCH_SIZE = 5000;
84

    
85
    /**
86
     * Regex for one arbitrary character in string
87
     */
88
    private static final String WILDCARD_CHARACTER_REGEX = "\\?";
89

    
90
    /**
91
     * Regex for one arbitrary character in string used in SQL
92
     */
93
    private static final String WILDCARD_CHARACTER_REGEX_SQL = "_";
94

    
95
    /**
96
     * Regex for any number of arbitrary characters in string
97
     */
98
    private static final String WILDCARD_CHARACTERS_REGEX = "\\*";
99

    
100
    /**
101
     * Regex for any number of arbitrary characters in string used in SQL
102
     */
103
    private static final String WILDCARD_CHARACTERS_REGEX_SQL = "%";
104

    
105
    /**
106
     * External catalog repository
107
     */
108
    private final ExternalCatalogItemRepository externalCatalogItemRepository;
109

    
110
    /**
111
     * Hikari data source
112
     */
113
    private final HikariDataSource hikariDataSource;
114

    
115
    /**
116
     * Set to string convertor
117
     */
118
    private final SetToStringConverter setToStringConverter;
119

    
120

    
121
    @Override
122
    public void updateCatalog(MultipartFile file) {
123
        log.info("Updating external catalog");
124

    
125
        try {
126
            Files.createDirectories(Paths.get(DIRECTORY_FOR_EXTERNAL_FILES)); // creates directory if not exists
127
            FileUtils.cleanDirectory(new File(DIRECTORY_FOR_EXTERNAL_FILES)); // cleans the directory
128
        } catch (Exception e) {
129
            e.printStackTrace();
130
        }
131

    
132
        externalCatalogItemRepository.deleteAll(); // clears database – updated list will be stored later
133

    
134
        try {
135
            addPleiadesRecords();
136
        } catch (Exception e) {
137
            e.printStackTrace();
138
        }
139

    
140
        try {
141
            addGeonamesRecords();
142
        } catch (Exception e) {
143
            e.printStackTrace();
144
        }
145

    
146
        try {
147
            if (file.getOriginalFilename() != null) {
148
                File cigsFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), file.getOriginalFilename());
149
                Files.copy(file.getInputStream(), cigsFile.toPath());
150
                addCigsRecords(cigsFile);
151
            }
152
        } catch (Exception e) {
153
            e.printStackTrace();
154
        }
155

    
156
        try {
157
            addAneRecords();
158
        } catch (Exception e) {
159
            e.printStackTrace();
160
        }
161

    
162
        log.info("External catalog updated");
163
    }
164

    
165
    @Override
166
    public List<ExternalCatalogItem> getCatalog(String name, ExternalSource source) {
167
        name = WILDCARD_CHARACTERS_REGEX_SQL +
168
                name.replaceAll(WILDCARD_CHARACTER_REGEX, WILDCARD_CHARACTER_REGEX_SQL).replaceAll(WILDCARD_CHARACTERS_REGEX, WILDCARD_CHARACTERS_REGEX_SQL) +
169
                WILDCARD_CHARACTERS_REGEX_SQL;
170
        return externalCatalogItemRepository.getFilteredExternalCatalog(name, source.name());
171
    }
172

    
173
    /**
174
     * Downloads, extracts and reads records from Pleiades and saves them to database
175
     */
176
    private void addPleiadesRecords() {
177
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
178
        byte[] buffer = new byte[BUFFER_SIZE];
179
        File pleiadesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), PLEIADES_FILE_NAME);
180
        try (InputStream fileIn = new URL(PLEIADES_FILE_URL).openStream();
181
             GZIPInputStream gZIPInputStream = new GZIPInputStream(fileIn);
182
             FileOutputStream fileOutputStream = new FileOutputStream(pleiadesFile)) {
183

    
184
            int bytes_read;
185

    
186
            while ((bytes_read = gZIPInputStream.read(buffer)) > 0) {
187
                fileOutputStream.write(buffer, 0, bytes_read);
188
            }
189

    
190
            log.info("The Pleiades file was decompressed successfully");
191

    
192
        } catch (IOException ex) {
193
            ex.printStackTrace();
194
        }
195

    
196
        try (InputStream csvData = new FileInputStream(pleiadesFile)) {
197
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
198
                    .setHeader()
199
                    .setSkipHeaderRecord(true)
200
                    .build());
201
            for (CSVRecord csvRecord : parser) {
202
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.PLEIADES);
203
                externalCatalogItems.add(e);
204
            }
205

    
206
        } catch (IOException ex) {
207
            ex.printStackTrace();
208
        }
209
        saveAllWithThreads(externalCatalogItems);
210

    
211
        log.info("Records from Pleiades added");
212
    }
213

    
214
    /**
215
     * Downloads, extracts and reads records from GeoNames and saves them to database
216
     */
217
    private void addGeonamesRecords() {
218
        byte[] buffer = new byte[BUFFER_SIZE];
219
        for (String countryCode : ExternalCatalogItem.COUNTRY_CODES.keySet()) {
220
            List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
221
            // Downloads file from URL and extracts it
222
            String url = String.format(GEONAMES_FILE_URL, countryCode);
223
            try (ZipInputStream zis = new ZipInputStream(new URL(url).openStream())) {
224
                ZipEntry zipEntry = zis.getNextEntry();
225
                while (zipEntry != null) {
226
                    FileOutputStream fileOutputStream = new FileOutputStream(new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), zipEntry.getName()));
227
                    int bytes_read;
228
                    while ((bytes_read = zis.read(buffer)) > 0) {
229
                        fileOutputStream.write(buffer, 0, bytes_read);
230
                    }
231
                    zipEntry = zis.getNextEntry();
232
                    fileOutputStream.close();
233
                }
234
            } catch (IOException e) {
235
                e.printStackTrace();
236
            }
237

    
238
            log.info("The GeoNames file with country code " + countryCode + " was decompressed successfully");
239

    
240
            // Reads file and adds catalog items to list
241
            File geoNamesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), String.format(GEONAMES_FILE_NAME, countryCode));
242
            try (BufferedReader reader = new BufferedReader(new FileReader(geoNamesFile))) {
243
                String line;
244
                while ((line = reader.readLine()) != null) {
245
                    ExternalCatalogItem e = new ExternalCatalogItem(List.of(line.split("\t")), ExternalSource.GEONAMES);
246
                    externalCatalogItems.add(e);
247
                }
248
            } catch (IOException e) {
249
                e.printStackTrace();
250
            }
251
            saveAllWithThreads(externalCatalogItems);
252
        }
253

    
254
        log.info("Records from GeoNames added");
255
    }
256

    
257
    /**
258
     * Saves records from ANE to database
259
     */
260
    private void addAneRecords() {
261
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
262
        // Reads file and adds catalog items to list
263
        try (InputStream csvData = getClass().getClassLoader().getResourceAsStream(ANE_FILE)) {
264
            if (csvData == null) {
265
                return;
266
            }
267
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
268
                    .setHeader()
269
                    .setSkipHeaderRecord(true)
270
                    .build());
271
            for (CSVRecord csvRecord : parser) {
272
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.ANE);
273
                externalCatalogItems.add(e);
274
            }
275
        } catch (IOException e) {
276
            e.printStackTrace();
277
        }
278
        saveAllWithThreads(externalCatalogItems);
279

    
280
        log.info("Records from ANE added");
281
    }
282

    
283

    
284
    /**
285
     * Reads records from CIGS and saves them to database
286
     *
287
     * @param cigsFile CIGS file
288
     */
289
    private void addCigsRecords(File cigsFile) {
290
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
291

    
292
        try (InputStream csvData = new FileInputStream(cigsFile)) {
293
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
294
                    .setHeader()
295
                    .setSkipHeaderRecord(true)
296
                    .build());
297
            for (CSVRecord csvRecord : parser) {
298
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.CIGS);
299
                externalCatalogItems.add(e);
300
            }
301
        } catch (IOException e) {
302
            e.printStackTrace();
303
        }
304
        saveAllWithThreads(externalCatalogItems);
305

    
306
        log.info("Records from CIGS added");
307
    }
308

    
309
    /**
310
     * Creates list of lists of external catalog items divided by batch size
311
     *
312
     * @param externalCatalogItems list of external catalog items
313
     * @return divided list of lists of external catalog items
314
     */
315
    private List<List<ExternalCatalogItem>> createSublist(List<ExternalCatalogItem> externalCatalogItems) {
316
        List<List<ExternalCatalogItem>> listOfSubList = new ArrayList<>();
317
        for (int i = 0; i < externalCatalogItems.size(); i += BATCH_SIZE) {
318
            if (i + BATCH_SIZE <= externalCatalogItems.size()) {
319
                listOfSubList.add(externalCatalogItems.subList(i, i + BATCH_SIZE));
320
            } else {
321
                listOfSubList.add(externalCatalogItems.subList(i, externalCatalogItems.size()));
322
            }
323
        }
324
        return listOfSubList;
325
    }
326

    
327
    /**
328
     * Divides list of external catalog items to sublist, creates threads (for saving sublists in batch) and executes them
329
     *
330
     * @param externalCatalogItems list of external catalog items
331
     */
332
    private void saveAllWithThreads(List<ExternalCatalogItem> externalCatalogItems) {
333
        ExecutorService executorService = Executors.newFixedThreadPool(hikariDataSource.getMaximumPoolSize());
334
        List<List<ExternalCatalogItem>> subList = createSublist(externalCatalogItems);
335
        List<Callable<Void>> callables = subList.stream().map(sublist ->
336
                (Callable<Void>) () -> {
337
                    saveAllInBatch(sublist);
338
                    return null;
339
                }).collect(Collectors.toList());
340
        try {
341
            executorService.invokeAll(callables);
342
        } catch (InterruptedException e) {
343
            e.printStackTrace();
344
        }
345
    }
346

    
347
    /**
348
     * Saves external catalog items in batch
349
     *
350
     * @param externalCatalogItems list of external catalog items
351
     */
352
    private void saveAllInBatch(List<ExternalCatalogItem> externalCatalogItems) {
353
        String sqlExternalCatalogItem = String.format("INSERT INTO %s (id, external_source, latitude, longitude, location_precision, max_date, " +
354
                "min_date, time_period_keys, pid, names, feature_code, country, accuracy, geoname_id, pleiades_id, osm_id) " +
355
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?,  ?, ?, ?, ?, ?, ?, ?, ?)", ExternalCatalogItem.class.getAnnotation(Table.class).name());
356
        try (Connection connection = hikariDataSource.getConnection();
357
             PreparedStatement statementExternalCatalogItem = connection.prepareStatement(sqlExternalCatalogItem)) {
358
            int counter = 0;
359
            for (ExternalCatalogItem item : externalCatalogItems) {
360
                statementExternalCatalogItem.clearParameters();
361
                statementExternalCatalogItem.setObject(1, item.getId());
362
                statementExternalCatalogItem.setString(2, item.getExternalSource().name());
363
                statementExternalCatalogItem.setObject(3, item.getLatitude());
364
                statementExternalCatalogItem.setObject(4, item.getLongitude());
365
                statementExternalCatalogItem.setString(5, item.getLocationPrecision());
366
                statementExternalCatalogItem.setObject(6, item.getMaxDate());
367
                statementExternalCatalogItem.setObject(7, item.getMinDate());
368
                statementExternalCatalogItem.setString(8, item.getTimePeriodKeys());
369
                statementExternalCatalogItem.setString(9, item.getPid());
370
                statementExternalCatalogItem.setString(10, setToStringConverter.convertToDatabaseColumn(item.getNames()));
371
                statementExternalCatalogItem.setString(11, item.getFeatureCode());
372
                statementExternalCatalogItem.setString(12, item.getCountry());
373
                statementExternalCatalogItem.setObject(13, item.getAccuracy());
374
                statementExternalCatalogItem.setObject(14, item.getGeonameId());
375
                statementExternalCatalogItem.setObject(15, item.getPleiadesId());
376
                statementExternalCatalogItem.setObject(16, item.getOsmId());
377
                statementExternalCatalogItem.addBatch();
378

    
379
                counter++;
380
                if (counter % BATCH_SIZE == 0 || counter == externalCatalogItems.size()) {
381
                    statementExternalCatalogItem.executeBatch();
382
                    statementExternalCatalogItem.clearBatch();
383
                }
384
            }
385
        } catch (Exception e) {
386
            e.printStackTrace();
387
        }
388
    }
389
}
(3-3/7)