Bläddra i källkod

HDFS-518. Create new tests for Append's hflush. Contributed by Konstantin Boudnik

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@820136 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 15 år sedan
förälder
incheckning
f3a30a0da2

+ 3 - 0
CHANGES.txt

@@ -76,6 +76,9 @@ Append branch (unreleased changes)
 
     HDFS-662. Unnecessary info message from DFSClient. (hairong)
 
+    HDFS-518. Create new tests for Append's hflush. (Konstantin Boudnik
+    via szetszwo)
+
   BUG FIXES
 
     HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice

+ 65 - 0
src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fi;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/** Helper methods and actions for hflush() fault injection tests */
+public class FiHFlushTestUtil extends DataTransferTestUtil {
+
+  /** {@inheritDoc} */
+  public static PipelineTest initTest() {
+    return thepipelinetest = new HFlushTest();
+  }
+  
+  /** Disk error action for fault injection tests */
+  public static class DerrAction extends DataTransferTestUtil.DataNodeAction {
+    /**
+     * @param currentTest The name of the test
+     * @param index       The index of the datanode
+     */
+    public DerrAction(String currentTest, int index) {
+      super(currentTest, index);
+    }
+
+    /** {@inheritDoc} */
+    public void run(DatanodeID id) throws IOException {
+      final Pipeline p = getPipelineTest().getPipeline(id);
+      if (p == null) {
+        FiTestUtil.LOG.info("FI: couldn't find a pipeline for " + id);
+        return;
+      }
+      if (p.contains(index, id)) {
+        final String s = super.toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new DiskErrorException(s);
+      }
+    }
+  }
+  
+  /** Class adds new type of action */
+  public static class HFlushTest extends DataTransferTest {
+    public final ActionContainer<DatanodeID> fiCallHFlush = 
+      new ActionContainer<DatanodeID>();
+  }
+}

+ 57 - 0
src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj

@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public aspect HFlushAspects {
+  public static final Log LOG = LogFactory.getLog(HFlushAspects.class);
+
+  pointcut hflushCall (DFSOutputStream outstream) :
+    execution(void DFSOutputStream.hflush(..))
+    && target (outstream); 
+  
+  /** This advise is suppose to initiate a call to the action (fiCallHFlush)
+   *  which will throw DiskErrorException if a pipeline has been created
+   *  and datanodes used are belong to that very pipeline
+   */
+  after (DFSOutputStream streamer) throws IOException : hflushCall(streamer) {
+    LOG.info("FI: hflush for any datanode");    
+    LOG.info("FI: hflush " + thisJoinPoint.getThis());
+    DatanodeInfo[] nodes = streamer.getPipeline();
+    if (nodes == null) {
+        LOG.info("No pipeline is built");
+        return;
+    }
+    if (DataTransferTestUtil.getPipelineTest() == null) {
+        LOG.info("No test has been initialized");    
+        return;
+    }
+    for (int i=0; i<nodes.length; i++) {
+        ((HFlushTest)DataTransferTestUtil.getPipelineTest()).fiCallHFlush.run(nodes[i]);
+    }
+  }
+}

+ 175 - 0
src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java

@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.FiHFlushTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.DerrAction;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+import java.io.IOException;
+
+/** Class provides basic fault injection tests according to the test plan
+    of HDFS-265
+ */
+public class TestFiHFlush {
+  
+  /** Methods initializes a test and sets required actions to be used later by
+   * an injected advice
+   * @param conf mini cluster configuration
+   * @param methodName String representation of a test method invoking this 
+   * method
+   * @param block_size needed size of file's block
+   * @param a is an action to be set for the set
+   * @throws IOException in case of any errors
+   */
+  private static void runDiskErrorTest (final Configuration conf, 
+      final String methodName, final int block_size, DerrAction a)
+      throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final HFlushTest hft = (HFlushTest) FiHFlushTestUtil.initTest();
+    hft.fiCallHFlush.set(a);
+    TestHFlush.doTheJob(conf, methodName, block_size, (short)3);
+  }
+  
+  /** The tests calls 
+   * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+   * to make a number of writes within a block boundaries.
+   * Although hflush() is called the test shouldn't expect an IOException
+   * in this case because the invocation is happening after write() call 
+   * is complete when pipeline doesn't exist anymore.
+   * Thus, injected fault won't be triggered for 0th datanode
+   */
+  @Test
+  public void hFlushFi01_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName, 
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 0));
+  }
+
+  /** The tests calls 
+   * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+   * to make a number of writes across a block boundaries.
+   * hflush() is called after each write() during a pipeline life time.
+   * Thus, injected fault ought to be triggered for 0th datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi01_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName, 
+        customBlockSize, new DerrAction(methodName, 0));
+  }
+  
+  /** Similar to {@link #hFlushFi01_b()} but writing happens
+   * across block and checksum's boundaries
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi01_c() throws IOException { 
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName, 
+        customBlockSize, new DerrAction(methodName, 0));
+  }
+
+  /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 1st datanode
+   */
+  @Test
+  public void hFlushFi02_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName,
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 1));
+  }
+
+  /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 1st datanode
+   */
+@Test(expected = IOException.class)
+  public void hFlushFi02_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 1));
+  }
+
+  /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 1st datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi02_c() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 1));
+  }
+  
+  /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 2nd datanode
+   */
+  @Test
+  public void hFlushFi03_a() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runDiskErrorTest(new Configuration(), methodName,
+        AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 2));
+  }
+  
+  /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 2nd datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi03_b() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 2));
+  }
+
+  /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 2nd datanode
+   */
+  @Test(expected = IOException.class)
+  public void hFlushFi03_c() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+    runDiskErrorTest(conf, methodName,
+        customBlockSize, new DerrAction(methodName, 2));
+  }
+}

+ 146 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/** Class contains a set of tests to verify the correctness of 
+ * newly introduced {@link DFSClient#hflush()} method */
+public class TestHFlush {
+  private final String fName = "hflushtest.dat";
+  
+  /** The test uses {@link #doTheJob(Configuration, String, long, short)
+   * to write a file with a standard block size
+   */
+  @Test
+  public void hFlush_01() throws IOException {
+    doTheJob(new Configuration(), fName, AppendTestUtil.BLOCK_SIZE, (short)2);
+  }
+
+  /** The test uses {@link #doTheJob(Configuration, String, long, short)
+   * to write a file with a custom block size so the writes will be
+   * happening across block' boundaries
+   */
+  @Test
+  public void hFlush_02() throws IOException {
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short)2);
+  }
+
+  /** The test uses {@link #doTheJob(Configuration, String, long, short)
+   * to write a file with a custom block size so the writes will be
+   * happening across block's and checksum' boundaries
+   */
+ @Test
+  public void hFlush_03() throws IOException {
+    Configuration conf = new Configuration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+    conf.setLong("dfs.block.size", customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short)2);
+  }
+
+  /**
+    The method starts new cluster with defined Configuration;
+    creates a file with specified block_size and writes 10 equal sections in it;
+    it also calls hflush() after each write and throws an IOException in case of 
+    an error.
+    @param conf cluster configuration
+    @param fileName of the file to be created and processed as required
+    @param block_size value to be used for the file's creation
+    @param replicas is the number of replicas
+    @throws IOException in case of any errors 
+   */
+  public static void doTheJob(Configuration conf, final String fileName,
+                              long block_size, short replicas) throws IOException {
+    byte[] fileContent;
+    final int SECTIONS = 10;
+
+    fileContent = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, replicas, true, null);
+    // Make sure we work with DFS in order to utilize all its functionality
+    DistributedFileSystem fileSystem =
+        (DistributedFileSystem)cluster.getFileSystem();
+
+    FSDataInputStream is;
+    try {
+      Path path = new Path(fileName);
+      FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
+          block_size);
+      System.out.println("Created file " + fileName);
+
+      int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
+      int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
+      for (int i=0; i<SECTIONS; i++) {
+        System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
+        // write to the file
+        stm.write(fileContent, tenth * i, tenth);
+        // Wait while hflush() pushes all packets through built pipeline
+        ((DFSClient.DFSOutputStream)stm.getWrappedStream()).hflush();
+        byte [] toRead = new byte[tenth];
+        byte [] expected = new byte[tenth];
+        System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
+        // Open the same file for read. Need to create new reader after every write operation(!)
+        is = fileSystem.open(path);
+        is.read(toRead, tenth * i, tenth);
+        is.close();
+        checkData(toRead, 0, expected, "Partial verification");
+      }
+      System.out.println("Writing " + (tenth * SECTIONS) + " to " + (tenth * SECTIONS + rounding) + " section to file " + fileName);
+      stm.write(fileContent, tenth * SECTIONS, rounding);
+      stm.close();
+
+      assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
+      AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
+
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+      throw ioe;
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      fileSystem.close();
+      cluster.shutdown();
+    }
+  }
+  static void checkData(final byte[] actual, int from,
+                                final byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                   expected[from+idx]+" actual "+actual[idx],
+                   expected[from+idx], actual[idx]);
+      actual[idx] = 0;
+    }
+  }
+}