From a7ab3cef6984d44135ea8493c37bd105a85d0bf9 Mon Sep 17 00:00:00 2001 From: Hassan Eslami Date: Thu, 15 Jun 2017 17:44:32 -0700 Subject: [PATCH] This diff contains the following fixes: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Once the OOC is at OFFLAODING state, it may offload data to disk indefinitely. We should call GC manually and get OOC out of OFFLOADING if anything has changed in computation/communication pattern. Fixes a "if" statement which entirely disabled regression for memory estimation. * Interleaving of in resetting and calculating memory estimation potentially can cause data race. * The superstep count in memory estimator was not coherent throughout the calculation. * Sometime the memory estimator's accuracy is not good. We should fall back to a threshold-based scheme relying on a more pessimistic memory usage report (such as the one given by JVM – memory usage includes garbage data too, but it can be used as a pessimistic estimate, as it is currently used in ThresholdBasedOracle). More tuning is needed for a smooth memory estimation mechanism. "mvn clean install" passes all tests and checks. --- .../apache/giraph/ooc/OutOfCoreEngine.java | 9 +- .../ooc/policy/MemoryEstimatorOracle.java | 321 ++++++++++++------ 2 files changed, 228 insertions(+), 102 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index 82a55f12d..bcff2a6bb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -426,11 +426,14 @@ public void ioCommandCompleted(IOCommand command) { * threads. * * @param fraction the fraction of processing threads to remain active. This - * number is in range [0, 1] + * number is in range (0, 1] */ public void updateActiveThreadsFraction(double fraction) { checkState(fraction >= 0 && fraction <= 1); - int numActiveThreads = (int) (numProcessingThreads * fraction); + // Making sure the number of active threads is not set to 0 to ensure + // progress in computation + int numActiveThreads = Math.max( + (int) (Math.ceil(numProcessingThreads * fraction)), 1); if (LOG.isInfoEnabled()) { LOG.info("updateActiveThreadsFraction: updating the number of active " + "threads to " + numActiveThreads); @@ -489,7 +492,7 @@ public void processingThreadFinish() { */ public void updateRequestsCreditFraction(double fraction) { checkState(fraction >= 0 && fraction <= 1); - short newCredit = (short) (maxRequestsCredit * fraction); + short newCredit = (short) (Math.ceil(maxRequestsCredit * fraction)); if (LOG.isInfoEnabled()) { LOG.info("updateRequestsCreditFraction: updating the credit to " + newCredit); diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java index 1233183e6..7ff647145 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java @@ -63,6 +63,10 @@ * x4: bytes received due to messages * x5: bytes loaded/stored from/to disk due to OOC. * + * If the estimation error is high, the oracle falls back to a threshold-based + * mechanism based on JVM's pessimistic memory usage report (refer to + * {@link ThresholdBasedOracle} to see how an oracle can be entirely depend on + * such reports) */ public class MemoryEstimatorOracle implements OutOfCoreOracle { /** Memory check interval in msec */ @@ -78,6 +82,14 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f, "The threshold above which GC is called manually if Full GC has not " + "happened in a while"); + /** + * If full GC is not called within this interval, in case of tight memory + * pressure, we should call GC manually + */ + public static final LongConfOption MANUAL_GC_INTERVAL = + new LongConfOption("giraph.memoryEstimator.manualGCInterval", 20000, + "The amount of time passed from the last full GC to call GC " + + "manually if memory is tight (in milliseconds)"); /** Used to detect a high memory pressure situation */ public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION = new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f, @@ -103,11 +115,32 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { public static final FloatConfOption CREDIT_LOW_THRESHOLD = new FloatConfOption("giraph.creditLowThreshold", 0.90f, "If mem-usage is below this threshold, credit is set to max"); - /** OOC starts if mem-usage is above this threshold */ + /** OOC starts being aggressive if mem-usage is above this threshold */ + public static final FloatConfOption OOC_THRESHOLD_STORE = + new FloatConfOption("giraph.oocThresholdStore", 0.95f, + "If mem-usage is above this threshold, out of core threads " + + "starts aggressively writing data to disk"); + /** + * OOC threads start using disk if mem-usage is above this threshold. If + * mem-usage is below this threshold, OOC threads bring critical (unprocessed) + * data to memory + */ public static final FloatConfOption OOC_THRESHOLD = new FloatConfOption("giraph.oocThreshold", 0.90f, "If mem-usage is above this threshold, out of core threads starts " + - "writing data to disk"); + "writing data to disk, but they wont be aggressive"); + /** + * OOC threads start bringing data back to memory if mem-usage is below this + * threshold + */ + public static final FloatConfOption OOC_THRESHOLD_LOAD = + new FloatConfOption("giraph.oocThresholdLoad", 0.85f, + "If mem-usage is below this threshold, out of core threads " + + "starts reading data from disk"); + /** Maximum tolerable error by memory estimator mechanism */ + public static final FloatConfOption MEM_ESTIMATOR_ACCURACY = + new FloatConfOption("giraph.MemEstimatorAccuracy", 0.1f, + "Maximum tolerable error by memory estimation mechanism"); /** Logger */ private static final Logger LOG = @@ -127,6 +160,12 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { private final float creditLowThreshold; /** Cached value for {@link #OOC_THRESHOLD} */ private final float oocThreshold; + /** Cached value for {@link #OOC_THRESHOLD_LOAD} */ + private final float oocThresholdLoad; + /** Cached value for {@link #OOC_THRESHOLD_STORE} */ + private final float oocThresholdStore; + /** Cached value for {@link #MEM_ESTIMATOR_ACCURACY} */ + private final float memEstAcc; /** Reference to running OOC engine */ private final OutOfCoreEngine oocEngine; @@ -134,21 +173,33 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle { private final MemoryEstimator memoryEstimator; /** Keeps track of the number of bytes stored/loaded by OOC */ private final AtomicLong oocBytesInjected = new AtomicLong(0); - /** How many bytes to offload */ - private final AtomicLong numBytesToOffload = new AtomicLong(0); - /** Current state of the OOC */ - private volatile State state = State.STABLE; /** Timestamp of the last major GC */ private volatile long lastMajorGCTime = 0; + /** + * Lock to avoid interleaving of resetting data structures in memory estimator + * with modifying those data structures + */ + private Lock lock = new ReentrantLock(); + /** State of the oracle */ + private volatile OracleState oracleState = OracleState.NEUTRAL; + /** Whether OOC has offloaded any data since the beginning of the superstep */ + private volatile boolean hasOffloaded = false; + /** The last time where memory usage was not identified as 'high' */ + private volatile long lastNonHighPressureTimeMillis = Long.MAX_VALUE; /** - * Different states the OOC can be in. + * Different states the oracle can be in. */ - private enum State { - /** No offloading */ - STABLE, - /** Current offloading */ - OFFLOADING, + private enum OracleState { + /** No offloading yet, no estimation exists */ + NEUTRAL, + /** An estimation exists and the accuracy is good enough */ + MEM_ESTIMATOR, + /** + * There has been data offloaded to disk since the beginning of the + * superstep, but a good estimation does not exist + */ + THRESHOLD_BASED, } /** @@ -170,38 +221,68 @@ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf, this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf); this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf); this.oocThreshold = OOC_THRESHOLD.get(conf); + this.oocThresholdLoad = OOC_THRESHOLD_LOAD.get(conf); + this.oocThresholdStore = OOC_THRESHOLD_STORE.get(conf); + this.memEstAcc = MEM_ESTIMATOR_ACCURACY.get(conf); final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf); + final long manualGCInterval = MANUAL_GC_INTERVAL.get(conf); ThreadUtils.startThread(new Runnable() { @Override public void run() { while (true) { - long oldGenUsageEstimate = memoryEstimator.getUsageEstimate(); MemoryUsage usage = getOldGenUsed(); - if (oldGenUsageEstimate > 0) { - updateRates(oldGenUsageEstimate, usage.getMax()); - } else { - long time = System.currentTimeMillis(); - if (time - lastMajorGCTime >= 10000) { - double used = (double) usage.getUsed() / usage.getMax(); - if (used > manualGCMemoryPressure) { - if (LOG.isInfoEnabled()) { - LOG.info( - "High memory pressure with no full GC from the JVM. " + - "Calling GC manually. Used fraction of old-gen is " + - String.format("%.2f", used) + "."); - } - System.gc(); - time = System.currentTimeMillis() - time; - usage = getOldGenUsed(); - used = (double) usage.getUsed() / usage.getMax(); - if (LOG.isInfoEnabled()) { - LOG.info("Manual GC done. It took " + - String.format("%.2f", time / 1000.0) + - " seconds. Used fraction of old-gen is " + + // If we are following an OOC policy (based on either + // memory-estimation or threshold-based), we update the + // number of active threads and network credits. + if (oracleState == OracleState.MEM_ESTIMATOR) { + updateRates(memoryEstimator.getUsageEstimate(), usage.getMax()); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + updateRates(usage.getUsed(), usage.getMax()); + } + + // We check intervals of major GCs. If a long enough interval has + // passed and either of the following + // satisfies: + // - Memory usage in old-gen is high, + // - We have constantly been at high memory pressure (i.e. + // offloading data if possible), + // we call manual GC. + + long time = System.currentTimeMillis(); + if (time - lastMajorGCTime >= manualGCInterval) { + double used = (double) usage.getUsed() / usage.getMax(); + if ( + // Memory usage is at a critical point and JVM's GC is almost + // about to kick in. + used > manualGCMemoryPressure || + // Or, memory pressure is high (i.e. enough to start offloading + // to disk), and it has been high for a long enough period. + ((double) getUsageEstimate(usage) / usage.getMax() > + oocThreshold && + time - lastNonHighPressureTimeMillis >= 3 * manualGCInterval) || + // Or, oracle is following the ThresholdBased scheme and long + // enough has passed since last major GC. We should call manual + // GC hoping for memory estimation accuracy to be good this + // time. + (oracleState == OracleState.THRESHOLD_BASED && + time - lastMajorGCTime >= 5 * manualGCInterval)) { + if (LOG.isInfoEnabled()) { + LOG.info( + "High memory pressure with no full GC from the JVM. " + + "Calling GC manually. Used fraction of old-gen is " + String.format("%.2f", used) + "."); - } + } + System.gc(); + time = System.currentTimeMillis() - time; + usage = getOldGenUsed(); + used = (double) usage.getUsed() / usage.getMax(); + if (LOG.isInfoEnabled()) { + LOG.info("Manual GC done. It took " + + String.format("%.2f", time / 1000.0) + + " seconds. Used fraction of old-gen is " + + String.format("%.2f", used) + "."); } } } @@ -217,6 +298,27 @@ public void run() { .createUncaughtExceptionHandler()); } + /** + * Finds the best possible estimation of usage at the moment based on the + * oracle's state. If the memory-estimation mechanism is accurate enough, we + * rely on that. If ThresholdBased scheme is used, we solely rely on JVM's + * report. Otherwise, we don't have any good estimation and return -1. + * + * @param oldGenUsage MemoryUsage object obtained from JVM in case we are + * using ThresholdBased scheme + * @return memory usage estimate + */ + private long getUsageEstimate(MemoryUsage oldGenUsage) { + if (oracleState == OracleState.MEM_ESTIMATOR) { + return memoryEstimator.getUsageEstimate(); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + return oldGenUsage.getUsed(); + } else { + return -1; + } + } + + /** * Resets all the counters used in the memory estimation. This is called at * the beginning of a new superstep. @@ -231,8 +333,16 @@ public void run() { public void startIteration() { AbstractEdgeStore.PROGRESS_COUNTER.reset(); oocBytesInjected.set(0); - memoryEstimator.clear(); - memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); + oracleState = OracleState.NEUTRAL; + hasOffloaded = false; + lastNonHighPressureTimeMillis = Long.MAX_VALUE; + lock.lock(); + try { + memoryEstimator.clear(); + memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep()); + } finally { + lock.unlock(); + } oocEngine.updateRequestsCreditFraction(1); oocEngine.updateActiveThreadsFraction(1); } @@ -240,28 +350,45 @@ public void startIteration() { @Override public IOAction[] getNextIOActions() { - if (state == State.OFFLOADING) { - return new IOAction[]{ - IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION}; - } - long oldGenUsage = memoryEstimator.getUsageEstimate(); + long oldGenUsage = -1; MemoryUsage usage = getOldGenUsed(); + if (oracleState == OracleState.MEM_ESTIMATOR) { + oldGenUsage = memoryEstimator.getUsageEstimate(); + } else if (oracleState == OracleState.THRESHOLD_BASED) { + oldGenUsage = usage.getUsed(); + } if (oldGenUsage > 0) { double usageEstimate = (double) oldGenUsage / usage.getMax(); - if (usageEstimate > oocThreshold) { + if (usageEstimate < oocThreshold) { + lastNonHighPressureTimeMillis = System.currentTimeMillis(); + } + if (usageEstimate < oocThresholdLoad) { + return new IOAction[]{IOAction.LOAD_PARTITION}; + } else if (usageEstimate < oocThreshold) { + return new IOAction[]{ + IOAction.LOAD_UNPROCESSED_PARTITION, + IOAction.LOAD_PARTITION}; + } else if (usageEstimate < oocThresholdStore) { return new IOAction[]{ IOAction.STORE_MESSAGES_AND_BUFFERS, - IOAction.STORE_PARTITION}; + IOAction.STORE_PROCESSED_PARTITION}; } else { - return new IOAction[]{IOAction.LOAD_PARTITION}; + return new IOAction[]{ + IOAction.STORE_MESSAGES_AND_BUFFERS, + IOAction.STORE_PARTITION}; } } else { + lastNonHighPressureTimeMillis = System.currentTimeMillis(); return new IOAction[]{IOAction.LOAD_PARTITION}; } } @Override public boolean approve(IOCommand command) { + if (!(command instanceof WaitIOCommand) && + !(command instanceof LoadPartitionIOCommand) && !hasOffloaded) { + hasOffloaded = true; + } return true; } @@ -269,20 +396,8 @@ public boolean approve(IOCommand command) { public void commandCompleted(IOCommand command) { if (command instanceof LoadPartitionIOCommand) { oocBytesInjected.getAndAdd(command.bytesTransferred()); - if (state == State.OFFLOADING) { - numBytesToOffload.getAndAdd(command.bytesTransferred()); - } } else if (!(command instanceof WaitIOCommand)) { oocBytesInjected.getAndAdd(0 - command.bytesTransferred()); - if (state == State.OFFLOADING) { - numBytesToOffload.getAndAdd(0 - command.bytesTransferred()); - } - } - - if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) { - numBytesToOffload.set(0); - state = State.STABLE; - updateRates(-1, 1); } } @@ -323,52 +438,64 @@ public synchronized void gcCompleted( long usedMemoryEstimate = memoryEstimator.getUsageEstimate(); long usedMemoryReal = after.getUsed(); if (usedMemoryEstimate >= 0) { + double error = (double) Math.abs(usedMemoryEstimate - usedMemoryReal) / + usedMemoryReal; if (LOG.isInfoEnabled()) { LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" + - usedMemoryReal + " error=" + - ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) / - usedMemoryReal * 100)); + usedMemoryReal + " error=" + (error * 100)); + } + if (error < memEstAcc) { + oracleState = OracleState.MEM_ESTIMATOR; + } else { + if (oracleState == OracleState.MEM_ESTIMATOR) { + if (hasOffloaded) { + oracleState = OracleState.THRESHOLD_BASED; + } else { + oracleState = OracleState.NEUTRAL; + } + } } } - // Number of edges loaded so far (if in input superstep) - long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : - EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); - // Number of vertices loaded so far (if in input superstep) - long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 : - VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); - // Number of vertices computed (if either in compute or store phase) - long verticesComputed = WorkerProgress.get().getVerticesComputed() + - WorkerProgress.get().getVerticesStored() + - AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); - // Number of bytes received - long receivedBytes = - oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); - // Number of OOC bytes - long oocBytes = oocBytesInjected.get(); - - memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, - verticesLoaded, verticesComputed, receivedBytes, oocBytes); + lock.lock(); + try { + // Number of edges loaded so far (if in input superstep) + long edgesLoaded = memoryEstimator.getCurrentSuperstep() >= 0 ? 0 : + EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count(); + // Number of vertices loaded so far (if in input superstep) + long verticesLoaded = memoryEstimator.getCurrentSuperstep() >= 0 ? 0 : + VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count(); + // Number of vertices computed (if either in compute or store phase) + long verticesComputed = WorkerProgress.get().getVerticesComputed() + + WorkerProgress.get().getVerticesStored() + + AbstractEdgeStore.PROGRESS_COUNTER.getProgress(); + // Number of bytes received + long receivedBytes = + oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep(); + // Number of OOC bytes + long oocBytes = oocBytesInjected.get(); + + memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded, + verticesLoaded, verticesComputed, receivedBytes, oocBytes); + } finally { + lock.unlock(); + } long garbage = before.getUsed() - after.getUsed(); long maxMem = after.getMax(); long memUsed = after.getUsed(); - boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem && - garbage < gcReclaimFraction * maxMem; + boolean highMemoryUsage = + (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem; + boolean isTight = highMemoryUsage && garbage < gcReclaimFraction * maxMem; boolean predictionExist = memoryEstimator.getUsageEstimate() > 0; - if (isTight && !predictionExist) { + if (isTight && !predictionExist && oracleState == OracleState.NEUTRAL) { + oracleState = OracleState.THRESHOLD_BASED; if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" + - memUsed + " maxMem=" + maxMem); + LOG.info("gcCompleted: Tight memory usage, no prediction exists, " + + "fall back to threshold-based until prediction is valid. garbage=" + + garbage + " memUsed=" + memUsed + " maxMem=" + maxMem); } - numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) - - (maxMem - memUsed)); - if (LOG.isInfoEnabled()) { - LOG.info("gcCompleted: tight memory usage. Starting to offload " + - "until " + numBytesToOffload.get() + " bytes are offloaded"); - } - state = State.OFFLOADING; - updateRates(1, 1); + updateRates(memUsed, maxMem); } } } @@ -483,6 +610,10 @@ public void setCurrentSuperstep(long superstep) { this.currentSuperstep = superstep; } + public long getCurrentSuperstep() { + return this.currentSuperstep; + } + /** * Given the current state of computation (i.e. current edges loaded, * vertices computed etc) and the current model (i.e. the regression @@ -787,14 +918,6 @@ private Boolean refineCoefficient(int coefIndex, double lowerBound, */ private static boolean calculateRegression(double[] coefficient, List validColumnIndices, OLSMultipleLinearRegression mlr) { - - if (coefficient.length != validColumnIndices.size()) { - LOG.warn("There are " + coefficient.length + - " coefficients, but " + validColumnIndices.size() + - " valid columns in the regression"); - return false; - } - double[] beta = mlr.estimateRegressionParameters(); Arrays.fill(coefficient, 0); for (int i = 0; i < validColumnIndices.size(); ++i) {