Bladeren bron

Reverting the previous merge r1416603 which committed some extra changes

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1416712 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 jaren geleden
bovenliggende
commit
9821af9ce8
42 gewijzigde bestanden met toevoegingen van 333 en 1469 verwijderingen
  1. 0 4
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 0 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java
  3. 0 46
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
  4. 3 21
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
  5. 26 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
  6. 3 185
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
  7. 0 10
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  8. 8 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
  9. 0 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupState.java
  10. 32 41
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  11. 13 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  12. 4 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  13. 66 55
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  14. 13 46
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  15. 21 71
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  16. 5 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  17. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
  18. 8 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
  19. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
  21. 2 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  22. 3 222
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java
  24. 1 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
  25. 92 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
  26. 0 96
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
  27. 0 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
  28. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
  29. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
  30. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java
  31. 0 18
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
  32. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
  33. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java
  34. 0 3
      hadoop-mapreduce-project/CHANGES.txt
  35. 2 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java
  36. 1 5
      hadoop-yarn-project/CHANGES.txt
  37. 2 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
  38. 5 65
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  39. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  40. 2 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  41. 0 92
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
  42. 0 222
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java

+ 0 - 4
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -306,10 +306,6 @@ Release 2.0.3-alpha - Unreleased
 
     HADOOP-9020. Add a SASL PLAIN server (daryn via bobby)
 
-    HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
-    suresh)
-
-
   IMPROVEMENTS
 
     HADOOP-8789. Tests setLevel(Level.OFF) should be Level.ERROR.

+ 0 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java

@@ -90,17 +90,6 @@ public abstract class MetricsSystem implements MetricsSystemMXBean {
    */
   public abstract void register(Callback callback);
 
-  /**
-   * Requests an immediate publish of all metrics from sources to sinks.
-   * 
-   * This is a "soft" request: the expectation is that a best effort will be
-   * done to synchronously snapshot the metrics from all the sources and put
-   * them in all the sinks (including flushing the sinks) before returning to
-   * the caller. If this can't be accomplished in reasonable time it's OK to
-   * return to the caller before everything is done. 
-   */
-  public abstract void publishMetricsNow();
-
   /**
    * Shutdown the metrics system completely (usually during server shutdown.)
    * The MetricsSystemMXBean will be unregistered.

+ 0 - 46
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.metrics2.impl;
 
 import java.util.Random;
-import java.util.concurrent.*;
 
 import static com.google.common.base.Preconditions.*;
 
@@ -49,7 +48,6 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
   private volatile boolean stopping = false;
   private volatile boolean inError = false;
   private final int period, firstRetryDelay, retryCount;
-  private final long oobPutTimeout;
   private final float retryBackoff;
   private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
   private final MutableStat latency;
@@ -71,8 +69,6 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
     this.period = checkArg(period, period > 0, "period");
     firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
     this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
-    oobPutTimeout = (long)
-        (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000);
     this.retryCount = retryCount;
     this.queue = new SinkQueue<MetricsBuffer>(checkArg(queueCapacity,
         queueCapacity > 0, "queue capacity"));
@@ -99,23 +95,6 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
     }
     return true; // OK
   }
-  
-  public boolean putMetricsImmediate(MetricsBuffer buffer) {
-    WaitableMetricsBuffer waitableBuffer =
-        new WaitableMetricsBuffer(buffer);
-    if (!queue.enqueue(waitableBuffer)) {
-      LOG.warn(name + " has a full queue and can't consume the given metrics.");
-      dropped.incr();
-      return false;
-    }
-    if (!waitableBuffer.waitTillNotified(oobPutTimeout)) {
-      LOG.warn(name +
-          " couldn't fulfill an immediate putMetrics request in time." +
-          " Abandoning.");
-      return false;
-    }
-    return true;
-  }
 
   void publishMetricsFromQueue() {
     int retryDelay = firstRetryDelay;
@@ -179,9 +158,6 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
       sink.flush();
       latency.add(Time.now() - ts);
     }
-    if (buffer instanceof WaitableMetricsBuffer) {
-      ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
-    }
     LOG.debug("Done");
   }
 
@@ -215,26 +191,4 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
   MetricsSink sink() {
     return sink;
   }
-
-  static class WaitableMetricsBuffer extends MetricsBuffer {
-    private final Semaphore notificationSemaphore =
-        new Semaphore(0);
-
-    public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
-      super(metricsBuffer);
-    }
-
-    public boolean waitTillNotified(long millisecondsToWait) {
-      try {
-        return notificationSemaphore.tryAcquire(millisecondsToWait,
-            TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        return false;
-      }
-    }
-
-    public void notifyAnyWaiters() {
-      notificationSemaphore.release();
-    }
-  }
 }

+ 3 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java

@@ -344,19 +344,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
   synchronized void onTimerEvent() {
     logicalTime += period;
     if (sinks.size() > 0) {
-      publishMetrics(sampleMetrics(), false);
+      publishMetrics(sampleMetrics());
     }
   }
-  
-  /**
-   * Requests an immediate publish of all metrics from sources to sinks.
-   */
-  @Override
-  public void publishMetricsNow() {
-    if (sinks.size() > 0) {
-      publishMetrics(sampleMetrics(), true);
-    }    
-  }
 
   /**
    * Sample all the sources for a snapshot of metrics/tags
@@ -390,20 +380,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
   /**
    * Publish a metrics snapshot to all the sinks
    * @param buffer  the metrics snapshot to publish
-   * @param immediate  indicates that we should publish metrics immediately
-   *                   instead of using a separate thread.
    */
-  synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) {
+  synchronized void publishMetrics(MetricsBuffer buffer) {
     int dropped = 0;
     for (MetricsSinkAdapter sa : sinks.values()) {
       long startTime = Time.now();
-      boolean result;
-      if (immediate) {
-        result = sa.putMetricsImmediate(buffer); 
-      } else {
-        result = sa.putMetrics(buffer, logicalTime);
-      }
-      dropped += result ? 0 : 1;
+      dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
       publishStat.add(Time.now() - startTime);
     }
     droppedPubAll.incr(dropped);

+ 26 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java

@@ -29,6 +29,8 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -113,23 +115,31 @@ public class TestGangliaMetrics {
     final int expectedCountFromGanglia30 = expectedMetrics.length;
     final int expectedCountFromGanglia31 = 2 * expectedMetrics.length;
 
+    // use latch to make sure we received required records before shutting
+    // down the MetricSystem
+    CountDownLatch latch = new CountDownLatch(
+        expectedCountFromGanglia30 + expectedCountFromGanglia31);
+
     // Setup test for GangliaSink30
     AbstractGangliaSink gsink30 = new GangliaSink30();
     gsink30.init(cb.subset("test"));
-    MockDatagramSocket mockds30 = new MockDatagramSocket();
+    MockDatagramSocket mockds30 = new MockDatagramSocket(latch);
     GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
 
     // Setup test for GangliaSink31
     AbstractGangliaSink gsink31 = new GangliaSink31();
     gsink31.init(cb.subset("test"));
-    MockDatagramSocket mockds31 = new MockDatagramSocket();
+    MockDatagramSocket mockds31 = new MockDatagramSocket(latch);
     GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
 
     // register the sinks
     ms.register("gsink30", "gsink30 desc", gsink30);
     ms.register("gsink31", "gsink31 desc", gsink31);
-    ms.publishMetricsNow(); // publish the metrics
+    ms.onTimerEvent();  // trigger something interesting
 
+    // wait for all records and the stop MetricSystem.  Without this
+    // sometime the ms gets shutdown before all the sinks have consumed
+    latch.await(200, TimeUnit.MILLISECONDS);
     ms.stop();
 
     // check GanfliaSink30 data
@@ -188,6 +198,7 @@ public class TestGangliaMetrics {
    */
   private class MockDatagramSocket extends DatagramSocket {
     private ArrayList<byte[]> capture;
+    private CountDownLatch latch;
 
     /**
      * @throws SocketException
@@ -196,6 +207,15 @@ public class TestGangliaMetrics {
       capture = new  ArrayList<byte[]>();
     }
 
+    /**
+     * @param latch
+     * @throws SocketException
+     */
+    public MockDatagramSocket(CountDownLatch latch) throws SocketException {
+      this();
+      this.latch = latch;
+    }
+
     /* (non-Javadoc)
      * @see java.net.DatagramSocket#send(java.net.DatagramPacket)
      */
@@ -205,6 +225,9 @@ public class TestGangliaMetrics {
       byte[] bytes = new byte[p.getLength()];
       System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
       capture.add(bytes);
+
+      // decrement the latch
+      latch.countDown();
     }
 
     /**

+ 3 - 185
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java

@@ -18,11 +18,7 @@
 
 package org.apache.hadoop.metrics2.impl;
 
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import javax.annotation.Nullable;
+import java.util.List;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -30,11 +26,9 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.runners.MockitoJUnitRunner;
-
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.configuration.SubsetConfiguration;
@@ -42,8 +36,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.MetricsException;
 import static org.apache.hadoop.test.MoreAsserts.*;
-
-import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -55,7 +47,6 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Test the MetricsSystemImpl class
@@ -81,7 +72,7 @@ public class TestMetricsSystemImpl {
   }
 
   @Test public void testInitFirst() throws Exception {
-    new ConfigBuilder().add("*.period", 8)
+    ConfigBuilder cb = new ConfigBuilder().add("*.period", 8)
         //.add("test.sink.plugin.urls", getPluginUrlsAsString())
         .add("test.sink.test.class", TestSink.class.getName())
         .add("test.*.source.filter.exclude", "s0")
@@ -102,9 +93,8 @@ public class TestMetricsSystemImpl {
     MetricsSink sink2 = mock(MetricsSink.class);
     ms.registerSink("sink1", "sink1 desc", sink1);
     ms.registerSink("sink2", "sink2 desc", sink2);
-    ms.publishMetricsNow(); // publish the metrics
+    ms.onTimerEvent();  // trigger something interesting
     ms.stop();
-    ms.shutdown();
 
     verify(sink1, times(2)).putMetrics(r1.capture());
     List<MetricsRecord> mr1 = r1.getAllValues();
@@ -114,177 +104,6 @@ public class TestMetricsSystemImpl {
     assertEquals("output", mr1, mr2);
   }
 
-  @Test public void testMultiThreadedPublish() throws Exception {
-    new ConfigBuilder().add("*.period", 80)
-      .add("test.sink.Collector.queue.capacity", "20")
-      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
-    final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
-    ms.start();
-    final int numThreads = 10;
-    final CollectingSink sink = new CollectingSink(numThreads);
-    ms.registerSink("Collector",
-        "Collector of values from all threads.", sink);
-    final TestSource[] sources = new TestSource[numThreads];
-    final Thread[] threads = new Thread[numThreads];
-    final String[] results = new String[numThreads];
-    final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
-        barrier2 = new CyclicBarrier(numThreads);
-    for (int i = 0; i < numThreads; i++) {
-      sources[i] = ms.register("threadSource" + i,
-          "A source of my threaded goodness.",
-          new TestSource("threadSourceRec" + i));
-      threads[i] = new Thread(new Runnable() {
-        private boolean safeAwait(int mySource, CyclicBarrier barrier) {
-          try {
-            barrier1.await(2, TimeUnit.SECONDS);
-          } catch (InterruptedException e) {
-            results[mySource] = "Interrupted";
-            return false;
-          } catch (BrokenBarrierException e) {
-            results[mySource] = "Broken Barrier";
-            return false;
-          } catch (TimeoutException e) {
-            results[mySource] = "Timed out on barrier";
-            return false;
-          }
-          return true;
-        }
-        
-        @Override
-        public void run() {
-          int mySource = Integer.parseInt(Thread.currentThread().getName());
-          if (sink.collected[mySource].get() != 0L) {
-            results[mySource] = "Someone else collected my metric!";
-            return;
-          }
-          // Wait for all the threads to come here so we can hammer
-          // the system at the same time
-          if (!safeAwait(mySource, barrier1)) return;
-          sources[mySource].g1.set(230);
-          ms.publishMetricsNow();
-          // Since some other thread may have snatched my metric,
-          // I need to wait for the threads to finish before checking.
-          if (!safeAwait(mySource, barrier2)) return;
-          if (sink.collected[mySource].get() != 230L) {
-            results[mySource] = "Metric not collected!";
-            return;
-          }
-          results[mySource] = "Passed";
-        }
-      }, "" + i);
-    }
-    for (Thread t : threads)
-      t.start();
-    for (Thread t : threads)
-      t.join();
-    assertEquals(0L, ms.droppedPubAll.value());
-    assertTrue(StringUtils.join("\n", Arrays.asList(results)),
-      Iterables.all(Arrays.asList(results), new Predicate<String>() {
-        @Override
-        public boolean apply(@Nullable String input) {
-          return input.equalsIgnoreCase("Passed");
-        }
-      }));
-    ms.stop();
-    ms.shutdown();
-  }
-
-  private static class CollectingSink implements MetricsSink {
-    private final AtomicLong[] collected;
-    
-    public CollectingSink(int capacity) {
-      collected = new AtomicLong[capacity];
-      for (int i = 0; i < capacity; i++) {
-        collected[i] = new AtomicLong();
-      }
-    }
-    
-    @Override
-    public void init(SubsetConfiguration conf) {
-    }
-
-    @Override
-    public void putMetrics(MetricsRecord record) {
-      final String prefix = "threadSourceRec";
-      if (record.name().startsWith(prefix)) {
-        final int recordNumber = Integer.parseInt(
-            record.name().substring(prefix.length()));
-        ArrayList<String> names = new ArrayList<String>();
-        for (AbstractMetric m : record.metrics()) {
-          if (m.name().equalsIgnoreCase("g1")) {
-            collected[recordNumber].set(m.value().longValue());
-            return;
-          }
-          names.add(m.name());
-        }
-      }
-    }
-
-    @Override
-    public void flush() {
-    }
-  }
-
-  @Test public void testHangingSink() {
-    new ConfigBuilder().add("*.period", 8)
-      .add("test.sink.test.class", TestSink.class.getName())
-      .add("test.sink.hanging.retry.delay", "1")
-      .add("test.sink.hanging.retry.backoff", "1.01")
-      .add("test.sink.hanging.retry.count", "0")
-      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
-    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
-    ms.start();
-    TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
-    s.c1.incr();
-    HangingSink hanging = new HangingSink();
-    ms.registerSink("hanging", "Hang the sink!", hanging);
-    ms.publishMetricsNow();
-    assertEquals(1L, ms.droppedPubAll.value());
-    assertFalse(hanging.getInterrupted());
-    ms.stop();
-    ms.shutdown();
-    assertTrue(hanging.getInterrupted());
-    assertTrue("The sink didn't get called after its first hang " +
-               "for subsequent records.", hanging.getGotCalledSecondTime());
-  }
-
-  private static class HangingSink implements MetricsSink {
-    private volatile boolean interrupted;
-    private boolean gotCalledSecondTime;
-    private boolean firstTime = true;
-
-    public boolean getGotCalledSecondTime() {
-      return gotCalledSecondTime;
-    }
-
-    public boolean getInterrupted() {
-      return interrupted;
-    }
-
-    @Override
-    public void init(SubsetConfiguration conf) {
-    }
-
-    @Override
-    public void putMetrics(MetricsRecord record) {
-      // No need to hang every time, just the first record.
-      if (!firstTime) {
-        gotCalledSecondTime = true;
-        return;
-      }
-      firstTime = false;
-      try {
-        Thread.sleep(10 * 1000);
-      } catch (InterruptedException ex) {
-        interrupted = true;
-      }
-    }
-
-    @Override
-    public void flush() {
-    }
-  }
-
   @Test public void testRegisterDups() {
     MetricsSystem ms = new MetricsSystemImpl();
     TestSource ts1 = new TestSource("ts1");
@@ -297,7 +116,6 @@ public class TestMetricsSystemImpl {
     MetricsSource s2 = ms.getSource("ts1");
     assertNotNull(s2);
     assertNotSame(s1, s2);
-    ms.shutdown();
   }
 
   @Test(expected=MetricsException.class) public void testRegisterDupError() {

+ 0 - 10
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -176,9 +176,6 @@ Trunk (Unreleased)
     HDFS-4209. Clean up the addNode/addChild/addChildNoQuotaCheck methods in
     FSDirectory and INodeDirectory. (szetszwo)
 
-    HDFS-3358. Specify explicitly that the NN UI status total is talking
-    of persistent objects on heap. (harsh)
-
   OPTIMIZATIONS
 
   BUG FIXES
@@ -649,8 +646,6 @@ Release 2.0.3-alpha - Unreleased
     of it is undefined after the iteration or modifications of the map.
     (szetszwo)
 
-    HDFS-4231. BackupNode: Introduce BackupState. (shv)
-
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -2040,11 +2035,6 @@ Release 0.23.6 - UNRELEASED
 
   BUG FIXES
 
-    HDFS-4247. saveNamespace should be tolerant of dangling lease (daryn)
-
-    HDFS-4248. Renaming directories may incorrectly remove the paths in leases
-    under the tree.  (daryn via szetszwo)
-
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 8 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java

@@ -24,7 +24,6 @@ import java.net.SocketTimeoutException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@@ -416,23 +414,14 @@ public class BackupNode extends NameNode {
       + HdfsConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
     return nsInfo;
   }
-
+  
   @Override
-  protected String getNameServiceId(Configuration conf) {
-    return DFSUtil.getBackupNameServiceId(conf);
-  }
-
-  protected HAState createHAState() {
-    return new BackupState();
-  }
-
-  @Override // NameNode
   protected NameNodeHAContext createHAContext() {
     return new BNHAContext();
   }
-
+  
   private class BNHAContext extends NameNodeHAContext {
-    @Override // NameNodeHAContext
+    @Override // NameNode
     public void checkOperation(OperationCategory op)
         throws StandbyException {
       if (op == OperationCategory.UNCHECKED ||
@@ -446,42 +435,10 @@ public class BackupNode extends NameNode {
         throw new StandbyException(msg);
       }
     }
-
-    @Override // NameNodeHAContext
-    public void prepareToStopStandbyServices() throws ServiceFailedException {
-    }
-
-    /**
-     * Start services for BackupNode.
-     * <p>
-     * The following services should be muted
-     * (not run or not pass any control commands to DataNodes)
-     * on BackupNode:
-     * {@link LeaseManager.Monitor} protected by SafeMode.
-     * {@link BlockManager.ReplicationMonitor} protected by SafeMode.
-     * {@link HeartbeatManager.Monitor} protected by SafeMode.
-     * {@link DecommissionManager.Monitor} need to prohibit refreshNodes().
-     * {@link PendingReplicationBlocks.PendingReplicationMonitor} harmless,
-     * because ReplicationMonitor is muted.
-     */
-    @Override
-    public void startActiveServices() throws IOException {
-      try {
-        namesystem.startActiveServices();
-      } catch (Throwable t) {
-        doImmediateShutdown(t);
-      }
-    }
-
-    @Override
-    public void stopActiveServices() throws IOException {
-      try {
-        if (namesystem != null) {
-          namesystem.stopActiveServices();
-        }
-      } catch (Throwable t) {
-        doImmediateShutdown(t);
-      }
-    }
+  }
+  
+  @Override
+  protected String getNameServiceId(Configuration conf) {
+    return DFSUtil.getBackupNameServiceId(conf);
   }
 }

+ 0 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupState.java

@@ -1,51 +0,0 @@
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.io.IOException;
-
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
-import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
-import org.apache.hadoop.ipc.StandbyException;
-
-public class BackupState extends HAState {
-
-  public BackupState() {
-    super(HAServiceState.STANDBY);
-  }
-
-  @Override // HAState
-  public void checkOperation(HAContext context, OperationCategory op)
-      throws StandbyException {
-    context.checkOperation(op);
-  }
-
-  @Override // HAState
-  public boolean shouldPopulateReplQueues() {
-    return false;
-  }
-
-  @Override
-  public void enterState(HAContext context) throws ServiceFailedException {
-    try {
-      context.startActiveServices();
-    } catch (IOException e) {
-      throw new ServiceFailedException("Failed to start backup services", e);
-    }
-  }
-
-  @Override
-  public void exitState(HAContext context) throws ServiceFailedException {
-    try {
-      context.stopActiveServices();
-    } catch (IOException e) {
-      throw new ServiceFailedException("Failed to stop backup services", e);
-    }
-  }
-
-  @Override
-  public void prepareToExitState(HAContext context) throws ServiceFailedException {
-    context.prepareToStopStandbyServices();
-  }
-}

+ 32 - 41
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -383,7 +383,7 @@ public class FSDirectory implements Closeable {
     writeLock();
     try {
       // file is closed
-      file.setModificationTime(now);
+      file.setModificationTimeForce(now);
       fsImage.getEditLog().logCloseFile(path, file);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
@@ -585,10 +585,8 @@ public class FSDirectory implements Closeable {
               + src + " is renamed to " + dst);
         }
         // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].updateModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].updateModificationTime(timestamp);
-        // update moved leases with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);        
+        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
         return true;
       }
     } finally {
@@ -752,10 +750,8 @@ public class FSDirectory implements Closeable {
               "DIR* FSDirectory.unprotectedRenameTo: " + src
               + " is renamed to " + dst);
         }
-        srcInodes[srcInodes.length - 2].updateModificationTime(timestamp);
-        dstInodes[dstInodes.length - 2].updateModificationTime(timestamp);
-        // update moved lease with new filename
-        getFSNamesystem().unprotectedChangeLease(src, dst);
+        srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
         int filesDeleted = 0;
@@ -990,12 +986,12 @@ public class FSDirectory implements Closeable {
       if(nodeToRemove == null) continue;
       
       nodeToRemove.setBlocks(null);
-      trgParent.removeChild(nodeToRemove, trgINodesInPath.getLatestSnapshot());
+      trgParent.removeChild(nodeToRemove);
       count++;
     }
     
-    trgInode.setModificationTime(timestamp);
-    trgParent.updateModificationTime(timestamp);
+    trgInode.setModificationTimeForce(timestamp);
+    trgParent.setModificationTime(timestamp);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodesInPath, trgINodes.length-1, -count, 0);
   }
@@ -1133,7 +1129,7 @@ public class FSDirectory implements Closeable {
       return 0;
     }
     // set the parent's modification time
-    inodes[inodes.length - 2].updateModificationTime(mtime);
+    inodes[inodes.length - 2].setModificationTime(mtime);
     int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
@@ -1169,10 +1165,10 @@ public class FSDirectory implements Closeable {
   /**
    * Replaces the specified INode.
    */
-  private void replaceINodeUnsynced(String path, INode oldnode, INode newnode,
-      Snapshot latestSnapshot) throws IOException {    
+  private void replaceINodeUnsynced(String path, INode oldnode, INode newnode
+      ) throws IOException {    
     //remove the old node from the namespace 
-    if (!oldnode.removeNode(latestSnapshot)) {
+    if (!oldnode.removeNode()) {
       final String mess = "FSDirectory.replaceINodeUnsynced: failed to remove "
           + path;
       NameNode.stateChangeLog.warn("DIR* " + mess);
@@ -1187,10 +1183,10 @@ public class FSDirectory implements Closeable {
    * Replaces the specified INodeDirectory.
    */
   public void replaceINodeDirectory(String path, INodeDirectory oldnode,
-      INodeDirectory newnode, Snapshot latestSnapshot) throws IOException {    
+      INodeDirectory newnode) throws IOException {    
     writeLock();
     try {
-      replaceINodeUnsynced(path, oldnode, newnode, latestSnapshot);
+      replaceINodeUnsynced(path, oldnode, newnode);
 
       //update children's parent directory
       for(INode i : newnode.getChildrenList(null)) {
@@ -1204,11 +1200,11 @@ public class FSDirectory implements Closeable {
   /**
    * Replaces the specified INodeFile with the specified one.
    */
-  public void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
-      Snapshot latestSnapshot) throws IOException {
+  public void replaceNode(String path, INodeFile oldnode, INodeFile newnode
+      ) throws IOException {    
     writeLock();
     try {
-      replaceINodeUnsynced(path, oldnode, newnode, latestSnapshot);
+      replaceINodeUnsynced(path, oldnode, newnode);
       
       //Currently, oldnode and newnode are assumed to contain the same blocks.
       //Otherwise, blocks need to be removed from the blocksMap.
@@ -1277,9 +1273,13 @@ public class FSDirectory implements Closeable {
     String srcs = normalizePath(src);
     readLock();
     try {
-      final INodesInPath inodesInPath = rootDir.getINodesInPath(srcs, resolveLink);
-      final INode i = inodesInPath.getINode(0);
-      return i == null? null: createFileStatus(HdfsFileStatus.EMPTY_NAME, i);
+      INode targetNode = rootDir.getNode(srcs, resolveLink);
+      if (targetNode == null) {
+        return null;
+      }
+      else {
+        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
+      }
     } finally {
       readUnlock();
     }
@@ -1303,16 +1303,9 @@ public class FSDirectory implements Closeable {
    * Get {@link INode} associated with the file / directory.
    */
   public INode getINode(String src) throws UnresolvedLinkException {
-    return getINodesInPath(src).getINode(0);
-  }
-
-  /**
-   * Get {@link INode} associated with the file / directory.
-   */
-  public INodesInPath getINodesInPath(String src) throws UnresolvedLinkException {
     readLock();
     try {
-      return rootDir.getINodesInPath(src, true);
+      return rootDir.getNode(src, true);
     } finally {
       readUnlock();
     }
@@ -1807,8 +1800,7 @@ public class FSDirectory implements Closeable {
     if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true,
-        inodesInPath.getLatestSnapshot());
+    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
     if (!added) {
       updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
     }
@@ -1832,8 +1824,7 @@ public class FSDirectory implements Closeable {
   private INode removeLastINode(final INodesInPath inodesInPath) {
     final INode[] inodes = inodesInPath.getINodes();
     final int pos = inodes.length - 1;
-    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos],
-        inodesInPath.getLatestSnapshot());
+    INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
@@ -1974,8 +1965,8 @@ public class FSDirectory implements Closeable {
     }
     
     String srcs = normalizePath(src);
-    final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(srcs, true);
-    final INode[] inodes = inodesInPath.getINodes();
+    final INode[] inodes = rootDir.getMutableINodesInPath(srcs, true)
+        .getINodes();
     INodeDirectory dirNode = INodeDirectory.valueOf(inodes[inodes.length-1], srcs);
     if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
       throw new IllegalArgumentException("Cannot clear namespace quota on root.");
@@ -1997,7 +1988,7 @@ public class FSDirectory implements Closeable {
           INodeDirectory newNode = new INodeDirectory(dirNode);
           INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
           dirNode = newNode;
-          parent.replaceChild(newNode, inodesInPath.getLatestSnapshot());
+          parent.replaceChild(newNode);
         }
       } else {
         // a non-quota directory; so replace it with a directory with quota
@@ -2006,7 +1997,7 @@ public class FSDirectory implements Closeable {
         // non-root directory node; parent != null
         INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
         dirNode = newNode;
-        parent.replaceChild(newNode, inodesInPath.getLatestSnapshot());
+        parent.replaceChild(newNode);
       }
       return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
@@ -2070,7 +2061,7 @@ public class FSDirectory implements Closeable {
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
-      inode.setModificationTime(mtime);
+      inode.setModificationTimeForce(mtime);
       status = true;
     }
     if (atime != -1) {

+ 13 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -57,7 +58,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.Holder;
 
@@ -246,8 +246,7 @@ public class FSEditLogLoader {
       // 3. OP_ADD to open file for append
 
       // See if the file already exists (persistBlocks call)
-      final INodesInPath inodesInPath = fsDir.getINodesInPath(addCloseOp.path);
-      INodeFile oldFile = toINodeFile(inodesInPath.getINode(0), addCloseOp.path);
+      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
       INodeFile newFile = oldFile;
       if (oldFile == null) { // this is OP_ADD on a new file (case 1)
         // versions > 0 support per file replication
@@ -273,7 +272,7 @@ public class FSEditLogLoader {
           }
           fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
               addCloseOp.clientName, addCloseOp.clientMachine, null,
-              false, inodesInPath.getLatestSnapshot());
+              false);
           newFile = getINodeFile(fsDir, addCloseOp.path);
         }
       }
@@ -283,7 +282,7 @@ public class FSEditLogLoader {
       
       // Update the salient file attributes.
       newFile.setAccessTime(addCloseOp.atime);
-      newFile.setModificationTime(addCloseOp.mtime);
+      newFile.setModificationTimeForce(addCloseOp.mtime);
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -297,8 +296,7 @@ public class FSEditLogLoader {
             " clientMachine " + addCloseOp.clientMachine);
       }
 
-      final INodesInPath inodesInPath = fsDir.getINodesInPath(addCloseOp.path);
-      INodeFile oldFile = toINodeFile(inodesInPath.getINode(0), addCloseOp.path);
+      INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
       if (oldFile == null) {
         throw new IOException("Operation trying to close non-existent file " +
             addCloseOp.path);
@@ -306,7 +304,7 @@ public class FSEditLogLoader {
       
       // Update the salient file attributes.
       oldFile.setAccessTime(addCloseOp.atime);
-      oldFile.setModificationTime(addCloseOp.mtime);
+      oldFile.setModificationTimeForce(addCloseOp.mtime);
       updateBlocks(fsDir, addCloseOp, oldFile);
 
       // Now close the file
@@ -324,8 +322,7 @@ public class FSEditLogLoader {
         INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
         fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
         INodeFile newFile = ucFile.convertToInodeFile();
-        fsDir.replaceNode(addCloseOp.path, ucFile, newFile,
-            inodesInPath.getLatestSnapshot());
+        fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
       }
       break;
     }
@@ -363,8 +360,10 @@ public class FSEditLogLoader {
     }
     case OP_RENAME_OLD: {
       RenameOldOp renameOp = (RenameOldOp)op;
+      HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp);
+      fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
       break;
     }
     case OP_DELETE: {
@@ -434,8 +433,11 @@ public class FSEditLogLoader {
     }
     case OP_RENAME: {
       RenameOp renameOp = (RenameOp)op;
+
+      HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
       fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                 renameOp.timestamp, renameOp.options);
+      fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
       break;
     }
     case OP_GET_DELEGATION_TOKEN: {
@@ -510,11 +512,7 @@ public class FSEditLogLoader {
   
   private static INodeFile getINodeFile(FSDirectory fsDir, String path)
       throws IOException {
-    return toINodeFile(fsDir.getINode(path), path);
-  }
-
-  private static INodeFile toINodeFile(INode inode, String path)
-      throws IOException {
+    INode inode = fsDir.getINode(path);
     if (inode != null) {
       if (!(inode instanceof INodeFile)) {
         throw new IOException("Operation trying to get non-file " + path);

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
@@ -203,7 +202,7 @@ class FSImageFormat {
     if (nsQuota != -1 || dsQuota != -1) {
       fsDir.rootDir.setQuota(nsQuota, dsQuota);
     }
-    fsDir.rootDir.cloneModificationTime(root);
+    fsDir.rootDir.setModificationTime(root.getModificationTime());
     fsDir.rootDir.clonePermissionStatus(root);    
   }
 
@@ -306,7 +305,7 @@ class FSImageFormat {
    */
   void addToParent(INodeDirectory parent, INode child) {
     // NOTE: This does not update space counts for parents
-    if (!parent.addChild(child, false, null)) {
+    if (!parent.addChild(child, false)) {
       return;
     }
     namesystem.dir.cacheName(child);
@@ -389,9 +388,8 @@ class FSImageFormat {
 
         // verify that file exists in namespace
         String path = cons.getLocalName();
-        final INodesInPath inodesInPath = fsDir.getINodesInPath(path);
-        INodeFile oldnode = INodeFile.valueOf(inodesInPath.getINode(0), path);
-        fsDir.replaceNode(path, oldnode, cons, inodesInPath.getLatestSnapshot());
+        INodeFile oldnode = INodeFile.valueOf(fsDir.getINode(path), path);
+        fsDir.replaceNode(path, oldnode, cons);
         namesystem.leaseManager.addLease(cons.getClientName(), path); 
       }
     }

+ 66 - 55
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -121,7 +121,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -166,6 +165,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
@@ -173,9 +173,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1038,8 +1036,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     long totalInodes = this.dir.totalInodes();
     long totalBlocks = this.getBlocksTotal();
     out.println(totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks)
-        + " total filesystem objects");
+        + " blocks = " + (totalInodes + totalBlocks) + " total");
 
     blockManager.metaSave(out);
   }
@@ -1823,9 +1820,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       blockManager.verifyReplication(src, replication, clientMachine);
       boolean create = flag.contains(CreateFlag.CREATE);
-      
-      final INodesInPath inodesInPath = dir.getINodesInPath(src);
-      final INode myFile = inodesInPath.getINode(0);
+      final INode myFile = dir.getINode(src);
       if (myFile == null) {
         if (!create) {
           throw new FileNotFoundException("failed to overwrite or append to non-existent file "
@@ -1852,8 +1847,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
       if (append && myFile != null) {
         final INodeFile f = INodeFile.valueOf(myFile, src); 
-        return prepareFileForWrite(src, f, holder, clientMachine, clientNode,
-            true, inodesInPath.getLatestSnapshot());
+        return prepareFileForWrite(
+            src, f, holder, clientMachine, clientNode, true);
       } else {
        // Now we can add the name to the filesystem. This file has no
        // blocks associated with it.
@@ -1901,7 +1896,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
+      boolean writeToEditLog) throws IOException {
     //TODO SNAPSHOT: INodeFileUnderConstruction with link
     INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
                                     file.getLocalNameBytes(),
@@ -1913,7 +1908,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
                                     leaseHolder,
                                     clientMachine,
                                     clientNode);
-    dir.replaceNode(src, file, cons, latestSnapshot);
+    dir.replaceNode(src, file, cons);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -2163,8 +2158,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // have we exceeded the configured limit of fs objects.
       checkFsObjectLimit();
 
-      final INodeFileUnderConstruction pendingFile = checkLease(
-          src, clientName, dir.getINode(src));
+      INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
       BlockInfo lastBlockInFile = pendingFile.getLastBlock();
       if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
         // The block that the client claims is the current last block
@@ -2300,8 +2294,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
 
       //check lease
-      final INodeFileUnderConstruction file = checkLease(
-          src, clientName, dir.getINode(src));
+      final INodeFileUnderConstruction file = checkLease(src, clientName);
       clientnode = file.getClientNode();
       preferredblocksize = file.getPreferredBlockSize();
 
@@ -2347,9 +2340,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         throw new SafeModeException("Cannot abandon block " + b +
                                     " for fle" + src, safeMode);
       }
-      final INodesInPath inodesInPath = checkLease(src, holder);
-      final INodeFileUnderConstruction file
-          = (INodeFileUnderConstruction)inodesInPath.getINode(0); 
+      INodeFileUnderConstruction file = checkLease(src, holder);
       dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
@@ -2366,13 +2357,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return true;
   }
   
-  /** make sure that we still have the lease on this file. */
-  private INodesInPath checkLease(String src, String holder) 
+  // make sure that we still have the lease on this file.
+  private INodeFileUnderConstruction checkLease(String src, String holder) 
       throws LeaseExpiredException, UnresolvedLinkException {
     assert hasReadOrWriteLock();
-    final INodesInPath inodesInPath = dir.getINodesInPath(src);
-    checkLease(src, holder, inodesInPath.getINode(0));
-    return inodesInPath;
+    return checkLease(src, holder, dir.getINode(src));
   }
 
   private INodeFileUnderConstruction checkLease(String src, String holder,
@@ -2435,11 +2424,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throw new SafeModeException("Cannot complete file " + src, safeMode);
     }
 
-    final INodesInPath inodesInPath;
-    final INodeFileUnderConstruction pendingFile;
+    INodeFileUnderConstruction pendingFile;
     try {
-      inodesInPath = checkLease(src, holder);
-      pendingFile = (INodeFileUnderConstruction)inodesInPath.getINode(0); 
+      pendingFile = checkLease(src, holder);
     } catch (LeaseExpiredException lee) {
       final INode inode = dir.getINode(src);
       if (inode != null && inode instanceof INodeFile && !inode.isUnderConstruction()) {
@@ -2467,8 +2454,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return false;
     }
 
-    finalizeINodeFileUnderConstruction(src, pendingFile,
-        inodesInPath.getLatestSnapshot());
+    finalizeINodeFileUnderConstruction(src, pendingFile);
 
     NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
         + holder);
@@ -2609,15 +2595,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
-      //NOTE: yes, this is bad!  it's assuming much lower level behavior
-      //      of rewriting the dst
       String actualdst = dir.isDir(dst)?
           dst + Path.SEPARATOR + new Path(src).getName(): dst;
       checkParentAccess(src, FsAction.WRITE);
       checkAncestorAccess(actualdst, FsAction.WRITE);
     }
 
+    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
+      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
@@ -2668,7 +2654,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       checkAncestorAccess(dst, FsAction.WRITE);
     }
 
+    HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
+    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -3048,8 +3036,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot fsync file " + src, safeMode);
       }
-      final INodeFileUnderConstruction pendingFile  = checkLease(
-          src, clientName, dir.getINode(src));
+      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
       if (lastBlockLength > 0) {
         pendingFile.updateLengthOfLastBlock(lastBlockLength);
       }
@@ -3081,9 +3068,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     assert !isInSafeMode();
     assert hasWriteLock();
 
-    final INodesInPath inodesInPath = dir.getINodesInPath(src);
     final INodeFileUnderConstruction pendingFile
-        = INodeFileUnderConstruction.valueOf(inodesInPath.getINode(0), src);
+        = INodeFileUnderConstruction.valueOf(dir.getINode(src), src);
     int nrBlocks = pendingFile.numBlocks();
     BlockInfo[] blocks = pendingFile.getBlocks();
 
@@ -3100,8 +3086,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // If there are no incomplete blocks associated with this file,
     // then reap lease immediately and close the file.
     if(nrCompleteBlocks == nrBlocks) {
-      finalizeINodeFileUnderConstruction(src, pendingFile,
-          inodesInPath.getLatestSnapshot());
+      finalizeINodeFileUnderConstruction(src, pendingFile);
       NameNode.stateChangeLog.warn("BLOCK*"
         + " internalReleaseLease: All existing blocks are COMPLETE,"
         + " lease removed, file closed.");
@@ -3149,8 +3134,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       // Close file if committed blocks are minimally replicated
       if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
-        finalizeINodeFileUnderConstruction(src, pendingFile,
-            inodesInPath.getLatestSnapshot());
+        finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
           + " internalReleaseLease: Committed blocks are minimally replicated,"
           + " lease removed, file closed.");
@@ -3228,7 +3212,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
 
   private void finalizeINodeFileUnderConstruction(String src, 
-      INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot) 
+      INodeFileUnderConstruction pendingFile) 
       throws IOException, UnresolvedLinkException {
     assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
@@ -3236,7 +3220,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // The file is no longer pending.
     // Create permanent INode, update blocks
     INodeFile newFile = pendingFile.convertToInodeFile();
-    dir.replaceNode(src, pendingFile, newFile, latestSnapshot);
+    dir.replaceNode(src, pendingFile, newFile);
 
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
@@ -3328,8 +3312,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         commitOrCompleteLastBlock(pendingFile, storedBlock);
 
         //remove lease, close file
-        finalizeINodeFileUnderConstruction(src, pendingFile,
-            INodeDirectorySnapshottable.findLatestSnapshot(pendingFile));
+        finalizeINodeFileUnderConstruction(src, pendingFile);
       } else {
         // If this commit does not want to close the file, persist blocks
         dir.persistBlocks(src, pendingFile);
@@ -3500,9 +3483,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private NNHAStatusHeartbeat createHaStatusHeartbeat() {
     HAState state = haContext.getState();
     NNHAStatusHeartbeat.State hbState;
-    if (state.getServiceState() == HAServiceState.ACTIVE) {
+    if (state instanceof ActiveState) {
       hbState = NNHAStatusHeartbeat.State.ACTIVE;
-    } else if (state.getServiceState() == HAServiceState.STANDBY) {
+    } else if (state instanceof StandbyState) {
       hbState = NNHAStatusHeartbeat.State.STANDBY;      
     } else {
       throw new AssertionError("Invalid state: " + state.getClass());
@@ -4956,9 +4939,31 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
-  void unprotectedChangeLease(String src, String dst) {
+  void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
+    String overwrite;
+    String replaceBy;
     assert hasWriteLock();
-    leaseManager.changeLease(src, dst);
+
+    boolean destinationExisted = true;
+    if (dinfo == null) {
+      destinationExisted = false;
+    }
+
+    if (destinationExisted && dinfo.isDir()) {
+      Path spath = new Path(src);
+      Path parent = spath.getParent();
+      if (parent.isRoot()) {
+        overwrite = parent.toString();
+      } else {
+        overwrite = parent.toString() + Path.SEPARATOR;
+      }
+      replaceBy = dst + Path.SEPARATOR;
+    } else {
+      overwrite = src;
+      replaceBy = dst;
+    }
+
+    leaseManager.changeLease(src, dst, overwrite, replaceBy);
   }
 
   /**
@@ -4969,13 +4974,19 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     // lock on our behalf. If we took the read lock here, we could block
     // for fairness if a writer is waiting on the lock.
     synchronized (leaseManager) {
-      Map<String, INodeFileUnderConstruction> nodes =
-          leaseManager.getINodesUnderConstruction();
-      out.writeInt(nodes.size()); // write the size    
-      for (Map.Entry<String, INodeFileUnderConstruction> entry
-           : nodes.entrySet()) {
-        FSImageSerialization.writeINodeUnderConstruction(
-            out, entry.getValue(), entry.getKey());
+      out.writeInt(leaseManager.countPath()); // write the size
+
+      for (Lease lease : leaseManager.getSortedLeases()) {
+        for(String path : lease.getPaths()) {
+          // verify that path exists in namespace
+          final INodeFileUnderConstruction cons;
+          try {
+            cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
+          } catch (UnresolvedLinkException e) {
+            throw new AssertionError("Lease files should reside on this FS");
+          }
+          FSImageSerialization.writeINodeUnderConstruction(out, cons, path);
+        }
       }
     }
   }

+ 13 - 46
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.StringUtils;
 
@@ -47,33 +46,8 @@ import com.google.common.primitives.SignedBytes;
  */
 @InterfaceAudience.Private
 public abstract class INode implements Comparable<byte[]> {
-  /** A dummy INode which can be used as a probe object. */
-  public static final INode DUMMY = new INode() {
-    @Override
-    int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    long[] computeContentSummary(long[] summary) {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    DirCounts spaceConsumedInTree(DirCounts counts) {
-      throw new UnsupportedOperationException();
-    }
-  };
   static final ReadOnlyList<INode> EMPTY_READ_ONLY_LIST
       = ReadOnlyList.Util.emptyList();
-  /**
-   * Assert that the snapshot parameter must be null since
-   * this class only take care current state. 
-   * Subclasses should override the methods for handling the snapshot states.
-   */
-  static void assertNull(Snapshot snapshot) {
-    if (snapshot != null) {
-      throw new AssertionError("snapshot is not null: " + snapshot);
-    }
-  }
 
 
   /** Wrapper of two counters for namespace consumed and diskspace consumed. */
@@ -146,12 +120,9 @@ public abstract class INode implements Comparable<byte[]> {
    * should not modify it.
    */
   private long permission = 0L;
-  INodeDirectory parent = null;
-  private long modificationTime = 0L;
-  private long accessTime = 0L;
-  
-  /** For creating the a {@link #DUMMY} object. */
-  private INode() {}
+  protected INodeDirectory parent = null;
+  protected long modificationTime = 0L;
+  protected long accessTime = 0L;
 
   private INode(byte[] name, long permission, INodeDirectory parent,
       long modificationTime, long accessTime) {
@@ -178,8 +149,8 @@ public abstract class INode implements Comparable<byte[]> {
   
   /** @param other Other node to be copied */
   INode(INode other) {
-    this(other.name, other.permission, other.parent, 
-        other.modificationTime, other.accessTime);
+    this(other.getLocalNameBytes(), other.permission, other.getParent(), 
+        other.getModificationTime(), other.getAccessTime());
   }
 
   /**
@@ -319,13 +290,13 @@ public abstract class INode implements Comparable<byte[]> {
    * Set local file name
    */
   public void setLocalName(String name) {
-    setLocalName(DFSUtil.string2Bytes(name));
+    this.name = DFSUtil.string2Bytes(name);
   }
 
   /**
    * Set local file name
    */
-  public void setLocalName(byte[] name) {
+  void setLocalName(byte[] name) {
     this.name = name;
   }
 
@@ -345,7 +316,7 @@ public abstract class INode implements Comparable<byte[]> {
    * Get parent directory 
    * @return parent INode
    */
-  public INodeDirectory getParent() {
+  INodeDirectory getParent() {
     return this.parent;
   }
 
@@ -365,21 +336,17 @@ public abstract class INode implements Comparable<byte[]> {
   /**
    * Set last modification time of inode.
    */
-  public void updateModificationTime(long modtime) {
+  public void setModificationTime(long modtime) {
     assert isDirectory();
     if (this.modificationTime <= modtime) {
-      setModificationTime(modtime);
+      this.modificationTime = modtime;
     }
   }
 
-  void cloneModificationTime(INode that) {
-    this.modificationTime = that.modificationTime;
-  }
-
   /**
    * Always set the last modification time of inode.
    */
-  void setModificationTime(long modtime) {
+  void setModificationTimeForce(long modtime) {
     this.modificationTime = modtime;
   }
 
@@ -464,11 +431,11 @@ public abstract class INode implements Comparable<byte[]> {
     return buf.toString();
   }
 
-  public boolean removeNode(Snapshot latestSnapshot) {
+  public boolean removeNode() {
     if (parent == null) {
       return false;
     } else {
-      parent.removeChild(this, latestSnapshot);
+      parent.removeChild(this);
       parent = null;
       return true;
     }

+ 21 - 71
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -54,7 +54,7 @@ public class INodeDirectory extends INode {
     }
     return (INodeDirectory)inode; 
   }
-  
+
   protected static final int DEFAULT_FILES_PER_DIRECTORY = 5;
   final static String ROOT_NAME = "";
 
@@ -99,56 +99,32 @@ public class INodeDirectory extends INode {
     }
   }
 
-  public int searchChildren(INode inode) {
+  private int searchChildren(INode inode) {
     return Collections.binarySearch(children, inode.getLocalNameBytes());
   }
 
-  public int searchChildrenForExistingINode(INode inode) {
-    final int i = searchChildren(inode);
-    if (i < 0) {
-      throw new AssertionError("Child not found: inode=" + inode);
-    }
-    return i;
-  }
-
-  public INode removeChild(INode node, Snapshot latestSnapshot) {
+  INode removeChild(INode node) {
     assertChildrenNonNull();
-
-    if (latestSnapshot != null) {
-      final INodeDirectoryWithSnapshot dir
-          = INodeDirectoryWithSnapshot.replaceDir(this, latestSnapshot);
-      return dir.removeChild(node, latestSnapshot);
-    }
-
     final int i = searchChildren(node);
     return i >= 0? children.remove(i): null;
   }
+
   /** Replace a child that has the same name as newChild by newChild.
    * 
    * @param newChild Child node to be added
    */
-  public INode replaceChild(INodeDirectory newChild, Snapshot latestSnapshot) {
+  void replaceChild(INode newChild) {
     assertChildrenNonNull();
 
-    if (latestSnapshot != null) {
-      final INodeDirectoryWithSnapshot dir
-          = INodeDirectoryWithSnapshot.replaceDir(this, latestSnapshot);
-      return dir.replaceChild(newChild, latestSnapshot);
-    }
-
-    // find the old child and replace it
-    final int low = searchChildrenForExistingINode(newChild);
-    final INode oldChild = children.set(low, newChild);
-    // set the parent of the children of the child.
-    for(INode i : newChild.getChildrenList(null)) {
-      i.parent = newChild;
+    final int low = searchChildren(newChild);
+    if (low>=0) { // an old child exists so replace by the newChild
+      children.set(low, newChild);
+    } else {
+      throw new IllegalArgumentException("No child exists to be replaced");
     }
-    return oldChild;
   }
 
-  public INode getChild(byte[] name, Snapshot snapshot) {
-    assertNull(snapshot);
-
+  private INode getChild(byte[] name, Snapshot snapshot) {
     final ReadOnlyList<INode> c = getChildrenList(snapshot);
     final int i = ReadOnlyList.Util.binarySearch(c, name);
     return i < 0? null: c.get(i);
@@ -385,14 +361,7 @@ public class INodeDirectory extends INode {
    * @return false if the child with this name already exists; 
    *         otherwise, return true;
    */
-  public boolean addChild(final INode node, final boolean setModTime,
-      Snapshot latestSnapshot) {
-    if (latestSnapshot != null) {
-      final INodeDirectoryWithSnapshot dir
-          = INodeDirectoryWithSnapshot.replaceDir(this, latestSnapshot);
-      return dir.addChild(node, setModTime, latestSnapshot);
-    }
-
+  public boolean addChild(final INode node, final boolean setModTime) {
     if (children == null) {
       children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
     }
@@ -403,9 +372,8 @@ public class INodeDirectory extends INode {
     node.parent = this;
     children.add(-low - 1, node);
     // update modification time of the parent directory
-    if (setModTime) {
-      updateModificationTime(node.getModificationTime());
-    }
+    if (setModTime)
+      setModificationTime(node.getModificationTime());
     if (node.getGroupName() == null) {
       node.setGroup(getGroupName());
     }
@@ -432,28 +400,20 @@ public class INodeDirectory extends INode {
     }
     newNode.setLocalName(pathComponents[pathComponents.length - 1]);
     // insert into the parent children list
-    INodesInPath inodes =  getExistingPathINodes(pathComponents, 2, false);
-    INodeDirectory parent = INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
-    return parent.addChild(newNode, true, inodes.getLatestSnapshot());
+    INodeDirectory parent = getParent(pathComponents);
+    return parent.addChild(newNode, true);
   }
 
   INodeDirectory getParent(byte[][] pathComponents
       ) throws FileNotFoundException, PathIsNotDirectoryException,
       UnresolvedLinkException {
-    return (INodeDirectory)getParentINodesInPath(pathComponents).getINode(0);
-  }
-
-  INodesInPath getParentINodesInPath(byte[][] pathComponents
-      ) throws FileNotFoundException, PathIsNotDirectoryException,
-      UnresolvedLinkException {
     if (pathComponents.length < 2)  // add root
       return null;
     // Gets the parent INode
     INodesInPath inodes =  getExistingPathINodes(pathComponents, 2, false);
-    INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
-    return inodes;
+    return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
   }
-  
+
   @Override
   DirCounts spaceConsumedInTree(DirCounts counts) {
     counts.nsCount += 1;
@@ -502,11 +462,10 @@ public class INodeDirectory extends INode {
    *         Note that the returned list is never null.
    */
   public ReadOnlyList<INode> getChildrenList(final Snapshot snapshot) {
-    assertNull(snapshot);
+    //TODO: use snapshot to select children list
     return children == null ? EMPTY_READ_ONLY_LIST
         : ReadOnlyList.Util.asReadOnlyList(children);
   }
-
   /** Set the children list. */
   public void setChildren(List<INode> children) {
     this.children = children;
@@ -531,7 +490,7 @@ public class INodeDirectory extends INode {
    * {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}.
    * Contains INodes information resolved from a given path.
    */
-  public static class INodesInPath {
+  static class INodesInPath {
     /**
      * Array with the specified number of INodes resolved for a given path.
      */
@@ -611,7 +570,7 @@ public class INodeDirectory extends INode {
     }
     
     /** @return the i-th inode. */
-    public INode getINode(int i) {
+    INode getINode(int i) {
       return inodes[i];
     }
     
@@ -717,13 +676,4 @@ public class INodeDirectory extends INode {
       }
     }
   }
-
-  /** 
-   * Get last modification time of inode.
-   * @return access time
-   */
-  public long getModificationTime(Snapshot snapshot) {
-    assertNull(snapshot);
-    return getModificationTime();
-  }
 }

+ 5 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -89,22 +89,16 @@ public class INodeFile extends INode implements BlockCollection {
   INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
                       short replication, long modificationTime,
                       long atime, long preferredBlockSize) {
-    this(null, permissions, modificationTime, atime, blklist, replication,
-        preferredBlockSize);
-  }
-
-  INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime,
-      BlockInfo[] blklist, short replication, long preferredBlockSize) {
-    super(name, permissions, null, mtime, atime);
+    super(permissions, modificationTime, atime);
     header = HeaderFormat.combineReplication(header, replication);
     header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
     this.blocks = blklist;
   }
 
-  protected INodeFile(INodeFile that) {
-    super(that);
-    this.header = that.header;
-    this.blocks = that.blocks;
+  protected INodeFile(INodeFile f) {
+    this(f.getPermissionStatus(), f.getBlocks(), f.getFileReplication(),
+        f.getModificationTime(), f.getAccessTime(), f.getPreferredBlockSize());
+    this.setLocalName(f.getLocalNameBytes());
   }
 
   /** @return true unconditionally. */

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java

@@ -72,8 +72,9 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(name, perm, modificationTime, modificationTime,
-        blocks, blockReplication, preferredBlockSize);
+    super(perm, blocks, blockReplication, modificationTime, modificationTime,
+          preferredBlockSize);
+    setLocalName(name);
     this.clientName = clientName;
     this.clientMachine = clientMachine;
     this.clientNode = clientNode;

+ 8 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -331,19 +331,22 @@ public class LeaseManager {
     }
   }
 
-  synchronized void changeLease(String src, String dst) {
+  synchronized void changeLease(String src, String dst,
+      String overwrite, String replaceBy) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".changelease: " +
-               " src=" + src + ", dest=" + dst);
+               " src=" + src + ", dest=" + dst + 
+               ", overwrite=" + overwrite +
+               ", replaceBy=" + replaceBy);
     }
 
-    final int len = src.length();
+    final int len = overwrite.length();
     for(Map.Entry<String, Lease> entry
         : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
       final String oldpath = entry.getKey();
       final Lease lease = entry.getValue();
-      // replace stem of src with new destination
-      final String newpath = dst + oldpath.substring(len);
+      //overwrite must be a prefix of oldpath
+      final String newpath = replaceBy + oldpath.substring(len);
       if (LOG.isDebugEnabled()) {
         LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
       }
@@ -426,26 +429,6 @@ public class LeaseManager {
     }
   }
 
-  /**
-   * Get the list of inodes corresponding to valid leases.
-   * @return list of inodes
-   * @throws UnresolvedLinkException
-   */
-  Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() {
-    Map<String, INodeFileUnderConstruction> inodes =
-        new TreeMap<String, INodeFileUnderConstruction>();
-    for (String p : sortedLeasesByPath.keySet()) {
-      // verify that path exists in namespace
-      try {
-        INode node = fsnamesystem.dir.getINode(p);
-        inodes.put(p, INodeFileUnderConstruction.valueOf(node, p));
-      } catch (IOException ioe) {
-        LOG.error(ioe);
-      }
-    }
-    return inodes;
-  }
-  
   /** Check the leases beginning from the oldest.
    *  @return true is sync is needed.
    */

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -598,7 +598,11 @@ public class NameNode {
     String nsId = getNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
-    state = createHAState();
+    if (!haEnabled) {
+      state = ACTIVE_STATE;
+    } else {
+      state = STANDBY_STATE;
+    }
     this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
     this.haContext = createHAContext();
     try {
@@ -615,10 +619,6 @@ public class NameNode {
     }
   }
 
-  protected HAState createHAState() {
-    return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
-  }
-
   protected HAContext createHAContext() {
     return new NameNodeHAContext();
   }
@@ -1298,7 +1298,7 @@ public class NameNode {
    *          before exit.
    * @throws ExitException thrown only for testing.
    */
-  protected synchronized void doImmediateShutdown(Throwable t)
+  private synchronized void doImmediateShutdown(Throwable t)
       throws ExitException {
     String message = "Error encountered requiring NN shutdown. " +
         "Shutting down immediately.";

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java

@@ -102,7 +102,7 @@ class NamenodeJspHelper {
     long usedNonHeap = (totalNonHeap * 100) / commitedNonHeap;
 
     String str = "<div>" + inodes + " files and directories, " + blocks + " blocks = "
-        + (inodes + blocks) + " total filesystem objects";
+        + (inodes + blocks) + " total";
     if (maxobjects != 0) {
       long pct = ((inodes + blocks) * 100) / maxobjects;
       str += " / " + maxobjects + " (" + pct + "%)";

+ 2 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -65,19 +65,6 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithQuota {
     }
     return (INodeDirectorySnapshottable)dir;
   }
-  
-  public static Snapshot findLatestSnapshot(INode inode) {
-    Snapshot latest = null;
-    for(; inode != null; inode = inode.getParent()) {
-      if (inode instanceof INodeDirectorySnapshottable) {
-        final Snapshot s = ((INodeDirectorySnapshottable)inode).getLastSnapshot();
-        if (Snapshot.ID_COMPARATOR.compare(latest, s) < 0) {
-          latest = s;
-        }
-      }
-    }
-    return latest;
-  }
 
   /** Snapshots of this directory in ascending order of snapshot id. */
   private final List<Snapshot> snapshots = new ArrayList<Snapshot>();
@@ -209,8 +196,8 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithQuota {
 
     //set modification time
     final long timestamp = Time.now();
-    s.getRoot().updateModificationTime(timestamp);
-    updateModificationTime(timestamp);
+    s.getRoot().setModificationTime(timestamp);
+    setModificationTime(timestamp);
     return s;
   }
   

+ 3 - 222
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -19,15 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
-
-import com.google.common.base.Preconditions;
 
 /** The directory with snapshots. */
 public class INodeDirectoryWithSnapshot extends INodeDirectory {
@@ -255,223 +250,9 @@ public class INodeDirectoryWithSnapshot extends INodeDirectory {
           + "\n  deleted=" + toString(deleted);
     }
   }
-  
-  private class SnapshotDiff implements Comparable<Snapshot> {
-    /** The snapshot will obtain after applied this diff. */
-    final Snapshot snapshot;
-    /** The size of the children list which is never changed. */
-    final int size;
-    /**
-     * Posterior diff is the diff happened after this diff.
-     * The posterior diff should be first applied to obtain the posterior
-     * snapshot and then apply this diff in order to obtain this snapshot.
-     * If the posterior diff is null, the posterior state is the current state. 
-     */
-    private SnapshotDiff posteriorDiff;
-    /** The data of this diff. */
-    private final Diff diff = new Diff();
-    /** The snapshot version of the inode. */
-    private INode snapshotINode;
-
-    SnapshotDiff(Snapshot snapshot, int size) {
-      if (size < 0) {
-        throw new HadoopIllegalArgumentException("size = " + size + " < 0");
-      }
-      this.snapshot = snapshot;
-      this.size = size;
-    }
-
-    @Override
-    public int compareTo(final Snapshot that_snapshot) {
-      return Snapshot.ID_COMPARATOR.compare(this.snapshot, that_snapshot);
-    }
-
-    /**
-     * @return The children list of a directory in a snapshot.
-     *         Since the snapshot is read-only, the logical view of the list is
-     *         never changed although the internal data structure may mutate.
-     */
-    ReadOnlyList<INode> getChildrenList() {
-      return new ReadOnlyList<INode>() {
-        private List<INode> children = null;
-
-        private List<INode> initChildren() {
-          if (children == null) {
-            final ReadOnlyList<INode> posterior = posteriorDiff != null?
-                posteriorDiff.getChildrenList()
-                : INodeDirectoryWithSnapshot.this.getChildrenList(null);
-            children = diff.apply2Current(ReadOnlyList.Util.asList(posterior));
-          }
-          return children;
-        }
-
-        @Override
-        public Iterator<INode> iterator() {
-          return initChildren().iterator();
-        }
-    
-        @Override
-        public boolean isEmpty() {
-          return size == 0;
-        }
-    
-        @Override
-        public int size() {
-          return size;
-        }
-    
-        @Override
-        public INode get(int i) {
-          return initChildren().get(i);
-        }
-      };
-    }
-    
-    INode getChild(byte[] name) {
-      final INode i = diff.accessPrevious(name, INode.DUMMY);
-      if (i != INode.DUMMY) {
-        // this diff is able to find it
-        return i; 
-      } else {
-        // should return the posterior INode.
-        return posteriorDiff != null? posteriorDiff.getChild(name)
-            : INodeDirectoryWithSnapshot.this.getChild(name, null);
-      }
-    }
-  }
-  
-  /** Replace the given directory to an {@link INodeDirectoryWithSnapshot}. */
-  public static INodeDirectoryWithSnapshot replaceDir(INodeDirectory oldDir,
-      Snapshot latestSnapshot) {
-    Preconditions.checkArgument(!(oldDir instanceof INodeDirectoryWithSnapshot),
-        "oldDir is already an INodeDirectoryWithSnapshot, oldDir=%s", oldDir);
-
-    final INodeDirectory parent = oldDir.getParent();
-    Preconditions.checkArgument(parent != null,
-        "parent is null, oldDir=%s", oldDir);
-
-    final INodeDirectoryWithSnapshot newDir = new INodeDirectoryWithSnapshot(
-        oldDir, latestSnapshot);
-    parent.replaceChild(newDir, null);
-    return newDir;
-  }
-  
-  /** Diff list sorted by snapshot IDs, i.e. in chronological order. */
-  private final List<SnapshotDiff> diffs = new ArrayList<SnapshotDiff>();
-
-  INodeDirectoryWithSnapshot(INodeDirectory that, Snapshot s) {
-    super(that);
-
-    // add a diff for the snapshot
-    addSnapshotDiff(s, that.getChildrenList(null).size());
-  }
-
-  INodeDirectoryWithSnapshot(String name, INodeDirectory dir, Snapshot s) {
-    this(dir, s);
-    setLocalName(name);
-    setParent(dir);
-  }
-  
-  SnapshotDiff addSnapshotDiff(Snapshot snapshot, int childrenSize) {
-    final SnapshotDiff d = new SnapshotDiff(snapshot, childrenSize); 
-    diffs.add(d);
-    return d;
-  }
-
-  /**
-   * Check if the latest snapshot diff exist.  If not, add it.
-   * @return the latest snapshot diff, which is never null.
-   */
-  private SnapshotDiff checkAndAddLatestSnapshotDiff(Snapshot latest) {
-    final SnapshotDiff last = getLastSnapshotDiff();
-    if (last != null && last.snapshot.equals(latest)) {
-      return last;
-    }
-
-    final int size = getChildrenList(null).size();
-    final SnapshotDiff d = addSnapshotDiff(latest, size);
-    if (last != null) {
-      last.posteriorDiff = d;
-    }
-    return d;
-  }
-  
-  Diff getLatestDiff(Snapshot latest) {
-    return checkAndAddLatestSnapshotDiff(latest).diff;
-  }
-
-  /**
-   * @return the diff corresponding to the snapshot.
-   *         When the diff is not found, it means that the current state and
-   *         the snapshot state are the same. 
-   */
-  SnapshotDiff getSnapshotDiff(Snapshot snapshot) {
-    if (snapshot == null) {
-      return null;
-    }
-    final int i = Collections.binarySearch(diffs, snapshot);
-    if (i >= 0) {
-      // exact match
-      return diffs.get(i);
-    } else {
-      // Exact match not found means that there were no changes between
-      // given snapshot and the next state so that the diff for the given
-      // snapshot is not recorded.  Thus, use the next state.
-      final int j = -i - 1;
-      return j < diffs.size()? diffs.get(j): null;
-    }
-  }
-  
-  SnapshotDiff getLastSnapshotDiff() {
-    return diffs.get(diffs.size() - 1);
-  }
-
-  @Override
-  public ReadOnlyList<INode> getChildrenList(Snapshot snapshot) {
-    final SnapshotDiff diff = getSnapshotDiff(snapshot);
-    if (diff != null) {
-      return diff.getChildrenList();
-    }
-    return super.getChildrenList(null);
-  }
 
-  @Override
-  public INode getChild(byte[] name, Snapshot snapshot) {
-    final SnapshotDiff diff = getSnapshotDiff(snapshot);
-    if (diff != null) {
-      return diff.getChild(name);
-    }
-    return super.getChild(name, null);
-  }
-  
-  @Override
-  public boolean addChild(INode inode, boolean setModTime,
-      Snapshot latestSnapshot) {
-    getLatestDiff(latestSnapshot).create(inode);
-    return super.addChild(inode, setModTime, null);
-  }
-
-  @Override
-  public INode removeChild(INode inode, Snapshot latestSnapshot) {
-    getLatestDiff(latestSnapshot).delete(inode);
-    return super.removeChild(inode, null);
-  }
-
-  @Override
-  public INode replaceChild(INodeDirectory newChild, Snapshot latestSnapshot) {
-    final INode oldChild = super.replaceChild(newChild, null);
-    final Diff diff = getLatestDiff(latestSnapshot);
-    diff.delete(oldChild);
-    diff.create(newChild);
-    return oldChild;
-  }
-
-  @Override
-  public long getModificationTime(Snapshot snapshot) {
-    final SnapshotDiff diff = getSnapshotDiff(snapshot);
-    if (diff != null) {
-      return diff.snapshotINode.getModificationTime();
-    }
-    return getModificationTime();
+  INodeDirectoryWithSnapshot(String name, INodeDirectory dir) {
+    super(name, dir.getPermissionStatus());
+    parent = dir;
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithLink.java

@@ -109,7 +109,7 @@ public class INodeFileWithLink extends INodeFile {
       this.setFileReplication(maxReplication);
       this.next = null;
       // clear parent
-      setParent(null);
+      parent = null;
     }
     return 1;
   }

+ 1 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java

@@ -44,7 +44,7 @@ public class Snapshot implements Comparable<byte[]> {
 
   Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
     this.id = id;
-    this.root = new INodeDirectoryWithSnapshot(name, dir, this);
+    this.root = new INodeDirectoryWithSnapshot(name, dir);
   }
 
   /** @return the root directory of the snapshot. */
@@ -57,21 +57,6 @@ public class Snapshot implements Comparable<byte[]> {
     return root.compareTo(bytes);
   }
   
-  @Override
-  public boolean equals(Object that) {
-    if (this == that) {
-      return true;
-    } else if (that == null || !(that instanceof Snapshot)) {
-      return false;
-    }
-    return this.id == ((Snapshot)that).id;
-  }
-  
-  @Override
-  public int hashCode() {
-    return id;
-  }
-  
   @Override
   public String toString() {
     return getClass().getSimpleName() + ":" + root.getLocalName();

+ 92 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java

@@ -24,8 +24,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
+import org.apache.hadoop.hdfs.server.namenode.INodeSymlink;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 /**
  * Manage snapshottable directories and their snapshots.
@@ -64,9 +68,7 @@ public class SnapshotManager implements SnapshotStats {
    */
   public void setSnapshottable(final String path, final int snapshotQuota
       ) throws IOException {
-    final INodesInPath inodesInPath = fsdir.getINodesInPath(path);
-    final INodeDirectory d = INodeDirectory.valueOf(
-        inodesInPath.getINode(0), path);
+    final INodeDirectory d = INodeDirectory.valueOf(fsdir.getINode(path), path);
     if (d.isSnapshottable()) {
       //The directory is already a snapshottable directory.
       ((INodeDirectorySnapshottable)d).setSnapshotQuota(snapshotQuota);
@@ -75,7 +77,7 @@ public class SnapshotManager implements SnapshotStats {
 
     final INodeDirectorySnapshottable s
         = INodeDirectorySnapshottable.newInstance(d, snapshotQuota);
-    fsdir.replaceINodeDirectory(path, d, s, inodesInPath.getLatestSnapshot());
+    fsdir.replaceINodeDirectory(path, d, s);
     snapshottables.add(s);
 
     numSnapshottableDirs.getAndIncrement();
@@ -88,16 +90,15 @@ public class SnapshotManager implements SnapshotStats {
    */
   public void resetSnapshottable(final String path
       ) throws IOException {
-    final INodesInPath inodesInPath = fsdir.getINodesInPath(path);
     final INodeDirectorySnapshottable s = INodeDirectorySnapshottable.valueOf(
-        inodesInPath.getINode(0), path);
+        fsdir.getINode(path), path);
     if (s.getNumSnapshots() > 0) {
       throw new SnapshotException("The directory " + path + " has snapshot(s). "
           + "Please redo the operation after removing all the snapshots.");
     }
 
     final INodeDirectory d = new INodeDirectory(s);
-    fsdir.replaceINodeDirectory(path, s, d, inodesInPath.getLatestSnapshot());
+    fsdir.replaceINodeDirectory(path, s, d);
     snapshottables.remove(s);
 
     numSnapshottableDirs.getAndDecrement();
@@ -120,8 +121,9 @@ public class SnapshotManager implements SnapshotStats {
     // Find the source root directory path where the snapshot is taken.
     final INodeDirectorySnapshottable srcRoot
         = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
-    srcRoot.addSnapshot(snapshotID, snapshotName);
-
+    final Snapshot s = srcRoot.addSnapshot(snapshotID, snapshotName);
+    new SnapshotCreation().processRecursively(srcRoot, s.getRoot());
+      
     //create success, update id
     snapshotID++;
     numSnapshots.getAndIncrement();
@@ -152,6 +154,85 @@ public class SnapshotManager implements SnapshotStats {
     srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
   }
   
+  /**
+   * Create a snapshot of subtrees by recursively coping the directory
+   * structure from the source directory to the snapshot destination directory.
+   * This creation algorithm requires O(N) running time and O(N) memory,
+   * where N = # files + # directories + # symlinks. 
+   */
+  class SnapshotCreation {
+    /** Process snapshot creation recursively. */
+    private void processRecursively(final INodeDirectory srcDir,
+        final INodeDirectory dstDir) throws IOException {
+      final ReadOnlyList<INode> children = srcDir.getChildrenList(null);
+      if (!children.isEmpty()) {
+        final List<INode> inodes = new ArrayList<INode>(children.size());
+        for(final INode c : new ArrayList<INode>(ReadOnlyList.Util.asList(children))) {
+          final INode i;
+          if (c == null) {
+            i = null;
+          } else if (c instanceof INodeDirectory) {
+            //also handle INodeDirectoryWithQuota
+            i = processINodeDirectory((INodeDirectory)c);
+          } else if (c instanceof INodeFileUnderConstruction) {
+            //TODO: support INodeFileUnderConstruction
+            throw new IOException("Not yet supported.");
+          } else if (c instanceof INodeFile) {
+            i = processINodeFile(srcDir, (INodeFile)c);
+          } else if (c instanceof INodeSymlink) {
+            i = new INodeSymlink((INodeSymlink)c);
+          } else {
+            throw new AssertionError("Unknow INode type: " + c.getClass()
+                + ", inode = " + c);
+          }
+          i.setParent(dstDir);
+          inodes.add(i);
+        }
+        dstDir.setChildren(inodes);
+      }
+    }
+    
+    /**
+     * Create destination INodeDirectory and make the recursive call. 
+     * @return destination INodeDirectory.
+     */
+    private INodeDirectory processINodeDirectory(final INodeDirectory srcChild
+        ) throws IOException {
+      final INodeDirectory dstChild = new INodeDirectory(srcChild);
+      dstChild.setChildren(null);
+      processRecursively(srcChild, dstChild);
+      return dstChild;
+    }
+
+    /**
+     * Create destination INodeFileSnapshot and update source INode type.
+     * @return destination INodeFileSnapshot.
+     */
+    private INodeFileSnapshot processINodeFile(final INodeDirectory parent,
+        final INodeFile file) {
+      final INodeFileSnapshot snapshot = new INodeFileSnapshot(
+          file, file.computeFileSize(true)); 
+
+      final INodeFileWithLink srcWithLink;
+      //check source INode type
+      if (file instanceof INodeFileWithLink) {
+        srcWithLink = (INodeFileWithLink)file;
+      } else {
+        //source is an INodeFile, replace the source.
+        srcWithLink = new INodeFileWithLink(file);
+        file.removeNode();
+        parent.addChild(srcWithLink, false);
+
+        //update block map
+        namesystem.getBlockManager().addBlockCollection(srcWithLink);
+      }
+      
+      //insert the snapshot to src's linked list.
+      srcWithLink.insert(snapshot);
+      return snapshot;
+    }
+  }
+
   @Override
   public long getNumSnapshottableDirs() {
     return numSnapshottableDirs.get();
@@ -162,4 +243,4 @@ public class SnapshotManager implements SnapshotStats {
     return numSnapshots.get();
   }
   
-}
+}

+ 0 - 96
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java

@@ -30,9 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@@ -51,10 +49,6 @@ public class TestLease {
         ).getLeaseByPath(src.toString()) != null;
   }
 
-  static int leaseCount(MiniDFSCluster cluster) {
-    return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()).countLease();
-  }
-  
   static final String dirString = "/test/lease";
   final Path dir = new Path(dirString);
   static final Log LOG = LogFactory.getLog(TestLease.class);
@@ -132,96 +126,6 @@ public class TestLease {
     }
   }
 
-  @Test
-  public void testLeaseAfterRename() throws Exception {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    try {
-      Path p = new Path("/test-file");
-      Path d = new Path("/test-d");
-      Path d2 = new Path("/test-d-other");
-
-      // open a file to get a lease
-      FileSystem fs = cluster.getFileSystem();
-      FSDataOutputStream out = fs.create(p);
-      out.writeBytes("something");
-      //out.hsync();
-      Assert.assertTrue(hasLease(cluster, p));
-      Assert.assertEquals(1, leaseCount(cluster));
-      
-      // just to ensure first fs doesn't have any logic to twiddle leases
-      DistributedFileSystem fs2 = (DistributedFileSystem) FileSystem.newInstance(fs.getUri(), fs.getConf());
-
-      // rename the file into an existing dir
-      LOG.info("DMS: rename file into dir");
-      Path pRenamed = new Path(d, p.getName());
-      fs2.mkdirs(d);
-      fs2.rename(p, pRenamed);
-      Assert.assertFalse(p+" exists", fs2.exists(p));
-      Assert.assertTrue(pRenamed+" not found", fs2.exists(pRenamed));
-      Assert.assertFalse("has lease for "+p, hasLease(cluster, p));
-      Assert.assertTrue("no lease for "+pRenamed, hasLease(cluster, pRenamed));
-      Assert.assertEquals(1, leaseCount(cluster));
-    
-      // rename the parent dir to a new non-existent dir
-      LOG.info("DMS: rename parent dir");
-      Path pRenamedAgain = new Path(d2, pRenamed.getName());
-      fs2.rename(d, d2);
-      // src gone
-      Assert.assertFalse(d+" exists", fs2.exists(d));
-      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
-      // dst checks
-      Assert.assertTrue(d2+" not found", fs2.exists(d2));
-      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
-      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
-      Assert.assertEquals(1, leaseCount(cluster));
-
-      // rename the parent dir to existing dir
-      // NOTE: rename w/o options moves paths into existing dir
-      LOG.info("DMS: rename parent again");
-      pRenamed = pRenamedAgain;
-      pRenamedAgain = new Path(new Path(d, d2.getName()), p.getName());      
-      fs2.mkdirs(d);
-      fs2.rename(d2, d);
-      // src gone
-      Assert.assertFalse(d2+" exists", fs2.exists(d2));
-      Assert.assertFalse("no lease for "+pRenamed, hasLease(cluster, pRenamed));
-      // dst checks
-      Assert.assertTrue(d+" not found", fs2.exists(d));
-      Assert.assertTrue(pRenamedAgain +" not found", fs2.exists(pRenamedAgain));
-      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
-      Assert.assertEquals(1, leaseCount(cluster));
-      
-      // rename with opts to non-existent dir
-      pRenamed = pRenamedAgain;
-      pRenamedAgain = new Path(d2, p.getName());
-      fs2.rename(pRenamed.getParent(), d2, Options.Rename.OVERWRITE);
-      // src gone
-      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
-      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
-      // dst checks
-      Assert.assertTrue(d2+" not found", fs2.exists(d2));
-      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
-      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
-      Assert.assertEquals(1, leaseCount(cluster));
-
-      // rename with opts to existing dir
-      // NOTE: rename with options will not move paths into the existing dir
-      pRenamed = pRenamedAgain;
-      pRenamedAgain = new Path(d, p.getName());
-      fs2.rename(pRenamed.getParent(), d, Options.Rename.OVERWRITE);
-      // src gone
-      Assert.assertFalse(pRenamed.getParent() +" not found", fs2.exists(pRenamed.getParent()));
-      Assert.assertFalse("has lease for "+pRenamed, hasLease(cluster, pRenamed));
-      // dst checks
-      Assert.assertTrue(d+" not found", fs2.exists(d));
-      Assert.assertTrue(pRenamedAgain+" not found", fs2.exists(pRenamedAgain));
-      Assert.assertTrue("no lease for "+pRenamedAgain, hasLease(cluster, pRenamedAgain));
-      Assert.assertEquals(1, leaseCount(cluster));
-    } finally {
-      cluster.shutdown();
-    }
-  }
-  
   @Test
   public void testLease() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -104,9 +103,6 @@ public class TestBackupNode {
     BackupNode bn = (BackupNode)NameNode.createNameNode(
         new String[]{startupOpt.getName()}, c);
     assertTrue(bn.getRole() + " must be in SafeMode.", bn.isInSafeMode());
-    assertTrue(bn.getRole() + " must be in StandbyState",
-               bn.getNamesystem().getHAState()
-                 .equalsIgnoreCase(HAServiceState.STANDBY.name()));
     return bn;
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java

@@ -158,7 +158,7 @@ public class TestFsLimits {
     Class<?> generated = null;
     try {
       fs.verifyFsLimits(inodes, 1, child);
-      rootInode.addChild(child, false, null);
+      rootInode.addChild(child, false);
     } catch (QuotaExceededException e) {
       generated = e.getClass();
     }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java

@@ -144,11 +144,11 @@ public class TestINodeFile {
     assertEquals("f", inf.getFullPathName());
     assertEquals("", inf.getLocalParentDir());
 
-    dir.addChild(inf, false, null);
+    dir.addChild(inf, false);
     assertEquals("d"+Path.SEPARATOR+"f", inf.getFullPathName());
     assertEquals("d", inf.getLocalParentDir());
     
-    root.addChild(dir, false, null);
+    root.addChild(dir, false);
     assertEquals(Path.SEPARATOR+"d"+Path.SEPARATOR+"f", inf.getFullPathName());
     assertEquals(Path.SEPARATOR+"d", dir.getFullPathName());
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestMetaSave.java

@@ -92,8 +92,7 @@ public class TestMetaSave {
     DataInputStream in = new DataInputStream(fstream);
     BufferedReader reader = new BufferedReader(new InputStreamReader(in));
     String line = reader.readLine();
-    assertTrue(line.equals(
-      "3 files and directories, 2 blocks = 5 total filesystem objects"));
+    assertTrue(line.equals("3 files and directories, 2 blocks = 5 total"));
     line = reader.readLine();
     assertTrue(line.equals("Live Datanodes: 1"));
     line = reader.readLine();

+ 0 - 18
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java

@@ -603,24 +603,6 @@ public class TestSaveNamespace {
     }
   }
   
-  @Test
-  public void testSaveNamespaceWithDanglingLease() throws Exception {
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
-        .numDataNodes(1).build();
-    cluster.waitActive();
-    DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    try {
-      cluster.getNamesystem().leaseManager.addLease("me", "/non-existent");      
-      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      cluster.getNameNodeRpc().saveNamespace();
-      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-    }
-  }
-
   private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
     // Make an edit
     fsn.mkdirs(

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

@@ -430,10 +430,10 @@ public class TestSnapshotPathINodes {
     // The number of inodes should be equal to components.length
     assertEquals(newInodes.length, components.length);
     // The last INode should be associated with file1
-    final int last = components.length - 1;
-    assertEquals(newInodes[last].getFullPathName(), file1.toString());
+    assertEquals(newInodes[components.length - 1].getFullPathName(),
+        file1.toString());
     // The modification time of the INode for file3 should have been changed
-    Assert.assertFalse(inodes[last].getModificationTime()
-        == newInodes[last].getModificationTime());
+    Assert.assertFalse(inodes[components.length - 1].getModificationTime() ==
+        newInodes[components.length - 1].getModificationTime());
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeDirectoryWithSnapshot.java

@@ -221,7 +221,7 @@ public class TestINodeDirectoryWithSnapshot {
     Assert.assertTrue(i >= 0);
     final INodeDirectory oldinode = (INodeDirectory)current.get(i);
     final INodeDirectory newinode = new INodeDirectory(oldinode);
-    newinode.updateModificationTime(oldinode.getModificationTime() + 1);
+    newinode.setModificationTime(oldinode.getModificationTime() + 1);
 
     current.set(i, newinode);
     if (diff != null) {

+ 0 - 3
hadoop-mapreduce-project/CHANGES.txt

@@ -604,9 +604,6 @@ Release 0.23.6 - UNRELEASED
     MAPREDUCE-4817. Hardcoded task ping timeout kills tasks localizing large 
     amounts of data (tgraves)
 
-    MAPREDUCE-4836. Elapsed time for running tasks on AM web UI tasks page is 0
-    (Ravi Prakash via jeagles)
-
 Release 0.23.5 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/dao/TaskInfo.java

@@ -59,12 +59,11 @@ public class TaskInfo {
     TaskReport report = task.getReport();
     this.startTime = report.getStartTime();
     this.finishTime = report.getFinishTime();
-    this.state = report.getTaskState();
-    this.elapsedTime = Times.elapsed(this.startTime, this.finishTime,
-      this.state == TaskState.RUNNING);
+    this.elapsedTime = Times.elapsed(this.startTime, this.finishTime, false);
     if (this.elapsedTime == -1) {
       this.elapsedTime = 0;
     }
+    this.state = report.getTaskState();
     this.progress = report.getProgress() * 100;
     this.id = MRApps.toString(task.getID());
     this.taskNum = task.getID().getId();

+ 1 - 5
hadoop-yarn-project/CHANGES.txt

@@ -117,11 +117,7 @@ Release 2.0.3-alpha - Unreleased
 
     YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy) 
 
-    YARN-187. Add hierarchical queues to the fair scheduler.
-    (Sandy Ryza via tomwhite)
-
-    YARN-72. NM should handle cleaning up containers when it shuts down.
-    (Sandy Ryza via tomwhite)
+    YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite)
 
 Release 2.0.2-alpha - 2012-09-07 
 

+ 2 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java

@@ -25,23 +25,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
 
   private List<ContainerId> containerToCleanup;
-  private Reason reason;
-  
-  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup, Reason reason) {
+
+  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
     super(ContainerManagerEventType.FINISH_CONTAINERS);
     this.containerToCleanup = containersToCleanup;
-    this.reason = reason;
   }
 
   public List<ContainerId> getContainersToCleanup() {
     return this.containerToCleanup;
   }
-  
-  public Reason getReason() {
-    return reason;
-  }
-  
-  public static enum Reason {
-    ON_SHUTDOWN, BY_RESOURCEMANAGER
-  }
 }

+ 5 - 65
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -19,9 +19,6 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -64,24 +61,14 @@ public class NodeManager extends CompositeService implements
    * Priority of the NodeManager shutdown hook.
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-  
-  /**
-   * Extra duration to wait for containers to be killed on shutdown.
-   */
-  private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
 
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
   private ApplicationACLsManager aclsManager;
   private NodeHealthCheckerService nodeHealthChecker;
   private LocalDirsHandlerService dirsHandler;
-  private Context context;
-  private AsyncDispatcher dispatcher;
-  private ContainerManagerImpl containerManager;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook; 
   
-  private long waitForContainersOnShutdownMillis;
-  
   public NodeManager() {
     super(NodeManager.class.getName());
   }
@@ -128,7 +115,7 @@ public class NodeManager extends CompositeService implements
       containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
     }
 
-    this.context = new NMContext(containerTokenSecretManager);
+    Context context = new NMContext(containerTokenSecretManager);
 
     this.aclsManager = new ApplicationACLsManager(conf);
 
@@ -144,7 +131,7 @@ public class NodeManager extends CompositeService implements
     addService(del);
 
     // NodeManager level dispatcher
-    this.dispatcher = new AsyncDispatcher();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
 
     nodeHealthChecker = new NodeHealthCheckerService();
     addService(nodeHealthChecker);
@@ -157,7 +144,7 @@ public class NodeManager extends CompositeService implements
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
 
-    containerManager =
+    ContainerManagerImpl containerManager =
         createContainerManager(context, exec, del, nodeStatusUpdater,
         this.aclsManager, dirsHandler);
     addService(containerManager);
@@ -168,20 +155,13 @@ public class NodeManager extends CompositeService implements
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     addService(dispatcher);
-    
+
     DefaultMetricsSystem.initialize("NodeManager");
 
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
     addService(nodeStatusUpdater);
-    
-    waitForContainersOnShutdownMillis =
-        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
-            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + 
-        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
-            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
-        SHUTDOWN_CLEANUP_SLOP_MS;
-    
+
     super.init(conf);
     // TODO add local dirs to del
   }
@@ -198,44 +178,9 @@ public class NodeManager extends CompositeService implements
 
   @Override
   public void stop() {
-    cleanupContainers();
     super.stop();
     DefaultMetricsSystem.shutdown();
   }
-  
-  @SuppressWarnings("unchecked")
-  private void cleanupContainers() {
-    Map<ContainerId, Container> containers = context.getContainers();
-    if (containers.isEmpty()) {
-      return;
-    }
-    LOG.info("Containers still running on shutdown: " + containers.keySet());
-    
-    List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
-    dispatcher.getEventHandler().handle(
-        new CMgrCompletedContainersEvent(containerIds, 
-            CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
-    
-    LOG.info("Waiting for containers to be killed");
-    
-    long waitStartTime = System.currentTimeMillis();
-    while (!containers.isEmpty() && 
-        System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ex) {
-        LOG.warn("Interrupted while sleeping on container kill", ex);
-      }
-    }
-
-    // All containers killed
-    if (containers.isEmpty()) {
-      LOG.info("All containers in DONE state");
-    } else {
-      LOG.info("Done waiting for containers to be killed. Still alive: " + 
-          containers.keySet());
-    }
-  }
 
   public static class NMContext implements Context {
 
@@ -337,11 +282,6 @@ public class NodeManager extends CompositeService implements
   NodeManager createNewNodeManager() {
     return new NodeManager();
   }
-  
-  // For testing
-  ContainerManagerImpl getContainerManager() {
-    return containerManager;
-  }
 
   public static void main(String[] args) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -363,8 +363,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                 .getContainersToCleanupList();
             if (containersToCleanup.size() != 0) {
               dispatcher.getEventHandler().handle(
-                  new CMgrCompletedContainersEvent(containersToCleanup, 
-                      CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
+                  new CMgrCompletedContainersEvent(containersToCleanup));
             }
             List<ApplicationId> appsToCleanup =
                 response.getApplicationsToCleanupList();

+ 2 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -23,8 +23,6 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -595,16 +593,9 @@ public class ContainerManagerImpl extends CompositeService implements
           (CMgrCompletedContainersEvent) event;
       for (ContainerId container : containersFinishedEvent
           .getContainersToCleanup()) {
-        String diagnostic = "";
-        if (containersFinishedEvent.getReason() == 
-            CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
-          diagnostic = "Container Killed on Shutdown";
-        } else if (containersFinishedEvent.getReason() == 
-            CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
-          diagnostic = "Container Killed by ResourceManager";
-        }
         this.dispatcher.getEventHandler().handle(
-            new ContainerKillEvent(container, diagnostic));
+            new ContainerKillEvent(container,
+                "Container Killed by ResourceManager"));
       }
       break;
     default:

+ 0 - 92
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java

@@ -1,92 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.server.nodemanager;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-
-/**
- * This class allows a node manager to run without without communicating with a
- * real RM.
- */
-public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
-  static final Log LOG = LogFactory.getLog(MockNodeStatusUpdater.class);
-  
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  private ResourceTracker resourceTracker;
-
-  public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
-      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
-    super(context, dispatcher, healthChecker, metrics);
-    resourceTracker = new MockResourceTracker();
-  }
-
-  @Override
-  protected ResourceTracker getRMClient() {
-    return resourceTracker;
-  }
-  
-  private static class MockResourceTracker implements ResourceTracker {
-    private int heartBeatID;
-
-    @Override
-    public RegisterNodeManagerResponse registerNodeManager(
-        RegisterNodeManagerRequest request) throws YarnRemoteException {
-      RegistrationResponse regResponse = recordFactory
-          .newRecordInstance(RegistrationResponse.class);
-
-      RegisterNodeManagerResponse response = recordFactory
-          .newRecordInstance(RegisterNodeManagerResponse.class);
-      response.setRegistrationResponse(regResponse);
-      return response;
-    }
-
-    @Override
-    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
-        throws YarnRemoteException {
-      NodeStatus nodeStatus = request.getNodeStatus();
-      LOG.info("Got heartbeat number " + heartBeatID);
-      nodeStatus.setResponseId(heartBeatID++);
-
-      HeartbeatResponse response = recordFactory
-          .newRecordInstance(HeartbeatResponse.class);
-      response.setResponseId(heartBeatID);
-
-      NodeHeartbeatResponse nhResponse = recordFactory
-          .newRecordInstance(NodeHeartbeatResponse.class);
-      nhResponse.setHeartbeatResponse(response);
-      return nhResponse;
-    }
-  }
-}

+ 0 - 222
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java

@@ -1,222 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.server.nodemanager;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestNodeManagerShutdown {
-  static final File basedir =
-      new File("target", TestNodeManagerShutdown.class.getName());
-  static final File tmpDir = new File(basedir, "tmpDir");
-  static final File logsDir = new File(basedir, "logs");
-  static final File remoteLogsDir = new File(basedir, "remotelogs");
-  static final File nmLocalDir = new File(basedir, "nm0");
-  static final File processStartFile = new File(tmpDir, "start_file.txt")
-    .getAbsoluteFile();
-
-  static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-  static final String user = "nobody";
-  private FileContext localFS;
-
-  @Before
-  public void setup() throws UnsupportedFileSystemException {
-    localFS = FileContext.getLocalFSFileContext();
-    tmpDir.mkdirs();
-    logsDir.mkdirs();
-    remoteLogsDir.mkdirs();
-    nmLocalDir.mkdirs();
-  }
-  
-  @After
-  public void tearDown() throws IOException, InterruptedException {
-    localFS.delete(new Path(basedir.getPath()), true);
-  }
-  
-  @Test
-  public void testKillContainersOnShutdown() throws IOException {
-    NodeManager nm = getNodeManager();
-    nm.init(createNMConfig());
-    nm.start();
-    
-    ContainerManagerImpl containerManager = nm.getContainerManager();
-    File scriptFile = createUnhaltingScriptFile();
-    
-    ContainerLaunchContext containerLaunchContext = 
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    // Construct the Container-id
-    ContainerId cId = createContainerId();
-    containerLaunchContext.setContainerId(cId);
-
-    containerLaunchContext.setUser(user);
-
-    URL localResourceUri =
-        ConverterUtils.getYarnUrlFromPath(localFS
-            .makeQualified(new Path(scriptFile.getAbsolutePath())));
-    LocalResource localResource =
-        recordFactory.newRecordInstance(LocalResource.class);
-    localResource.setResource(localResourceUri);
-    localResource.setSize(-1);
-    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    localResource.setType(LocalResourceType.FILE);
-    localResource.setTimestamp(scriptFile.lastModified());
-    String destinationFile = "dest_file";
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    localResources.put(destinationFile, localResource);
-    containerLaunchContext.setLocalResources(localResources);
-    containerLaunchContext.setUser(containerLaunchContext.getUser());
-    List<String> commands = new ArrayList<String>();
-    commands.add("/bin/bash");
-    commands.add(scriptFile.getAbsolutePath());
-    containerLaunchContext.setCommands(commands);
-    containerLaunchContext.setResource(recordFactory
-        .newRecordInstance(Resource.class));
-    containerLaunchContext.getResource().setMemory(1024);
-    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
-    startRequest.setContainerLaunchContext(containerLaunchContext);
-    containerManager.startContainer(startRequest);
-    
-    GetContainerStatusRequest request =
-        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-        request.setContainerId(cId);
-    ContainerStatus containerStatus =
-        containerManager.getContainerStatus(request).getStatus();
-    Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
-    
-    try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();} 
-    
-    nm.stop();
-    
-    // Now verify the contents of the file
-    // Script generates a message when it receives a sigterm
-    // so we look for that
-    BufferedReader reader =
-        new BufferedReader(new FileReader(processStartFile));
-
-    boolean foundSigTermMessage = false;
-    while (true) {
-      String line = reader.readLine();
-      if (line == null) {
-        break;
-      }
-      if (line.contains("SIGTERM")) {
-        foundSigTermMessage = true;
-        break;
-      }
-    }
-    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
-    reader.close();
-  }
-  
-  private ContainerId createContainerId() {
-    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
-    appId.setClusterTimestamp(0);
-    appId.setId(0);
-    ApplicationAttemptId appAttemptId = 
-        recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId.setApplicationId(appId);
-    appAttemptId.setAttemptId(1);
-    ContainerId containerId = 
-        recordFactory.newRecordInstance(ContainerId.class);
-    containerId.setApplicationAttemptId(appAttemptId);
-    return containerId;
-  }
-  
-  private YarnConfiguration createNMConfig() {
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
-    conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
-    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
-    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
-    return conf;
-  }
-  
-  /**
-   * Creates a script to run a container that will run forever unless
-   * stopped by external means.
-   */
-  private File createUnhaltingScriptFile() throws IOException {
-    File scriptFile = new File(tmpDir, "scriptFile.sh");
-    BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
-    fileWriter.write("#!/bin/bash\n\n");
-    fileWriter.write("echo \"Running testscript for delayed kill\"\n");
-    fileWriter.write("hello=\"Got SIGTERM\"\n");
-    fileWriter.write("umask 0\n");
-    fileWriter.write("trap \"echo $hello >> " + processStartFile + "\" SIGTERM\n");
-    fileWriter.write("echo \"Writing pid to start file\"\n");
-    fileWriter.write("echo $$ >> " + processStartFile + "\n");
-    fileWriter.write("while true; do\nsleep 1s;\ndone\n");
-
-    fileWriter.close();
-    return scriptFile;
-  }
-
-  private NodeManager getNodeManager() {
-    return new NodeManager() {
-      @Override
-      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-        MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
-            context, dispatcher, healthChecker, metrics);
-        return myNodeStatusUpdater;
-      }
-    };
-  }
-}