|
@@ -17,88 +17,80 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.fi;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-
|
|
|
import org.apache.hadoop.fi.FiTestUtil.Action;
|
|
|
import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
/**
|
|
|
* Utilities for DataTransferProtocol related tests,
|
|
|
* e.g. TestFiDataTransferProtocol.
|
|
|
*/
|
|
|
public class DataTransferTestUtil {
|
|
|
- private static DataTransferTest thepipelinetest;
|
|
|
+ protected static PipelineTest thepipelinetest;
|
|
|
/** initialize pipeline test */
|
|
|
- public static DataTransferTest initTest() {
|
|
|
+ public static PipelineTest initTest() {
|
|
|
return thepipelinetest = new DataTransferTest();
|
|
|
}
|
|
|
/** get the pipeline test object */
|
|
|
- public static DataTransferTest getPipelineTest() {
|
|
|
+ public static PipelineTest getPipelineTest() {
|
|
|
return thepipelinetest;
|
|
|
}
|
|
|
+ /** get the pipeline test object cast to DataTransferTest */
|
|
|
+ public static DataTransferTest getDataTransferTest() {
|
|
|
+ return (DataTransferTest)getPipelineTest();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* The DataTransferTest class includes a pipeline
|
|
|
* and some actions.
|
|
|
*/
|
|
|
- public static class DataTransferTest {
|
|
|
- private Pipeline thepipeline;
|
|
|
+ public static class DataTransferTest implements PipelineTest {
|
|
|
+ private List<Pipeline> pipelines = new ArrayList<Pipeline>();
|
|
|
+
|
|
|
/** Simulate action for the receiverOpWriteBlock pointcut */
|
|
|
- public final ActionContainer<DataNode> fiReceiverOpWriteBlock
|
|
|
- = new ActionContainer<DataNode>();
|
|
|
+ public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
|
|
|
+ = new ActionContainer<DatanodeID>();
|
|
|
/** Simulate action for the callReceivePacket pointcut */
|
|
|
- public final ActionContainer<DataNode> fiCallReceivePacket
|
|
|
- = new ActionContainer<DataNode>();
|
|
|
+ public final ActionContainer<DatanodeID> fiCallReceivePacket
|
|
|
+ = new ActionContainer<DatanodeID>();
|
|
|
/** Simulate action for the statusRead pointcut */
|
|
|
- public final ActionContainer<DataNode> fiStatusRead
|
|
|
- = new ActionContainer<DataNode>();
|
|
|
+ public final ActionContainer<DatanodeID> fiStatusRead
|
|
|
+ = new ActionContainer<DatanodeID>();
|
|
|
|
|
|
/** Initialize the pipeline. */
|
|
|
public Pipeline initPipeline(LocatedBlock lb) {
|
|
|
- if (thepipeline != null) {
|
|
|
+ final Pipeline pl = new Pipeline(lb);
|
|
|
+ if (pipelines.contains(pl)) {
|
|
|
throw new IllegalStateException("thepipeline != null");
|
|
|
}
|
|
|
- return thepipeline = new Pipeline(lb);
|
|
|
+ pipelines.add(pl);
|
|
|
+ return pl;
|
|
|
}
|
|
|
|
|
|
/** Return the pipeline. */
|
|
|
- public Pipeline getPipeline() {
|
|
|
- if (thepipeline == null) {
|
|
|
+ public Pipeline getPipeline(DatanodeID id) {
|
|
|
+ if (pipelines == null) {
|
|
|
throw new IllegalStateException("thepipeline == null");
|
|
|
}
|
|
|
- return thepipeline;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** A pipeline contains a list of datanodes. */
|
|
|
- public static class Pipeline {
|
|
|
- private final List<String> datanodes = new ArrayList<String>();
|
|
|
-
|
|
|
- private Pipeline(LocatedBlock lb) {
|
|
|
- for(DatanodeInfo d : lb.getLocations()) {
|
|
|
- datanodes.add(d.getName());
|
|
|
+ StringBuilder dnString = new StringBuilder();
|
|
|
+ for (Pipeline pipeline : pipelines) {
|
|
|
+ for (DatanodeInfo dni : pipeline.getDataNodes())
|
|
|
+ dnString.append(dni.getStorageID());
|
|
|
+ if (dnString.toString().contains(id.getStorageID()))
|
|
|
+ return pipeline;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- /** Does the pipeline contains d at the n th position? */
|
|
|
- public boolean contains(int n, DatanodeID d) {
|
|
|
- return d.getName().equals(datanodes.get(n));
|
|
|
- }
|
|
|
-
|
|
|
- /** {@inheritDoc} */
|
|
|
- public String toString() {
|
|
|
- return getClass().getSimpleName() + datanodes;
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/** Action for DataNode */
|
|
|
- public static abstract class DataNodeAction implements Action<DataNode> {
|
|
|
+ public static abstract class DataNodeAction implements Action<DatanodeID> {
|
|
|
/** The name of the test */
|
|
|
final String currentTest;
|
|
|
/** The index of the datanode */
|
|
@@ -108,7 +100,7 @@ public class DataTransferTestUtil {
|
|
|
* @param currentTest The name of the test
|
|
|
* @param index The index of the datanode
|
|
|
*/
|
|
|
- private DataNodeAction(String currentTest, int index) {
|
|
|
+ protected DataNodeAction(String currentTest, int index) {
|
|
|
this.currentTest = currentTest;
|
|
|
this.index = index;
|
|
|
}
|
|
@@ -118,10 +110,11 @@ public class DataTransferTestUtil {
|
|
|
return currentTest + ", index=" + index;
|
|
|
}
|
|
|
|
|
|
- /** {@inheritDoc} */
|
|
|
- String toString(DataNode datanode) {
|
|
|
+ /** {@inheritDoc}
|
|
|
+ * @param datanodeID*/
|
|
|
+ String toString(DatanodeID datanodeID) {
|
|
|
return "FI: " + this + ", datanode="
|
|
|
- + datanode.getDatanodeRegistration().getName();
|
|
|
+ + datanodeID.getName();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -133,10 +126,10 @@ public class DataTransferTestUtil {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void run(DataNode datanode) {
|
|
|
- final Pipeline p = getPipelineTest().getPipeline();
|
|
|
- if (p.contains(index, datanode.getDatanodeRegistration())) {
|
|
|
- final String s = toString(datanode);
|
|
|
+ public void run(DatanodeID id) {
|
|
|
+ final Pipeline p = getPipelineTest().getPipeline(id);
|
|
|
+ if (p.contains(index, id)) {
|
|
|
+ final String s = toString(id);
|
|
|
FiTestUtil.LOG.info(s);
|
|
|
throw new OutOfMemoryError(s);
|
|
|
}
|
|
@@ -151,10 +144,10 @@ public class DataTransferTestUtil {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void run(DataNode datanode) throws DiskOutOfSpaceException {
|
|
|
- final Pipeline p = getPipelineTest().getPipeline();
|
|
|
- if (p.contains(index, datanode.getDatanodeRegistration())) {
|
|
|
- final String s = toString(datanode);
|
|
|
+ public void run(DatanodeID id) throws DiskOutOfSpaceException {
|
|
|
+ final Pipeline p = getPipelineTest().getPipeline(id);
|
|
|
+ if (p.contains(index, id)) {
|
|
|
+ final String s = toString(id);
|
|
|
FiTestUtil.LOG.info(s);
|
|
|
throw new DiskOutOfSpaceException(s);
|
|
|
}
|
|
@@ -179,10 +172,10 @@ public class DataTransferTestUtil {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void run(DataNode datanode) {
|
|
|
- final Pipeline p = getPipelineTest().getPipeline();
|
|
|
- if (p.contains(index, datanode.getDatanodeRegistration())) {
|
|
|
- final String s = toString(datanode) + ", duration=" + duration;
|
|
|
+ public void run(DatanodeID id) {
|
|
|
+ final Pipeline p = getPipelineTest().getPipeline(id);
|
|
|
+ if (p.contains(index, id)) {
|
|
|
+ final String s = toString(id) + ", duration=" + duration;
|
|
|
FiTestUtil.LOG.info(s);
|
|
|
if (duration <= 0) {
|
|
|
for(; true; FiTestUtil.sleep(1000)); //sleep forever
|