Procházet zdrojové kódy

Merging r1533208 through r1534278 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534279 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal před 11 roky
rodič
revize
62f95b003b
29 změnil soubory, kde provedl 1472 přidání a 189 odebrání
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 275 28
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  3. 34 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java
  4. 10 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  5. 7 1
      hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
  6. 5 0
      hadoop-hdfs-project/hadoop-hdfs/pom.xml
  7. 10 1
      hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
  8. 12 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  9. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
  10. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
  11. 16 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
  13. 358 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java
  14. 23 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestJMXGet.java
  15. 160 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestTools.java
  16. 5 0
      hadoop-mapreduce-project/CHANGES.txt
  17. 49 22
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextOutputFormat.java
  18. 7 0
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
  19. 35 0
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java
  20. 90 0
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java
  21. 2 2
      hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java
  22. 51 0
      hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java
  23. 68 0
      hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java
  24. 3 0
      hadoop-yarn-project/CHANGES.txt
  25. 30 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  26. 3 100
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  27. 120 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
  28. 80 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
  29. 9 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -363,6 +363,9 @@ Release 2.3.0 - UNRELEASED
     HADOOP-9078. enhance unit-test coverage of class
     org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles)
 
+    HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid
+    multi-threaded performance issues on read/write.  (Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

+ 275 - 28
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.ref.WeakReference;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
@@ -31,6 +32,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -2501,28 +2503,149 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
   }
   
+  /**
+   * Tracks statistics about how many reads, writes, and so forth have been
+   * done in a FileSystem.
+   * 
+   * Since there is only one of these objects per FileSystem, there will 
+   * typically be many threads writing to this object.  Almost every operation
+   * on an open file will involve a write to this object.  In contrast, reading
+   * statistics is done infrequently by most programs, and not at all by others.
+   * Hence, this is optimized for writes.
+   * 
+   * Each thread writes to its own thread-local area of memory.  This removes 
+   * contention and allows us to scale up to many, many threads.  To read
+   * statistics, the reader thread totals up the contents of all of the 
+   * thread-local data areas.
+   */
   public static final class Statistics {
+    /**
+     * Statistics data.
+     * 
+     * There is only a single writer to thread-local StatisticsData objects.
+     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
+     * to prevent lost updates.
+     * The Java specification guarantees that updates to volatile longs will
+     * be perceived as atomic with respect to other threads, which is all we
+     * need.
+     */
+    private static class StatisticsData {
+      volatile long bytesRead;
+      volatile long bytesWritten;
+      volatile int readOps;
+      volatile int largeReadOps;
+      volatile int writeOps;
+      /**
+       * Stores a weak reference to the thread owning this StatisticsData.
+       * This allows us to remove StatisticsData objects that pertain to
+       * threads that no longer exist.
+       */
+      final WeakReference<Thread> owner;
+
+      StatisticsData(WeakReference<Thread> owner) {
+        this.owner = owner;
+      }
+
+      /**
+       * Add another StatisticsData object to this one.
+       */
+      void add(StatisticsData other) {
+        this.bytesRead += other.bytesRead;
+        this.bytesWritten += other.bytesWritten;
+        this.readOps += other.readOps;
+        this.largeReadOps += other.largeReadOps;
+        this.writeOps += other.writeOps;
+      }
+
+      /**
+       * Negate the values of all statistics.
+       */
+      void negate() {
+        this.bytesRead = -this.bytesRead;
+        this.bytesWritten = -this.bytesWritten;
+        this.readOps = -this.readOps;
+        this.largeReadOps = -this.largeReadOps;
+        this.writeOps = -this.writeOps;
+      }
+
+      @Override
+      public String toString() {
+        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+            + readOps + " read ops, " + largeReadOps + " large read ops, "
+            + writeOps + " write ops";
+      }
+    }
+
+    private interface StatisticsAggregator<T> {
+      void accept(StatisticsData data);
+      T aggregate();
+    }
+
     private final String scheme;
-    private AtomicLong bytesRead = new AtomicLong();
-    private AtomicLong bytesWritten = new AtomicLong();
-    private AtomicInteger readOps = new AtomicInteger();
-    private AtomicInteger largeReadOps = new AtomicInteger();
-    private AtomicInteger writeOps = new AtomicInteger();
+
+    /**
+     * rootData is data that doesn't belong to any thread, but will be added
+     * to the totals.  This is useful for making copies of Statistics objects,
+     * and for storing data that pertains to threads that have been garbage
+     * collected.  Protected by the Statistics lock.
+     */
+    private final StatisticsData rootData;
+
+    /**
+     * Thread-local data.
+     */
+    private final ThreadLocal<StatisticsData> threadData;
     
+    /**
+     * List of all thread-local data areas.  Protected by the Statistics lock.
+     */
+    private LinkedList<StatisticsData> allData;
+
     public Statistics(String scheme) {
       this.scheme = scheme;
+      this.rootData = new StatisticsData(null);
+      this.threadData = new ThreadLocal<StatisticsData>();
+      this.allData = null;
     }
 
     /**
      * Copy constructor.
      * 
-     * @param st
-     *          The input Statistics object which is cloned.
+     * @param other    The input Statistics object which is cloned.
+     */
+    public Statistics(Statistics other) {
+      this.scheme = other.scheme;
+      this.rootData = new StatisticsData(null);
+      other.visitAll(new StatisticsAggregator<Void>() {
+        @Override
+        public void accept(StatisticsData data) {
+          rootData.add(data);
+        }
+
+        public Void aggregate() {
+          return null;
+        }
+      });
+      this.threadData = new ThreadLocal<StatisticsData>();
+    }
+
+    /**
+     * Get or create the thread-local data associated with the current thread.
      */
-    public Statistics(Statistics st) {
-      this.scheme = st.scheme;
-      this.bytesRead = new AtomicLong(st.bytesRead.longValue());
-      this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
+    private StatisticsData getThreadData() {
+      StatisticsData data = threadData.get();
+      if (data == null) {
+        data = new StatisticsData(
+            new WeakReference<Thread>(Thread.currentThread()));
+        threadData.set(data);
+        synchronized(this) {
+          if (allData == null) {
+            allData = new LinkedList<StatisticsData>();
+          }
+          allData.add(data);
+        }
+      }
+      return data;
     }
 
     /**
@@ -2530,7 +2653,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @param newBytes the additional bytes read
      */
     public void incrementBytesRead(long newBytes) {
-      bytesRead.getAndAdd(newBytes);
+      getThreadData().bytesRead += newBytes;
     }
     
     /**
@@ -2538,7 +2661,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @param newBytes the additional bytes written
      */
     public void incrementBytesWritten(long newBytes) {
-      bytesWritten.getAndAdd(newBytes);
+      getThreadData().bytesWritten += newBytes;
     }
     
     /**
@@ -2546,7 +2669,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @param count number of read operations
      */
     public void incrementReadOps(int count) {
-      readOps.getAndAdd(count);
+      getThreadData().readOps += count;
     }
 
     /**
@@ -2554,7 +2677,7 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @param count number of large read operations
      */
     public void incrementLargeReadOps(int count) {
-      largeReadOps.getAndAdd(count);
+      getThreadData().largeReadOps += count;
     }
 
     /**
@@ -2562,7 +2685,38 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @param count number of write operations
      */
     public void incrementWriteOps(int count) {
-      writeOps.getAndAdd(count);
+      getThreadData().writeOps += count;
+    }
+
+    /**
+     * Apply the given aggregator to all StatisticsData objects associated with
+     * this Statistics object.
+     *
+     * For each StatisticsData object, we will call accept on the visitor.
+     * Finally, at the end, we will call aggregate to get the final total. 
+     *
+     * @param         The visitor to use.
+     * @return        The total.
+     */
+    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
+      visitor.accept(rootData);
+      if (allData != null) {
+        for (Iterator<StatisticsData> iter = allData.iterator();
+            iter.hasNext(); ) {
+          StatisticsData data = iter.next();
+          visitor.accept(data);
+          if (data.owner.get() == null) {
+            /*
+             * If the thread that created this thread-local data no
+             * longer exists, remove the StatisticsData from our list
+             * and fold the values into rootData.
+             */
+            rootData.add(data);
+            iter.remove();
+          }
+        }
+      }
+      return visitor.aggregate();
     }
 
     /**
@@ -2570,7 +2724,18 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @return the number of bytes
      */
     public long getBytesRead() {
-      return bytesRead.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesRead = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesRead += data.bytesRead;
+        }
+
+        public Long aggregate() {
+          return bytesRead;
+        }
+      });
     }
     
     /**
@@ -2578,7 +2743,18 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @return the number of bytes
      */
     public long getBytesWritten() {
-      return bytesWritten.get();
+      return visitAll(new StatisticsAggregator<Long>() {
+        private long bytesWritten = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          bytesWritten += data.bytesWritten;
+        }
+
+        public Long aggregate() {
+          return bytesWritten;
+        }
+      });
     }
     
     /**
@@ -2586,7 +2762,19 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @return number of read operations
      */
     public int getReadOps() {
-      return readOps.get() + largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int readOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          readOps += data.readOps;
+          readOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return readOps;
+        }
+      });
     }
 
     /**
@@ -2595,7 +2783,18 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @return number of large read operations
      */
     public int getLargeReadOps() {
-      return largeReadOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int largeReadOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          largeReadOps += data.largeReadOps;
+        }
+
+        public Integer aggregate() {
+          return largeReadOps;
+        }
+      });
     }
 
     /**
@@ -2604,22 +2803,70 @@ public abstract class FileSystem extends Configured implements Closeable {
      * @return number of write operations
      */
     public int getWriteOps() {
-      return writeOps.get();
+      return visitAll(new StatisticsAggregator<Integer>() {
+        private int writeOps = 0;
+
+        @Override
+        public void accept(StatisticsData data) {
+          writeOps += data.writeOps;
+        }
+
+        public Integer aggregate() {
+          return writeOps;
+        }
+      });
     }
 
+
     @Override
     public String toString() {
-      return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
-          + readOps + " read ops, " + largeReadOps + " large read ops, "
-          + writeOps + " write ops";
+      return visitAll(new StatisticsAggregator<String>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public String aggregate() {
+          return total.toString();
+        }
+      });
     }
-    
+
     /**
-     * Reset the counts of bytes to 0.
+     * Resets all statistics to 0.
+     *
+     * In order to reset, we add up all the thread-local statistics data, and
+     * set rootData to the negative of that.
+     *
+     * This may seem like a counterintuitive way to reset the statsitics.  Why
+     * can't we just zero out all the thread-local data?  Well, thread-local
+     * data can only be modified by the thread that owns it.  If we tried to
+     * modify the thread-local data from this thread, our modification might get
+     * interleaved with a read-modify-write operation done by the thread that
+     * owns the data.  That would result in our update getting lost.
+     *
+     * The approach used here avoids this problem because it only ever reads
+     * (not writes) the thread-local data.  Both reads and writes to rootData
+     * are done under the lock, so we're free to modify rootData from any thread
+     * that holds the lock.
      */
     public void reset() {
-      bytesWritten.set(0);
-      bytesRead.set(0);
+      visitAll(new StatisticsAggregator<Void>() {
+        private StatisticsData total = new StatisticsData(null);
+
+        @Override
+        public void accept(StatisticsData data) {
+          total.add(data);
+        }
+
+        public Void aggregate() {
+          total.negate();
+          rootData.add(total);
+          return null;
+        }
+      });
     }
     
     /**

+ 34 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 import static org.apache.hadoop.fs.FileContextTestHelper.*;
 
 /**
@@ -44,6 +46,38 @@ public abstract class FCStatisticsBaseTest {
   //fc should be set appropriately by the deriving test.
   protected static FileContext fc = null;
   
+  @Test(timeout=60000)
+  public void testStatisticsOperations() throws Exception {
+    final Statistics stats = new Statistics("file");
+    Assert.assertEquals(0L, stats.getBytesRead());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertEquals(0, stats.getWriteOps());
+    stats.incrementBytesWritten(1000);
+    Assert.assertEquals(1000L, stats.getBytesWritten());
+    Assert.assertEquals(0, stats.getWriteOps());
+    stats.incrementWriteOps(123);
+    Assert.assertEquals(123, stats.getWriteOps());
+    
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        stats.incrementWriteOps(1);
+      }
+    };
+    thread.start();
+    Uninterruptibles.joinUninterruptibly(thread);
+    Assert.assertEquals(124, stats.getWriteOps());
+    // Test copy constructor and reset function
+    Statistics stats2 = new Statistics(stats);
+    stats.reset();
+    Assert.assertEquals(0, stats.getWriteOps());
+    Assert.assertEquals(0L, stats.getBytesWritten());
+    Assert.assertEquals(0L, stats.getBytesRead());
+    Assert.assertEquals(124, stats2.getWriteOps());
+    Assert.assertEquals(1000L, stats2.getBytesWritten());
+    Assert.assertEquals(0L, stats2.getBytesRead());
+  }
+
   @Test
   public void testStatistics() throws IOException, URISyntaxException {
     URI fsUri = getFsUri();

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -324,6 +324,11 @@ Release 2.3.0 - UNRELEASED
     HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands.
     (Binglin Chang via jing9)
 
+    HDFS-5374. Remove deadcode in DFSOutputStream. (suresh)
+
+    HDFS-4511. Cover package org.apache.hadoop.hdfs.tools with unit test
+    (Andrey Klochkov via jeagles)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -375,6 +380,9 @@ Release 2.2.1 - UNRELEASED
     HDFS-5360. Improvement of usage message of renameSnapshot and
     deleteSnapshot. (Shinichi Yamashita via wang)
 
+    HDFS-5331. make SnapshotDiff.java to a o.a.h.util.Tool interface implementation. 
+    (Vinayakumar B via umamahesh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -410,6 +418,8 @@ Release 2.2.1 - UNRELEASED
     HDFS-5370. Typo in Error Message: different between range in condition
     and range in error message. (Kousuke Saruta via suresh)
 
+    HDFS-5365. Fix libhdfs compile error on FreeBSD9. (Radim Kolar via cnauroth)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -83,7 +83,7 @@
        <Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
        <Method name="run" />
        <Bug pattern="DM_EXIT" />
-     </Match>
+     </Match>     
      <!--
        We need to cast objects between old and new api objects
      -->
@@ -325,6 +325,12 @@
        <Field name="modification" />
        <Bug pattern="VO_VOLATILE_INCREMENT" />
      </Match>
+      <!-- Replace System.exit() call with ExitUtil.terminate() -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.tools.JMXGet"/>
+       <Method name="main" />
+       <Bug pattern="NP_NULL_ON_SOME_PATH" />
+     </Match>    
      <Match>
        <Class name="org.apache.hadoop.hdfs.server.datanode.ReplicaInfo" />
        <Method name="setDirInternal" />

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -176,6 +176,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>xmlenc</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt

@@ -62,6 +62,11 @@ endfunction()
 INCLUDE(CheckCSourceCompiles)
 CHECK_C_SOURCE_COMPILES("int main(void) { static __thread int i = 0; return 0; }" HAVE_BETTER_TLS)
 
+# Check if we need to link dl library to get dlopen.
+# dlopen on Linux is in separate library but on FreeBSD its in libc
+INCLUDE(CheckLibraryExists)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
 find_package(JNI REQUIRED)
 if (NOT GENERATED_JAVAH)
     # Must identify where the generated headers have been placed
@@ -89,9 +94,13 @@ add_dual_library(hdfs
     main/native/libhdfs/jni_helper.c
     main/native/libhdfs/hdfs.c
 )
+if (NEED_LINK_DL)
+   set(LIB_DL dl)
+endif(NEED_LINK_DL)
+
 target_link_dual_libraries(hdfs
     ${JAVA_JVM_LIBRARY}
-    dl
+    ${LIB_DL}
     pthread
 )
 dual_output_directory(hdfs target/usr/local/lib)

+ 12 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
@@ -138,7 +137,7 @@ public class DFSOutputStream extends FSOutputSummer
   private long currentSeqno = 0;
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
-  private long bytesCurBlock = 0; // bytes writen in current block
+  private long bytesCurBlock = 0; // bytes written in current block
   private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
   private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
@@ -460,8 +459,7 @@ public class DFSOutputStream extends FSOutputSummer
           }
         }
 
-        Packet one = null;
-
+        Packet one;
         try {
           // process datanode IO errors if any
           boolean doSleep = false;
@@ -506,7 +504,7 @@ public class DFSOutputStream extends FSOutputSummer
             if(DFSClient.LOG.isDebugEnabled()) {
               DFSClient.LOG.debug("Allocating new block");
             }
-            nodes = nextBlockOutputStream(src);
+            nodes = nextBlockOutputStream();
             initDataStreaming();
           } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
             if(DFSClient.LOG.isDebugEnabled()) {
@@ -571,9 +569,6 @@ public class DFSOutputStream extends FSOutputSummer
           }
           lastPacket = Time.now();
           
-          if (one.isHeartbeatPacket()) {  //heartbeat packet
-          }
-          
           // update bytesSent
           long tmpBytesSent = one.getLastByteOffsetBlock();
           if (bytesSent < tmpBytesSent) {
@@ -692,7 +687,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     //
-    // Processes reponses from the datanodes.  A packet is removed 
+    // Processes responses from the datanodes.  A packet is removed
     // from the ackQueue when its response arrives.
     //
     private class ResponseProcessor extends Daemon {
@@ -734,18 +729,18 @@ public class DFSOutputStream extends FSOutputSummer
             }
             
             assert seqno != PipelineAck.UNKOWN_SEQNO : 
-              "Ack for unkown seqno should be a failed ack: " + ack;
+              "Ack for unknown seqno should be a failed ack: " + ack;
             if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
               continue;
             }
 
             // a success ack for a data packet
-            Packet one = null;
+            Packet one;
             synchronized (dataQueue) {
               one = ackQueue.getFirst();
             }
             if (one.seqno != seqno) {
-              throw new IOException("Responseprocessor: Expecting seqno " +
+              throw new IOException("ResponseProcessor: Expecting seqno " +
                                     " for block " + block +
                                     one.seqno + " but received " + seqno);
             }
@@ -1056,7 +1051,7 @@ public class DFSOutputStream extends FSOutputSummer
      * Must get block ID and the IDs of the destinations from the namenode.
      * Returns the list of target datanodes.
      */
-    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+    private DatanodeInfo[] nextBlockOutputStream() throws IOException {
       LocatedBlock lb = null;
       DatanodeInfo[] nodes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
@@ -1214,8 +1209,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     private LocatedBlock locateFollowingBlock(long start,
-        DatanodeInfo[] excludedNodes) 
-        throws IOException, UnresolvedLinkException {
+        DatanodeInfo[] excludedNodes)  throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       long sleeptime = 400;
       while (true) {
@@ -1287,7 +1281,7 @@ public class DFSOutputStream extends FSOutputSummer
    * Create a socket for a write pipeline
    * @param first the first datanode 
    * @param length the pipeline length
-   * @param client
+   * @param client client
    * @return the socket connected to the first datanode
    */
   static Socket createSocketForPipeline(final DatanodeInfo first,
@@ -1479,7 +1473,7 @@ public class DFSOutputStream extends FSOutputSummer
           //
           // Rather than wait around for space in the queue, we should instead try to
           // return to the caller as soon as possible, even though we slightly overrun
-          // the MAX_PACKETS iength.
+          // the MAX_PACKETS length.
           Thread.currentThread().interrupt();
           break;
         }
@@ -1700,7 +1694,7 @@ public class DFSOutputStream extends FSOutputSummer
         }
       }
       // If 1) any new blocks were allocated since the last flush, or 2) to
-      // update length in NN is requried, then persist block locations on
+      // update length in NN is required, then persist block locations on
       // namenode.
       if (persistBlocks.getAndSet(false) || updateLength) {
         try {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 import com.google.common.base.Charsets;
@@ -86,7 +87,7 @@ public class DelegationTokenFetcher {
     err.println("  --print             Print the delegation token");
     err.println();
     GenericOptionsParser.printGenericCommandUsage(err);
-    System.exit(1);
+    ExitUtil.terminate(1);    
   }
 
   private static Collection<Token<?>> readTokens(Path file, Configuration conf)

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java

@@ -43,6 +43,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.ExitUtil;
 
 /**
  * tool to get data from NameNode or DataNode using MBeans currently the
@@ -295,7 +296,7 @@ public class JMXGet {
       // invalid arguments
       err("Invalid args");
       printUsage(opts);
-      System.exit(-1);
+      ExitUtil.terminate(-1);      
     }
 
     JMXGet jm = new JMXGet();
@@ -317,7 +318,7 @@ public class JMXGet {
 
     if (commandLine.hasOption("help")) {
       printUsage(opts);
-      System.exit(0);
+      ExitUtil.terminate(0);
     }
 
     // rest of args
@@ -342,6 +343,6 @@ public class JMXGet {
       res = -1;
     }
 
-    System.exit(res);
+    ExitUtil.terminate(res);
   }
 }

+ 16 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java

@@ -20,12 +20,14 @@ package org.apache.hadoop.hdfs.tools.snapshot;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A tool used to get the difference report between two snapshots, or between
@@ -38,7 +40,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
  * </pre>
  */
 @InterfaceAudience.Private
-public class SnapshotDiff {
+public class SnapshotDiff extends Configured implements Tool {
   private static String getSnapshotName(String name) {
     if (Path.CUR_DIR.equals(name)) { // current directory
       return "";
@@ -57,7 +59,8 @@ public class SnapshotDiff {
     return name.substring(i + HdfsConstants.DOT_SNAPSHOT_DIR.length() + 1);
   }
   
-  public static void main(String[] argv) throws IOException {
+  @Override
+  public int run(String[] argv) throws Exception {
     String description = "SnapshotDiff <snapshotDir> <from> <to>:\n" +
     "\tGet the difference between two snapshots, \n" + 
     "\tor between a snapshot and the current tree of a directory.\n" +
@@ -67,15 +70,14 @@ public class SnapshotDiff {
     
     if(argv.length != 3) {
       System.err.println("Usage: \n" + description);
-      System.exit(1);
+      return 1;
     }
     
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(getConf());
     if (! (fs instanceof DistributedFileSystem)) {
       System.err.println(
           "SnapshotDiff can only be used in DistributedFileSystem");
-      System.exit(1);
+      return 1;
     }
     DistributedFileSystem dfs = (DistributedFileSystem) fs;
     
@@ -89,7 +91,14 @@ public class SnapshotDiff {
     } catch (IOException e) {
       String[] content = e.getLocalizedMessage().split("\n");
       System.err.println("snapshotDiff: " + content[0]);
+      return 1;
     }
+    return 0;
+  }
+
+  public static void main(String[] argv) throws Exception {
+    int rc = ToolRunner.run(new SnapshotDiff(), argv);
+    System.exit(rc);
   }
 
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c

@@ -27,6 +27,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <limits.h>
 
 static pthread_mutex_t gTempdirLock = PTHREAD_MUTEX_INITIALIZER;
 

+ 358 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java

@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HftpFileSystem;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class TestDelegationTokenRemoteFetcher {
+  private static final Logger LOG = Logger
+      .getLogger(TestDelegationTokenRemoteFetcher.class);
+
+  private static final String EXP_DATE = "124123512361236";
+  private static final String tokenFile = "http.file.dta";
+
+  private int httpPort;
+  private String serviceUrl;
+  private FileSystem fileSys;
+  private Configuration conf;
+  private ServerBootstrap bootstrap;
+  private Token<DelegationTokenIdentifier> testToken;
+  private volatile AssertionError assertionError;
+  
+  @Before
+  public void init() throws Exception {
+    conf = new Configuration();
+    fileSys = FileSystem.getLocal(conf);
+    httpPort = NetUtils.getFreeSocketPort();
+    serviceUrl = "http://localhost:" + httpPort;
+    testToken = createToken(serviceUrl);
+  }
+
+  @After
+  public void clean() throws IOException {
+    if (fileSys != null)
+      fileSys.delete(new Path(tokenFile), true);
+    if (bootstrap != null)
+      bootstrap.releaseExternalResources();
+  }
+
+  /**
+   * try to fetch token without http server with IOException
+   */
+  @Test
+  public void testTokenFetchFail() throws Exception {
+    try {
+      DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
+          tokenFile });
+      fail("Token fetcher shouldn't start in absense of NN");
+    } catch (IOException ex) {
+    }
+  }
+  
+  /**
+   * try to fetch token without http server with IOException
+   */
+  @Test
+  public void testTokenRenewFail() {
+    try {
+      DelegationTokenFetcher.renewDelegationToken(serviceUrl, testToken);
+      fail("Token fetcher shouldn't be able to renew tokens in absense of NN");
+    } catch (IOException ex) {
+    } 
+  }     
+  
+  /**
+   * try cancel token without http server with IOException
+   */
+  @Test
+  public void expectedTokenCancelFail() {
+    try {
+      DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+      fail("Token fetcher shouldn't be able to cancel tokens in absense of NN");
+    } catch (IOException ex) {
+    } 
+  }
+  
+  /**
+   * try fetch token and get http response with error
+   */
+  @Test  
+  public void expectedTokenRenewErrorHttpResponse() {
+    bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
+    try {
+      DelegationTokenFetcher.renewDelegationToken(serviceUrl + "/exception", 
+          createToken(serviceUrl));
+      fail("Token fetcher shouldn't be able to renew tokens using an invalid"
+          + " NN URL");
+    } catch (IOException ex) {
+    } 
+    if (assertionError != null)
+      throw assertionError;
+  }
+  
+  /**
+   *   
+   *
+   */
+  @Test
+  public void testCancelTokenFromHttp() throws IOException {
+    bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
+    DelegationTokenFetcher.cancelDelegationToken(serviceUrl, testToken);
+    if (assertionError != null)
+      throw assertionError;
+  }
+  
+  /**
+   * Call renew token using http server return new expiration time
+   */
+  @Test
+  public void testRenewTokenFromHttp() throws IOException {
+    bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
+    assertTrue("testRenewTokenFromHttp error",
+        Long.valueOf(EXP_DATE) == DelegationTokenFetcher.renewDelegationToken(
+            serviceUrl, testToken));
+    if (assertionError != null)
+      throw assertionError;
+  }
+
+  /**
+   * Call fetch token using http server 
+   */
+  @Test
+  public void expectedTokenIsRetrievedFromHttp() throws Exception {
+    bootstrap = startHttpServer(httpPort, testToken, serviceUrl);
+    DelegationTokenFetcher.main(new String[] { "-webservice=" + serviceUrl,
+        tokenFile });
+    Path p = new Path(fileSys.getWorkingDirectory(), tokenFile);
+    Credentials creds = Credentials.readTokenStorageFile(p, conf);
+    Iterator<Token<?>> itr = creds.getAllTokens().iterator();
+    assertTrue("token not exist error", itr.hasNext());
+    Token<?> fetchedToken = itr.next();
+    Assert.assertArrayEquals("token wrong identifier error",
+        testToken.getIdentifier(), fetchedToken.getIdentifier());
+    Assert.assertArrayEquals("token wrong password error",
+        testToken.getPassword(), fetchedToken.getPassword());
+    if (assertionError != null)
+      throw assertionError;
+  }
+  
+  private static Token<DelegationTokenIdentifier> createToken(String serviceUri) {
+    byte[] pw = "hadoop".getBytes();
+    byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text(
+        "renewer"), new Text("realuser")).getBytes();
+    Text service = new Text(serviceUri);
+    return new Token<DelegationTokenIdentifier>(ident, pw,
+        HftpFileSystem.TOKEN_KIND, service);
+  }
+
+  private interface Handler {
+    void handle(Channel channel, Token<DelegationTokenIdentifier> token,
+        String serviceUrl) throws IOException;
+  }
+
+  private class FetchHandler implements Handler {
+    
+    @Override
+    public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
+        String serviceUrl) throws IOException {
+      Assert.assertEquals(testToken, token);
+
+      Credentials creds = new Credentials();
+      creds.addToken(new Text(serviceUrl), token);
+      DataOutputBuffer out = new DataOutputBuffer();
+      creds.write(out);
+      int fileLength = out.getData().length;
+      ChannelBuffer cbuffer = ChannelBuffers.buffer(fileLength);
+      cbuffer.writeBytes(out.getData());
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+          String.valueOf(fileLength));
+      response.setContent(cbuffer);
+      channel.write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+
+  private class RenewHandler implements Handler {
+    
+    @Override
+    public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
+        String serviceUrl) throws IOException {
+      Assert.assertEquals(testToken, token);
+      byte[] bytes = EXP_DATE.getBytes();
+      ChannelBuffer cbuffer = ChannelBuffers.buffer(bytes.length);
+      cbuffer.writeBytes(bytes);
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+          String.valueOf(bytes.length));
+      response.setContent(cbuffer);
+      channel.write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+  
+  private class ExceptionHandler implements Handler {
+
+    @Override
+    public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
+        String serviceUrl) throws IOException {
+      Assert.assertEquals(testToken, token);
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, 
+          HttpResponseStatus.METHOD_NOT_ALLOWED);
+      channel.write(response).addListener(ChannelFutureListener.CLOSE);
+    }    
+  }
+  
+  private class CancelHandler implements Handler {
+
+    @Override
+    public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
+        String serviceUrl) throws IOException {
+      Assert.assertEquals(testToken, token);
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      channel.write(response).addListener(ChannelFutureListener.CLOSE);
+    }    
+  }
+  
+  private final class CredentialsLogicHandler extends
+      SimpleChannelUpstreamHandler {
+
+    private final Token<DelegationTokenIdentifier> token;
+    private final String serviceUrl;
+    private ImmutableMap<String, Handler> routes = ImmutableMap.of(
+        "/exception", new ExceptionHandler(),
+        "/cancelDelegationToken", new CancelHandler(),
+        "/getDelegationToken", new FetchHandler() , 
+        "/renewDelegationToken", new RenewHandler());
+
+    public CredentialsLogicHandler(Token<DelegationTokenIdentifier> token,
+        String serviceUrl) {
+      this.token = token;
+      this.serviceUrl = serviceUrl;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
+        throws Exception {
+      HttpRequest request = (HttpRequest) e.getMessage();
+      if (request.getMethod() != GET) {
+        return;
+      }
+      UnmodifiableIterator<Map.Entry<String, Handler>> iter = routes.entrySet()
+          .iterator();
+      while (iter.hasNext()) {
+        Map.Entry<String, Handler> entry = iter.next();
+        if (request.getUri().contains(entry.getKey())) {
+          Handler handler = entry.getValue();
+          try {
+            handler.handle(e.getChannel(), token, serviceUrl);
+          } catch (AssertionError ee) {
+            TestDelegationTokenRemoteFetcher.this.assertionError = ee;
+            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, 
+                HttpResponseStatus.BAD_REQUEST);
+            response.setContent(ChannelBuffers.copiedBuffer(ee.getMessage(), 
+                Charset.defaultCharset()));
+            e.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+          }
+          return;
+        }
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+
+      if (LOG.isDebugEnabled())
+        LOG.debug(cause.getMessage());
+      ch.close().addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+
+  private ServerBootstrap startHttpServer(int port,
+      final Token<DelegationTokenIdentifier> token, final String url) {
+    ServerBootstrap bootstrap = new ServerBootstrap(
+        new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+            Executors.newCachedThreadPool()));
+
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(new HttpRequestDecoder(),
+            new HttpChunkAggregator(65536), new HttpResponseEncoder(),
+            new CredentialsLogicHandler(token, url));
+      }
+    });
+    bootstrap.bind(new InetSocketAddress("localhost", port));
+    return bootstrap;
+  }
+  
+}

+ 23 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestJMXGet.java

@@ -21,9 +21,13 @@ package org.apache.hadoop.tools;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.PrintStream;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -92,6 +96,7 @@ public class TestJMXGet {
     //jmx.init();
     //jmx = new JMXGet();
     jmx.init(); // default lists namenode mbeans only
+    assertTrue("error printAllValues", checkPrintAllValues(jmx));
 
     //get some data from different source
     assertEquals(numDatanodes, Integer.parseInt(
@@ -103,7 +108,24 @@ public class TestJMXGet {
 
     cluster.shutdown();
   }
-
+  
+  private static boolean checkPrintAllValues(JMXGet jmx) throws Exception {
+    int size = 0; 
+    byte[] bytes = null;
+    String pattern = "List of all the available keys:";
+    PipedOutputStream pipeOut = new PipedOutputStream();
+    PipedInputStream pipeIn = new PipedInputStream(pipeOut);
+    System.setErr(new PrintStream(pipeOut));
+    jmx.printAllValues();
+    if ((size = pipeIn.available()) != 0) {
+      bytes = new byte[size];
+      pipeIn.read(bytes, 0, bytes.length);            
+    }
+    pipeOut.close();
+    pipeIn.close();
+    return bytes != null ? new String(bytes).contains(pattern) : false;
+  }
+  
   /**
    * test JMX connection to DataNode..
    * @throws Exception 

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestTools.java

@@ -0,0 +1,160 @@
+package org.apache.hadoop.tools;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.PrintStream;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.ExitUtil.ExitException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteStreams;
+
+public class TestTools {
+
+  private static final int PIPE_BUFFER_SIZE = 1024 * 5;
+  private final static String INVALID_OPTION = "-invalidOption";
+  private static final String[] OPTIONS = new String[2];
+
+  @BeforeClass
+  public static void before() {
+    ExitUtil.disableSystemExit();
+    OPTIONS[1] = INVALID_OPTION;
+  }
+
+  @Test  
+  public void testDelegationTokenFetcherPrintUsage() {
+    String pattern = "Options:";
+    checkOutput(new String[] { "-help" }, pattern, System.out,
+        DelegationTokenFetcher.class);
+  }
+
+  @Test  
+  public void testDelegationTokenFetcherErrorOption() {
+    String pattern = "ERROR: Only specify cancel, renew or print.";
+    checkOutput(new String[] { "-cancel", "-renew" }, pattern, System.err,
+        DelegationTokenFetcher.class);
+  }
+
+  @Test  
+  public void testJMXToolHelp() {
+    String pattern = "usage: jmxget options are:";
+    checkOutput(new String[] { "-help" }, pattern, System.out, JMXGet.class);
+  }
+
+  @Test  
+  public void testJMXToolAdditionParameter() {
+    String pattern = "key = -addition";
+    checkOutput(new String[] { "-service=NameNode", "-server=localhost",
+        "-addition" }, pattern, System.err, JMXGet.class);
+  }
+
+  @Test
+  public void testDFSAdminInvalidUsageHelp() {
+    ImmutableSet<String> args = ImmutableSet.of("-report", "-saveNamespace",
+        "-rollEdits", "-restoreFailedStorage", "-refreshNodes",
+        "-finalizeUpgrade", "-metasave", "-refreshUserToGroupsMappings",
+        "-printTopology", "-refreshNamenodes", "-deleteBlockPool",
+        "-setBalancerBandwidth", "-fetchImage");
+    try {
+      for (String arg : args)
+        assertTrue(ToolRunner.run(new DFSAdmin(), fillArgs(arg)) == -1);
+      
+      assertTrue(ToolRunner.run(new DFSAdmin(),
+          new String[] { "-help", "-some" }) == 0);
+    } catch (Exception e) {
+      fail("testDFSAdminHelp error" + e);
+    }
+
+    String pattern = "Usage: java DFSAdmin";
+    checkOutput(new String[] { "-cancel", "-renew" }, pattern, System.err,
+        DFSAdmin.class);
+  }
+
+  private static String[] fillArgs(String arg) {
+    OPTIONS[0] = arg;
+    return OPTIONS;
+  }
+
+  private void checkOutput(String[] args, String pattern, PrintStream out,
+      Class<?> clazz) {       
+    ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+    try {
+      PipedOutputStream pipeOut = new PipedOutputStream();
+      PipedInputStream pipeIn = new PipedInputStream(pipeOut, PIPE_BUFFER_SIZE);
+      if (out == System.out) {
+        System.setOut(new PrintStream(pipeOut));
+      } else if (out == System.err) {
+        System.setErr(new PrintStream(pipeOut));
+      }
+
+      if (clazz == DelegationTokenFetcher.class) {
+        expectDelegationTokenFetcherExit(args);
+      } else if (clazz == JMXGet.class) {
+        expectJMXGetExit(args);
+      } else if (clazz == DFSAdmin.class) {
+        expectDfsAdminPrint(args);
+      }
+      pipeOut.close();
+      ByteStreams.copy(pipeIn, outBytes);      
+      pipeIn.close();
+      assertTrue(new String(outBytes.toByteArray()).contains(pattern));            
+    } catch (Exception ex) {
+      fail("checkOutput error " + ex);
+    }
+  }
+
+  private void expectDfsAdminPrint(String[] args) {
+    try {
+      ToolRunner.run(new DFSAdmin(), args);
+    } catch (Exception ex) {
+      fail("expectDelegationTokenFetcherExit ex error " + ex);
+    }
+  }
+
+  private static void expectDelegationTokenFetcherExit(String[] args) {
+    try {
+      DelegationTokenFetcher.main(args);
+      fail("should call exit");
+    } catch (ExitException e) {
+      ExitUtil.resetFirstExitException();
+    } catch (Exception ex) {
+      fail("expectDelegationTokenFetcherExit ex error " + ex);
+    }
+  }
+
+  private static void expectJMXGetExit(String[] args) {
+    try {
+      JMXGet.main(args);
+      fail("should call exit");
+    } catch (ExitException e) {
+      ExitUtil.resetFirstExitException();
+    } catch (Exception ex) {
+      fail("expectJMXGetExit ex error " + ex);
+    }
+  }
+}

+ 5 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -209,6 +209,9 @@ Release 2.2.1 - UNRELEASED
     MAPREDUCE-5463. Deprecate SLOTS_MILLIS counters (Tzuyoshi Ozawa via Sandy
     Ryza)
 
+    MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
+    out text files without separators (Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -1497,6 +1500,8 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5586. TestCopyMapper#testCopyFailOnBlockSizeDifference fails when
     run from hadoop-tools/hadoop-distcp directory (jeagles)
 
+    MAPREDUCE-5587. TestTextOutputFormat fails on JDK7 (jeagles)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 49 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -18,13 +18,24 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import junit.framework.TestCase;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
+import org.junit.Test;
 
-public class TestTextOutputFormat extends TestCase {
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.LineReader;
+
+public class TestTextOutputFormat {
   private static JobConf defaultConf = new JobConf();
 
   private static FileSystem localFs = null;
@@ -38,12 +49,13 @@ public class TestTextOutputFormat extends TestCase {
   // A random task attempt id for testing.
   private static String attempt = "attempt_200707121733_0001_m_000000_0";
 
-  private static Path workDir = 
+  private static Path workDir =
     new Path(new Path(
-                      new Path(System.getProperty("test.build.data", "."), 
-                               "data"), 
+                      new Path(System.getProperty("test.build.data", "."),
+                               "data"),
                       FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
 
+  @Test
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
     job.set(JobContext.TASK_ATTEMPT_ID, attempt);
@@ -53,7 +65,7 @@ public class TestTextOutputFormat extends TestCase {
     if (!fs.mkdirs(workDir)) {
       fail("Failed to create output directory");
     }
-    String file = "test.txt";
+    String file = "test_format.txt";
 
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
@@ -90,10 +102,11 @@ public class TestTextOutputFormat extends TestCase {
     expectedOutput.append(key1).append("\n");
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     String output = UtilsForTests.slurp(expectedFile);
-    assertEquals(output, expectedOutput.toString());
+    assertEquals(expectedOutput.toString(), output);
 
   }
 
+  @Test
   public void testFormatWithCustomSeparator() throws Exception {
     JobConf job = new JobConf();
     String separator = "\u0001";
@@ -105,7 +118,7 @@ public class TestTextOutputFormat extends TestCase {
     if (!fs.mkdirs(workDir)) {
       fail("Failed to create output directory");
     }
-    String file = "test.txt";
+    String file = "test_custom.txt";
 
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
@@ -142,27 +155,27 @@ public class TestTextOutputFormat extends TestCase {
     expectedOutput.append(key1).append("\n");
     expectedOutput.append(key2).append(separator).append(val2).append("\n");
     String output = UtilsForTests.slurp(expectedFile);
-    assertEquals(output, expectedOutput.toString());
+    assertEquals(expectedOutput.toString(), output);
 
   }
+
   /**
    * test compressed file
    * @throws IOException
    */
- public void testCompress() throws IOException{
+  @Test
+  public void testCompress() throws IOException {
    JobConf job = new JobConf();
-   String separator = "\u0001";
-   job.set("mapreduce.output.textoutputformat.separator", separator);
    job.set(JobContext.TASK_ATTEMPT_ID, attempt);
    job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS,"true");
-   
+
    FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
    FileOutputFormat.setWorkOutputPath(job, workDir);
    FileSystem fs = workDir.getFileSystem(job);
    if (!fs.mkdirs(workDir)) {
      fail("Failed to create output directory");
    }
-   String file = "test.txt";
+   String file = "test_compress.txt";
 
    // A reporter that does nothing
    Reporter reporter = Reporter.NULL;
@@ -189,16 +202,30 @@ public class TestTextOutputFormat extends TestCase {
    } finally {
      theRecordWriter.close(reporter);
    }
-   File expectedFile = new File(new Path(workDir, file).toString());
    StringBuffer expectedOutput = new StringBuffer();
-   expectedOutput.append(key1).append(separator).append(val1).append("\n");
+   expectedOutput.append(key1).append("\t").append(val1).append("\n");
    expectedOutput.append(val1).append("\n");
    expectedOutput.append(val2).append("\n");
    expectedOutput.append(key2).append("\n");
    expectedOutput.append(key1).append("\n");
-   expectedOutput.append(key2).append(separator).append(val2).append("\n");
-   String output = UtilsForTests.slurp(expectedFile);
-   assertEquals(output, expectedOutput.toString());
+   expectedOutput.append(key2).append("\t").append(val2).append("\n");
+
+   DefaultCodec codec = new DefaultCodec();
+   codec.setConf(job);
+   Path expectedFile = new Path(workDir, file + codec.getDefaultExtension());
+   final FileInputStream istream = new FileInputStream(expectedFile.toString());
+   CompressionInputStream cistream = codec.createInputStream(istream);
+   LineReader reader = new LineReader(cistream);
+
+   String output = "";
+   Text out = new Text();
+   while (reader.readLine(out) > 0) {
+     output += out;
+     output += "\n";
+   }
+   reader.close();
+
+   assertEquals(expectedOutput.toString(), output);
  }
   public static void main(String[] args) throws Exception {
     new TestTextOutputFormat().testFormat();

+ 7 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/IdentifierResolver.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.streaming.io;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.typedbytes.TypedBytesWritable;
 
@@ -34,6 +35,7 @@ public class IdentifierResolver {
   public static final String TEXT_ID = "text";
   public static final String RAW_BYTES_ID = "rawbytes";
   public static final String TYPED_BYTES_ID = "typedbytes";
+  public static final String KEY_ONLY_TEXT_ID = "keyonlytext";
   
   private Class<? extends InputWriter> inputWriterClass = null;
   private Class<? extends OutputReader> outputReaderClass = null;
@@ -55,6 +57,11 @@ public class IdentifierResolver {
       setOutputReaderClass(TypedBytesOutputReader.class);
       setOutputKeyClass(TypedBytesWritable.class);
       setOutputValueClass(TypedBytesWritable.class);
+    } else if (identifier.equalsIgnoreCase(KEY_ONLY_TEXT_ID)) {
+      setInputWriterClass(KeyOnlyTextInputWriter.class);
+      setOutputReaderClass(KeyOnlyTextOutputReader.class);
+      setOutputKeyClass(Text.class);
+      setOutputValueClass(NullWritable.class);
     } else { // assume TEXT_ID
       setInputWriterClass(TextInputWriter.class);
       setOutputReaderClass(TextOutputReader.class);

+ 35 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextInputWriter.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.IOException;
+
+
+public class KeyOnlyTextInputWriter extends TextInputWriter {
+
+  @Override
+  public void writeKey(Object key) throws IOException {
+    writeUTF8(key);
+    clientOut.write('\n');
+  }
+
+  @Override
+  public void writeValue(Object value) throws IOException {}
+
+}

+ 90 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/KeyOnlyTextOutputReader.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.streaming.PipeMapRed;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * OutputReader that reads the client's output as text, interpreting each line
+ * as a key and outputting NullWritables for values.
+ */
+public class KeyOnlyTextOutputReader extends OutputReader<Text, NullWritable> {
+
+  private LineReader lineReader;
+  private byte[] bytes;
+  private DataInput clientIn;
+  private Configuration conf;
+  private Text key;
+  private Text line;
+  
+  @Override
+  public void initialize(PipeMapRed pipeMapRed) throws IOException {
+    super.initialize(pipeMapRed);
+    clientIn = pipeMapRed.getClientInput();
+    conf = pipeMapRed.getConfiguration();
+    lineReader = new LineReader((InputStream)clientIn, conf);
+    key = new Text();
+    line = new Text();
+  }
+  
+  @Override
+  public boolean readKeyValue() throws IOException {
+    if (lineReader.readLine(line) <= 0) {
+      return false;
+    }
+    bytes = line.getBytes();
+    key.set(bytes, 0, line.getLength());
+
+    line.clear();
+    return true;
+  }
+  
+  @Override
+  public Text getCurrentKey() throws IOException {
+    return key;
+  }
+  
+  @Override
+  public NullWritable getCurrentValue() throws IOException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public String getLastOutput() {
+    if (bytes != null) {
+      try {
+        return new String(bytes, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        return "<undecodable>";
+      }
+    } else {
+      return null;
+    }
+  }
+  
+}

+ 2 - 2
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/io/TextInputWriter.java

@@ -30,7 +30,7 @@ import org.apache.hadoop.streaming.PipeMapRed;
  */
 public class TextInputWriter extends InputWriter<Object, Object> {
   
-  private DataOutput clientOut;
+  protected DataOutput clientOut;
   private byte[] inputSeparator;
   
   @Override
@@ -53,7 +53,7 @@ public class TextInputWriter extends InputWriter<Object, Object> {
   }
   
   // Write an object to the output stream using UTF-8 encoding
-  private void writeUTF8(Object object) throws IOException {
+  protected void writeUTF8(Object object) throws IOException {
     byte[] bval;
     int valSize;
     if (object instanceof BytesWritable) {

+ 51 - 0
hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingOutputOnlyKeys.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class TestStreamingOutputOnlyKeys extends TestStreaming {
+
+  public TestStreamingOutputOnlyKeys() throws IOException {
+    super();
+  }
+  
+  @Test
+  public void testOutputOnlyKeys() throws Exception {
+    args.add("-jobconf"); args.add("stream.reduce.input" +
+        "=keyonlytext");
+    args.add("-jobconf"); args.add("stream.reduce.output" +
+        "=keyonlytext");
+    super.testCommandLine();
+  }
+  
+  @Override
+  public String getExpectedOutput() {
+    return outputExpect.replaceAll("\t", "");
+  }
+  
+  @Override
+  @Test
+  public void testCommandLine() {
+    // Do nothing
+  }
+
+}

+ 68 - 0
hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/io/TestKeyOnlyTextOutputReader.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.streaming.PipeMapRed;
+import org.apache.hadoop.streaming.PipeMapper;
+import org.junit.Test;
+
+public class TestKeyOnlyTextOutputReader {
+  @Test
+  public void testKeyOnlyTextOutputReader() throws IOException {
+    String text = "key,value\nkey2,value2\nnocomma\n";
+    PipeMapRed pipeMapRed = new MyPipeMapRed(text);
+    KeyOnlyTextOutputReader outputReader = new KeyOnlyTextOutputReader();
+    outputReader.initialize(pipeMapRed);
+    outputReader.readKeyValue();
+    Assert.assertEquals(new Text("key,value"), outputReader.getCurrentKey());
+    outputReader.readKeyValue();
+    Assert.assertEquals(new Text("key2,value2"), outputReader.getCurrentKey());
+    outputReader.readKeyValue();
+    Assert.assertEquals(new Text("nocomma"), outputReader.getCurrentKey());
+    Assert.assertEquals(false, outputReader.readKeyValue());
+  }
+  
+  private class MyPipeMapRed extends PipeMapper {
+    private DataInput clientIn;
+    private Configuration conf = new Configuration();
+    
+    public MyPipeMapRed(String text) {
+      clientIn = new DataInputStream(new ByteArrayInputStream(text.getBytes()));
+    }
+    
+    @Override
+    public DataInput getClientInput() {
+      return clientIn;
+    }
+    
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+  }
+}

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
     file busy errors (Sandy Ryza)
 
+    YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
+    prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 30 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -118,6 +119,9 @@ public class FileSystemRMStateStore extends RMStateStore {
         for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
           assert childNodeStatus.isFile();
           String childNodeName = childNodeStatus.getPath().getName();
+          if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+            continue;
+          }
           byte[] childData =
               readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@@ -178,12 +182,28 @@ public class FileSystemRMStateStore extends RMStateStore {
     }
   }
 
+  private boolean checkAndRemovePartialRecord(Path record) throws IOException {
+    // If the file ends with .tmp then it shows that it failed
+    // during saving state into state store. The file will be deleted as a
+    // part of this call
+    if (record.getName().endsWith(".tmp")) {
+      LOG.error("incomplete rm state store entry found :"
+          + record);
+      fs.delete(record, false);
+      return true;
+    }
+    return false;
+  }
+
   private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
     FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
 
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
+      if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+        continue;
+      }
       if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
         rmState.rmSecretManagerState.dtSequenceNumber =
             Integer.parseInt(childNodeName.split("_")[1]);
@@ -344,10 +364,19 @@ public class FileSystemRMStateStore extends RMStateStore {
     return data;
   }
 
+  /*
+   * In order to make this write atomic as a part of write we will first write
+   * data to .tmp file and then rename it. Here we are assuming that rename is
+   * atomic for underlying file system.
+   */
   private void writeFile(Path outputPath, byte[] data) throws Exception {
-    FSDataOutputStream fsOut = fs.create(outputPath, false);
+    Path tempPath =
+        new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+    FSDataOutputStream fsOut = null;
+    fsOut = fs.create(tempPath, false);
     fsOut.write(data);
     fsOut.close();
+    fs.rename(tempPath, outputPath);
   }
 
   private boolean renameFile(Path src, Path dst) throws Exception {

+ 3 - 100
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -39,6 +39,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,9 +76,9 @@ import org.apache.zookeeper.ZooKeeper;
 
 import org.junit.Test;
 
-public class TestRMStateStore extends ClientBaseWithFixes{
+public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
-  public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
+  public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
   static class TestDispatcher implements
       Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
@@ -116,104 +117,6 @@ public class TestRMStateStore extends ClientBaseWithFixes{
     boolean isFinalStateValid() throws Exception;
   }
 
-  @Test
-  public void testZKRMStateStoreRealZK() throws Exception {
-    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
-    testRMAppStateStore(zkTester);
-    testRMDTSecretManagerStateStore(zkTester);
-  }
-
-  @Test
-  public void testFSRMStateStore() throws Exception {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    try {
-      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
-      testRMAppStateStore(fsTester);
-      testRMDTSecretManagerStateStore(fsTester);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  class TestZKRMStateStoreTester implements RMStateStoreHelper {
-    ZooKeeper client;
-    ZKRMStateStore store;
-
-    class TestZKRMStateStore extends ZKRMStateStore {
-      public TestZKRMStateStore(Configuration conf, String workingZnode)
-          throws Exception {
-        init(conf);
-        start();
-        assertTrue(znodeWorkingPath.equals(workingZnode));
-      }
-
-      @Override
-      public ZooKeeper getNewZooKeeper() throws IOException {
-        return client;
-      }
-    }
-
-    public RMStateStore getRMStateStore() throws Exception {
-      String workingZnode = "/Test";
-      YarnConfiguration conf = new YarnConfiguration();
-      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
-      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-      this.client = createClient();
-      this.store = new TestZKRMStateStore(conf, workingZnode);
-      return this.store;
-    }
-
-    @Override
-    public boolean isFinalStateValid() throws Exception {
-      List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
-      return nodes.size() == 1;
-    }
-  }
-
-  class TestFSRMStateStoreTester implements RMStateStoreHelper {
-    Path workingDirPathURI;
-    FileSystemRMStateStore store;
-    MiniDFSCluster cluster;
-
-    class TestFileSystemRMStore extends FileSystemRMStateStore {
-      TestFileSystemRMStore(Configuration conf) throws Exception {
-        init(conf);
-        Assert.assertNull(fs);
-        assertTrue(workingDirPathURI.equals(fsWorkingPath));
-        start();
-        Assert.assertNotNull(fs);
-      }
-    }
-
-    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
-      Path workingDirPath = new Path("/Test");
-      this.cluster = cluster;
-      FileSystem fs = cluster.getFileSystem();
-      fs.mkdirs(workingDirPath);
-      Path clusterURI = new Path(cluster.getURI());
-      workingDirPathURI = new Path(clusterURI, workingDirPath);
-      fs.close();
-    }
-
-    @Override
-    public RMStateStore getRMStateStore() throws Exception {
-      YarnConfiguration conf = new YarnConfiguration();
-      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
-          workingDirPathURI.toString());
-      this.store = new TestFileSystemRMStore(conf);
-      return store;
-    }
-
-    @Override
-    public boolean isFinalStateValid() throws Exception {
-      FileSystem fs = cluster.getFileSystem();
-      FileStatus[] files = fs.listStatus(workingDirPathURI);
-      return files.length == 1;
-    }
-  }
-
   void waitNotify(TestDispatcher dispatcher) {
     long startTime = System.currentTimeMillis();
     while(!dispatcher.notified) {

+ 120 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestFSRMStateStore extends RMStateStoreTestBase {
+
+  public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
+
+  class TestFSRMStateStoreTester implements RMStateStoreHelper {
+
+    Path workingDirPathURI;
+    FileSystemRMStateStore store;
+    MiniDFSCluster cluster;
+
+    class TestFileSystemRMStore extends FileSystemRMStateStore {
+
+      TestFileSystemRMStore(Configuration conf) throws Exception {
+        init(conf);
+        Assert.assertNull(fs);
+        assertTrue(workingDirPathURI.equals(fsWorkingPath));
+        start();
+        Assert.assertNotNull(fs);
+      }
+    }
+
+    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
+      Path workingDirPath = new Path("/Test");
+      this.cluster = cluster;
+      FileSystem fs = cluster.getFileSystem();
+      fs.mkdirs(workingDirPath);
+      Path clusterURI = new Path(cluster.getURI());
+      workingDirPathURI = new Path(clusterURI, workingDirPath);
+      fs.close();
+    }
+
+    @Override
+    public RMStateStore getRMStateStore() throws Exception {
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+          workingDirPathURI.toString());
+      this.store = new TestFileSystemRMStore(conf);
+      return store;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      FileSystem fs = cluster.getFileSystem();
+      FileStatus[] files = fs.listStatus(workingDirPathURI);
+      return files.length == 1;
+    }
+  }
+
+  @Test
+  public void testFSRMStateStore() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
+      // If the state store is FileSystemRMStateStore then add corrupted entry.
+      // It should discard the entry and remove it from file system.
+      FSDataOutputStream fsOut = null;
+      FileSystemRMStateStore fileSystemRMStateStore =
+          (FileSystemRMStateStore) fsTester.getRMStateStore();
+      String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
+      ApplicationAttemptId attemptId3 =
+          ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
+      Path rootDir =
+          new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
+      Path appRootDir = new Path(rootDir, "RMAppRoot");
+      Path appDir =
+          new Path(appRootDir, attemptId3.getApplicationId().toString());
+      Path tempAppAttemptFile =
+          new Path(appDir, attemptId3.toString() + ".tmp");
+      fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
+      fsOut.write("Some random data ".getBytes());
+      fsOut.close();
+
+      testRMAppStateStore(fsTester);
+      Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+          .getFileSystem(conf).exists(tempAppAttemptFile));
+      testRMDTSecretManagerStateStore(fsTester);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class TestZKRMStateStore extends RMStateStoreTestBase {
+
+  public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+
+  class TestZKRMStateStoreTester implements RMStateStoreHelper {
+
+    ZooKeeper client;
+    ZKRMStateStore store;
+
+    class TestZKRMStateStoreInternal extends ZKRMStateStore {
+
+      public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
+          throws Exception {
+        init(conf);
+        start();
+        assertTrue(znodeWorkingPath.equals(workingZnode));
+      }
+
+      @Override
+      public ZooKeeper getNewZooKeeper() throws IOException {
+        return client;
+      }
+    }
+
+    public RMStateStore getRMStateStore() throws Exception {
+      String workingZnode = "/Test";
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+      this.client = createClient();
+      this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
+      return this.store;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
+      return nodes.size() == 1;
+    }
+  }
+
+  @Test
+  public void testZKRMStateStoreRealZK() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    testRMAppStateStore(zkTester);
+    testRMDTSecretManagerStateStore(zkTester);
+  }
+}

+ 9 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
 import org.apache.hadoop.util.ZKUtil;
 
 import org.apache.zookeeper.CreateMode;
@@ -43,17 +43,20 @@ import static org.junit.Assert.fail;
 
 public class TestZKRMStateStoreZKClientConnections extends
     ClientBaseWithFixes {
+
   private static final int ZK_OP_WAIT_TIME = 3000;
   private Log LOG =
       LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
 
   class TestZKClient {
+
     ZKRMStateStore store;
     boolean forExpire = false;
     TestForwardingWatcher watcher;
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
     protected class TestZKRMStateStore extends ZKRMStateStore {
+
       public TestZKRMStateStore(Configuration conf, String workingZnode)
           throws Exception {
         init(conf);
@@ -87,6 +90,7 @@ public class TestZKRMStateStoreZKClientConnections extends
 
     private class TestForwardingWatcher extends
         ClientBaseWithFixes.CountdownWatcher {
+
       public void process(WatchedEvent event) {
         super.process(event);
         try {
@@ -187,7 +191,7 @@ public class TestZKRMStateStoreZKClientConnections extends
     }
   }
 
-  @Test (timeout = 20000)
+  @Test(timeout = 20000)
   public void testSetZKAcl() {
     TestZKClient zkClientTester = new TestZKClient();
     YarnConfiguration conf = new YarnConfiguration();
@@ -196,10 +200,11 @@ public class TestZKRMStateStoreZKClientConnections extends
       zkClientTester.store.zkClient.delete(zkClientTester.store
           .znodeWorkingPath, -1);
       fail("Shouldn't be able to delete path");
-    } catch (Exception e) {/* expected behavior */}
+    } catch (Exception e) {/* expected behavior */
+    }
   }
 
-  @Test (timeout = 20000)
+  @Test(timeout = 20000)
   public void testInvalidZKAclConfiguration() {
     TestZKClient zkClientTester = new TestZKClient();
     YarnConfiguration conf = new YarnConfiguration();