Kaynağa Gözat

HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for DataTransferProtocol.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@802264 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 15 yıl önce
ebeveyn
işleme
409eee88f7

+ 3 - 0
CHANGES.txt

@@ -89,6 +89,9 @@ Trunk (unreleased changes)
     HDFS-530. Refactor TestFileAppend* to remove code duplication.
     (Konstantin Boudnik via szetszwo)
 
+    HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
+    DataTransferProtocol.  (szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

+ 4 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -77,7 +77,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private String clientName;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
-  private DataNode datanode = null;
+  private final DataNode datanode;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
@@ -128,6 +128,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     }
   }
 
+  /** Return the datanode object. */
+  DataNode getDataNode() {return datanode;}
+
   /**
    * close files.
    */

+ 5 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -490,6 +490,11 @@ public class DataNode extends Configured
     return myMetrics;
   }
   
+  /** Return DatanodeRegistration */
+  public DatanodeRegistration getDatanodeRegistration() {
+    return dnRegistration;
+  }
+
   /**
    * Return the namenode's identifier
    */

+ 3 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -84,6 +84,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
     }
   }
 
+  /** Return the datanode object. */
+  DataNode getDataNode() {return datanode;}
+
   /**
    * Read/write data from/to the DataXceiveServer.
    */

+ 195 - 0
src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java

@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+
+/**
+ * Utilities for DataTransferProtocol related tests,
+ * e.g. TestFiDataTransferProtocol.
+ */
+public class DataTransferTestUtil {
+  private static DataTransferTest thepipelinetest;
+  /** initialize pipeline test */
+  public static DataTransferTest initTest() {
+    return thepipelinetest = new DataTransferTest();
+  }
+  /** get the pipeline test object */
+  public static DataTransferTest getPipelineTest() {
+    return thepipelinetest;
+  }
+
+  /**
+   * The DataTransferTest class includes a pipeline
+   * and some actions.
+   */
+  public static class DataTransferTest {
+    private Pipeline thepipeline;
+    /** Simulate action for the receiverOpWriteBlock pointcut */
+    public final ActionContainer<DataNode> fiReceiverOpWriteBlock
+        = new ActionContainer<DataNode>();
+    /** Simulate action for the callReceivePacket pointcut */
+    public final ActionContainer<DataNode> fiCallReceivePacket
+        = new ActionContainer<DataNode>();
+    /** Simulate action for the statusRead pointcut */
+    public final ActionContainer<DataNode> fiStatusRead
+        = new ActionContainer<DataNode>();
+
+    /** Initialize the pipeline. */
+    public Pipeline initPipeline(LocatedBlock lb) {
+      if (thepipeline != null) {
+        throw new IllegalStateException("thepipeline != null");
+      }
+      return thepipeline = new Pipeline(lb);
+    }
+
+    /** Return the pipeline. */
+    public Pipeline getPipeline() {
+      if (thepipeline == null) {
+        throw new IllegalStateException("thepipeline == null");
+      }
+      return thepipeline;
+    }
+  }
+
+  /** A pipeline contains a list of datanodes. */
+  public static class Pipeline {
+    private final List<String> datanodes = new ArrayList<String>();
+    
+    private Pipeline(LocatedBlock lb) {
+      for(DatanodeInfo d : lb.getLocations()) {
+        datanodes.add(d.getName());
+      }
+    }
+
+    /** Does the pipeline contains d at the n th position? */
+    public boolean contains(int n, DatanodeID d) {
+      return d.getName().equals(datanodes.get(n));
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return getClass().getSimpleName() + datanodes;
+    }
+  }
+
+  /** Action for DataNode */
+  public static abstract class DataNodeAction implements Action<DataNode> {
+    /** The name of the test */
+    final String currentTest;
+    /** The index of the datanode */
+    final int index;
+
+    /**
+     * @param currentTest The name of the test
+     * @param index The index of the datanode
+     */
+    private DataNodeAction(String currentTest, int index) {
+      this.currentTest = currentTest;
+      this.index = index;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return currentTest + ", index=" + index;
+    }
+
+    /** {@inheritDoc} */
+    String toString(DataNode datanode) {
+      return "FI: " + this + ", datanode="
+          + datanode.getDatanodeRegistration().getName();
+    }
+  }
+
+  /** Throws OutOfMemoryError. */
+  public static class OomAction extends DataNodeAction {
+    /** Create an action for datanode i in the pipeline. */
+    public OomAction(String currentTest, int i) {
+      super(currentTest, i);
+    }
+
+    @Override
+    public void run(DataNode datanode) {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode);
+        FiTestUtil.LOG.info(s);
+        throw new OutOfMemoryError(s);
+      }
+    }
+  }
+
+  /** Throws DiskOutOfSpaceException. */
+  public static class DoosAction extends DataNodeAction {
+    /** Create an action for datanode i in the pipeline. */
+    public DoosAction(String currentTest, int i) {
+      super(currentTest, i);
+    }
+
+    @Override
+    public void run(DataNode datanode) throws DiskOutOfSpaceException {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode);
+        FiTestUtil.LOG.info(s);
+        throw new DiskOutOfSpaceException(s);
+      }
+    }
+  }
+
+  /**
+   * Sleep some period of time so that it slows down the datanode
+   * or sleep forever so that datanode becomes not responding.
+   */
+  public static class SleepAction extends DataNodeAction {
+    /** In milliseconds, duration <= 0 means sleeping forever.*/
+    final long duration;
+
+    /**
+     * Create an action for datanode i in the pipeline.
+     * @param duration In milliseconds, duration <= 0 means sleeping forever.
+     */
+    public SleepAction(String currentTest, int i, long duration) {
+      super(currentTest, i);
+      this.duration = duration;
+    }
+
+    @Override
+    public void run(DataNode datanode) {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode) + ", duration=" + duration;
+        FiTestUtil.LOG.info(s);
+        if (duration <= 0) {
+          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+        } else {
+          FiTestUtil.sleep(duration);
+        }
+      }
+    }
+  }
+}

+ 70 - 0
src/test/aop/org/apache/hadoop/fi/FiTestUtil.java

@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/** Test Utilities */
+public class FiTestUtil {
+  /** Logging */
+  public static final Log LOG = LogFactory.getLog(FiTestUtil.class);
+
+  /** Return the method name of the callee. */
+  public static String getMethodName() {
+    return Thread.currentThread().getStackTrace()[2].getMethodName();
+  }
+
+  /**
+   * Sleep.
+   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   */
+  public static void sleep(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Action interface */
+  public static interface Action<T> {
+    /** Run the action with the parameter. */
+    public void run(T parameter) throws IOException;
+  }
+
+  /** An ActionContainer contains at most one action. */
+  public static class ActionContainer<T> {
+    private Action<T> action;
+
+    /** Create an empty container. */
+    public ActionContainer() {}
+
+    /** Set action. */
+    public void set(Action<T> a) {action = a;}
+
+    /** Run the action if it exists. */
+    public void run(T obj) throws IOException {
+      if (action != null) {
+        action.run(obj);
+      }
+    }
+  }
+}

+ 35 - 0
src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+
+/** Aspect for ClientProtocol */
+public aspect ClientProtocolAspects {
+  public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
+
+  pointcut addBlock():
+    call(LocatedBlock ClientProtocol.addBlock(String, String));
+
+  after() returning(LocatedBlock lb): addBlock() {
+    LOG.info("FI: addBlock "
+        + DataTransferTestUtil.getPipelineTest().initPipeline(lb));
+  }
+}

+ 14 - 9
src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj

@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.ProbabilityModel;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.util.DiskChecker.*;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.DataOutputStream;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
  * This aspect takes care about faults injected into datanode.BlockReceiver 
@@ -34,14 +33,20 @@ import java.io.DataOutputStream;
 public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
-  pointcut callReceivePacket() :
+  pointcut callReceivePacket(BlockReceiver blockreceiver) :
     call (* OutputStream.write(..))
       && withincode (* BlockReceiver.receivePacket(..))
 // to further limit the application of this aspect a very narrow 'target' can be used as follows
 //  && target(DataOutputStream)
-      && !within(BlockReceiverAspects +);
+      && !within(BlockReceiverAspects +)
+      && this(blockreceiver);
 	
-  before () throws IOException : callReceivePacket () {
+  before(BlockReceiver blockreceiver
+      ) throws IOException : callReceivePacket(blockreceiver) {
+    LOG.info("FI: callReceivePacket");
+    DataTransferTestUtil.getPipelineTest().fiCallReceivePacket.run(
+        blockreceiver.getDataNode());
+
     if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
       LOG.info("Before the injection point");
       Thread.dumpStack();

+ 74 - 0
src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+
+/** Aspect for DataTransferProtocol */
+public aspect DataTransferProtocolAspects {
+  public static final Log LOG = LogFactory.getLog(
+      DataTransferProtocolAspects.class);
+  /*
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+  */
+
+  pointcut receiverOp(DataXceiver dataxceiver):
+    call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver);
+
+  after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
+    LOG.info("FI: receiverOp " + op + ", datanode="
+        + dataxceiver.getDataNode().getDatanodeRegistration().getName());    
+  }
+
+  pointcut statusRead(DataXceiver dataxceiver):
+    call(Status Status.read(DataInput)) && this(dataxceiver);
+
+  after(DataXceiver dataxceiver) returning(Status status
+      ) throws IOException: statusRead(dataxceiver) {
+    final DataNode d = dataxceiver.getDataNode();
+    LOG.info("FI: statusRead " + status + ", datanode="
+        + d.getDatanodeRegistration().getName());    
+    DataTransferTestUtil.getPipelineTest().fiStatusRead.run(d);
+  }
+
+  pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
+    call(void Receiver.opWriteBlock(DataInputStream)) && target(dataxceiver);
+
+  before(DataXceiver dataxceiver
+      ) throws IOException: receiverOpWriteBlock(dataxceiver) {
+    LOG.info("FI: receiverOpWriteBlock");
+    DataTransferTestUtil.getPipelineTest().fiReceiverOpWriteBlock.run(
+        dataxceiver.getDataNode());
+  }
+}

+ 133 - 0
src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol extends junit.framework.TestCase {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+
+  static final Configuration conf = new Configuration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+  }
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  /**
+   * 1. create files with dfs
+   * 2. write 1 byte
+   * 3. close file
+   * 4. open the same file
+   * 5. read the 1 byte and compare results
+   */
+  private static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+        null);
+    try {
+      final FileSystem dfs = cluster.getFileSystem();
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+      out.write(1);
+      out.close();
+      
+      final FSDataInputStream in = dfs.open(p);
+      final int b = in.read();
+      in.close();
+      assertEquals(1, b);
+    }
+    finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static void runSlowDatanodeTest(String methodName, SleepAction a
+                  ) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = DataTransferTestUtil.initTest();
+    t.fiCallReceivePacket.set(a);
+    t.fiReceiverOpWriteBlock.set(a);
+    t.fiStatusRead.set(a);
+    write1byte(methodName);
+  }
+  
+  /**
+   * Pipeline setup with DN0 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  public void testPipelineFi06() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000));
+  }
+
+  /**
+   * Pipeline setup with DN1 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  public void testPipelineFi07() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000));
+  }
+
+  private static void runCallReceivePacketTest(String methodName,
+      Action<DataNode> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    DataTransferTestUtil.initTest().fiCallReceivePacket.set(a);
+    write1byte(methodName);
+  }
+
+  /**
+   * Streaming: Write a packet, DN0 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  public void testPipelineFi14() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, new DoosAction(methodName, 0));
+  }
+
+  /**
+   * Streaming: Write a packet, DN1 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  public void testPipelineFi15() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, new DoosAction(methodName, 1));
+  }
+}