From 62645489e56d400b03c7e288ac96fdb7d638507e Mon Sep 17 00:00:00 2001 From: Al Niessner Date: Fri, 10 Jan 2025 14:35:12 -0800 Subject: [PATCH] code clean up and show duplicate lidvids --- .../gov/nasa/pds/harvest/cmd/HarvestCmd.java | 2 + .../pds/harvest/crawler/FilesProcessor.java | 27 +- .../nasa/pds/harvest/dao/MetadataWriter.java | 255 ++++++++---------- .../pds/harvest/dao/RegistryDocBatch.java | 28 +- 4 files changed, 155 insertions(+), 157 deletions(-) diff --git a/src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java b/src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java index 837fb6a9..a10bbd50 100644 --- a/src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java +++ b/src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java @@ -14,6 +14,7 @@ import gov.nasa.pds.harvest.crawler.FilesProcessor; import gov.nasa.pds.harvest.crawler.ProductProcessor; import gov.nasa.pds.harvest.crawler.RefsCache; +import gov.nasa.pds.harvest.dao.RegistryDocBatch; import gov.nasa.pds.harvest.dao.RegistryManager; import gov.nasa.pds.harvest.meta.XPathCacheLoader; import gov.nasa.pds.harvest.util.CounterMap; @@ -277,6 +278,7 @@ private void printSummary() { Counter counter = RegistryManager.getInstance().getCounter(); + RegistryDocBatch.showDuplicates(); log.log(LogUtils.LEVEL_SUMMARY, "Summary:"); int processedCount = counter.prodCounters.getTotal(); diff --git a/src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java b/src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java index eb36047f..5d4b58bf 100644 --- a/src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java +++ b/src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java @@ -84,24 +84,17 @@ public boolean test(Path path, BasicFileAttributes attrs) * @param dir Directory with PDS4 labels * @throws Exception Generic exception */ - public void processDirectory(File dir) throws Exception - { - Stream stream = null; - - try - { - stream = Files.find(dir.toPath(), Integer.MAX_VALUE, new FileMatcher(), FileVisitOption.FOLLOW_LINKS); - Iterator it = stream.iterator(); - - while(it.hasNext()) - { - onFile(it.next().toFile()); - } - } - finally - { - CloseUtils.close(stream); + public void processDirectory(File dir) throws Exception { + try (Stream stream = Files.find( + dir.toPath(), + Integer.MAX_VALUE, + new FileMatcher(), FileVisitOption.FOLLOW_LINKS)) { + Iterator it = stream.iterator(); + + while(it.hasNext()) { + onFile(it.next().toFile()); } + } } diff --git a/src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java b/src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java index b0700c22..e7d055be 100644 --- a/src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java +++ b/src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java @@ -17,154 +17,131 @@ import gov.nasa.pds.registry.common.meta.Metadata; -public class MetadataWriter implements Closeable -{ - private final static String WARN_SKIP_PRE = "Skipping registered product "; - private final static String WARN_SKIP_POST = " (LIDVID/LID already exists in registry database)"; - private final static int ES_DOC_BATCH_SIZE = 50; - private final ConnectionFactory conFact; - private Logger log; - - private RegistryDao registryDao; - private DataLoader loader; - private RegistryDocBatch docBatch; - private String jobId; - - private int totalRecords; - private boolean overwriteExisting = false; - - private Counter counter; - - /** - * Constructor - * @param cfg registry configuration - * @throws Exception an exception - */ - public MetadataWriter(ConnectionFactory conFact, RegistryDao dao, Counter counter) throws Exception - { - this.conFact = conFact; - log = LogManager.getLogger(this.getClass()); - loader = new DataLoader(conFact); - docBatch = new RegistryDocBatch(); - jobId = PackageIdGenerator.getInstance().getPackageId(); - - this.registryDao = dao; - this.counter = counter; - } +public class MetadataWriter implements Closeable { + private final static String WARN_SKIP_PRE = "Skipping registered product "; + private final static String WARN_SKIP_POST = " (LIDVID/LID already exists in registry database)"; + private final static int ES_DOC_BATCH_SIZE = 50; + private final ConnectionFactory conFact; + private Logger log; - - public void setOverwriteExisting(boolean b) - { - this.overwriteExisting = b; - } - - - public void write(Metadata meta) throws Exception - { - docBatch.write(this.conFact, meta, jobId); - - if(docBatch.size() % ES_DOC_BATCH_SIZE == 0) - { - writeBatch(); - } + private RegistryDao registryDao; + private DataLoader loader; + private RegistryDocBatch docBatch; + private String jobId; + + private int totalRecords; + private boolean overwriteExisting = false; + + private Counter counter; + + /** + * Constructor + * + * @param cfg registry configuration + * @throws Exception an exception + */ + public MetadataWriter(ConnectionFactory conFact, RegistryDao dao, Counter counter) + throws Exception { + this.conFact = conFact; + log = LogManager.getLogger(this.getClass()); + loader = new DataLoader(conFact); + docBatch = new RegistryDocBatch(); + jobId = PackageIdGenerator.getInstance().getPackageId(); + this.registryDao = dao; + this.counter = counter; + } + + + public void setOverwriteExisting(boolean b) { + this.overwriteExisting = b; + } + + + public void write(Metadata meta) throws Exception { + docBatch.write(this.conFact, meta, jobId); + + if (docBatch.size() % ES_DOC_BATCH_SIZE == 0) { + writeBatch(); } + } - - private void writeBatch() throws Exception - { - if(docBatch.isEmpty()) return; - - Set nonRegisteredIds = null; - if(!overwriteExisting) - { - List batchLidVids = docBatch.getLidVids(); - nonRegisteredIds = registryDao.getNonExistingIds(batchLidVids); - if(nonRegisteredIds == null || nonRegisteredIds.isEmpty()) - { - for(String lidvid: batchLidVids) - { - log.warn(WARN_SKIP_PRE + lidvid + WARN_SKIP_POST); - counter.skippedFileCount++; - } - - docBatch.clear(); - return; - } - } - // Build JSON documents for Elasticsearch - List data = new ArrayList<>(); - - for(RegistryDocBatch.NJsonItem item: docBatch.getItems()) - { - if(nonRegisteredIds == null) - { - addItem(data, item); - } - else - { - if(nonRegisteredIds.contains(item.lidvid)) - { - addItem(data, item); - } - else - { - log.warn(WARN_SKIP_PRE + item.lidvid + WARN_SKIP_POST); - counter.skippedFileCount++; - } - } - } - - // Load batch - Set failedIds = new TreeSet<>(); - totalRecords += loader.loadBatch(data, failedIds); - log.info("Wrote " + totalRecords + " product(s)"); - - // Update failed counter - counter.failedFileCount += failedIds.size(); - - // Update product counters - for(RegistryDocBatch.NJsonItem item: docBatch.getItems()) - { - if((nonRegisteredIds == null && !failedIds.contains(item.lidvid)) || - (nonRegisteredIds != null && nonRegisteredIds.contains(item.lidvid) && !failedIds.contains(item.lidvid))) - { - counter.prodCounters.inc(item.prodClass); - } + private void writeBatch() throws Exception { + if (docBatch.isEmpty()) + return; + + Set nonRegisteredIds = null; + if (!overwriteExisting) { + List batchLidVids = docBatch.getLidVids(); + nonRegisteredIds = registryDao.getNonExistingIds(batchLidVids); + if (nonRegisteredIds == null || nonRegisteredIds.isEmpty()) { + for (String lidvid : batchLidVids) { + log.warn(WARN_SKIP_PRE + lidvid + WARN_SKIP_POST); + counter.skippedFileCount++; } - - // Clear batch docBatch.clear(); + return; + } } - - private void addItem(List data, RegistryDocBatch.NJsonItem item) - { - data.add(item.pkJson); - data.add(item.dataJson); + // Build JSON documents for Elasticsearch + List data = new ArrayList<>(); + + for (RegistryDocBatch.NJsonItem item : docBatch.getItems()) { + if (nonRegisteredIds == null) { + addItem(data, item); + } else { + if (nonRegisteredIds.contains(item.lidvid)) { + addItem(data, item); + } else { + log.warn(WARN_SKIP_PRE + item.lidvid + WARN_SKIP_POST); + counter.skippedFileCount++; + } + } } - - - public void flush() throws Exception - { - writeBatch(); + + // Load batch + Set failedIds = new TreeSet<>(); + totalRecords += loader.loadBatch(data, failedIds); + log.info("Wrote " + totalRecords + " product(s)"); + + // Update failed counter + counter.failedFileCount += failedIds.size(); + + // Update product counters + for (RegistryDocBatch.NJsonItem item : docBatch.getItems()) { + if ((nonRegisteredIds == null && !failedIds.contains(item.lidvid)) + || (nonRegisteredIds != null && nonRegisteredIds.contains(item.lidvid) + && !failedIds.contains(item.lidvid))) { + counter.prodCounters.inc(item.prodClass); + } } - - - @Override - public void close() throws IOException - { - try - { - flush(); - } - catch(IOException ex) - { - throw ex; - } - catch(Exception ex) - { - throw new IOException(ex); - } + + // Clear batch + docBatch.clear(); + } + + + private void addItem(List data, RegistryDocBatch.NJsonItem item) { + RegistryDocBatch.increment(item.lidvid); + data.add(item.pkJson); + data.add(item.dataJson); + } + + + public void flush() throws Exception { + writeBatch(); + } + + + @Override + public void close() throws IOException { + try { + flush(); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(ex); } + } } diff --git a/src/main/java/gov/nasa/pds/harvest/dao/RegistryDocBatch.java b/src/main/java/gov/nasa/pds/harvest/dao/RegistryDocBatch.java index 48b3c30b..d1e80de3 100644 --- a/src/main/java/gov/nasa/pds/harvest/dao/RegistryDocBatch.java +++ b/src/main/java/gov/nasa/pds/harvest/dao/RegistryDocBatch.java @@ -1,8 +1,10 @@ package gov.nasa.pds.harvest.dao; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map.Entry; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import gov.nasa.pds.registry.common.ConnectionFactory; @@ -26,7 +28,8 @@ public static class NJsonItem } final private static HashSet alreadyLearned = new HashSet(); - final private Logger log = LogManager.getLogger(RegistryDocBatch.class); + final private static HashMap history = new HashMap(); + final private static Logger log = LogManager.getLogger(RegistryDocBatch.class); private List items; @@ -109,4 +112,27 @@ public List getLidVids() items.forEach((item) -> { ids.add(item.lidvid); } ); return ids; } + static public void increment(String lidvid) { + Integer count = history.containsKey(lidvid) ? history.get(lidvid) : 0; + history.put(lidvid, ++count); + } + static public void showDuplicates() { + boolean first = true; + for (Entry entry : history.entrySet()) { + if (entry.getValue() > 1) { + if (first) { + log.fatal("The harvested collection has duplicate lidvids. Double check content of these lidvids:"); + first = false; + } + log.fatal(" Found " + entry.getValue() + " of lidvid " + entry.getKey()); + } + } + if (!first) { + int total = 0; + for (Integer count : history.values()) { + total += count; + } + log.fatal(" Total number of duplicate lidvids: " + (total - history.size())); + } + } }