|
@@ -24,7 +24,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPOR
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Supplier;
|
|
import com.google.common.base.Supplier;
|
|
import com.google.common.util.concurrent.Uninterruptibles;
|
|
import com.google.common.util.concurrent.Uninterruptibles;
|
|
-import org.apache.commons.lang.mutable.MutableObject;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -42,8 +41,6 @@ import org.junit.Test;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
-import java.util.concurrent.BlockingQueue;
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
@@ -174,13 +171,11 @@ public class TestBlockReportRateLimiting {
|
|
conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L);
|
|
conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L);
|
|
|
|
|
|
final Semaphore gotFbrSem = new Semaphore(0);
|
|
final Semaphore gotFbrSem = new Semaphore(0);
|
|
- final AtomicReference<String> failure = new AtomicReference<String>("");
|
|
|
|
|
|
+ final AtomicReference<String> failure = new AtomicReference<>();
|
|
final AtomicReference<MiniDFSCluster> cluster =
|
|
final AtomicReference<MiniDFSCluster> cluster =
|
|
- new AtomicReference<>(null);
|
|
|
|
- final BlockingQueue<Integer> datanodeToStop =
|
|
|
|
- new ArrayBlockingQueue<Integer>(1);
|
|
|
|
|
|
+ new AtomicReference<>();
|
|
|
|
+ final AtomicReference<String> datanodeToStop = new AtomicReference<>();
|
|
final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
|
|
final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
|
|
- private String uuidToStop = "";
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void incomingBlockReportRpc(DatanodeID nodeID,
|
|
public void incomingBlockReportRpc(DatanodeID nodeID,
|
|
@@ -189,11 +184,9 @@ public class TestBlockReportRateLimiting {
|
|
setFailure(failure, "Got unexpected rate-limiting-" +
|
|
setFailure(failure, "Got unexpected rate-limiting-" +
|
|
"bypassing full block report RPC from " + nodeID);
|
|
"bypassing full block report RPC from " + nodeID);
|
|
}
|
|
}
|
|
- synchronized (this) {
|
|
|
|
- if (uuidToStop.equals(nodeID.getDatanodeUuid())) {
|
|
|
|
- throw new IOException("Injecting failure into block " +
|
|
|
|
- "report RPC for " + nodeID);
|
|
|
|
- }
|
|
|
|
|
|
+ if (nodeID.getXferAddr().equals(datanodeToStop.get())) {
|
|
|
|
+ throw new IOException("Injecting failure into block " +
|
|
|
|
+ "report RPC for " + nodeID);
|
|
}
|
|
}
|
|
gotFbrSem.release();
|
|
gotFbrSem.release();
|
|
}
|
|
}
|
|
@@ -204,43 +197,24 @@ public class TestBlockReportRateLimiting {
|
|
if (leaseId == 0) {
|
|
if (leaseId == 0) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- synchronized (this) {
|
|
|
|
- if (uuidToStop.isEmpty()) {
|
|
|
|
- MiniDFSCluster cl;
|
|
|
|
- do {
|
|
|
|
- cl = cluster.get();
|
|
|
|
- } while (cl == null);
|
|
|
|
- int datanodeIndexToStop = getDatanodeIndex(cl, node);
|
|
|
|
- uuidToStop = node.getDatanodeUuid();
|
|
|
|
- datanodeToStop.add(Integer.valueOf(datanodeIndexToStop));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private int getDatanodeIndex(MiniDFSCluster cl,
|
|
|
|
- DatanodeDescriptor node) {
|
|
|
|
- List<DataNode> datanodes = cl.getDataNodes();
|
|
|
|
- for (int i = 0; i < datanodes.size(); i++) {
|
|
|
|
- DataNode datanode = datanodes.get(i);
|
|
|
|
- if (datanode.getDatanodeUuid().equals(node.getDatanodeUuid())) {
|
|
|
|
- return i;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- throw new RuntimeException("Failed to find UUID " +
|
|
|
|
- node.getDatanodeUuid() + " in the list of datanodes.");
|
|
|
|
|
|
+ datanodeToStop.compareAndSet(null, node.getXferAddr());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
|
|
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
- BlockManagerFaultInjector.instance = injector;
|
|
|
|
- cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build());
|
|
|
|
- cluster.get().waitActive();
|
|
|
|
- int datanodeIndexToStop = datanodeToStop.take();
|
|
|
|
- cluster.get().stopDataNode(datanodeIndexToStop);
|
|
|
|
- gotFbrSem.acquire();
|
|
|
|
- cluster.get().shutdown();
|
|
|
|
- Assert.assertEquals("", failure.get());
|
|
|
|
|
|
+ try {
|
|
|
|
+ BlockManagerFaultInjector.instance = injector;
|
|
|
|
+ cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build());
|
|
|
|
+ cluster.get().waitActive();
|
|
|
|
+ Assert.assertNotNull(cluster.get().stopDataNode(datanodeToStop.get()));
|
|
|
|
+ gotFbrSem.acquire();
|
|
|
|
+ Assert.assertNull(failure.get());
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster.get() != null) {
|
|
|
|
+ cluster.get().shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|