|
@@ -42,8 +42,10 @@ import java.util.Collection;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import com.google.common.collect.Iterators;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -84,6 +86,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -130,7 +133,7 @@ public class TestBlockRecovery {
|
|
|
}
|
|
|
|
|
|
private final long
|
|
|
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
|
|
|
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS = 1000000000L;
|
|
|
|
|
|
/**
|
|
|
* Starts an instance of DataNode
|
|
@@ -143,11 +146,10 @@ public class TestBlockRecovery {
|
|
|
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
|
|
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
|
|
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
|
|
- if (currentTestName.getMethodName().equals(
|
|
|
- "testInitReplicaRecoveryDoesNotHogLock")) {
|
|
|
+ if (currentTestName.getMethodName().contains("DoesNotHoldLock")) {
|
|
|
// This test requires a very long value for the xceiver stop timeout.
|
|
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
|
|
|
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS);
|
|
|
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS);
|
|
|
}
|
|
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
FileSystem.setDefaultUri(conf,
|
|
@@ -759,96 +761,216 @@ public class TestBlockRecovery {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class TestStopWorkerSemaphore {
|
|
|
+ final Semaphore sem;
|
|
|
+
|
|
|
+ final AtomicBoolean gotInterruption = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ TestStopWorkerSemaphore() {
|
|
|
+ this.sem = new Semaphore(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Attempt to acquire a sempahore within a given timeout.
|
|
|
+ *
|
|
|
+ * This is useful for unit tests where we need to ignore InterruptedException
|
|
|
+ * when attempting to take a semaphore, but still want to honor the overall
|
|
|
+ * test timeout.
|
|
|
+ *
|
|
|
+ * @param timeoutMs The timeout in miliseconds.
|
|
|
+ */
|
|
|
+ private void uninterruptiblyAcquire(long timeoutMs) throws Exception {
|
|
|
+ long startTimeMs = Time.monotonicNow();
|
|
|
+ while (true) {
|
|
|
+ long remTime = startTimeMs + timeoutMs - Time.monotonicNow();
|
|
|
+ if (remTime < 0) {
|
|
|
+ throw new RuntimeException("Failed to acquire the semaphore within " +
|
|
|
+ timeoutMs + " milliseconds.");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (sem.tryAcquire(1, remTime, TimeUnit.MILLISECONDS)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ gotInterruption.set(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private interface TestStopWorkerRunnable {
|
|
|
+ /**
|
|
|
+ * Return the name of the operation that this runnable performs.
|
|
|
+ */
|
|
|
+ String opName();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Perform the operation.
|
|
|
+ */
|
|
|
+ void run(RecoveringBlock recoveringBlock) throws Exception;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=90000)
|
|
|
+ public void testInitReplicaRecoveryDoesNotHoldLock() throws Exception {
|
|
|
+ testStopWorker(new TestStopWorkerRunnable() {
|
|
|
+ @Override
|
|
|
+ public String opName() {
|
|
|
+ return "initReplicaRecovery";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
|
|
|
+ try {
|
|
|
+ spyDN.initReplicaRecovery(recoveringBlock);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (!e.getMessage().contains("meta does not exist")) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=90000)
|
|
|
+ public void testRecoverAppendDoesNotHoldLock() throws Exception {
|
|
|
+ testStopWorker(new TestStopWorkerRunnable() {
|
|
|
+ @Override
|
|
|
+ public String opName() {
|
|
|
+ return "recoverAppend";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
|
|
|
+ try {
|
|
|
+ ExtendedBlock extBlock = recoveringBlock.getBlock();
|
|
|
+ spyDN.getFSDataset().recoverAppend(extBlock,
|
|
|
+ extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (!e.getMessage().contains(
|
|
|
+ "Corrupted replica ReplicaBeingWritten")) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=90000)
|
|
|
+ public void testRecoverCloseDoesNotHoldLock() throws Exception {
|
|
|
+ testStopWorker(new TestStopWorkerRunnable() {
|
|
|
+ @Override
|
|
|
+ public String opName() {
|
|
|
+ return "recoverClose";
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(RecoveringBlock recoveringBlock) throws Exception {
|
|
|
+ try {
|
|
|
+ ExtendedBlock extBlock = recoveringBlock.getBlock();
|
|
|
+ spyDN.getFSDataset().recoverClose(extBlock,
|
|
|
+ extBlock.getGenerationStamp() + 1, extBlock.getNumBytes());
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (!e.getMessage().contains(
|
|
|
+ "Corrupted replica ReplicaBeingWritten")) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Test that initReplicaRecovery does not hold the lock for an unreasonable
|
|
|
- * amount of time if a writer is taking a long time to stop.
|
|
|
+ * Test that an FsDatasetImpl operation does not hold the lock for an
|
|
|
+ * unreasonable amount of time if a writer is taking a long time to stop.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
- public void testInitReplicaRecoveryDoesNotHogLock() throws Exception {
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Running " + GenericTestUtils.getMethodName());
|
|
|
- }
|
|
|
+ private void testStopWorker(final TestStopWorkerRunnable tswr)
|
|
|
+ throws Exception {
|
|
|
+ LOG.debug("Running " + currentTestName.getMethodName());
|
|
|
// We need a long value for the data xceiver stop timeout.
|
|
|
// Otherwise the timeout will trigger, and we will not have tested that
|
|
|
// thread join was done locklessly.
|
|
|
Assert.assertEquals(
|
|
|
- TEST_LOCK_HOG_DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS,
|
|
|
+ TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS,
|
|
|
dn.getDnConf().getXceiverStopTimeout());
|
|
|
- final Semaphore progressParent = new Semaphore(0);
|
|
|
- final Semaphore terminateSlowWorker = new Semaphore(0);
|
|
|
- final AtomicBoolean failure = new AtomicBoolean(false);
|
|
|
+ final TestStopWorkerSemaphore progressParent =
|
|
|
+ new TestStopWorkerSemaphore();
|
|
|
+ final TestStopWorkerSemaphore terminateSlowWriter =
|
|
|
+ new TestStopWorkerSemaphore();
|
|
|
+ final AtomicReference<String> failure =
|
|
|
+ new AtomicReference<String>(null);
|
|
|
Collection<RecoveringBlock> recoveringBlocks =
|
|
|
initRecoveringBlocks();
|
|
|
final RecoveringBlock recoveringBlock =
|
|
|
Iterators.get(recoveringBlocks.iterator(), 0);
|
|
|
final ExtendedBlock block = recoveringBlock.getBlock();
|
|
|
- Thread slowWorker = new Thread(new Runnable() {
|
|
|
+ Thread slowWriterThread = new Thread(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
// Register this thread as the writer for the recoveringBlock.
|
|
|
- LOG.debug("slowWorker creating rbw");
|
|
|
+ LOG.debug("slowWriter creating rbw");
|
|
|
ReplicaHandler replicaHandler =
|
|
|
spyDN.data.createRbw(StorageType.DISK, block, false);
|
|
|
replicaHandler.close();
|
|
|
- LOG.debug("slowWorker created rbw");
|
|
|
+ LOG.debug("slowWriter created rbw");
|
|
|
// Tell the parent thread to start progressing.
|
|
|
- progressParent.release();
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- terminateSlowWorker.acquire();
|
|
|
- break;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // Ignore interrupted exceptions so that the waitingWorker thread
|
|
|
- // will have to wait for us.
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.debug("slowWorker exiting");
|
|
|
+ progressParent.sem.release();
|
|
|
+ terminateSlowWriter.uninterruptiblyAcquire(60000);
|
|
|
+ LOG.debug("slowWriter exiting");
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.error("slowWorker got exception", t);
|
|
|
- failure.set(true);
|
|
|
+ LOG.error("slowWriter got exception", t);
|
|
|
+ failure.compareAndSet(null, "slowWriter got exception " +
|
|
|
+ t.getMessage());
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
// Start the slow worker thread and wait for it to take ownership of the
|
|
|
// ReplicaInPipeline
|
|
|
- slowWorker.start();
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- progressParent.acquire();
|
|
|
- break;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // Ignore interrupted exceptions
|
|
|
- }
|
|
|
- }
|
|
|
+ slowWriterThread.start();
|
|
|
+ progressParent.uninterruptiblyAcquire(60000);
|
|
|
|
|
|
- // Start a worker thread which will wait for the slow worker thread.
|
|
|
- Thread waitingWorker = new Thread(new Runnable() {
|
|
|
+ // Start a worker thread which will attempt to stop the writer.
|
|
|
+ Thread stopWriterThread = new Thread(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
- // Attempt to terminate the other worker thread and take ownership
|
|
|
- // of the ReplicaInPipeline.
|
|
|
- LOG.debug("waitingWorker initiating recovery");
|
|
|
- spyDN.initReplicaRecovery(recoveringBlock);
|
|
|
- LOG.debug("waitingWorker initiated recovery");
|
|
|
+ LOG.debug("initiating " + tswr.opName());
|
|
|
+ tswr.run(recoveringBlock);
|
|
|
+ LOG.debug("finished " + tswr.opName());
|
|
|
} catch (Throwable t) {
|
|
|
- GenericTestUtils.assertExceptionContains("meta does not exist", t);
|
|
|
+ LOG.error("stopWriterThread got unexpected exception for " +
|
|
|
+ tswr.opName(), t);
|
|
|
+ failure.compareAndSet(null, "stopWriterThread got unexpected " +
|
|
|
+ "exception for " + tswr.opName() + ": " + t.getMessage());
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
- waitingWorker.start();
|
|
|
+ stopWriterThread.start();
|
|
|
|
|
|
- // Do an operation that requires the lock. This should not be blocked
|
|
|
- // by the replica recovery in progress.
|
|
|
+ while (!terminateSlowWriter.gotInterruption.get()) {
|
|
|
+ // Wait until stopWriterThread attempts to stop our slow writer by sending
|
|
|
+ // it an InterruptedException.
|
|
|
+ Thread.sleep(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // We know that stopWriterThread is in the process of joining our slow
|
|
|
+ // writer. It must not hold the lock during this operation.
|
|
|
+ // In order to test that it does not, we attempt to do an operation that
|
|
|
+ // requires the lock-- getReplicaString.
|
|
|
spyDN.getFSDataset().getReplicaString(
|
|
|
recoveringBlock.getBlock().getBlockPoolId(),
|
|
|
recoveringBlock.getBlock().getBlockId());
|
|
|
|
|
|
- // Wait for the two worker threads to exit normally.
|
|
|
- terminateSlowWorker.release();
|
|
|
- slowWorker.join();
|
|
|
- waitingWorker.join();
|
|
|
- Assert.assertFalse("The slowWriter thread failed.", failure.get());
|
|
|
+ // Tell the slow writer to exit, and then wait for all threads to join.
|
|
|
+ terminateSlowWriter.sem.release();
|
|
|
+ slowWriterThread.join();
|
|
|
+ stopWriterThread.join();
|
|
|
+
|
|
|
+ // Check that our worker threads exited cleanly. This is not checked by the
|
|
|
+ // unit test framework, so we have to do it manually here.
|
|
|
+ String failureReason = failure.get();
|
|
|
+ if (failureReason != null) {
|
|
|
+ Assert.fail("Thread failure: " + failureReason);
|
|
|
+ }
|
|
|
}
|
|
|
}
|