Projekt

Obecné

Profil

Stáhnout (15.2 KB) Statistiky
| Větev: | Tag: | Revize:
1
package cz.zcu.kiv.backendapi.external;
2

    
3
import com.zaxxer.hikari.HikariDataSource;
4
import lombok.RequiredArgsConstructor;
5
import lombok.extern.slf4j.Slf4j;
6
import org.apache.commons.csv.CSVFormat;
7
import org.apache.commons.csv.CSVParser;
8
import org.apache.commons.csv.CSVRecord;
9
import org.apache.tomcat.util.http.fileupload.FileUtils;
10
import org.springframework.stereotype.Service;
11
import org.springframework.transaction.annotation.Transactional;
12
import org.springframework.web.multipart.MultipartFile;
13

    
14
import javax.persistence.Table;
15
import java.io.*;
16
import java.net.URL;
17
import java.nio.charset.StandardCharsets;
18
import java.nio.file.Files;
19
import java.nio.file.Paths;
20
import java.sql.Connection;
21
import java.sql.PreparedStatement;
22
import java.sql.Statement;
23
import java.util.ArrayList;
24
import java.util.List;
25
import java.util.concurrent.Callable;
26
import java.util.concurrent.ExecutorService;
27
import java.util.concurrent.Executors;
28
import java.util.stream.Collectors;
29
import java.util.zip.GZIPInputStream;
30
import java.util.zip.ZipEntry;
31
import java.util.zip.ZipInputStream;
32

    
33
/**
34
 * External catalog item service implementation
35
 */
36
@Service
37
@Transactional
38
@RequiredArgsConstructor
39
@Slf4j
40
public class ExternalCatalogItemItemServiceImpl implements IExternalCatalogItemService {
41
    /**
42
     * Buffer size
43
     */
44
    public static final int BUFFER_SIZE = 1024;
45

    
46
    /**
47
     * Directory where files for external directory will be stored
48
     */
49
    private static final String DIRECTORY_FOR_EXTERNAL_FILES = "sources";
50

    
51
    /**
52
     * URL for Pleiades file
53
     */
54
    private static final String PLEIADES_FILE_URL = "https://atlantides.org/downloads/pleiades/dumps/pleiades-names-latest.csv.gz";
55

    
56
    /**
57
     * Name of Pleiades file
58
     */
59
    private static final String PLEIADES_FILE_NAME = "pleiades-names-latest.csv";
60

    
61
    /**
62
     * URL for Pleiades file – needs to be formatted (more sources)
63
     */
64
    private static final String GEONAMES_FILE_URL = "https://download.geonames.org/export/dump/%s.zip";
65

    
66
    /**
67
     * Name of GeoNames file – needs to be formatted (more sources)
68
     */
69
    private static final String GEONAMES_FILE_NAME = "%s.txt";
70

    
71
    /**
72
     * Name of ANE file
73
     */
74
    private static final String ANE_FILE = "ANE.csv";
75

    
76
    /**
77
     * Batch size for saving items
78
     */
79
    private static final int BATCH_SIZE = 5000;
80

    
81
    /**
82
     * Regex for one arbitrary character in string
83
     */
84
    private static final String WILDCARD_CHARACTER_REGEX = "\\?";
85

    
86
    /**
87
     * Regex for one arbitrary character in string used in SQL
88
     */
89
    private static final String WILDCARD_CHARACTER_REGEX_SQL = "_";
90

    
91
    /**
92
     * Regex for any number of arbitrary characters in string
93
     */
94
    private static final String WILDCARD_CHARACTERS_REGEX = "\\*";
95

    
96
    /**
97
     * Regex for any number of arbitrary characters in string used in SQL
98
     */
99
    private static final String WILDCARD_CHARACTERS_REGEX_SQL = "%";
100

    
101
    /**
102
     * External catalog repository
103
     */
104
    private final ExternalCatalogItemRepository externalCatalogItemRepository;
105

    
106
    /**
107
     * Hikari data source
108
     */
109
    private final HikariDataSource hikariDataSource;
110

    
111
    /**
112
     * Set to string convertor
113
     */
114
    private final SetToStringConverter setToStringConverter;
115

    
116

    
117
    @Override
118
    public void updateCatalog(MultipartFile file) {
119
        log.info("Updating external catalog");
120

    
121
        try {
122
            Files.createDirectories(Paths.get(DIRECTORY_FOR_EXTERNAL_FILES)); // creates directory if not exists
123
            FileUtils.cleanDirectory(new File(DIRECTORY_FOR_EXTERNAL_FILES)); // cleans the directory
124
        } catch (Exception e) {
125
            e.printStackTrace();
126
        }
127

    
128
        // clears database – updated list will be stored later
129
        String sqlDelete = String.format("DELETE FROM %s", ExternalCatalogItem.class.getAnnotation(Table.class).name());
130
        try (Connection connection = hikariDataSource.getConnection();
131
             Statement statementDelete = connection.createStatement()) {
132
            statementDelete.execute(sqlDelete);
133
        } catch (Exception e) {
134
            e.printStackTrace();
135
        }
136

    
137
        try {
138
            addPleiadesRecords();
139
        } catch (Exception e) {
140
            e.printStackTrace();
141
        }
142

    
143
        try {
144
            addGeonamesRecords();
145
        } catch (Exception e) {
146
            e.printStackTrace();
147
        }
148

    
149
        try {
150
            if (file.getOriginalFilename() != null) {
151
                File cigsFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), file.getOriginalFilename());
152
                Files.copy(file.getInputStream(), cigsFile.toPath());
153
                addCigsRecords(cigsFile);
154
            }
155
        } catch (Exception e) {
156
            e.printStackTrace();
157
        }
158

    
159
        try {
160
            addAneRecords();
161
        } catch (Exception e) {
162
            e.printStackTrace();
163
        }
164

    
165
        log.info("External catalog updated");
166
    }
167

    
168
    @Override
169
    public List<ExternalCatalogItem> getCatalog(String name, ExternalSource source) {
170
        name = WILDCARD_CHARACTERS_REGEX_SQL +
171
                name.replaceAll(WILDCARD_CHARACTER_REGEX, WILDCARD_CHARACTER_REGEX_SQL).replaceAll(WILDCARD_CHARACTERS_REGEX, WILDCARD_CHARACTERS_REGEX_SQL) +
172
                WILDCARD_CHARACTERS_REGEX_SQL;
173
        return externalCatalogItemRepository.getFilteredExternalCatalog(name, source.name());
174
    }
175

    
176
    /**
177
     * Downloads, extracts and reads records from Pleiades and saves them to database
178
     */
179
    private void addPleiadesRecords() {
180
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
181
        byte[] buffer = new byte[BUFFER_SIZE];
182
        File pleiadesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), PLEIADES_FILE_NAME);
183
        try (InputStream fileIn = new URL(PLEIADES_FILE_URL).openStream();
184
             GZIPInputStream gZIPInputStream = new GZIPInputStream(fileIn);
185
             FileOutputStream fileOutputStream = new FileOutputStream(pleiadesFile)) {
186

    
187
            int bytes_read;
188

    
189
            while ((bytes_read = gZIPInputStream.read(buffer)) > 0) {
190
                fileOutputStream.write(buffer, 0, bytes_read);
191
            }
192

    
193
            log.info("The Pleiades file was decompressed successfully");
194

    
195
        } catch (IOException ex) {
196
            ex.printStackTrace();
197
        }
198

    
199
        try (InputStream csvData = new FileInputStream(pleiadesFile)) {
200
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
201
                    .setHeader()
202
                    .setSkipHeaderRecord(true)
203
                    .build());
204
            for (CSVRecord csvRecord : parser) {
205
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.PLEIADES);
206
                externalCatalogItems.add(e);
207
            }
208

    
209
        } catch (IOException ex) {
210
            ex.printStackTrace();
211
        }
212
        saveAllWithThreads(externalCatalogItems);
213

    
214
        log.info("Records from Pleiades added");
215
    }
216

    
217
    /**
218
     * Downloads, extracts and reads records from GeoNames and saves them to database
219
     */
220
    private void addGeonamesRecords() {
221
        byte[] buffer = new byte[BUFFER_SIZE];
222
        for (String countryCode : ExternalCatalogItem.COUNTRY_CODES.keySet()) {
223
            List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
224
            // Downloads file from URL and extracts it
225
            String url = String.format(GEONAMES_FILE_URL, countryCode);
226
            try (ZipInputStream zis = new ZipInputStream(new URL(url).openStream())) {
227
                ZipEntry zipEntry = zis.getNextEntry();
228
                while (zipEntry != null) {
229
                    FileOutputStream fileOutputStream = new FileOutputStream(new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), zipEntry.getName()));
230
                    int bytes_read;
231
                    while ((bytes_read = zis.read(buffer)) > 0) {
232
                        fileOutputStream.write(buffer, 0, bytes_read);
233
                    }
234
                    zipEntry = zis.getNextEntry();
235
                    fileOutputStream.close();
236
                }
237
            } catch (IOException e) {
238
                e.printStackTrace();
239
            }
240

    
241
            log.info("The GeoNames file with country code " + countryCode + " was decompressed successfully");
242

    
243
            // Reads file and adds catalog items to list
244
            File geoNamesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), String.format(GEONAMES_FILE_NAME, countryCode));
245
            try (BufferedReader reader = new BufferedReader(new FileReader(geoNamesFile))) {
246
                String line;
247
                while ((line = reader.readLine()) != null) {
248
                    ExternalCatalogItem e = new ExternalCatalogItem(List.of(line.split("\t")), ExternalSource.GEONAMES);
249
                    externalCatalogItems.add(e);
250
                }
251
            } catch (IOException e) {
252
                e.printStackTrace();
253
            }
254
            saveAllWithThreads(externalCatalogItems);
255
        }
256

    
257
        log.info("Records from GeoNames added");
258
    }
259

    
260
    /**
261
     * Saves records from ANE to database
262
     */
263
    private void addAneRecords() {
264
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
265
        // Reads file and adds catalog items to list
266
        try (InputStream csvData = getClass().getClassLoader().getResourceAsStream(ANE_FILE)) {
267
            if (csvData == null) {
268
                return;
269
            }
270
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
271
                    .setHeader()
272
                    .setSkipHeaderRecord(true)
273
                    .build());
274
            for (CSVRecord csvRecord : parser) {
275
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.ANE);
276
                externalCatalogItems.add(e);
277
            }
278
        } catch (IOException e) {
279
            e.printStackTrace();
280
        }
281
        saveAllWithThreads(externalCatalogItems);
282

    
283
        log.info("Records from ANE added");
284
    }
285

    
286

    
287
    /**
288
     * Reads records from CIGS and saves them to database
289
     *
290
     * @param cigsFile CIGS file
291
     */
292
    private void addCigsRecords(File cigsFile) {
293
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
294

    
295
        try (InputStream csvData = new FileInputStream(cigsFile)) {
296
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
297
                    .setHeader()
298
                    .setSkipHeaderRecord(true)
299
                    .build());
300
            for (CSVRecord csvRecord : parser) {
301
                ExternalCatalogItem e = new ExternalCatalogItem(csvRecord.toList(), ExternalSource.CIGS);
302
                externalCatalogItems.add(e);
303
            }
304
        } catch (IOException e) {
305
            e.printStackTrace();
306
        }
307
        saveAllWithThreads(externalCatalogItems);
308

    
309
        log.info("Records from CIGS added");
310
    }
311

    
312
    /**
313
     * Creates list of lists of external catalog items divided by batch size
314
     *
315
     * @param externalCatalogItems list of external catalog items
316
     * @return divided list of lists of external catalog items
317
     */
318
    private List<List<ExternalCatalogItem>> createSublist(List<ExternalCatalogItem> externalCatalogItems) {
319
        List<List<ExternalCatalogItem>> listOfSubList = new ArrayList<>();
320
        for (int i = 0; i < externalCatalogItems.size(); i += BATCH_SIZE) {
321
            if (i + BATCH_SIZE <= externalCatalogItems.size()) {
322
                listOfSubList.add(externalCatalogItems.subList(i, i + BATCH_SIZE));
323
            } else {
324
                listOfSubList.add(externalCatalogItems.subList(i, externalCatalogItems.size()));
325
            }
326
        }
327
        return listOfSubList;
328
    }
329

    
330
    /**
331
     * Divides list of external catalog items to sublist, creates threads (for saving sublists in batch) and executes them
332
     *
333
     * @param externalCatalogItems list of external catalog items
334
     */
335
    private void saveAllWithThreads(List<ExternalCatalogItem> externalCatalogItems) {
336
        ExecutorService executorService = Executors.newFixedThreadPool(hikariDataSource.getMaximumPoolSize());
337
        List<List<ExternalCatalogItem>> subList = createSublist(externalCatalogItems);
338
        List<Callable<Void>> callables = subList.stream().map(sublist ->
339
                (Callable<Void>) () -> {
340
                    saveAllInBatch(sublist);
341
                    return null;
342
                }).collect(Collectors.toList());
343
        try {
344
            executorService.invokeAll(callables);
345
        } catch (InterruptedException e) {
346
            e.printStackTrace();
347
        }
348
    }
349

    
350
    /**
351
     * Saves external catalog items in batch
352
     *
353
     * @param externalCatalogItems list of external catalog items
354
     */
355
    private void saveAllInBatch(List<ExternalCatalogItem> externalCatalogItems) {
356
        String sqlExternalCatalogItem = String.format("INSERT INTO %s (id, external_source, latitude, longitude, location_precision, max_date, " +
357
                "min_date, time_period_keys, pid, names, feature_code, country, accuracy, geoname_id, pleiades_id, osm_id) " +
358
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?,  ?, ?, ?, ?, ?, ?, ?, ?)", ExternalCatalogItem.class.getAnnotation(Table.class).name());
359
        try (Connection connection = hikariDataSource.getConnection();
360
             PreparedStatement statementExternalCatalogItem = connection.prepareStatement(sqlExternalCatalogItem)) {
361
            int counter = 0;
362
            for (ExternalCatalogItem item : externalCatalogItems) {
363
                statementExternalCatalogItem.clearParameters();
364
                statementExternalCatalogItem.setObject(1, item.getId());
365
                statementExternalCatalogItem.setString(2, item.getExternalSource().name());
366
                statementExternalCatalogItem.setObject(3, item.getLatitude());
367
                statementExternalCatalogItem.setObject(4, item.getLongitude());
368
                statementExternalCatalogItem.setString(5, item.getLocationPrecision());
369
                statementExternalCatalogItem.setObject(6, item.getMaxDate());
370
                statementExternalCatalogItem.setObject(7, item.getMinDate());
371
                statementExternalCatalogItem.setString(8, item.getTimePeriodKeys());
372
                statementExternalCatalogItem.setString(9, item.getPid());
373
                statementExternalCatalogItem.setString(10, setToStringConverter.convertToDatabaseColumn(item.getNames()));
374
                statementExternalCatalogItem.setString(11, item.getFeatureCode());
375
                statementExternalCatalogItem.setString(12, item.getCountry());
376
                statementExternalCatalogItem.setObject(13, item.getAccuracy());
377
                statementExternalCatalogItem.setObject(14, item.getGeonameId());
378
                statementExternalCatalogItem.setObject(15, item.getPleiadesId());
379
                statementExternalCatalogItem.setObject(16, item.getOsmId());
380
                statementExternalCatalogItem.addBatch();
381

    
382
                counter++;
383
                if (counter % BATCH_SIZE == 0 || counter == externalCatalogItems.size()) {
384
                    statementExternalCatalogItem.executeBatch();
385
                    statementExternalCatalogItem.clearBatch();
386
                }
387
            }
388
        } catch (Exception e) {
389
            e.printStackTrace();
390
        }
391
    }
392
}
(3-3/7)