소스 검색

Merge -r 800618:803337 from hdfs trunk to move the change to the append trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@803588 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 년 전
부모
커밋
5754a8cc09
36개의 변경된 파일1478개의 추가작업 그리고 285개의 파일을 삭제
  1. 31 8
      CHANGES.txt
  2. 4 0
      ivy.xml
  3. 1 1
      ivy/ivysettings.xml
  4. BIN
      lib/hadoop-core-0.21.0-dev.jar
  5. BIN
      lib/hadoop-core-test-0.21.0-dev.jar
  6. 390 0
      src/docs/src/documentation/content/xdocs/faultinject_framework.xml
  7. 3 0
      src/docs/src/documentation/content/xdocs/site.xml
  8. BIN
      src/docs/src/documentation/resources/images/FI-framework.gif
  9. BIN
      src/docs/src/documentation/resources/images/FI-framework.odg
  10. 36 29
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  11. 2 2
      src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
  12. 56 17
      src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
  13. 87 0
      src/java/org/apache/hadoop/hdfs/server/common/ThreadLocalDateFormat.java
  14. 4 1
      src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  15. 5 0
      src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  16. 62 51
      src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  17. 5 5
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
  18. 13 2
      src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
  19. 8 5
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  20. 4 8
      src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
  21. 1 1
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  22. 195 0
      src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
  23. 70 0
      src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
  24. 35 0
      src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
  25. 14 9
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
  26. 74 0
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
  27. 133 0
      src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
  28. 52 4
      src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
  29. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java
  30. 1 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  31. 32 73
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
  32. 19 58
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
  33. 24 6
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
  34. 3 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
  35. 2 1
      src/test/hdfs/org/apache/hadoop/hdfs/TestGetBlocks.java
  36. 110 0
      src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestThreadLocalDateFormat.java

+ 31 - 8
CHANGES.txt

@@ -13,13 +13,14 @@ Trunk (unreleased changes)
 
     HDFS-461. Tool to analyze file size distribution in HDFS. (shv)
 
-    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
-
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
     is deleted. (Suresh Srinivas via rangadi)
 
+    HDFS-377. Separate codes which implement DataTransferProtocol.
+    (szetszwo)
+
     HDFS-396. NameNode image and edits directories are specified as URIs.
     (Luca Telloli via rangadi)
 
@@ -49,9 +50,14 @@ Trunk (unreleased changes)
     only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
     szetszwo)
 
+    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
+
     HADOOP-6160. Fix releaseaudit target to run on specific directories.
     (gkesavan)
 
+    HDFS-501. Use enum to define the constants in DataTransferProtocol.
+    (szetszwo)
+
     HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
 
     HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
@@ -70,7 +76,24 @@ Trunk (unreleased changes)
     HDFS-504. Update the modification time of a file when the file 
     is closed. (Chun Zhang via dhruba)
 
+    HDFS-498. Add development guide and documentation for the fault injection
+    framework.  (Konstantin Boudnik via szetszwo)
+
+    HDFS-524. Further DataTransferProtocol code refactoring.  (szetszwo)
+
+    HDFS-527. Remove/deprecate unnecessary DFSClient constructors.  (szetszwo)
+
+    HDFS-529. Use BlockInfo instead of Block to avoid redundant block searches
+    in BlockManager. (shv)
+
+    HDFS-530. Refactor TestFileAppend* to remove code duplication.
+    (Konstantin Boudnik via szetszwo)
+
+    HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
+    DataTransferProtocol.  (szetszwo)
+
   BUG FIXES
+
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than
     current consumption. (Boris Shkolnik via rangadi)
@@ -78,9 +101,6 @@ Trunk (unreleased changes)
     HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)
 
-    HDFS-377. Separate codes which implement DataTransferProtocol.
-    (szetszwo)
-
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
 
@@ -117,9 +137,6 @@ Trunk (unreleased changes)
     HDFS-484. Fix bin-package and package target to package jar files.
     (gkesavan)
 
-    HDFS-501. Use enum to define the constants in DataTransferProtocol.
-    (szetszwo)
-
     HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
     (He Yongqiang via szetszwo)
 
@@ -129,9 +146,15 @@ Trunk (unreleased changes)
     HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
     (Bill Zeller via szetszwo)
 
+    HDFS-534. Include avro in ivy.  (szetszwo)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS
 
     HDFS-438. Improve help message for space quota command. (Raghu Angadi)
 
+  BUG FIXES
+
+    HDFS-525. The SimpleDateFormat object in ListPathsServlet is not thread
+    safe.  (Suresh Srinivas via szetszwo)

+ 4 - 0
ivy.xml

@@ -252,6 +252,10 @@
       name="slf4j-api"
       rev="${slf4j-api.version}"
       conf="common->master"/>
+    <dependency org="org.apache.hadoop"
+      name="avro"
+      rev="1.0.0"
+      conf="common->default"/>
     <dependency org="org.eclipse.jdt"
       name="core"
       rev="${core.version}"

+ 1 - 1
ivy/ivysettings.xml

@@ -74,7 +74,7 @@
     rather than look for them online.
 
     -->
-    <module organisation="org.apache.hadoop" name=".*" resolver="internal"/>
+    <module organisation="org.apache.hadoop" name="Hadoop.*" resolver="internal"/>
     <!--until commons cli is external, we need to pull it in from the snapshot repository -if present -->
     <module organisation="org.apache.commons" name=".*" resolver="external-and-snapshots"/>
   </modules>

BIN
lib/hadoop-core-0.21.0-dev.jar


BIN
lib/hadoop-core-test-0.21.0-dev.jar


+ 390 - 0
src/docs/src/documentation/content/xdocs/faultinject_framework.xml

@@ -0,0 +1,390 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+
+<document>
+  <header>
+    <title>Fault injection Framework and Development Guide</title>
+  </header>
+
+  <body>
+    <section>
+      <title>Introduction</title>
+      <p>The following is a brief help for Hadoops' Fault Injection (FI)
+        Framework and Developer's Guide for those who will be developing
+        their own faults (aspects).
+      </p>
+      <p>An idea of Fault Injection (FI) is fairly simple: it is an
+        infusion of errors and exceptions into an application's logic to
+        achieve a higher coverage and fault tolerance of the system.
+        Different implementations of this idea are available at this day.
+        Hadoop's FI framework is built on top of Aspect Oriented Paradigm
+        (AOP) implemented by AspectJ toolkit.
+      </p>
+    </section>
+    <section>
+      <title>Assumptions</title>
+      <p>The current implementation of the framework assumes that the faults it
+        will be emulating are of non-deterministic nature. i.e. the moment
+        of a fault's happening isn't known in advance and is a coin-flip
+        based.
+      </p>
+    </section>
+    <section>
+      <title>Architecture of the Fault Injection Framework</title>
+      <figure src="images/FI-framework.gif" alt="Components layout" />
+      <section>
+        <title>Configuration management</title>
+        <p>This piece of the framework allow to
+          set expectations for faults to happen. The settings could be applied
+          either statically (in advance) or in a runtime. There's two ways to
+          configure desired level of faults in the framework:
+        </p>
+        <ul>
+          <li>
+            editing
+            <code>src/aop/fi-site.xml</code>
+            configuration file. This file is similar to other Hadoop's config
+            files
+          </li>
+          <li>
+            setting system properties of JVM through VM startup parameters or in
+            <code>build.properties</code>
+            file
+          </li>
+        </ul>
+      </section>
+      <section>
+        <title>Probability model</title>
+        <p>This fundamentally is a coin flipper. The methods of this class are
+          getting a random number between 0.0
+          and 1.0 and then checking if new number has happened to be in the
+          range of
+          0.0 and a configured level for the fault in question. If that
+          condition
+          is true then the fault will occur.
+        </p>
+        <p>Thus, to guarantee a happening of a fault one needs to set an
+          appropriate level to 1.0.
+          To completely prevent a fault from happening its probability level
+          has to be set to 0.0
+        </p>
+        <p><strong>Nota bene</strong>: default probability level is set to 0
+          (zero) unless the level is changed explicitly through the
+          configuration file or in the runtime. The name of the default
+          level's configuration parameter is
+          <code>fi.*</code>
+        </p>
+      </section>
+      <section>
+        <title>Fault injection mechanism: AOP and AspectJ</title>
+        <p>In the foundation of Hadoop's fault injection framework lays
+          cross-cutting concept implemented by AspectJ. The following basic
+          terms are important to remember:
+        </p>
+        <ul>
+          <li>
+            <strong>A cross-cutting concept</strong>
+            (aspect) is behavior, and often data, that is used across the scope
+            of a piece of software
+          </li>
+          <li>In AOP, the
+            <strong>aspects</strong>
+            provide a mechanism by which a cross-cutting concern can be
+            specified in a modular way
+          </li>
+          <li>
+            <strong>Advice</strong>
+            is the
+            code that is executed when an aspect is invoked
+          </li>
+          <li>
+            <strong>Join point</strong>
+            (or pointcut) is a specific
+            point within the application that may or not invoke some advice
+          </li>
+        </ul>
+      </section>
+      <section>
+        <title>Existing join points</title>
+        <p>
+          The following readily available join points are provided by AspectJ:
+        </p>
+        <ul>
+          <li>Join when a method is called
+          </li>
+          <li>Join during a method's execution
+          </li>
+          <li>Join when a constructor is invoked
+          </li>
+          <li>Join during a constructor's execution
+          </li>
+          <li>Join during aspect advice execution
+          </li>
+          <li>Join before an object is initialized
+          </li>
+          <li>Join during object initialization
+          </li>
+          <li>Join during static initializer execution
+          </li>
+          <li>Join when a class's field is referenced
+          </li>
+          <li>Join when a class's field is assigned
+          </li>
+          <li>Join when a handler is executed
+          </li>
+        </ul>
+      </section>
+    </section>
+    <section>
+      <title>Aspects examples</title>
+      <source>
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.ProbabilityModel;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.util.DiskChecker.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.DataOutputStream;
+
+/**
+* This aspect takes care about faults injected into datanode.BlockReceiver
+* class
+*/
+public aspect BlockReceiverAspects {
+  public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
+
+  public static final String BLOCK_RECEIVER_FAULT="hdfs.datanode.BlockReceiver";
+    pointcut callReceivePacket() : call (* OutputStream.write(..))
+      &amp;&amp; withincode (* BlockReceiver.receivePacket(..))
+    // to further limit the application of this aspect a very narrow 'target' can be used as follows
+    // &amp;&amp; target(DataOutputStream)
+      &amp;&amp; !within(BlockReceiverAspects +);
+
+  before () throws IOException : callReceivePacket () {
+    if (ProbabilityModel.injectCriteria(BLOCK_RECEIVER_FAULT)) {
+      LOG.info("Before the injection point");
+      Thread.dumpStack();
+      throw new DiskOutOfSpaceException ("FI: injected fault point at " +
+      thisJoinPoint.getStaticPart( ).getSourceLocation());
+    }
+  }
+}
+      </source>
+      <p>
+        The aspect has two main parts: the join point
+        <code>pointcut callReceivepacket()</code>
+        which servers as an identification mark of a specific point (in control
+        and/or data flow) in the life of an application. A call to the advice -
+        <code>before () throws IOException : callReceivepacket()</code>
+        - will be
+        <a href="#Putting+it+all+together">injected</a>
+        before that specific spot of the application's code.
+      </p>
+
+      <p>The pointcut identifies an invocation of class'
+        <code>java.io.OutputStream write()</code>
+        method
+        with any number of parameters and any return type. This invoke should
+        take place within the body of method
+        <code>receivepacket()</code>
+        from class<code>BlockReceiver</code>.
+        The method can have any parameters and any return type. possible
+        invocations of
+        <code>write()</code>
+        method happening anywhere within the aspect
+        <code>BlockReceiverAspects</code>
+        or its heirs will be ignored.
+      </p>
+      <p><strong>Note 1</strong>: This short example doesn't illustrate
+        the fact that you can have more than a single injection point per
+        class. In such a case the names of the faults have to be different
+        if a developer wants to trigger them separately.
+      </p>
+      <p><strong>Note 2</strong>: After
+        <a href="#Putting+it+all+together">injection step</a>
+        you can verify that the faults were properly injected by
+        searching for
+        <code>ajc</code>
+        keywords in a disassembled class file.
+      </p>
+
+    </section>
+    
+    <section>
+      <title>Fault naming convention &amp; namespaces</title>
+      <p>For the sake of unified naming
+      convention the following two types of names are recommended for a
+      new aspects development:</p>
+      <ul>
+        <li>Activity specific notation (as
+          when we don't care about a particular location of a fault's
+          happening). In this case the name of the fault is rather abstract:
+          <code>fi.hdfs.DiskError</code>
+        </li>
+        <li>Location specific notation.
+          Here, the fault's name is mnemonic as in:
+          <code>fi.hdfs.datanode.BlockReceiver[optional location details]</code>
+        </li>
+      </ul>
+    </section>
+
+    <section>
+      <title>Development tools</title>
+      <ul>
+        <li>Eclipse
+          <a href="http://www.eclipse.org/ajdt/">AspectJ
+            Development Toolkit
+          </a>
+          might help you in the aspects' development
+          process.
+        </li>
+        <li>IntelliJ IDEA provides AspectJ weaver and Spring-AOP plugins
+        </li>
+      </ul>
+    </section>
+
+    <section>
+      <title>Putting it all together</title>
+      <p>Faults (or aspects) have to injected (or woven) together before
+        they can be used. Here's a step-by-step instruction how this can be
+        done.</p>
+      <p>Weaving aspects in place:</p>
+      <source>
+% ant injectfaults
+      </source>
+      <p>If you
+        misidentified the join point of your aspect then you'll see a
+        warning similar to this one below when 'injectfaults' target is
+        completed:</p>
+        <source>
+[iajc] warning at
+src/test/aop/org/apache/hadoop/hdfs/server/datanode/ \
+          BlockReceiverAspects.aj:44::0
+advice defined in org.apache.hadoop.hdfs.server.datanode.BlockReceiverAspects
+has not been applied [Xlint:adviceDidNotMatch]
+        </source>
+      <p>It isn't an error, so the build will report the successful result.
+
+        To prepare dev.jar file with all your faults weaved in
+      place run (HDFS-475 pending)</p>
+        <source>
+% ant jar-fault-inject
+        </source>
+
+      <p>Test jars can be created by</p>
+        <source>
+% ant jar-test-fault-inject
+        </source>
+
+      <p>To run HDFS tests with faults injected:</p>
+        <source>
+% ant run-test-hdfs-fault-inject
+        </source>
+      <section>
+        <title>How to use fault injection framework</title>
+        <p>Faults could be triggered by the following two meanings:
+        </p>
+        <ul>
+          <li>In the runtime as:
+            <source>
+% ant run-test-hdfs -Dfi.hdfs.datanode.BlockReceiver=0.12
+            </source>
+            To set a certain level, e.g. 25%, of all injected faults one can run
+            <br/>
+            <source>
+% ant run-test-hdfs-fault-inject -Dfi.*=0.25
+            </source>
+          </li>
+          <li>or from a program as follows:
+          </li>
+        </ul>
+        <source>
+package org.apache.hadoop.fs;
+
+import org.junit.Test;
+import org.junit.Before;
+import junit.framework.TestCase;
+
+public class DemoFiTest extends TestCase {
+  public static final String BLOCK_RECEIVER_FAULT="hdfs.datanode.BlockReceiver";
+  @Override
+  @Before
+  public void setUp(){
+    //Setting up the test's environment as required
+  }
+
+  @Test
+  public void testFI() {
+    // It triggers the fault, assuming that there's one called 'hdfs.datanode.BlockReceiver'
+    System.setProperty("fi." + BLOCK_RECEIVER_FAULT, "0.12");
+    //
+    // The main logic of your tests goes here
+    //
+    // Now set the level back to 0 (zero) to prevent this fault from happening again
+    System.setProperty("fi." + BLOCK_RECEIVER_FAULT, "0.0");
+    // or delete its trigger completely
+    System.getProperties().remove("fi." + BLOCK_RECEIVER_FAULT);
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    //Cleaning up test test environment
+  }
+}
+        </source>
+        <p>
+          as you can see above these two methods do the same thing. They are
+          setting the probability level of
+          <code>hdfs.datanode.BlockReceiver</code>
+          at 12%.
+          The difference, however, is that the program provides more
+          flexibility and allows to turn a fault off when a test doesn't need
+          it anymore.
+        </p>
+      </section>
+    </section>
+
+    <section>
+      <title>Additional information and contacts</title>
+      <p>This two sources of information seem to be particularly
+        interesting and worth further reading:
+      </p>
+      <ul>
+        <li>
+          <a href="http://www.eclipse.org/aspectj/doc/next/devguide/">
+            http://www.eclipse.org/aspectj/doc/next/devguide/
+          </a>
+        </li>
+        <li>AspectJ Cookbook (ISBN-13: 978-0-596-00654-9)
+        </li>
+      </ul>
+      <p>Should you have any farther comments or questions to the author
+        check
+        <a href="http://issues.apache.org/jira/browse/HDFS-435">HDFS-435</a>
+      </p>
+    </section>
+  </body>
+</document>

+ 3 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -60,6 +60,9 @@ See http://forrest.apache.org/docs/linking.html for more info.
 		<hdfs_SLG        			label="Synthetic Load Generator Guide"  href="SLG_user_guide.html" />
 		<hdfs_imageviewer						label="Offline Image Viewer Guide"	href="hdfs_imageviewer.html" />
 		<hdfs_libhdfs   				label="C API libhdfs"         						href="libhdfs.html" /> 
+                <docs label="Testing">
+                    <faultinject_framework              label="Fault Injection"                                                     href="faultinject_framework.html" />
+                </docs>
    </docs> 
    
    <docs label="HOD">

BIN
src/docs/src/documentation/resources/images/FI-framework.gif


BIN
src/docs/src/documentation/resources/images/FI-framework.odg


+ 36 - 29
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -127,8 +127,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  private ClientProtocol namenode;
-  private ClientProtocol rpcNamenode;
+  private final ClientProtocol namenode;
+  private final ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   Random r = new Random();
@@ -206,41 +206,41 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         ClientDatanodeProtocol.versionID, addr, conf);
   }
         
-  /** 
-   * Create a new DFSClient connected to the default namenode.
+  /**
+   * Same as this(NameNode.getAddress(conf), conf);
+   * @see #DFSClient(InetSocketAddress, Configuration)
+   * @deprecated Deprecated at 0.21
    */
+  @Deprecated
   public DFSClient(Configuration conf) throws IOException {
-    this(NameNode.getAddress(conf), conf, null);
+    this(NameNode.getAddress(conf), conf);
   }
 
-  /** 
-   * Create a new DFSClient connected to the given namenode server.
+  /**
+   * Same as this(nameNodeAddr, conf, null);
+   * @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
+   */
+  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf
+      ) throws IOException {
+    this(nameNodeAddr, conf, null);
+  }
+
+  /**
+   * Same as this(nameNodeAddr, null, conf, stats);
+   * @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) 
    */
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
     throws IOException {
-    this(conf, stats);
-    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-    this.namenode = createNamenode(this.rpcNamenode);
+    this(nameNodeAddr, null, conf, stats);
   }
 
   /** 
-   * Create a new DFSClient connected to the given namenode
-   * and rpcNamenode objects.
-   * 
-   * This constructor was written to allow easy testing of the DFSClient class.
-   * End users will most likely want to use one of the other constructors.
+   * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
+   * Exactly one of nameNodeAddr or rpcNamenode must be null.
    */
-  public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
-                   Configuration conf, FileSystem.Statistics stats)
-    throws IOException {
-      this(conf, stats);
-      this.namenode = namenode;
-      this.rpcNamenode = rpcNamenode;
-  }
-
-  
-  private DFSClient(Configuration conf, FileSystem.Statistics stats)
+  DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
+      Configuration conf, FileSystem.Statistics stats)
     throws IOException {
     this.conf = conf;
     this.stats = stats;
@@ -271,11 +271,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
-  }
 
-  public DFSClient(InetSocketAddress nameNodeAddr, 
-                   Configuration conf) throws IOException {
-    this(nameNodeAddr, conf, null);
+    if (nameNodeAddr != null && rpcNamenode == null) {
+      this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+      this.namenode = createNamenode(this.rpcNamenode);
+    } else if (nameNodeAddr == null && rpcNamenode != null) {
+      //This case is used for testing.
+      this.namenode = this.rpcNamenode = rpcNamenode;
+    } else {
+      throw new IllegalArgumentException(
+          "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+          + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
+    }
   }
 
   private void checkOpen() throws IOException {

+ 2 - 2
src/java/org/apache/hadoop/hdfs/HftpFileSystem.java

@@ -27,7 +27,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Random;
@@ -45,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.common.ThreadLocalDateFormat;
 import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -74,7 +74,7 @@ public class HftpFileSystem extends FileSystem {
   protected UserGroupInformation ugi; 
   protected final Random ran = new Random();
 
-  protected static final SimpleDateFormat df = ListPathsServlet.df;
+  protected static final ThreadLocalDateFormat df = ListPathsServlet.df;
 
   @Override
   public void initialize(URI name, Configuration conf) throws IOException {

+ 56 - 17
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -249,8 +249,8 @@ public interface DataTransferProtocol {
 
   /** Receiver */
   public static abstract class Receiver {
-    /** Initialize a operation. */
-    public final Op op(DataInputStream in) throws IOException {
+    /** Read an Op.  It also checks protocol version. */
+    protected final Op readOp(DataInputStream in) throws IOException {
       final short version = in.readShort();
       if (version != DATA_TRANSFER_VERSION) {
         throw new IOException( "Version Mismatch" );
@@ -258,8 +258,32 @@ public interface DataTransferProtocol {
       return Op.read(in);
     }
 
+    /** Process op by the corresponding method. */
+    protected final void processOp(Op op, DataInputStream in
+        ) throws IOException {
+      switch(op) {
+      case READ_BLOCK:
+        opReadBlock(in);
+        break;
+      case WRITE_BLOCK:
+        opWriteBlock(in);
+        break;
+      case REPLACE_BLOCK:
+        opReplaceBlock(in);
+        break;
+      case COPY_BLOCK:
+        opCopyBlock(in);
+        break;
+      case BLOCK_CHECKSUM:
+        opBlockChecksum(in);
+        break;
+      default:
+        throw new IOException("Unknown op " + op + " in data stream");
+      }
+    }
+
     /** Receive OP_READ_BLOCK */
-    public final void opReadBlock(DataInputStream in) throws IOException {
+    private void opReadBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long offset = in.readLong();
@@ -270,13 +294,16 @@ public interface DataTransferProtocol {
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
     }
 
-    /** Abstract OP_READ_BLOCK method. */
-    public abstract void opReadBlock(DataInputStream in,
+    /**
+     * Abstract OP_READ_BLOCK method.
+     * Read a block.
+     */
+    protected abstract void opReadBlock(DataInputStream in,
         long blockId, long blockGs, long offset, long length,
         String client, AccessToken accesstoken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
-    public final void opWriteBlock(DataInputStream in) throws IOException {
+    private void opWriteBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
@@ -298,14 +325,17 @@ public interface DataTransferProtocol {
           client, src, targets, accesstoken);
     }
 
-    /** Abstract OP_WRITE_BLOCK method. */
-    public abstract void opWriteBlock(DataInputStream in,
+    /**
+     * Abstract OP_WRITE_BLOCK method. 
+     * Write a block.
+     */
+    protected abstract void opWriteBlock(DataInputStream in,
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
-    public final void opReplaceBlock(DataInputStream in) throws IOException {
+    private void opReplaceBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
@@ -315,13 +345,16 @@ public interface DataTransferProtocol {
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
     }
 
-    /** Abstract OP_REPLACE_BLOCK method. */
-    public abstract void opReplaceBlock(DataInputStream in,
+    /**
+     * Abstract OP_REPLACE_BLOCK method.
+     * It is used for balancing purpose; send to a destination
+     */
+    protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
         AccessToken accesstoken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
-    public final void opCopyBlock(DataInputStream in) throws IOException {
+    private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
@@ -329,12 +362,15 @@ public interface DataTransferProtocol {
       opCopyBlock(in, blockId, blockGs, accesstoken);
     }
 
-    /** Abstract OP_COPY_BLOCK method. */
-    public abstract void opCopyBlock(DataInputStream in,
+    /**
+     * Abstract OP_COPY_BLOCK method.
+     * It is used for balancing purpose; send to a proxy source.
+     */
+    protected abstract void opCopyBlock(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
-    public final void opBlockChecksum(DataInputStream in) throws IOException {
+    private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
@@ -342,8 +378,11 @@ public interface DataTransferProtocol {
       opBlockChecksum(in, blockId, blockGs, accesstoken);
     }
 
-    /** Abstract OP_BLOCK_CHECKSUM method. */
-    public abstract void opBlockChecksum(DataInputStream in,
+    /**
+     * Abstract OP_BLOCK_CHECKSUM method.
+     * Get the checksum of a block 
+     */
+    protected abstract void opBlockChecksum(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
     /** Read an AccessToken */

+ 87 - 0
src/java/org/apache/hadoop/hdfs/server/common/ThreadLocalDateFormat.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Thread safe implementation of {@link SimpleDateFormat} 
+ * TODO: This needs to be moved to hadoop common project.
+ */
+public class ThreadLocalDateFormat {
+  private final String format;
+
+  /**
+   * Constructs {@link ThreadLocalDateFormat} using given date format pattern
+   * @param format Date format pattern
+   */
+  public ThreadLocalDateFormat(String format) {
+    this.format = format;
+  }
+
+  /**
+   * ThreadLocal based {@link SimpleDateFormat}
+   */
+  private final ThreadLocal<SimpleDateFormat> dateFormat = 
+    new ThreadLocal<SimpleDateFormat>() {
+      @Override
+      protected SimpleDateFormat initialValue() {
+        SimpleDateFormat df = new SimpleDateFormat(format);
+        return df;
+      }
+    };
+
+  /**
+   * Format given <code>Date</code> into date/time string.
+   * @param date Date to be formatted.
+   * @return the formatted date-time string.
+   */
+  public String format(Date date) {
+    return dateFormat.get().format(date);
+  }
+
+  /**
+   * Parse the String to produce <code>Date</code>.
+   * @param source String to parse.
+   * @return Date parsed from the String.
+   * @throws ParseException
+   *           - if the beginning of the specified string cannot be parsed.
+   */
+  public Date parse(String source) throws ParseException {
+    return dateFormat.get().parse(source);
+  }
+
+  /**
+   * @param zone
+   */
+  public void setTimeZone(TimeZone zone) {
+    dateFormat.get().setTimeZone(zone);
+  }
+
+  /**
+   * Get access to underlying SimpleDateFormat.
+   * Note: Do not pass reference to this Date to other threads!
+   * @return the SimpleDateFormat for the thread.
+   */
+  SimpleDateFormat get() {
+    return dateFormat.get();
+  }
+}

+ 4 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -77,7 +77,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   private String clientName;
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
-  private DataNode datanode = null;
+  private final DataNode datanode;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
@@ -128,6 +128,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
     }
   }
 
+  /** Return the datanode object. */
+  DataNode getDataNode() {return datanode;}
+
   /**
    * close files.
    */

+ 5 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -490,6 +490,11 @@ public class DataNode extends Configured
     return myMetrics;
   }
   
+  /** Return DatanodeRegistration */
+  public DatanodeRegistration getDatanodeRegistration() {
+    return dnRegistration;
+  }
+
   /**
    * Return the namenode's identifier
    */

+ 62 - 51
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputSt
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.AccessTokenHandler;
@@ -57,24 +59,34 @@ class DataXceiver extends DataTransferProtocol.Receiver
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  Socket s;
-  final String remoteAddress; // address of remote side
-  final String localAddress;  // local address of this daemon
-  DataNode datanode;
-  DataXceiverServer dataXceiverServer;
+  private final Socket s;
+  private final boolean isLocal; //is a local connection?
+  private final String remoteAddress; // address of remote side
+  private final String localAddress;  // local address of this daemon
+  private final DataNode datanode;
+  private final DataXceiverServer dataXceiverServer;
+
+  private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
       DataXceiverServer dataXceiverServer) {
-    
     this.s = s;
+    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     dataXceiverServer.childSockets.put(s, s);
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
-    LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of active connections is: "
+          + datanode.getXceiverCount());
+    }
   }
 
+  /** Return the datanode object. */
+  DataNode getDataNode() {return datanode;}
+
   /**
    * Read/write data from/to the DataXceiveServer.
    */
@@ -84,8 +96,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final DataTransferProtocol.Op op = op(in);
-      boolean local = s.getInetAddress().equals(s.getLocalAddress());
+      final DataTransferProtocol.Op op = readOp(in);
+
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -93,45 +105,16 @@ class DataXceiver extends DataTransferProtocol.Receiver
                               + " exceeds the limit of concurrent xcievers "
                               + dataXceiverServer.maxXceiverCount);
       }
-      long startTime = DataNode.now();
-      switch ( op ) {
-      case READ_BLOCK:
-        opReadBlock(in);
-        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.readsFromLocalClient.inc();
-        else
-          datanode.myMetrics.readsFromRemoteClient.inc();
-        break;
-      case WRITE_BLOCK:
-        opWriteBlock(in);
-        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.writesFromLocalClient.inc();
-        else
-          datanode.myMetrics.writesFromRemoteClient.inc();
-        break;
-      case REPLACE_BLOCK: // for balancing purpose; send to a destination
-        opReplaceBlock(in);
-        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case COPY_BLOCK:
-            // for balancing purpose; send to a proxy source
-        opCopyBlock(in);
-        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case BLOCK_CHECKSUM: //get the checksum of a block
-        opBlockChecksum(in);
-        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
-        break;
-      default:
-        throw new IOException("Unknown opcode " + op + " in data stream");
-      }
+
+      opStartTime = DataNode.now();
+      processOp(op, in);
     } catch (Throwable t) {
       LOG.error(datanode.dnRegistration + ":DataXceiver",t);
     } finally {
-      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
-                               + datanode.getXceiverCount());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+            + datanode.getXceiverCount());
+      }
       IOUtils.closeStream(in);
       IOUtils.closeSocket(s);
       dataXceiverServer.childSockets.remove(s);
@@ -142,7 +125,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * Read a block from the disk.
    */
   @Override
-  public void opReadBlock(DataInputStream in,
+  protected void opReadBlock(DataInputStream in,
       long blockId, long blockGs, long startOffset, long length,
       String clientName, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
@@ -213,13 +196,18 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(out);
       IOUtils.closeStream(blockSender);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.readBlockOp);
+    updateCounter(datanode.myMetrics.readsFromLocalClient,
+                  datanode.myMetrics.readsFromRemoteClient);
   }
 
   /**
    * Write a block to disk.
    */
   @Override
-  public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+  protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
       int pipelineSize, boolean isRecovery,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       AccessToken accessToken) throws IOException {
@@ -377,13 +365,18 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.writeBlockOp);
+    updateCounter(datanode.myMetrics.writesFromLocalClient,
+                  datanode.myMetrics.writesFromRemoteClient);
   }
 
   /**
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  public void opBlockChecksum(DataInputStream in,
+  protected void opBlockChecksum(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
@@ -433,13 +426,16 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(checksumIn);
       IOUtils.closeStream(metadataIn);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.blockChecksumOp);
   }
 
   /**
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  public void opCopyBlock(DataInputStream in,
+  protected void opCopyBlock(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
@@ -499,6 +495,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(reply);
       IOUtils.closeStream(blockSender);
     }
+
+    //update metrics    
+    updateDuration(datanode.myMetrics.copyBlockOp);
   }
 
   /**
@@ -506,7 +505,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
    * remove the copy from the source.
    */
   @Override
-  public void opReplaceBlock(DataInputStream in,
+  protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
       AccessToken accessToken) throws IOException {
     /* read header */
@@ -606,8 +605,20 @@ class DataXceiver extends DataTransferProtocol.Receiver
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.replaceBlockOp);
   }
-  
+
+  private void updateDuration(MetricsTimeVaryingRate mtvr) {
+    mtvr.inc(DataNode.now() - opStartTime);
+  }
+
+  private void updateCounter(MetricsTimeVaryingInt localCounter,
+      MetricsTimeVaryingInt remoteCounter) {
+    (isLocal? localCounter: remoteCounter).inc();
+  }
+
   /**
    * Utility function for sending a response.
    * @param s socket to write to

+ 5 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -893,11 +893,11 @@ public class BlockManager {
             } else {
               // new replica is larger in size than existing block.
               // Mark pre-existing replicas as corrupt.
-              int numNodes = blocksMap.numNodes(block);
+              int numNodes = storedBlock.numNodes();
               int count = 0;
               DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
-              Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-              for (; it != null && it.hasNext(); ) {
+              Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(storedBlock);
+              while (it.hasNext()) {
                 DatanodeDescriptor dd = it.next();
                 if (!dd.equals(node)) {
                   nodes[count++] = dd;
@@ -1262,9 +1262,9 @@ public class BlockManager {
     return blocksMap.size() - (int)pendingDeletionBlocksCount;
   }
 
-  DatanodeDescriptor[] getNodes(Block block) {
+  DatanodeDescriptor[] getNodes(BlockInfo block) {
     DatanodeDescriptor[] nodes =
-      new DatanodeDescriptor[blocksMap.numNodes(block)];
+      new DatanodeDescriptor[block.numNodes()];
     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
     for (int i = 0; it != null && it.hasNext(); i++) {
       nodes[i] = it.next();

+ 13 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java

@@ -108,9 +108,20 @@ class BlocksMap {
     return map.get(b);
   }
 
-  /** Returned Iterator does not support. */
+  /**
+   * Searches for the block in the BlocksMap and 
+   * returns Iterator that iterates through the nodes the block belongs to.
+   */
   Iterator<DatanodeDescriptor> nodeIterator(Block b) {
-    return new NodeIterator(map.get(b));
+    return nodeIterator(map.get(b));
+  }
+
+  /**
+   * For a block that has already been retrieved from the BlocksMap
+   * returns Iterator that iterates through the nodes the block belongs to.
+   */
+  Iterator<DatanodeDescriptor> nodeIterator(BlockInfo storedBlock) {
+    return new NodeIterator(storedBlock);
   }
 
   /** counts number of containing nodes. Better than using iterator. */

+ 8 - 5
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1030,13 +1030,16 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     synchronized (this) {
       INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
 
-      Block[] blocks = file.getBlocks();
+      BlockInfo[] blocks = file.getBlocks();
       if (blocks != null && blocks.length > 0) {
-        Block last = blocks[blocks.length-1];
+        BlockInfo last = blocks[blocks.length-1];
+        // this is a redundant search in blocksMap
+        // should be resolved by the new implementation of append
         BlockInfo storedBlock = blockManager.getStoredBlock(last);
+        assert last == storedBlock : "last block should be in the blocksMap";
         if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
           long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = blockManager.getNodes(last);
+          DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
           // remove the replica locations of this block from the node
           for (int i = 0; i < targets.length; i++) {
             targets[i].removeBlock(storedBlock);
@@ -1578,8 +1581,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       }
       // setup the Inode.targets for the last block from the blockManager
       //
-      Block[] blocks = pendingFile.getBlocks();
-      Block last = blocks[blocks.length-1];
+      BlockInfo[] blocks = pendingFile.getBlocks();
+      BlockInfo last = blocks[blocks.length-1];
       DatanodeDescriptor[] targets = blockManager.getNodes(last);
       pendingFile.setTargets(targets);
     }

+ 4 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.common.ThreadLocalDateFormat;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
@@ -27,14 +28,12 @@ import org.znerd.xmlenc.*;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 import java.util.TimeZone;
 import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -46,9 +45,9 @@ import javax.servlet.http.HttpServletResponse;
 public class ListPathsServlet extends DfsServlet {
   /** For java.io.Serializable */
   private static final long serialVersionUID = 1L;
+  public static final ThreadLocalDateFormat df = 
+    new ThreadLocalDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
 
-  public static final SimpleDateFormat df =
-    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
   static {
     df.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
@@ -163,13 +162,10 @@ public class ListPathsServlet extends DfsServlet {
         }
         catch(RemoteException re) {re.writeXml(p, doc);}
       }
-    } catch (PatternSyntaxException e) {
-      out.println(e.toString());
-    } finally {
       if (doc != null) {
         doc.endDocument();
       }
-
+    } finally {
       if (out != null) {
         out.close();
       }

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -324,7 +324,7 @@ public class NamenodeFsck {
   
   private void lostFoundMove(FileStatus file, LocatedBlocks blocks)
     throws IOException {
-    DFSClient dfs = new DFSClient(conf);
+    final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
     try {
     if (!lfInited) {
       lostFoundInit(dfs);

+ 195 - 0
src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java

@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+
+/**
+ * Utilities for DataTransferProtocol related tests,
+ * e.g. TestFiDataTransferProtocol.
+ */
+public class DataTransferTestUtil {
+  private static DataTransferTest thepipelinetest;
+  /** initialize pipeline test */
+  public static DataTransferTest initTest() {
+    return thepipelinetest = new DataTransferTest();
+  }
+  /** get the pipeline test object */
+  public static DataTransferTest getPipelineTest() {
+    return thepipelinetest;
+  }
+
+  /**
+   * The DataTransferTest class includes a pipeline
+   * and some actions.
+   */
+  public static class DataTransferTest {
+    private Pipeline thepipeline;
+    /** Simulate action for the receiverOpWriteBlock pointcut */
+    public final ActionContainer<DataNode> fiReceiverOpWriteBlock
+        = new ActionContainer<DataNode>();
+    /** Simulate action for the callReceivePacket pointcut */
+    public final ActionContainer<DataNode> fiCallReceivePacket
+        = new ActionContainer<DataNode>();
+    /** Simulate action for the statusRead pointcut */
+    public final ActionContainer<DataNode> fiStatusRead
+        = new ActionContainer<DataNode>();
+
+    /** Initialize the pipeline. */
+    public Pipeline initPipeline(LocatedBlock lb) {
+      if (thepipeline != null) {
+        throw new IllegalStateException("thepipeline != null");
+      }
+      return thepipeline = new Pipeline(lb);
+    }
+
+    /** Return the pipeline. */
+    public Pipeline getPipeline() {
+      if (thepipeline == null) {
+        throw new IllegalStateException("thepipeline == null");
+      }
+      return thepipeline;
+    }
+  }
+
+  /** A pipeline contains a list of datanodes. */
+  public static class Pipeline {
+    private final List<String> datanodes = new ArrayList<String>();
+    
+    private Pipeline(LocatedBlock lb) {
+      for(DatanodeInfo d : lb.getLocations()) {
+        datanodes.add(d.getName());
+      }
+    }
+
+    /** Does the pipeline contains d at the n th position? */
+    public boolean contains(int n, DatanodeID d) {
+      return d.getName().equals(datanodes.get(n));
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return getClass().getSimpleName() + datanodes;
+    }
+  }
+
+  /** Action for DataNode */
+  public static abstract class DataNodeAction implements Action<DataNode> {
+    /** The name of the test */
+    final String currentTest;
+    /** The index of the datanode */
+    final int index;
+
+    /**
+     * @param currentTest The name of the test
+     * @param index The index of the datanode
+     */
+    private DataNodeAction(String currentTest, int index) {
+      this.currentTest = currentTest;
+      this.index = index;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return currentTest + ", index=" + index;
+    }
+
+    /** {@inheritDoc} */
+    String toString(DataNode datanode) {
+      return "FI: " + this + ", datanode="
+          + datanode.getDatanodeRegistration().getName();
+    }
+  }
+
+  /** Throws OutOfMemoryError. */
+  public static class OomAction extends DataNodeAction {
+    /** Create an action for datanode i in the pipeline. */
+    public OomAction(String currentTest, int i) {
+      super(currentTest, i);
+    }
+
+    @Override
+    public void run(DataNode datanode) {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode);
+        FiTestUtil.LOG.info(s);
+        throw new OutOfMemoryError(s);
+      }
+    }
+  }
+
+  /** Throws DiskOutOfSpaceException. */
+  public static class DoosAction extends DataNodeAction {
+    /** Create an action for datanode i in the pipeline. */
+    public DoosAction(String currentTest, int i) {
+      super(currentTest, i);
+    }
+
+    @Override
+    public void run(DataNode datanode) throws DiskOutOfSpaceException {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode);
+        FiTestUtil.LOG.info(s);
+        throw new DiskOutOfSpaceException(s);
+      }
+    }
+  }
+
+  /**
+   * Sleep some period of time so that it slows down the datanode
+   * or sleep forever so that datanode becomes not responding.
+   */
+  public static class SleepAction extends DataNodeAction {
+    /** In milliseconds, duration <= 0 means sleeping forever.*/
+    final long duration;
+
+    /**
+     * Create an action for datanode i in the pipeline.
+     * @param duration In milliseconds, duration <= 0 means sleeping forever.
+     */
+    public SleepAction(String currentTest, int i, long duration) {
+      super(currentTest, i);
+      this.duration = duration;
+    }
+
+    @Override
+    public void run(DataNode datanode) {
+      final Pipeline p = getPipelineTest().getPipeline();
+      if (p.contains(index, datanode.getDatanodeRegistration())) {
+        final String s = toString(datanode) + ", duration=" + duration;
+        FiTestUtil.LOG.info(s);
+        if (duration <= 0) {
+          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+        } else {
+          FiTestUtil.sleep(duration);
+        }
+      }
+    }
+  }
+}

+ 70 - 0
src/test/aop/org/apache/hadoop/fi/FiTestUtil.java

@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fi;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/** Test Utilities */
+public class FiTestUtil {
+  /** Logging */
+  public static final Log LOG = LogFactory.getLog(FiTestUtil.class);
+
+  /** Return the method name of the callee. */
+  public static String getMethodName() {
+    return Thread.currentThread().getStackTrace()[2].getMethodName();
+  }
+
+  /**
+   * Sleep.
+   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   */
+  public static void sleep(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Action interface */
+  public static interface Action<T> {
+    /** Run the action with the parameter. */
+    public void run(T parameter) throws IOException;
+  }
+
+  /** An ActionContainer contains at most one action. */
+  public static class ActionContainer<T> {
+    private Action<T> action;
+
+    /** Create an empty container. */
+    public ActionContainer() {}
+
+    /** Set action. */
+    public void set(Action<T> a) {action = a;}
+
+    /** Run the action if it exists. */
+    public void run(T obj) throws IOException {
+      if (action != null) {
+        action.run(obj);
+      }
+    }
+  }
+}

+ 35 - 0
src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+
+/** Aspect for ClientProtocol */
+public aspect ClientProtocolAspects {
+  public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
+
+  pointcut addBlock():
+    call(LocatedBlock ClientProtocol.addBlock(String, String));
+
+  after() returning(LocatedBlock lb): addBlock() {
+    LOG.info("FI: addBlock "
+        + DataTransferTestUtil.getPipelineTest().initPipeline(lb));
+  }
+}

+ 14 - 9
src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj

@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.ProbabilityModel;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.util.DiskChecker.*;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.DataOutputStream;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
  * This aspect takes care about faults injected into datanode.BlockReceiver 
@@ -34,14 +33,20 @@ import java.io.DataOutputStream;
 public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
-  pointcut callReceivePacket() :
+  pointcut callReceivePacket(BlockReceiver blockreceiver) :
     call (* OutputStream.write(..))
       && withincode (* BlockReceiver.receivePacket(..))
 // to further limit the application of this aspect a very narrow 'target' can be used as follows
 //  && target(DataOutputStream)
-      && !within(BlockReceiverAspects +);
+      && !within(BlockReceiverAspects +)
+      && this(blockreceiver);
 	
-  before () throws IOException : callReceivePacket () {
+  before(BlockReceiver blockreceiver
+      ) throws IOException : callReceivePacket(blockreceiver) {
+    LOG.info("FI: callReceivePacket");
+    DataTransferTestUtil.getPipelineTest().fiCallReceivePacket.run(
+        blockreceiver.getDataNode());
+
     if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
       LOG.info("Before the injection point");
       Thread.dumpStack();

+ 74 - 0
src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+
+/** Aspect for DataTransferProtocol */
+public aspect DataTransferProtocolAspects {
+  public static final Log LOG = LogFactory.getLog(
+      DataTransferProtocolAspects.class);
+  /*
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+  */
+
+  pointcut receiverOp(DataXceiver dataxceiver):
+    call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver);
+
+  after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
+    LOG.info("FI: receiverOp " + op + ", datanode="
+        + dataxceiver.getDataNode().getDatanodeRegistration().getName());    
+  }
+
+  pointcut statusRead(DataXceiver dataxceiver):
+    call(Status Status.read(DataInput)) && this(dataxceiver);
+
+  after(DataXceiver dataxceiver) returning(Status status
+      ) throws IOException: statusRead(dataxceiver) {
+    final DataNode d = dataxceiver.getDataNode();
+    LOG.info("FI: statusRead " + status + ", datanode="
+        + d.getDatanodeRegistration().getName());    
+    DataTransferTestUtil.getPipelineTest().fiStatusRead.run(d);
+  }
+
+  pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
+    call(void Receiver.opWriteBlock(DataInputStream)) && target(dataxceiver);
+
+  before(DataXceiver dataxceiver
+      ) throws IOException: receiverOpWriteBlock(dataxceiver) {
+    LOG.info("FI: receiverOpWriteBlock");
+    DataTransferTestUtil.getPipelineTest().fiReceiverOpWriteBlock.run(
+        dataxceiver.getDataNode());
+  }
+}

+ 133 - 0
src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol extends junit.framework.TestCase {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+
+  static final Configuration conf = new Configuration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+  }
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  /**
+   * 1. create files with dfs
+   * 2. write 1 byte
+   * 3. close file
+   * 4. open the same file
+   * 5. read the 1 byte and compare results
+   */
+  private static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+        null);
+    try {
+      final FileSystem dfs = cluster.getFileSystem();
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+      out.write(1);
+      out.close();
+      
+      final FSDataInputStream in = dfs.open(p);
+      final int b = in.read();
+      in.close();
+      assertEquals(1, b);
+    }
+    finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static void runSlowDatanodeTest(String methodName, SleepAction a
+                  ) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = DataTransferTestUtil.initTest();
+    t.fiCallReceivePacket.set(a);
+    t.fiReceiverOpWriteBlock.set(a);
+    t.fiStatusRead.set(a);
+    write1byte(methodName);
+  }
+  
+  /**
+   * Pipeline setup with DN0 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  public void testPipelineFi06() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000));
+  }
+
+  /**
+   * Pipeline setup with DN1 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  public void testPipelineFi07() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000));
+  }
+
+  private static void runCallReceivePacketTest(String methodName,
+      Action<DataNode> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    DataTransferTestUtil.initTest().fiCallReceivePacket.set(a);
+    write1byte(methodName);
+  }
+
+  /**
+   * Streaming: Write a packet, DN0 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  public void testPipelineFi14() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, new DoosAction(methodName, 0));
+  }
+
+  /**
+   * Streaming: Write a packet, DN1 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  public void testPipelineFi15() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, new DoosAction(methodName, 1));
+  }
+}

+ 52 - 4
src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java

@@ -23,13 +23,12 @@ import java.io.OutputStream;
 import java.util.Random;
 
 import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -61,7 +60,11 @@ class AppendTestUtil {
       return r;
     }
   };
-  
+  static final int BLOCK_SIZE = 1024;
+  static final int NUM_BLOCKS = 10;
+  static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
+  static long seed = -1;
+
   static int nextInt() {return RANDOM.get().nextInt();}
   static int nextInt(int n) {return RANDOM.get().nextInt(n);}
   static int nextLong() {return RANDOM.get().nextInt();}
@@ -116,4 +119,49 @@ class AppendTestUtil {
       throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
     }
   }
+
+  /**
+   *  create a buffer that contains the entire test file data.
+   */
+  static byte[] initBuffer(int size) {
+    if (seed == -1)
+      seed = nextLong();
+    return randomBytes(seed, size);
+  }
+
+  /**
+   *  Creates a file but does not close it
+   *  Make sure to call close() on the returned stream
+   *  @throws IOException an exception might be thrown
+   */
+  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+      throws IOException {
+    return fileSys.create(name, true,
+        fileSys.getConf().getInt("io.file.buffer.size", 4096),
+        (short) repl, (long) BLOCK_SIZE);
+  }
+
+  /**
+   *  Compare the content of a file created from FileSystem and Path with
+   *  the specified byte[] buffer's content
+   *  @throws IOException an exception might be thrown
+   */
+  static void checkFullFile(FileSystem fs, Path name, int len,
+                            final byte[] compareContent, String message) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[len];
+    stm.readFully(0, actual);
+    checkData(actual, 0, compareContent, message);
+    stm.close();
+  }
+
+  private static void checkData(final byte[] actual, int from,
+                                final byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      Assert.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                   expected[from+idx]+" actual "+actual[idx],
+                   expected[from+idx], actual[idx]);
+      actual[idx] = 0;
+    }
+  }
 }

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.StringUtils;
 
 public class TestAbandonBlock extends junit.framework.TestCase {
@@ -49,7 +50,7 @@ public class TestAbandonBlock extends junit.framework.TestCase {
       fout.sync();
   
       //try reading the block by someone
-      DFSClient dfsclient = new DFSClient(CONF);
+      final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);
       LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
       LocatedBlock b = blocks.get(0); 
       try {

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -223,7 +223,7 @@ public class TestDFSClientRetries extends TestCase {
     conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
         
     TestNameNode tnn = new TestNameNode(conf);
-    DFSClient client = new DFSClient(tnn, tnn, conf, null);
+    final DFSClient client = new DFSClient(null, tnn, conf, null);
     OutputStream os = client.create("testfile", true);
     os.write(20); // write one random byte
     

+ 32 - 73
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -26,7 +26,6 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,38 +42,15 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  * support HDFS appends.
  */
 public class TestFileAppend extends TestCase {
-  static final int blockSize = 1024;
-  static final int numBlocks = 10;
-  static final int fileSize = numBlocks * blockSize + 1;
   boolean simulatedStorage = false;
 
-  private long seed;
-  private byte[] fileContents = null;
-
-  //
-  // create a buffer that contains the entire test file data.
-  //
-  private void initBuffer(int size) {
-    seed = AppendTestUtil.nextLong();
-    fileContents = AppendTestUtil.randomBytes(seed, size);
-  }
-
-  /*
-   * creates a file but does not close it
-   */ 
-  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
-    throws IOException {
-    FSDataOutputStream stm = fileSys.create(name, true,
-                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
-                                            (short)repl, (long)blockSize);
-    return stm;
-  }
+  private static byte[] fileContents = null;
 
   //
   // writes to file but does not close it
   //
   private void writeFile(FSDataOutputStream stm) throws IOException {
-    byte[] buffer = AppendTestUtil.randomBytes(seed, fileSize);
+    byte[] buffer = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     stm.write(buffer);
   }
 
@@ -89,16 +65,16 @@ public class TestFileAppend extends TestCase {
     while (!done) {
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {;}
       done = true;
       BlockLocation[] locations = fileSys.getFileBlockLocations(
-          fileSys.getFileStatus(name), 0, fileSize);
-      if (locations.length < numBlocks) {
+          fileSys.getFileStatus(name), 0, AppendTestUtil.FILE_SIZE);
+      if (locations.length < AppendTestUtil.NUM_BLOCKS) {
         System.out.println("Number of blocks found " + locations.length);
         done = false;
         continue;
       }
-      for (int idx = 0; idx < numBlocks; idx++) {
+      for (int idx = 0; idx < AppendTestUtil.NUM_BLOCKS; idx++) {
         if (locations[idx].getHosts().length < repl) {
           System.out.println("Block index " + idx + " not yet replciated.");
           done = false;
@@ -106,43 +82,24 @@ public class TestFileAppend extends TestCase {
         }
       }
     }
-    FSDataInputStream stm = fileSys.open(name);
-    byte[] expected = new byte[numBlocks * blockSize];
+    byte[] expected = 
+        new byte[AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE];
     if (simulatedStorage) {
       for (int i= 0; i < expected.length; i++) {  
         expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
       }
     } else {
-      for (int i= 0; i < expected.length; i++) {  
-        expected[i] = fileContents[i];
-      }
+      System.arraycopy(fileContents, 0, expected, 0, expected.length);
     }
     // do a sanity check. Read the file
-    byte[] actual = new byte[numBlocks * blockSize];
-    stm.readFully(0, actual);
-    checkData(actual, 0, expected, "Read 1");
-  }
-
-  private void checkFullFile(FileSystem fs, Path name) throws IOException {
-    FSDataInputStream stm = fs.open(name);
-    byte[] actual = new byte[fileSize];
-    stm.readFully(0, actual);
-    checkData(actual, 0, fileContents, "Read 2");
-    stm.close();
+    AppendTestUtil.checkFullFile(fileSys, name,
+        AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE,
+        expected, "Read 1");
   }
 
-  private void checkData(byte[] actual, int from, byte[] expected, String message) {
-    for (int idx = 0; idx < actual.length; idx++) {
-      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
-                   expected[from+idx]+" actual "+actual[idx],
-                   expected[from+idx], actual[idx]);
-      actual[idx] = 0;
-    }
-  }
-
-
   /**
    * Test that copy on write for blocks works correctly
+   * @throws IOException an exception might be thrown
    */
   public void testCopyOnWrite() throws IOException {
     Configuration conf = new Configuration();
@@ -159,7 +116,7 @@ public class TestFileAppend extends TestCase {
       // create a new file, write to it and close it.
       //
       Path file1 = new Path("/filestatus.dat");
-      FSDataOutputStream stm = createFile(fs, file1, 1);
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
       writeFile(stm);
       stm.close();
 
@@ -178,11 +135,9 @@ public class TestFileAppend extends TestCase {
       //
       for (int i = 0; i < blocks.size(); i = i + 2) {
         Block b = blocks.get(i).getBlock();
-        FSDataset fsd = dataset;
-        File f = fsd.getFile(b);
+        File f = dataset.getFile(b);
         File link = new File(f.toString() + ".link");
-        System.out.println("Creating hardlink for File " + f + 
-                           " to " + link);
+        System.out.println("Creating hardlink for File " + f + " to " + link);
         HardLink.createHardLink(f, link);
       }
 
@@ -193,7 +148,7 @@ public class TestFileAppend extends TestCase {
         Block b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned true",
-                   dataset.detachBlock(b, 1) == true);
+            dataset.detachBlock(b, 1));
       }
 
       // Since the blocks were already detached earlier, these calls should
@@ -203,7 +158,7 @@ public class TestFileAppend extends TestCase {
         Block b = blocks.get(i).getBlock();
         System.out.println("testCopyOnWrite detaching block " + b);
         assertTrue("Detaching block " + b + " should have returned false",
-                   dataset.detachBlock(b, 1) == false);
+            !dataset.detachBlock(b, 1));
       }
 
     } finally {
@@ -214,30 +169,31 @@ public class TestFileAppend extends TestCase {
 
   /**
    * Test a simple flush on a simple HDFS file.
+   * @throws IOException an exception might be thrown
    */
   public void testSimpleFlush() throws IOException {
     Configuration conf = new Configuration();
     if (simulatedStorage) {
       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
     }
-    initBuffer(fileSize);
+    fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     try {
 
       // create a new file.
       Path file1 = new Path("/simpleFlush.dat");
-      FSDataOutputStream stm = createFile(fs, file1, 1);
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
       System.out.println("Created file simpleFlush.dat");
 
       // write to file
-      int mid = fileSize/2;
+      int mid = AppendTestUtil.FILE_SIZE /2;
       stm.write(fileContents, 0, mid);
       stm.sync();
       System.out.println("Wrote and Flushed first part of file.");
 
       // write the remainder of the file
-      stm.write(fileContents, mid, fileSize - mid);
+      stm.write(fileContents, mid, AppendTestUtil.FILE_SIZE - mid);
       System.out.println("Written second part of file");
       stm.sync();
       stm.sync();
@@ -250,7 +206,8 @@ public class TestFileAppend extends TestCase {
       System.out.println("Closed file.");
 
       // verify that entire file is good
-      checkFullFile(fs, file1);
+      AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+          fileContents, "Read 2");
 
     } catch (IOException e) {
       System.out.println("Exception :" + e);
@@ -267,36 +224,38 @@ public class TestFileAppend extends TestCase {
 
   /**
    * Test that file data can be flushed.
+   * @throws IOException an exception might be thrown
    */
   public void testComplexFlush() throws IOException {
     Configuration conf = new Configuration();
     if (simulatedStorage) {
       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
     }
-    initBuffer(fileSize);
+    fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     try {
 
       // create a new file.
       Path file1 = new Path("/complexFlush.dat");
-      FSDataOutputStream stm = createFile(fs, file1, 1);
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
       System.out.println("Created file complexFlush.dat");
 
       int start = 0;
-      for (start = 0; (start + 29) < fileSize; ) {
+      for (start = 0; (start + 29) < AppendTestUtil.FILE_SIZE; ) {
         stm.write(fileContents, start, 29);
         stm.sync();
         start += 29;
       }
-      stm.write(fileContents, start, fileSize-start);
+      stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
 
       // verify that full blocks are sane
       checkFile(fs, file1, 1);
       stm.close();
 
       // verify that entire file is good
-      checkFullFile(fs, file1);
+      AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+          fileContents, "Read 2");
     } catch (IOException e) {
       System.out.println("Exception :" + e);
       throw e; 

+ 19 - 58
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java

@@ -24,7 +24,6 @@ import java.util.Arrays;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,9 +55,7 @@ public class TestFileAppend2 extends TestCase {
     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
   }
 
-  static final int blockSize = 1024;
   static final int numBlocks = 5;
-  static final int fileSize = numBlocks * blockSize + 1;
   boolean simulatedStorage = false;
 
   private byte[] fileContents = null;
@@ -73,54 +70,14 @@ public class TestFileAppend2 extends TestCase {
   int numAppendsPerThread = 2000;
 ****/
   Workload[] workload = null;
-  ArrayList<Path> testFiles = new ArrayList<Path>();
+  final ArrayList<Path> testFiles = new ArrayList<Path>();
   volatile static boolean globalStatus = true;
 
-  //
-  // create a buffer that contains the entire test file data.
-  //
-  private void initBuffer(int size) {
-    long seed = AppendTestUtil.nextLong();
-    fileContents = AppendTestUtil.randomBytes(seed, size);
-  }
-
-  /*
-   * creates a file but does not close it
-   */ 
-  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
-    throws IOException {
-    FSDataOutputStream stm = fileSys.create(name, true,
-                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
-                                            (short)repl, (long)blockSize);
-    return stm;
-  }
-
-  private void checkFile(FileSystem fs, Path name, int len) throws IOException {
-    FSDataInputStream stm = fs.open(name);
-    byte[] actual = new byte[len];
-    stm.readFully(0, actual);
-    checkData(actual, 0, fileContents, "Read 2");
-    stm.close();
-  }
-
-  private void checkFullFile(FileSystem fs, Path name) throws IOException {
-    checkFile(fs, name, fileSize);
-  }
-
-  private void checkData(byte[] actual, int from, byte[] expected, String message) {
-    for (int idx = 0; idx < actual.length; idx++) {
-      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
-                   expected[from+idx]+" actual "+actual[idx],
-                   expected[from+idx], actual[idx]);
-      actual[idx] = 0;
-    }
-  }
-
-
   /**
    * Creates one file, writes a few bytes to it and then closed it.
    * Reopens the same file for appending, write all blocks and then close.
    * Verify that all data exists in file.
+   * @throws IOException an exception might be thrown
    */ 
   public void testSimpleAppend() throws IOException {
     Configuration conf = new Configuration();
@@ -129,7 +86,7 @@ public class TestFileAppend2 extends TestCase {
     }
     conf.setInt("dfs.datanode.handler.count", 50);
     conf.setBoolean("dfs.support.append", true);
-    initBuffer(fileSize);
+    fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     try {
@@ -137,7 +94,7 @@ public class TestFileAppend2 extends TestCase {
 
         // create a new file.
         Path file1 = new Path("/simpleAppend.dat");
-        FSDataOutputStream stm = createFile(fs, file1, 1);
+        FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
         System.out.println("Created file simpleAppend.dat");
   
         // write to file
@@ -161,14 +118,16 @@ public class TestFileAppend2 extends TestCase {
         // ensure getPos is set to reflect existing size of the file
         assertTrue(stm.getPos() > 0);
 
-        System.out.println("Writing " + (fileSize - mid2) + " bytes to file " + file1);
-        stm.write(fileContents, mid2, fileSize - mid2);
+        System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) +
+            " bytes to file " + file1);
+        stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2);
         System.out.println("Written second part of file");
         stm.close();
         System.out.println("Wrote and Closed second part of file.");
   
         // verify that entire file is good
-        checkFullFile(fs, file1);
+        AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+            fileContents, "Read 2");
       }
 
       { // test appending to an non-existing file.
@@ -285,7 +244,7 @@ public class TestFileAppend2 extends TestCase {
       for (int i = 0; i < numAppendsPerThread; i++) {
    
         // pick a file at random and remove it from pool
-        Path testfile = null;
+        Path testfile;
         synchronized (testFiles) {
           if (testFiles.size() == 0) {
             System.out.println("Completed write to almost all files.");
@@ -304,7 +263,7 @@ public class TestFileAppend2 extends TestCase {
           len = fs.getFileStatus(testfile).getLen();
 
           // if file is already full, then pick another file
-          if (len >= fileSize) {
+          if (len >= AppendTestUtil.FILE_SIZE) {
             System.out.println("File " + testfile + " is full.");
             continue;
           }
@@ -312,7 +271,7 @@ public class TestFileAppend2 extends TestCase {
           // do small size appends so that we can trigger multiple
           // appends to the same file.
           //
-          int left = (int)(fileSize - len)/3;
+          int left = (int)(AppendTestUtil.FILE_SIZE - len)/3;
           if (left <= 0) {
             left = 1;
           }
@@ -335,8 +294,7 @@ public class TestFileAppend2 extends TestCase {
                                  " expected size " + (len + sizeToAppend) +
                                  " waiting for namenode metadata update.");
               Thread.sleep(5000);
-            } catch (InterruptedException e) { 
-            }
+            } catch (InterruptedException e) {;}
           }
 
           assertTrue("File " + testfile + " size is " + 
@@ -344,7 +302,8 @@ public class TestFileAppend2 extends TestCase {
                      " but expected " + (len + sizeToAppend),
                     fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
 
-          checkFile(fs, testfile, (int)(len + sizeToAppend));
+          AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend),
+              fileContents, "Read 2");
         } catch (Throwable e) {
           globalStatus = false;
           if (e != null && e.toString() != null) {
@@ -368,9 +327,10 @@ public class TestFileAppend2 extends TestCase {
 
   /**
    * Test that appends to files at random offsets.
+   * @throws IOException an exception might be thrown
    */
   public void testComplexAppend() throws IOException {
-    initBuffer(fileSize);
+    fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     Configuration conf = new Configuration();
     conf.setInt("heartbeat.recheck.interval", 2000);
     conf.setInt("dfs.heartbeat.interval", 2);
@@ -392,7 +352,8 @@ public class TestFileAppend2 extends TestCase {
       for (int i = 0; i < numberOfFiles; i++) {
         short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
         Path testFile = new Path("/" + i + ".dat");
-        FSDataOutputStream stm = createFile(fs, testFile, replication);
+        FSDataOutputStream stm =
+            AppendTestUtil.createFile(fs, testFile, replication);
         stm.close();
         testFiles.add(testFile);
       }

+ 24 - 6
src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java

@@ -66,7 +66,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     };  
   }
 
-  /** TC1: Append on block boundary. */
+  /**
+   * TC1: Append on block boundary.
+   * @throws IOException an exception might be thrown
+   */
   public void testTC1() throws Exception {
     final Path p = new Path("/TC1/foo");
     System.out.println("p=" + p);
@@ -91,7 +94,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
-  /** TC2: Append on non-block boundary. */
+  /**
+   * TC2: Append on non-block boundary.
+   * @throws IOException an exception might be thrown
+   */
   public void testTC2() throws Exception {
     final Path p = new Path("/TC2/foo");
     System.out.println("p=" + p);
@@ -116,7 +122,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
-  /** TC5: Only one simultaneous append. */
+  /**
+   * TC5: Only one simultaneous append.
+   * @throws IOException an exception might be thrown
+   */
   public void testTC5() throws Exception {
     final Path p = new Path("/TC5/foo");
     System.out.println("p=" + p);
@@ -143,7 +152,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     out.close();        
   }
 
-  /** TC7: Corrupted replicas are present. */
+  /**
+   * TC7: Corrupted replicas are present.
+   * @throws IOException an exception might be thrown
+   */
   public void testTC7() throws Exception {
     final short repl = 2;
     final Path p = new Path("/TC7/foo");
@@ -188,7 +200,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
-  /** TC11: Racing rename */
+  /**
+   * TC11: Racing rename
+   * @throws IOException an exception might be thrown
+   */
   public void testTC11() throws Exception {
     final Path p = new Path("/TC11/foo");
     System.out.println("p=" + p);
@@ -241,7 +256,10 @@ public class TestFileAppend3 extends junit.framework.TestCase {
     }
   }
 
-  /** TC12: Append to partial CRC chunk */
+  /** 
+   * TC12: Append to partial CRC chunk
+   * @throws IOException an exception might be thrown
+   */
   public void testTC12() throws Exception {
     final Path p = new Path("/TC12/foo");
     System.out.println("p=" + p);

+ 3 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -63,7 +64,7 @@ public class TestFileStatus extends TestCase {
     Configuration conf = new Configuration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
-    DFSClient dfsClient = new DFSClient(conf);
+    final DFSClient dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
     try {
 
       //
@@ -83,7 +84,7 @@ public class TestFileStatus extends TestCase {
       // make sure getFileInfo throws the appropriate exception for non-relative
       // filenames
       try {
-        FileStatus foo = dfsClient.getFileInfo("non-relative");
+        dfsClient.getFileInfo("non-relative");
         fail("getFileInfo for a non-relative path did not thro IOException");
       } catch (RemoteException re) {
         assertTrue(re.toString().contains("Invalid file name"));

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/TestGetBlocks.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -75,7 +76,7 @@ public class TestGetBlocks extends TestCase {
       DatanodeInfo[] dataNodes=null;
       boolean notWritten;
       do {
-        DFSClient dfsclient = new DFSClient(CONF);
+        final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);
         locatedBlocks = dfsclient.getNamenode().
           getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
         assertEquals(2, locatedBlocks.size());

+ 110 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestThreadLocalDateFormat.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.TestDirectoryScanner;
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * Test for {@link ThreadLocalDateFormat}
+ */
+public class TestThreadLocalDateFormat {
+  private static final int TOTAL_THREADS = 3;
+  private static final Log LOG = LogFactory.getLog(TestDirectoryScanner.class);
+  private static final ThreadLocalDateFormat TDF = new ThreadLocalDateFormat(
+      "dd-MM-yyyy HH:mm:ss:S Z");
+  private static volatile boolean failed = false;
+  private final static Random rand = new Random();
+
+  private static synchronized void setFailed() {
+    failed = true;
+  }
+
+  /**
+   * Run formatting and parsing test and look for multi threaded access related
+   * failures
+   */
+  private void runTest(final SimpleDateFormat df) {
+    while (!failed) {
+      try {
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date date = new Date(rand.nextInt(Integer.MAX_VALUE));
+        String s1 = df.format(date);
+        Date parsedDate = df.parse(s1);
+        String s2 = df.format(parsedDate);
+        if (!s1.equals(s2)) {
+          LOG.warn("Parse failed, actual /" + s2 + "/ expected /" + s1 + "/");
+          setFailed();
+        }
+      } catch (ArrayIndexOutOfBoundsException e) {
+        setFailed();
+        LOG.warn("exception ", e);
+      } catch (ParseException e) {
+        LOG.warn("Parsing failed ", e);
+        setFailed();
+      } catch (Exception e) {
+        LOG.warn("Unknown exception", e);
+        setFailed();
+      }
+    }
+  }
+
+  /**
+   * {@link SimpleDateFormat} when using with multiple threads has following
+   * issues:
+   * <ul>
+   * <li>format method throws {@link ArrayIndexOutOfBoundsException}
+   * <li>parse method throws {@link ParseException} or returns invalid parse
+   * </ul>
+   * This test shows ThreadLocal based implementation of
+   * {@link SimpleDateFormat} does not have these issues.
+   * 
+   * @throws InterruptedException
+   */
+  @Test
+  public void testDateFormat() throws InterruptedException {
+    for (int i = 0; i < TOTAL_THREADS; i++) {
+      Thread thread = new Thread() {
+        public void run() {
+          runTest(TDF.get());
+        }
+      };
+      thread.start();
+    }
+
+    // Wait up to 30 seconds for failure to occur
+    long endTime = System.currentTimeMillis() + 30 * 1000;
+    while (!failed && endTime > System.currentTimeMillis()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        LOG.debug("Exception", ie);
+      }
+    }
+    Assert.assertFalse(failed);
+  }
+}