Projekt

Obecné

Profil

Stáhnout (11.9 KB) Statistiky
| Větev: | Tag: | Revize:
1
package cz.zcu.kiv.backendapi.external;
2

    
3
import com.zaxxer.hikari.HikariDataSource;
4
import lombok.RequiredArgsConstructor;
5
import lombok.extern.slf4j.Slf4j;
6
import org.apache.commons.csv.CSVFormat;
7
import org.apache.commons.csv.CSVParser;
8
import org.apache.commons.csv.CSVRecord;
9
import org.apache.tomcat.util.http.fileupload.FileUtils;
10
import org.springframework.stereotype.Service;
11
import org.springframework.transaction.annotation.Transactional;
12

    
13
import javax.persistence.Table;
14
import java.io.*;
15
import java.net.URL;
16
import java.nio.charset.StandardCharsets;
17
import java.nio.file.Files;
18
import java.nio.file.Paths;
19
import java.sql.Connection;
20
import java.sql.PreparedStatement;
21
import java.util.ArrayList;
22
import java.util.List;
23
import java.util.concurrent.Callable;
24
import java.util.concurrent.ExecutorService;
25
import java.util.concurrent.Executors;
26
import java.util.stream.Collectors;
27
import java.util.zip.GZIPInputStream;
28
import java.util.zip.ZipEntry;
29
import java.util.zip.ZipInputStream;
30

    
31
/**
32
 * External catalog service implementation
33
 */
34
@Service
35
@Transactional
36
@RequiredArgsConstructor
37
@Slf4j
38
public class ExternalCatalogServiceImpl implements IExternalCatalogService {
39
    /**
40
     * Buffer size
41
     */
42
    public static final int BUFFER_SIZE = 1024;
43
    /**
44
     * Directory where files for external directory will be stored
45
     */
46
    private static final String DIRECTORY_FOR_EXTERNAL_FILES = "sources";
47
    /**
48
     * URL for Pleiades file
49
     */
50
    private static final String PLEIADES_FILE_URL = "https://atlantides.org/downloads/pleiades/dumps/pleiades-names-latest.csv.gz";
51
    /**
52
     * Name of Pleiades file
53
     */
54
    private static final String PLEIADES_FILE_NAME = "pleiades-names-latest.csv";
55
    /**
56
     * URL for Pleiades file – needs to be formatted (more sources)
57
     */
58
    private static final String GEONAMES_FILE_URL = "https://download.geonames.org/export/dump/%s.zip";
59
    /**
60
     * Name of GeoNames file – needs to be formatted (more sources)
61
     */
62
    private static final String GEONAMES_FILE_NAME = "%s.txt";
63
    /**
64
     * URL for CIGS file
65
     */
66
    private static final String CIGS_FILE_URL = "https://zenodo.org/record/5642899/files/CIGS_v1_4_20211101.csv";
67
    /**
68
     * Name of CIGS file
69
     */
70
    private static final String CIGS_FILE_NAME = "CIGS_v1_4_20211101.csv";
71
    /**
72
     * Batch size for saving items
73
     */
74
    private static final int BATCH_SIZE = 5000;
75

    
76
    /**
77
     * External catalog repository
78
     */
79
    private final ExternalCatalogRepository externalCatalogRepository;
80

    
81
    /**
82
     * Hikari data source
83
     */
84
    private final HikariDataSource hikariDataSource;
85

    
86

    
87
    @Override
88
    public void updateCatalog() {
89
        log.info("Updating external catalog");
90
        try {
91
            Files.createDirectories(Paths.get(DIRECTORY_FOR_EXTERNAL_FILES)); // creates directory if not exists
92
            FileUtils.cleanDirectory(new File(DIRECTORY_FOR_EXTERNAL_FILES)); // cleans the directory
93
            externalCatalogRepository.deleteAll(); // clears database – updated list will be stored later
94
            addPleiadesSource();
95
            addGeonamesSources();
96
            addCigsSources();
97
        } catch (IOException e) {
98
            e.printStackTrace();
99
        }
100
        log.info("External catalog updated");
101
    }
102

    
103
    /**
104
     * Downloads, extracts and reads Pleiades sources and saves them to database
105
     */
106
    private void addPleiadesSource() {
107
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
108
        byte[] buffer = new byte[BUFFER_SIZE];
109
        File pleiadesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), PLEIADES_FILE_NAME);
110
        try (InputStream fileIn = new URL(PLEIADES_FILE_URL).openStream();
111
             GZIPInputStream gZIPInputStream = new GZIPInputStream(fileIn);
112
             FileOutputStream fileOutputStream = new FileOutputStream(pleiadesFile)) {
113

    
114
            int bytes_read;
115

    
116
            while ((bytes_read = gZIPInputStream.read(buffer)) > 0) {
117
                fileOutputStream.write(buffer, 0, bytes_read);
118
            }
119

    
120
            log.info("The Pleiades file was decompressed successfully");
121

    
122
        } catch (IOException ex) {
123
            ex.printStackTrace();
124
        }
125

    
126
        try (InputStream csvData = new FileInputStream(pleiadesFile)) {
127
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
128
                    .setHeader()
129
                    .setSkipHeaderRecord(true)
130
                    .build());
131
            for (CSVRecord csvRecord : parser) {
132
                externalCatalogItems.add(new ExternalCatalogItem(csvRecord.toList(), ExternalSource.PLEIADES));
133
            }
134

    
135
        } catch (IOException ex) {
136
            ex.printStackTrace();
137
        }
138
        saveAllWithThreads(externalCatalogItems);
139
    }
140

    
141
    /**
142
     * Downloads, extracts and reads GeoNames sources and saves them to database
143
     */
144
    private void addGeonamesSources() {
145
        byte[] buffer = new byte[BUFFER_SIZE];
146
        for (String countryCode : ExternalCatalogItem.COUNTRY_CODES.keySet()) {
147
            List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
148
            // Downloads file from URL and extracts it
149
            String url = String.format(GEONAMES_FILE_URL, countryCode);
150
            try (ZipInputStream zis = new ZipInputStream(new URL(url).openStream())) {
151
                ZipEntry zipEntry = zis.getNextEntry();
152
                while (zipEntry != null) {
153
                    FileOutputStream fileOutputStream = new FileOutputStream(new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), zipEntry.getName()));
154
                    int bytes_read;
155
                    while ((bytes_read = zis.read(buffer)) > 0) {
156
                        fileOutputStream.write(buffer, 0, bytes_read);
157
                    }
158
                    zipEntry = zis.getNextEntry();
159
                    fileOutputStream.close();
160
                }
161
            } catch (IOException e) {
162
                e.printStackTrace();
163
            }
164

    
165
            log.info("The Geonames file with country code " + countryCode + " was decompressed successfully");
166

    
167
            // Reads file and adds catalog items to list
168
            File geoNamesFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), String.format(GEONAMES_FILE_NAME, countryCode));
169
            try (BufferedReader reader = new BufferedReader(new FileReader(geoNamesFile))) {
170
                String line;
171
                while ((line = reader.readLine()) != null) {
172
                    externalCatalogItems.add(new ExternalCatalogItem(List.of(line.split("\t")), ExternalSource.GEONAMES));
173
                }
174
            } catch (IOException e) {
175
                e.printStackTrace();
176
            }
177
            saveAllWithThreads(externalCatalogItems);
178
        }
179
    }
180

    
181
    /**
182
     * Downloads and reads CIGS sources and saves them to database
183
     */
184
    private void addCigsSources() {
185
        List<ExternalCatalogItem> externalCatalogItems = new ArrayList<>();
186
        byte[] buffer = new byte[BUFFER_SIZE];
187
        File cigsFile = new File(new File(DIRECTORY_FOR_EXTERNAL_FILES), CIGS_FILE_NAME);
188

    
189
        try (InputStream inputStream = new URL(CIGS_FILE_URL).openStream();
190
             FileOutputStream fileOutputStream = new FileOutputStream(cigsFile)) {
191
            int bytes_read;
192

    
193
            while ((bytes_read = inputStream.read(buffer)) > 0) {
194

    
195
                fileOutputStream.write(buffer, 0, bytes_read);
196
            }
197
        } catch (IOException e) {
198
            e.printStackTrace();
199
        }
200

    
201
        log.info("The CIGS file was downloaded successfully");
202

    
203
        try (InputStream csvData = new FileInputStream(cigsFile)) {
204
            CSVParser parser = CSVParser.parse(csvData, StandardCharsets.UTF_8, CSVFormat.Builder.create(CSVFormat.DEFAULT)
205
                    .setHeader()
206
                    .setSkipHeaderRecord(true)
207
                    .build());
208
            for (CSVRecord csvRecord : parser) {
209
                externalCatalogItems.add(new ExternalCatalogItem(csvRecord.toList(), ExternalSource.CIGS));
210
            }
211
        } catch (IOException e) {
212
            e.printStackTrace();
213
        }
214
        saveAllWithThreads(externalCatalogItems);
215
    }
216

    
217
    /**
218
     * Creates list of lists of external catalog items divided by batch size
219
     *
220
     * @param externalCatalogItems list of external catalog items
221
     * @return divided list of lists of external catalog items
222
     */
223
    private List<List<ExternalCatalogItem>> createSublist(List<ExternalCatalogItem> externalCatalogItems) {
224
        List<List<ExternalCatalogItem>> listOfSubList = new ArrayList<>();
225
        for (int i = 0; i < externalCatalogItems.size(); i += BATCH_SIZE) {
226
            if (i + BATCH_SIZE <= externalCatalogItems.size()) {
227
                listOfSubList.add(externalCatalogItems.subList(i, i + BATCH_SIZE));
228
            } else {
229
                listOfSubList.add(externalCatalogItems.subList(i, externalCatalogItems.size()));
230
            }
231
        }
232
        return listOfSubList;
233
    }
234

    
235
    /**
236
     * Divides list of external catalog items to sublist, creates threads (for saving sublists in batch) and executes them
237
     *
238
     * @param externalCatalogItems list of external catalog items
239
     */
240
    private void saveAllWithThreads(List<ExternalCatalogItem> externalCatalogItems) {
241
        ExecutorService executorService = Executors.newFixedThreadPool(hikariDataSource.getMaximumPoolSize());
242
        List<List<ExternalCatalogItem>> subList = createSublist(externalCatalogItems);
243
        List<Callable<Void>> callables = subList.stream().map(sublist ->
244
                (Callable<Void>) () -> {
245
                    saveAllInBatch(sublist);
246
                    return null;
247
                }).collect(Collectors.toList());
248
        try {
249
            executorService.invokeAll(callables);
250
        } catch (InterruptedException e) {
251
            e.printStackTrace();
252
        }
253
    }
254

    
255
    /**
256
     * Saves external catalog items in batch
257
     *
258
     * @param externalCatalogItems list of external catalog items
259
     */
260
    private void saveAllInBatch(List<ExternalCatalogItem> externalCatalogItems) {
261
        String sql = String.format("INSERT INTO %s (id, external_source, latitude, longitude, location_precision, max_date, " +
262
                "min_date, time_period_keys, pid, names, feature_code, country, accuracy, geoname_id, pleiades_id, osm_id) " +
263
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ExternalCatalogItem.class.getAnnotation(Table.class).name());
264
        try (Connection connection = hikariDataSource.getConnection();
265
             PreparedStatement statement = connection.prepareStatement(sql)) {
266
            int counter = 0;
267
            for (ExternalCatalogItem item : externalCatalogItems) {
268
                statement.clearParameters();
269
                statement.setObject(1, item.getId());
270
                statement.setString(2, item.getExternalSource().name());
271
                statement.setObject(3, item.getLatitude());
272
                statement.setObject(4, item.getLongitude());
273
                statement.setString(5, item.getLocationPrecision());
274
                statement.setObject(6, item.getMaxDate());
275
                statement.setObject(7, item.getMinDate());
276
                statement.setString(8, item.getTimePeriodKeys());
277
                statement.setString(9, item.getPid());
278
                statement.setString(10, String.join(",", item.getNames()));
279
                statement.setString(11, item.getFeatureCode());
280
                statement.setString(12, item.getCountry());
281
                statement.setObject(13, item.getAccuracy());
282
                statement.setObject(14, item.getGeonameId());
283
                statement.setObject(15, item.getPleiadesId());
284
                statement.setObject(16, item.getOsmId());
285
                statement.addBatch();
286
                counter++;
287
                if (counter % BATCH_SIZE == 0 || counter == externalCatalogItems.size()) {
288
                    statement.executeBatch();
289
                    statement.clearBatch();
290
                }
291
            }
292
        } catch (Exception e) {
293
            e.printStackTrace();
294
        }
295
    }
296
}
(4-4/7)