|
@@ -17,9 +17,11 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.mockito.Mockito.argThat;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
|
|
@@ -53,13 +55,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
-import org.mockito.Mockito;
|
|
|
+import org.mockito.ArgumentMatcher;
|
|
|
import org.slf4j.event.Level;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.runner.RunWith;
|
|
@@ -285,12 +289,12 @@ public class TestEditLogRace {
|
|
|
|
|
|
File editFile = new File(sd.getCurrentDir(), logFileName);
|
|
|
|
|
|
- System.out.println("Verifying file: " + editFile);
|
|
|
+ LOG.info("Verifying file: " + editFile);
|
|
|
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
|
|
|
long numEditsThisLog = loader.loadFSEdits(
|
|
|
new EditLogFileInputStream(editFile), startTxId);
|
|
|
|
|
|
- System.out.println("Number of edits: " + numEditsThisLog);
|
|
|
+ LOG.info("Number of edits: " + numEditsThisLog);
|
|
|
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
|
|
|
numEdits = numEditsThisLog;
|
|
|
}
|
|
@@ -575,9 +579,29 @@ public class TestEditLogRace {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) {
|
|
|
+ return ((SetOwnerOp)cache.get(OP_SET_OWNER))
|
|
|
+ .setSource("/").setUser("u").setGroup(group);
|
|
|
+ }
|
|
|
+
|
|
|
+ static class BlockingOpMatcher extends ArgumentMatcher<FSEditLogOp> {
|
|
|
+ @Override
|
|
|
+ public boolean matches(Object o) {
|
|
|
+ if(o instanceof FSEditLogOp.SetOwnerOp) {
|
|
|
+ FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o;
|
|
|
+ if("b".equals(op.groupname)) {
|
|
|
+ LOG.info("Blocking op: " + op);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout=180000)
|
|
|
public void testDeadlock() throws Throwable {
|
|
|
- GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
|
|
|
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG);
|
|
|
+ GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG);
|
|
|
|
|
|
Configuration conf = getConf();
|
|
|
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
|
|
@@ -590,21 +614,17 @@ public class TestEditLogRace {
|
|
|
|
|
|
ExecutorService executor = Executors.newCachedThreadPool();
|
|
|
try {
|
|
|
- final FSEditLog editLog = namesystem.getEditLog();
|
|
|
+ final FSEditLog editLog = spy(namesystem.getEditLog());
|
|
|
+ DFSTestUtil.setEditLogForTesting(namesystem, editLog);
|
|
|
|
|
|
- FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
|
|
|
- final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
|
|
|
- .setSource("/").setUser("u").setGroup("g");
|
|
|
- // don't reset fields so instance can be reused.
|
|
|
- final FSEditLogOp reuseOp = Mockito.spy(op);
|
|
|
- Mockito.doNothing().when(reuseOp).reset();
|
|
|
+ final OpInstanceCache cache = editLog.cache.get();
|
|
|
|
|
|
// only job is spam edits. it will fill the queue when the test
|
|
|
// loop injects the blockingOp.
|
|
|
- Future[] logSpammers = new Future[16];
|
|
|
+ Future<?>[] logSpammers = new Future<?>[16];
|
|
|
for (int i=0; i < logSpammers.length; i++) {
|
|
|
final int ii = i;
|
|
|
- logSpammers[i] = executor.submit(new Callable() {
|
|
|
+ logSpammers[i] = executor.submit(new Callable<Void>() {
|
|
|
@Override
|
|
|
public Void call() throws Exception {
|
|
|
Thread.currentThread().setName("Log spammer " + ii);
|
|
@@ -612,7 +632,7 @@ public class TestEditLogRace {
|
|
|
startSpamLatch.await();
|
|
|
for (int i = 0; !done.get() && i < 1000000; i++) {
|
|
|
// do not logSync here because we need to congest the queue.
|
|
|
- editLog.logEdit(reuseOp);
|
|
|
+ editLog.logEdit(getSetOwnerOp(cache, "g"));
|
|
|
if (i % 2048 == 0) {
|
|
|
LOG.info("thread[" + ii +"] edits=" + i);
|
|
|
}
|
|
@@ -623,10 +643,9 @@ public class TestEditLogRace {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // the tx id is set while the edit log monitor is held, so this will
|
|
|
- // effectively stall the async processing thread which will cause the
|
|
|
- // edit queue to fill up.
|
|
|
- final FSEditLogOp blockingOp = Mockito.spy(op);
|
|
|
+ // doEditTransaction is set while the edit log monitor is held, so this
|
|
|
+ // will effectively stall the async processing thread which will cause
|
|
|
+ // the edit queue to fill up.
|
|
|
doAnswer(
|
|
|
new Answer<Void>() {
|
|
|
@Override
|
|
@@ -640,9 +659,7 @@ public class TestEditLogRace {
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
- ).when(blockingOp).setTransactionId(Mockito.anyLong());
|
|
|
- // don't reset fields so instance can be reused.
|
|
|
- Mockito.doNothing().when(blockingOp).reset();
|
|
|
+ ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher()));
|
|
|
|
|
|
// repeatedly overflow the queue and verify it doesn't deadlock.
|
|
|
for (int i = 0; i < 8; i++) {
|
|
@@ -650,10 +667,11 @@ public class TestEditLogRace {
|
|
|
// spammers to overflow the edit queue, then waits for a permit
|
|
|
// from blockerSemaphore that will be released at the bottom of
|
|
|
// this loop.
|
|
|
- Future blockingEdit = executor.submit(new Callable() {
|
|
|
+ Future<Void> blockingEdit = executor.submit(new Callable<Void>() {
|
|
|
@Override
|
|
|
public Void call() throws Exception {
|
|
|
Thread.currentThread().setName("Log blocker");
|
|
|
+ final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b");
|
|
|
editLog.logEdit(blockingOp);
|
|
|
editLog.logSync();
|
|
|
return null;
|
|
@@ -684,7 +702,7 @@ public class TestEditLogRace {
|
|
|
// what log rolling does), unblock the op currently holding the
|
|
|
// monitor, and ensure deadlock does not occur.
|
|
|
CountDownLatch readyLatch = new CountDownLatch(1);
|
|
|
- Future synchedEdits = executor.submit(new Callable() {
|
|
|
+ Future<Void> synchedEdits = executor.submit(new Callable<Void>() {
|
|
|
@Override
|
|
|
public Void call() throws Exception {
|
|
|
Thread.currentThread().setName("Log synchronizer");
|
|
@@ -692,7 +710,7 @@ public class TestEditLogRace {
|
|
|
// log rolling to deadlock when queue is full.
|
|
|
readyLatch.countDown();
|
|
|
synchronized (editLog) {
|
|
|
- editLog.logEdit(reuseOp);
|
|
|
+ editLog.logEdit(getSetOwnerOp(cache, "g"));
|
|
|
editLog.logSync();
|
|
|
}
|
|
|
return null;
|