Bladeren bron

Merge branch 'trunk' into HDFS-6581

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
arp 10 jaren geleden
bovenliggende
commit
032e0eba6b
46 gewijzigde bestanden met toevoegingen van 1550 en 303 verwijderingen
  1. 140 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java
  2. 70 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java
  3. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java
  4. 1 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
  5. 90 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoProtocolVersion.java
  6. 15 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java
  7. 154 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java
  8. 11 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  9. 59 30
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  10. 8 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  11. 4 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
  12. 38 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
  13. 12 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
  14. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  15. 15 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZone.java
  16. 58 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
  17. 53 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
  18. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  19. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  20. 61 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  21. 17 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
  22. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
  23. 42 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  24. 21 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
  25. 10 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  26. 53 48
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  27. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  28. 94 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  29. 11 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java
  30. 30 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
  31. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  32. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/encryption.proto
  33. 18 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  34. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  35. 95 23
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
  36. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
  37. 44 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
  38. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
  39. 152 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
  40. 29 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestAtomicFileOutputStream.java
  41. 6 0
      hadoop-yarn-project/CHANGES.txt
  42. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
  43. 0 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  44. 62 60
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  45. 22 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  46. 23 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 140 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java

@@ -18,9 +18,18 @@
 
 package org.apache.hadoop.conf;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.*;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Utility base class for implementing the Reconfigurable interface.
@@ -34,6 +43,30 @@ public abstract class ReconfigurableBase
   
   private static final Log LOG =
     LogFactory.getLog(ReconfigurableBase.class);
+  // Use for testing purpose.
+  private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
+
+  /** Background thread to reload configuration. */
+  private Thread reconfigThread = null;
+  private volatile boolean shouldRun = true;
+  private Object reconfigLock = new Object();
+
+  /**
+   * The timestamp when the <code>reconfigThread</code> starts.
+   */
+  private long startTime = 0;
+
+  /**
+   * The timestamp when the <code>reconfigThread</code> finishes.
+   */
+  private long endTime = 0;
+
+  /**
+   * A map of <changed property, error message>. If error message is present,
+   * it contains the messages about the error occurred when applies the particular
+   * change. Otherwise, it indicates that the change has been successfully applied.
+   */
+  private Map<PropertyChange, Optional<String>> status = null;
 
   /**
    * Construct a ReconfigurableBase.
@@ -50,6 +83,113 @@ public abstract class ReconfigurableBase
     super((conf == null) ? new Configuration() : conf);
   }
 
+  @VisibleForTesting
+  public void setReconfigurationUtil(ReconfigurationUtil ru) {
+    reconfigurationUtil = Preconditions.checkNotNull(ru);
+  }
+
+  @VisibleForTesting
+  public Collection<PropertyChange> getChangedProperties(
+      Configuration newConf, Configuration oldConf) {
+    return reconfigurationUtil.parseChangedProperties(newConf, oldConf);
+  }
+
+  /**
+   * A background thread to apply configuration changes.
+   */
+  private static class ReconfigurationThread extends Thread {
+    private ReconfigurableBase parent;
+
+    ReconfigurationThread(ReconfigurableBase base) {
+      this.parent = base;
+    }
+
+    // See {@link ReconfigurationServlet#applyChanges}
+    public void run() {
+      LOG.info("Starting reconfiguration task.");
+      Configuration oldConf = this.parent.getConf();
+      Configuration newConf = new Configuration();
+      Collection<PropertyChange> changes =
+          this.parent.getChangedProperties(newConf, oldConf);
+      Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
+      for (PropertyChange change : changes) {
+        String errorMessage = null;
+        if (!this.parent.isPropertyReconfigurable(change.prop)) {
+          errorMessage = "Property " + change.prop +
+              " is not reconfigurable";
+          LOG.info(errorMessage);
+          results.put(change, Optional.of(errorMessage));
+          continue;
+        }
+        LOG.info("Change property: " + change.prop + " from \""
+            + ((change.oldVal == null) ? "<default>" : change.oldVal)
+            + "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
+            + "\".");
+        try {
+          this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
+        } catch (ReconfigurationException e) {
+          errorMessage = e.toString();
+        }
+        results.put(change, Optional.fromNullable(errorMessage));
+      }
+
+      synchronized (this.parent.reconfigLock) {
+        this.parent.endTime = Time.monotonicNow();
+        this.parent.status = Collections.unmodifiableMap(results);
+        this.parent.reconfigThread = null;
+      }
+    }
+  }
+
+  /**
+   * Start a reconfiguration task to reload configuration in background.
+   */
+  public void startReconfigurationTask() throws IOException {
+    synchronized (reconfigLock) {
+      if (!shouldRun) {
+        String errorMessage = "The server is stopped.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      }
+      if (reconfigThread != null) {
+        String errorMessage = "Another reconfiguration task is running.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      }
+      reconfigThread = new ReconfigurationThread(this);
+      reconfigThread.setDaemon(true);
+      reconfigThread.setName("Reconfiguration Task");
+      reconfigThread.start();
+      startTime = Time.monotonicNow();
+    }
+  }
+
+  public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
+    synchronized (reconfigLock) {
+      if (reconfigThread != null) {
+        return new ReconfigurationTaskStatus(startTime, 0, null);
+      }
+      return new ReconfigurationTaskStatus(startTime, endTime, status);
+    }
+  }
+
+  public void shutdownReconfigurationTask() {
+    Thread tempThread;
+    synchronized (reconfigLock) {
+      shouldRun = false;
+      if (reconfigThread == null) {
+        return;
+      }
+      tempThread = reconfigThread;
+      reconfigThread = null;
+    }
+
+    try {
+      tempThread.join();
+    } catch (InterruptedException e) {
+    }
+  }
+
   /**
    * {@inheritDoc}
    *

+ 70 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.conf;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ReconfigurationTaskStatus {
+  long startTime;
+  long endTime;
+  final Map<ReconfigurationUtil.PropertyChange, Optional<String>> status;
+
+  public ReconfigurationTaskStatus(long startTime, long endTime,
+      Map<ReconfigurationUtil.PropertyChange, Optional<String>> status) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.status = status;
+  }
+
+  /**
+   * Return true if
+   *   - A reconfiguration task has finished or
+   *   - an active reconfiguration task is running
+   */
+  public boolean hasTask() {
+    return startTime > 0;
+  }
+
+  /**
+   * Return true if the latest reconfiguration task has finished and there is
+   * no another active task running.
+   */
+  public boolean stopped() {
+    return endTime > 0;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public final Map<PropertyChange, Optional<String>> getStatus() {
+    return status;
+  }
+}

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java

@@ -63,4 +63,9 @@ public class ReconfigurationUtil {
 
     return changes.values();
   }
+
+  public Collection<PropertyChange> parseChangedProperties(
+      Configuration newConf, Configuration oldConf) {
+    return getChangedProperties(newConf, oldConf);
+  }
 }

+ 1 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java

@@ -82,12 +82,7 @@ public abstract class CryptoCodec implements Configurable {
       }
     }
     
-    if (codec != null) {
-      return codec;
-    }
-    
-    throw new RuntimeException("No available crypto codec which meets " + 
-        "the cipher suite " + cipherSuite.getName() + ".");
+    return codec;
   }
   
   /**

+ 90 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoProtocolVersion.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Versions of the client/server protocol used for HDFS encryption.
+ */
+@InterfaceAudience.Private
+public enum CryptoProtocolVersion {
+  UNKNOWN("Unknown", 1),
+  ENCRYPTION_ZONES("Encryption zones", 2);
+
+  private final String description;
+  private final int version;
+  private Integer unknownValue = null;
+
+  private static CryptoProtocolVersion[] supported = {ENCRYPTION_ZONES};
+
+  /**
+   * @return Array of supported protocol versions.
+   */
+  public static CryptoProtocolVersion[] supported() {
+    return supported;
+  }
+
+  CryptoProtocolVersion(String description, int version) {
+    this.description = description;
+    this.version = version;
+  }
+
+  /**
+   * Returns if a given protocol version is supported.
+   *
+   * @param version version number
+   * @return true if the version is supported, else false
+   */
+  public static boolean supports(CryptoProtocolVersion version) {
+    if (version.getVersion() == UNKNOWN.getVersion()) {
+      return false;
+    }
+    for (CryptoProtocolVersion v : CryptoProtocolVersion.values()) {
+      if (v.getVersion() == version.getVersion()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void setUnknownValue(int unknown) {
+    this.unknownValue = unknown;
+  }
+
+  public int getUnknownValue() {
+    return unknownValue;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public String toString() {
+    return "CryptoProtocolVersion{" +
+        "description='" + description + '\'' +
+        ", version=" + version +
+        ", unknownValue=" + unknownValue +
+        '}';
+  }
+}

+ 15 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -32,6 +33,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 public class FileEncryptionInfo {
 
   private final CipherSuite cipherSuite;
+  private final CryptoProtocolVersion version;
   private final byte[] edek;
   private final byte[] iv;
   private final String keyName;
@@ -47,9 +49,11 @@ public class FileEncryptionInfo {
    * @param ezKeyVersionName name of the KeyVersion used to encrypt the
    *                         encrypted data encryption key.
    */
-  public FileEncryptionInfo(final CipherSuite suite, final byte[] edek,
+  public FileEncryptionInfo(final CipherSuite suite,
+      final CryptoProtocolVersion version, final byte[] edek,
       final byte[] iv, final String keyName, final String ezKeyVersionName) {
     checkNotNull(suite);
+    checkNotNull(version);
     checkNotNull(edek);
     checkNotNull(iv);
     checkNotNull(keyName);
@@ -59,6 +63,7 @@ public class FileEncryptionInfo {
     checkArgument(iv.length == suite.getAlgorithmBlockSize(),
         "Unexpected IV length");
     this.cipherSuite = suite;
+    this.version = version;
     this.edek = edek;
     this.iv = iv;
     this.keyName = keyName;
@@ -73,6 +78,14 @@ public class FileEncryptionInfo {
     return cipherSuite;
   }
 
+  /**
+   * @return {@link org.apache.hadoop.crypto.CryptoProtocolVersion} to use
+   * to access the file.
+   */
+  public CryptoProtocolVersion getCryptoProtocolVersion() {
+    return version;
+  }
+
   /**
    * @return encrypted data encryption key (EDEK) for the file
    */
@@ -102,6 +115,7 @@ public class FileEncryptionInfo {
   public String toString() {
     StringBuilder builder = new StringBuilder("{");
     builder.append("cipherSuite: " + cipherSuite);
+    builder.append(", cryptoProtocolVersion: " + version);
     builder.append(", edek: " + Hex.encodeHexString(edek));
     builder.append(", iv: " + Hex.encodeHexString(iv));
     builder.append(", keyName: " + keyName);

+ 154 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java

@@ -18,13 +18,32 @@
 
 package org.apache.hadoop.conf;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.junit.Test;
 import org.junit.Before;
-import static org.junit.Assert.*;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 public class TestReconfiguration {
   private Configuration conf1;
@@ -105,8 +124,8 @@ public class TestReconfiguration {
     }
 
     @Override
-    public synchronized void reconfigurePropertyImpl(String property, 
-                                                     String newVal) {
+    public synchronized void reconfigurePropertyImpl(
+        String property, String newVal) throws ReconfigurationException {
       // do nothing
     }
     
@@ -312,4 +331,136 @@ public class TestReconfiguration {
     
   }
 
+  private static class AsyncReconfigurableDummy extends ReconfigurableBase {
+    AsyncReconfigurableDummy(Configuration conf) {
+      super(conf);
+    }
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public Collection<String> getReconfigurableProperties() {
+      return Arrays.asList(PROP1, PROP2, PROP4);
+    }
+
+    @Override
+    public synchronized void reconfigurePropertyImpl(String property,
+        String newVal) throws ReconfigurationException {
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+
+  private static void waitAsyncReconfigureTaskFinish(ReconfigurableBase rb)
+      throws InterruptedException {
+    ReconfigurationTaskStatus status = null;
+    int count = 20;
+    while (count > 0) {
+      status = rb.getReconfigurationTaskStatus();
+      if (status.stopped()) {
+        break;
+      }
+      count--;
+      Thread.sleep(500);
+    }
+    assert(status.stopped());
+  }
+
+  @Test
+  public void testAsyncReconfigure()
+      throws ReconfigurationException, IOException, InterruptedException {
+    AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+
+    List<PropertyChange> changes = Lists.newArrayList();
+    changes.add(new PropertyChange("name1", "new1", "old1"));
+    changes.add(new PropertyChange("name2", "new2", "old2"));
+    changes.add(new PropertyChange("name3", "new3", "old3"));
+    doReturn(changes).when(dummy).getChangedProperties(
+        any(Configuration.class), any(Configuration.class));
+
+    doReturn(true).when(dummy).isPropertyReconfigurable(eq("name1"));
+    doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
+    doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));
+
+    doNothing().when(dummy)
+        .reconfigurePropertyImpl(eq("name1"), anyString());
+    doNothing().when(dummy)
+        .reconfigurePropertyImpl(eq("name2"), anyString());
+    doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3"))
+        .when(dummy).reconfigurePropertyImpl(eq("name3"), anyString());
+
+    dummy.startReconfigurationTask();
+
+    waitAsyncReconfigureTaskFinish(dummy);
+    ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+    assertEquals(3, status.getStatus().size());
+    for (Map.Entry<PropertyChange, Optional<String>> result :
+        status.getStatus().entrySet()) {
+      PropertyChange change = result.getKey();
+      if (change.prop.equals("name1")) {
+        assertFalse(result.getValue().isPresent());
+      } else if (change.prop.equals("name2")) {
+        assertThat(result.getValue().get(),
+            containsString("Property name2 is not reconfigurable"));
+      } else if (change.prop.equals("name3")) {
+        assertThat(result.getValue().get(), containsString("NAME3"));
+      } else {
+        fail("Unknown property: " + change.prop);
+      }
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testStartReconfigurationFailureDueToExistingRunningTask()
+      throws InterruptedException, IOException {
+    AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+    List<PropertyChange> changes = Lists.newArrayList(
+        new PropertyChange(PROP1, "new1", "old1")
+    );
+    doReturn(changes).when(dummy).getChangedProperties(
+        any(Configuration.class), any(Configuration.class));
+
+    ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+    assertFalse(status.hasTask());
+
+    dummy.startReconfigurationTask();
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertFalse(status.stopped());
+
+    // An active reconfiguration task is running.
+    try {
+      dummy.startReconfigurationTask();
+      fail("Expect to throw IOException.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Another reconfiguration task is running", e);
+    }
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertFalse(status.stopped());
+
+    dummy.latch.countDown();
+    waitAsyncReconfigureTaskFinish(dummy);
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertTrue(status.stopped());
+
+    // The first task has finished.
+    dummy.startReconfigurationTask();
+    waitAsyncReconfigureTaskFinish(dummy);
+    ReconfigurationTaskStatus status2 = dummy.getReconfigurationTaskStatus();
+    assertTrue(status2.getStartTime() >= status.getEndTime());
+
+    dummy.shutdownReconfigurationTask();
+    try {
+      dummy.startReconfigurationTask();
+      fail("Expect to throw IOException");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("The server is stopped", e);
+    }
+  }
 }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -585,6 +585,14 @@ Release 2.6.0 - UNRELEASED
     HDFS-7118. Improve diagnostics on storage directory rename operations by
     using NativeIO#renameTo in Storage#rename. (cnauroth)
 
+    HDFS-6808. Add command line option to ask DataNode reload configuration.
+    (Lei Xu via Colin Patrick McCabe)
+
+    HDFS-7119. Split error checks in AtomicFileOutputStream#close into separate
+    conditions to improve diagnostics. (cnauroth)
+
+    HDFS-7077. Separate CipherSuite from crypto protocol version. (wang)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)
@@ -955,6 +963,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-7127. TestLeaseRecovery leaks MiniDFSCluster instances. (cnauroth)
 
+    HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
+    file in the current directory. (jing9)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 59 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -21,7 +21,6 @@ import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
     .EncryptedKeyVersion;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -104,6 +103,7 @@ import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
@@ -263,9 +263,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
       new DFSHedgedReadMetrics();
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
-  private final CryptoCodec codec;
-  @VisibleForTesting
-  List<CipherSuite> cipherSuites;
   @VisibleForTesting
   KeyProviderCryptoExtension provider;
   /**
@@ -599,11 +596,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
-    this.codec = CryptoCodec.getInstance(conf);
-    this.cipherSuites = Lists.newArrayListWithCapacity(1);
-    if (codec != null) {
-      cipherSuites.add(codec.getCipherSuite());
-    }
     provider = DFSUtil.createKeyProviderCryptoExtension(conf);
     if (LOG.isDebugEnabled()) {
       if (provider == null) {
@@ -1329,6 +1321,55 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Obtain the crypto protocol version from the provided FileEncryptionInfo,
+   * checking to see if this version is supported by.
+   *
+   * @param feInfo FileEncryptionInfo
+   * @return CryptoProtocolVersion from the feInfo
+   * @throws IOException if the protocol version is unsupported.
+   */
+  private static CryptoProtocolVersion getCryptoProtocolVersion
+      (FileEncryptionInfo feInfo) throws IOException {
+    final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
+    if (!CryptoProtocolVersion.supports(version)) {
+      throw new IOException("Client does not support specified " +
+          "CryptoProtocolVersion " + version.getDescription() + " version " +
+          "number" + version.getVersion());
+    }
+    return version;
+  }
+
+  /**
+   * Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
+   * and the available CryptoCodecs configured in the Configuration.
+   *
+   * @param conf   Configuration
+   * @param feInfo FileEncryptionInfo
+   * @return CryptoCodec
+   * @throws IOException if no suitable CryptoCodec for the CipherSuite is
+   *                     available.
+   */
+  private static CryptoCodec getCryptoCodec(Configuration conf,
+      FileEncryptionInfo feInfo) throws IOException {
+    final CipherSuite suite = feInfo.getCipherSuite();
+    if (suite.equals(CipherSuite.UNKNOWN)) {
+      throw new IOException("NameNode specified unknown CipherSuite with ID "
+          + suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
+    }
+    final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
+    if (codec == null) {
+      throw new UnknownCipherSuiteException(
+          "No configuration found for the cipher suite "
+          + suite.getConfigSuffix() + " prefixed with "
+          + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
+          + ". Please see the example configuration "
+          + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
+          + "at core-default.xml for details.");
+    }
+    return codec;
+  }
+
   /**
    * Wraps the stream in a CryptoInputStream if the underlying file is
    * encrypted.
@@ -1338,17 +1379,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
     if (feInfo != null) {
       // File is encrypted, wrap the stream in a crypto stream.
-      KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
-      CryptoCodec codec = CryptoCodec
-          .getInstance(conf, feInfo.getCipherSuite());
-      if (codec == null) {
-        throw new IOException("No configuration found for the cipher suite "
-            + feInfo.getCipherSuite().getConfigSuffix() + " prefixed with "
-            + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
-            + ". Please see the example configuration "
-            + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
-            + "at core-default.xml for details.");
-      }
+      // Currently only one version, so no special logic based on the version #
+      getCryptoProtocolVersion(feInfo);
+      final CryptoCodec codec = getCryptoCodec(conf, feInfo);
+      final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
       final CryptoInputStream cryptoIn =
           new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
               feInfo.getIV());
@@ -1376,15 +1410,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       FileSystem.Statistics statistics, long startPos) throws IOException {
     final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
     if (feInfo != null) {
-      if (codec == null) {
-        throw new IOException("No configuration found for the cipher suite "
-            + HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY + " value prefixed with "
-            + HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
-            + ". Please see the example configuration "
-            + "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
-            + "at core-default.xml for details.");
-      }
       // File is encrypted, wrap the stream in a crypto stream.
+      // Currently only one version, so no special logic based on the version #
+      getCryptoProtocolVersion(feInfo);
+      final CryptoCodec codec = getCryptoCodec(conf, feInfo);
       KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
       final CryptoOutputStream cryptoOut =
           new CryptoOutputStream(dfsos, codec,
@@ -1599,7 +1628,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
         buffersize, dfsClientConf.createChecksum(checksumOpt),
-        favoredNodeStrs, cipherSuites);
+        favoredNodeStrs);
     beginFileLease(result.getFileId(), result);
     return result;
   }
@@ -1646,7 +1675,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
       result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
-          checksum, null, cipherSuites);
+          checksum, null);
     }
     beginFileLease(result.getFileId(), result);
     return result;

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSOutputSummer;
@@ -54,6 +53,7 @@ import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -133,7 +133,10 @@ public class DFSOutputStream extends FSOutputSummer
    * errors (typically related to encryption zones and KeyProvider operations).
    */
   @VisibleForTesting
-  public static final int CREATE_RETRY_COUNT = 10;
+  static final int CREATE_RETRY_COUNT = 10;
+  @VisibleForTesting
+  static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
+      CryptoProtocolVersion.supported();
 
   private final DFSClient dfsClient;
   private final long dfsclientSlowLogThresholdMs;
@@ -1658,8 +1661,7 @@ public class DFSOutputStream extends FSOutputSummer
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize, Progressable progress, int buffersize,
-      DataChecksum checksum, String[] favoredNodes,
-      List<CipherSuite> cipherSuites) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     HdfsFileStatus stat = null;
 
     // Retry the create if we get a RetryStartFileException up to a maximum
@@ -1671,7 +1673,7 @@ public class DFSOutputStream extends FSOutputSummer
       try {
         stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
             new EnumSetWritable<CreateFlag>(flag), createParent, replication,
-            blockSize, cipherSuites);
+            blockSize, SUPPORTED_CRYPTO_VERSIONS);
         break;
       } catch (RemoteException re) {
         IOException e = re.unwrapRemoteException(
@@ -1685,7 +1687,7 @@ public class DFSOutputStream extends FSOutputSummer
             SafeModeException.class,
             UnresolvedPathException.class,
             SnapshotAccessControlException.class,
-            UnknownCipherSuiteException.class);
+            UnknownCryptoProtocolVersionException.class);
         if (e instanceof RetryStartFileException) {
           if (retryCount > 0) {
             shouldRetry = true;

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java

@@ -23,15 +23,12 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+/**
+ * Thrown when an unknown cipher suite is encountered.
+ */
 @InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceStability.Stable
 public class UnknownCipherSuiteException extends IOException {
-  private static final long serialVersionUID = 8957192l;
-
-  public UnknownCipherSuiteException() {
-    super();
-  }
-
   public UnknownCipherSuiteException(String msg) {
     super(msg);
   }

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnknownCryptoProtocolVersionException extends IOException {
+  private static final long serialVersionUID = 8957192l;
+
+  public UnknownCryptoProtocolVersionException() {
+    super();
+  }
+
+  public UnknownCryptoProtocolVersionException(String msg) {
+    super(msg);
+  }
+}

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
@@ -144,4 +145,15 @@ public interface ClientDatanodeProtocol {
    * @return software/config version and uptime of the datanode
    */
   DatanodeLocalInfo getDatanodeInfo() throws IOException;
+
+  /**
+   * Asynchronously reload configuration on disk and apply changes.
+   */
+  void startReconfiguration() throws IOException;
+
+  /**
+   * Get the status of the previously issued reconfig task.
+   * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
+   */
+  ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
 }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -163,6 +163,7 @@ public interface ClientProtocol {
    * @param createParent create missing parent directory if true
    * @param replication block replication factor.
    * @param blockSize maximum block size.
+   * @param supportedVersions CryptoProtocolVersions supported by the client
    * 
    * @return the status of the created file, it could be null if the server
    *           doesn't support returning the file status
@@ -191,7 +192,7 @@ public interface ClientProtocol {
   public HdfsFileStatus create(String src, FsPermission masked,
       String clientName, EnumSetWritable<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, 
-      List<CipherSuite> cipherSuites)
+      CryptoProtocolVersion[] supportedVersions)
       throws AccessControlException, AlreadyBeingCreatedException,
       DSQuotaExceededException, FileAlreadyExistsException,
       FileNotFoundException, NSQuotaExceededException,

+ 15 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZone.java

@@ -22,6 +22,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 
 /**
  * A simple class for representing an encryption zone. Presently an encryption
@@ -35,13 +36,15 @@ public class EncryptionZone {
   private final long id;
   private final String path;
   private final CipherSuite suite;
+  private final CryptoProtocolVersion version;
   private final String keyName;
 
-  public EncryptionZone(long id, String path,
-      CipherSuite suite, String keyName) {
+  public EncryptionZone(long id, String path, CipherSuite suite,
+      CryptoProtocolVersion version, String keyName) {
     this.id = id;
     this.path = path;
     this.suite = suite;
+    this.version = version;
     this.keyName = keyName;
   }
 
@@ -57,15 +60,20 @@ public class EncryptionZone {
     return suite;
   }
 
+  public CryptoProtocolVersion getVersion() { return version; }
+
   public String getKeyName() {
     return keyName;
   }
 
   @Override
   public int hashCode() {
-    return new HashCodeBuilder(13, 31).
-      append(id).append(path).
-      append(suite).append(keyName).
+    return new HashCodeBuilder(13, 31)
+        .append(id)
+        .append(path)
+        .append(suite)
+        .append(version)
+        .append(keyName).
       toHashCode();
   }
 
@@ -86,6 +94,7 @@ public class EncryptionZone {
       append(id, rhs.id).
       append(path, rhs.path).
       append(suite, rhs.suite).
+      append(version, rhs.version).
       append(keyName, rhs.keyName).
       isEquals();
   }
@@ -95,6 +104,7 @@ public class EncryptionZone {
     return "EncryptionZone [id=" + id +
         ", path=" + path +
         ", suite=" + suite +
+        ", version=" + version +
         ", keyName=" + keyName + "]";
   }
 }

+ 58 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java

@@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
@@ -32,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
@@ -41,11 +47,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Refres
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
@@ -66,6 +72,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       DeleteBlockPoolResponseProto.newBuilder().build();
   private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP =
       ShutdownDatanodeResponseProto.newBuilder().build();
+  private final static StartReconfigurationResponseProto START_RECONFIG_RESP =
+      StartReconfigurationResponseProto.newBuilder().build();
   
   private final ClientDatanodeProtocol impl;
 
@@ -182,4 +190,51 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
     }
     return res;
   }
+
+  @Override
+  public StartReconfigurationResponseProto startReconfiguration(
+      RpcController unused, StartReconfigurationRequestProto request)
+    throws ServiceException {
+    try {
+      impl.startReconfiguration();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return START_RECONFIG_RESP;
+  }
+
+  @Override
+  public GetReconfigurationStatusResponseProto getReconfigurationStatus(
+      RpcController unused, GetReconfigurationStatusRequestProto request)
+      throws ServiceException {
+    GetReconfigurationStatusResponseProto.Builder builder =
+        GetReconfigurationStatusResponseProto.newBuilder();
+    try {
+      ReconfigurationTaskStatus status = impl.getReconfigurationStatus();
+      builder.setStartTime(status.getStartTime());
+      if (status.stopped()) {
+        builder.setEndTime(status.getEndTime());
+        assert status.getStatus() != null;
+        for (Map.Entry<PropertyChange, Optional<String>> result :
+            status.getStatus().entrySet()) {
+          GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
+              GetReconfigurationStatusConfigChangeProto.newBuilder();
+          PropertyChange change = result.getKey();
+          changeBuilder.setName(change.prop);
+          changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
+          if (change.newVal != null) {
+            changeBuilder.setNewValue(change.newVal);
+          }
+          if (result.getValue().isPresent()) {
+            // Get full stack trace.
+            changeBuilder.setErrorMessage(result.getValue().get());
+          }
+          builder.addChanges(changeBuilder);
+        }
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return builder.build();
+  }
 }

+ 53 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java

@@ -21,16 +21,20 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import javax.net.SocketFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -48,8 +52,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdf
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -87,6 +94,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
       RefreshNamenodesRequestProto.newBuilder().build();
   private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
       GetDatanodeInfoRequestProto.newBuilder().build();
+  private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS =
+      GetReconfigurationStatusRequestProto.newBuilder().build();
+  private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
+      StartReconfigurationRequestProto.newBuilder().build();
 
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
       Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@@ -282,4 +293,44 @@ public class ClientDatanodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public void startReconfiguration() throws IOException {
+    try {
+      rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+    GetReconfigurationStatusResponseProto response;
+    Map<PropertyChange, Optional<String>> statusMap = null;
+    long startTime;
+    long endTime = 0;
+    try {
+      response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
+          VOID_GET_RECONFIG_STATUS);
+      startTime = response.getStartTime();
+      if (response.hasEndTime()) {
+        endTime = response.getEndTime();
+      }
+      if (response.getChangesCount() > 0) {
+        statusMap = Maps.newHashMap();
+        for (GetReconfigurationStatusConfigChangeProto change :
+            response.getChangesList()) {
+          PropertyChange pc = new PropertyChange(
+              change.getName(), change.getNewValue(), change.getOldValue());
+          String errorMessage = null;
+          if (change.hasErrorMessage()) {
+            errorMessage = change.getErrorMessage();
+          }
+          statusMap.put(pc, Optional.fromNullable(errorMessage));
+        }
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
+  }
 }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -394,8 +394,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       HdfsFileStatus result = server.create(req.getSrc(),
           PBHelper.convert(req.getMasked()), req.getClientName(),
           PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
-          (short) req.getReplication(), req.getBlockSize(), 
-          PBHelper.convertCipherSuiteProtos(req.getCipherSuitesList()));
+          (short) req.getReplication(), req.getBlockSize(),
+          PBHelper.convertCryptoProtocolVersions(
+              req.getCryptoProtocolVersionList()));
 
       if (result != null) {
         return CreateResponseProto.newBuilder().setFs(PBHelper.convert(result))

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -274,7 +275,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public HdfsFileStatus create(String src, FsPermission masked,
       String clientName, EnumSetWritable<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, 
-      List<CipherSuite> cipherSuites)
+      CryptoProtocolVersion[] supportedVersions)
       throws AccessControlException, AlreadyBeingCreatedException,
       DSQuotaExceededException, FileAlreadyExistsException,
       FileNotFoundException, NSQuotaExceededException,
@@ -288,9 +289,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setCreateParent(createParent)
         .setReplication(replication)
         .setBlockSize(blockSize);
-    if (cipherSuites != null) {
-      builder.addAllCipherSuites(PBHelper.convertCipherSuites(cipherSuites));
-    }
+    builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
     CreateRequestProto req = builder.build();
     try {
       CreateResponseProto res = rpcProxy.create(null, req);

+ 61 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
     .EncryptionZoneProto;
+import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
+import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
@@ -2404,15 +2407,17 @@ public class PBHelper {
   public static EncryptionZoneProto convert(EncryptionZone zone) {
     return EncryptionZoneProto.newBuilder()
         .setId(zone.getId())
-        .setKeyName(zone.getKeyName())
         .setPath(zone.getPath())
         .setSuite(convert(zone.getSuite()))
+        .setCryptoProtocolVersion(convert(zone.getVersion()))
+        .setKeyName(zone.getKeyName())
         .build();
   }
 
   public static EncryptionZone convert(EncryptionZoneProto proto) {
     return new EncryptionZone(proto.getId(), proto.getPath(),
-        convert(proto.getSuite()), proto.getKeyName());
+        convert(proto.getSuite()), convert(proto.getCryptoProtocolVersion()),
+        proto.getKeyName());
   }
 
   public static ShortCircuitShmSlotProto convert(SlotId slotId) {
@@ -2682,18 +2687,18 @@ public class PBHelper {
         builder.build()).build();
   }
 
-  public static HdfsProtos.CipherSuite convert(CipherSuite suite) {
+  public static CipherSuiteProto convert(CipherSuite suite) {
     switch (suite) {
     case UNKNOWN:
-      return HdfsProtos.CipherSuite.UNKNOWN;
+      return CipherSuiteProto.UNKNOWN;
     case AES_CTR_NOPADDING:
-      return HdfsProtos.CipherSuite.AES_CTR_NOPADDING;
+      return CipherSuiteProto.AES_CTR_NOPADDING;
     default:
       return null;
     }
   }
 
-  public static CipherSuite convert(HdfsProtos.CipherSuite proto) {
+  public static CipherSuite convert(CipherSuiteProto proto) {
     switch (proto) {
     case AES_CTR_NOPADDING:
       return CipherSuite.AES_CTR_NOPADDING;
@@ -2705,26 +2710,49 @@ public class PBHelper {
     }
   }
 
-  public static List<HdfsProtos.CipherSuite> convertCipherSuites
-      (List<CipherSuite> suites) {
-    if (suites == null) {
-      return null;
-    }
-    List<HdfsProtos.CipherSuite> protos =
-        Lists.newArrayListWithCapacity(suites.size());
-    for (CipherSuite suite : suites) {
-      protos.add(convert(suite));
+  public static List<CryptoProtocolVersionProto> convert(
+      CryptoProtocolVersion[] versions) {
+    List<CryptoProtocolVersionProto> protos =
+        Lists.newArrayListWithCapacity(versions.length);
+    for (CryptoProtocolVersion v: versions) {
+      protos.add(convert(v));
     }
     return protos;
   }
 
-  public static List<CipherSuite> convertCipherSuiteProtos(
-      List<HdfsProtos.CipherSuite> protos) {
-    List<CipherSuite> suites = Lists.newArrayListWithCapacity(protos.size());
-    for (HdfsProtos.CipherSuite proto : protos) {
-      suites.add(convert(proto));
+  public static CryptoProtocolVersion[] convertCryptoProtocolVersions(
+      List<CryptoProtocolVersionProto> protos) {
+    List<CryptoProtocolVersion> versions =
+        Lists.newArrayListWithCapacity(protos.size());
+    for (CryptoProtocolVersionProto p: protos) {
+      versions.add(convert(p));
+    }
+    return versions.toArray(new CryptoProtocolVersion[] {});
+  }
+
+  public static CryptoProtocolVersion convert(CryptoProtocolVersionProto
+      proto) {
+    switch(proto) {
+    case ENCRYPTION_ZONES:
+      return CryptoProtocolVersion.ENCRYPTION_ZONES;
+    default:
+      // Set to UNKNOWN and stash the unknown enum value
+      CryptoProtocolVersion version = CryptoProtocolVersion.UNKNOWN;
+      version.setUnknownValue(proto.getNumber());
+      return version;
+    }
+  }
+
+  public static CryptoProtocolVersionProto convert(CryptoProtocolVersion
+      version) {
+    switch(version) {
+    case UNKNOWN:
+      return CryptoProtocolVersionProto.UNKNOWN_PROTOCOL_VERSION;
+    case ENCRYPTION_ZONES:
+      return CryptoProtocolVersionProto.ENCRYPTION_ZONES;
+    default:
+      return null;
     }
-    return suites;
   }
 
   public static HdfsProtos.FileEncryptionInfoProto convert(
@@ -2734,6 +2762,7 @@ public class PBHelper {
     }
     return HdfsProtos.FileEncryptionInfoProto.newBuilder()
         .setSuite(convert(info.getCipherSuite()))
+        .setCryptoProtocolVersion(convert(info.getCryptoProtocolVersion()))
         .setKey(getByteString(info.getEncryptedDataEncryptionKey()))
         .setIv(getByteString(info.getIV()))
         .setEzKeyVersionName(info.getEzKeyVersionName())
@@ -2754,12 +2783,13 @@ public class PBHelper {
   }
 
   public static HdfsProtos.ZoneEncryptionInfoProto convert(
-      CipherSuite suite, String keyName) {
-    if (suite == null || keyName == null) {
+      CipherSuite suite, CryptoProtocolVersion version, String keyName) {
+    if (suite == null || version == null || keyName == null) {
       return null;
     }
     return HdfsProtos.ZoneEncryptionInfoProto.newBuilder()
         .setSuite(convert(suite))
+        .setCryptoProtocolVersion(convert(version))
         .setKeyName(keyName)
         .build();
   }
@@ -2770,23 +2800,27 @@ public class PBHelper {
       return null;
     }
     CipherSuite suite = convert(proto.getSuite());
+    CryptoProtocolVersion version = convert(proto.getCryptoProtocolVersion());
     byte[] key = proto.getKey().toByteArray();
     byte[] iv = proto.getIv().toByteArray();
     String ezKeyVersionName = proto.getEzKeyVersionName();
     String keyName = proto.getKeyName();
-    return new FileEncryptionInfo(suite, key, iv, keyName, ezKeyVersionName);
+    return new FileEncryptionInfo(suite, version, key, iv, keyName,
+        ezKeyVersionName);
   }
 
   public static FileEncryptionInfo convert(
       HdfsProtos.PerFileEncryptionInfoProto fileProto,
-      CipherSuite suite, String keyName) {
-    if (fileProto == null || suite == null || keyName == null) {
+      CipherSuite suite, CryptoProtocolVersion version, String keyName) {
+    if (fileProto == null || suite == null || version == null ||
+        keyName == null) {
       return null;
     }
     byte[] key = fileProto.getKey().toByteArray();
     byte[] iv = fileProto.getIv().toByteArray();
     String ezKeyVersionName = fileProto.getEzKeyVersionName();
-    return new FileEncryptionInfo(suite, key, iv, keyName, ezKeyVersionName);
+    return new FileEncryptionInfo(suite, version, key, iv, keyName,
+        ezKeyVersionName);
   }
 
 }

+ 17 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -990,7 +990,7 @@ public class Journal implements Closeable {
   public synchronized void doPreUpgrade() throws IOException {
     // Do not hold file lock on committedTxnId, because the containing
     // directory will be renamed.  It will be reopened lazily on next access.
-    committedTxnId.close();
+    IOUtils.cleanup(LOG, committedTxnId);
     storage.getJournalManager().doPreUpgrade();
   }
 
@@ -1015,14 +1015,25 @@ public class Journal implements Closeable {
         new File(previousDir, LAST_PROMISED_FILENAME), 0);
     PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
         new File(previousDir, LAST_WRITER_EPOCH), 0);
-    
+    BestEffortLongFile prevCommittedTxnId = new BestEffortLongFile(
+        new File(previousDir, COMMITTED_TXID_FILENAME),
+        HdfsConstants.INVALID_TXID);
+
     lastPromisedEpoch = new PersistentLongFile(
         new File(currentDir, LAST_PROMISED_FILENAME), 0);
     lastWriterEpoch = new PersistentLongFile(
         new File(currentDir, LAST_WRITER_EPOCH), 0);
-    
-    lastPromisedEpoch.set(prevLastPromisedEpoch.get());
-    lastWriterEpoch.set(prevLastWriterEpoch.get());
+    committedTxnId = new BestEffortLongFile(
+        new File(currentDir, COMMITTED_TXID_FILENAME),
+        HdfsConstants.INVALID_TXID);
+
+    try {
+      lastPromisedEpoch.set(prevLastPromisedEpoch.get());
+      lastWriterEpoch.set(prevLastWriterEpoch.get());
+      committedTxnId.set(prevCommittedTxnId.get());
+    } finally {
+      IOUtils.cleanup(LOG, prevCommittedTxnId);
+    }
   }
 
   public synchronized void doFinalize() throws IOException {
@@ -1043,7 +1054,7 @@ public class Journal implements Closeable {
   public synchronized void doRollback() throws IOException {
     // Do not hold file lock on committedTxnId, because the containing
     // directory will be renamed.  It will be reopened lazily on next access.
-    committedTxnId.close();
+    IOUtils.cleanup(LOG, committedTxnId);
     storage.getJournalManager().doRollback();
   }
 

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java

@@ -26,6 +26,7 @@ import java.util.Map;
 
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -92,8 +93,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
     
     return journal;
   }
-  
-  Journal getOrCreateJournal(String jid) throws IOException {
+
+  @VisibleForTesting
+  public Journal getOrCreateJournal(String jid) throws IOException {
     return getOrCreateJournal(jid, StartupOption.REGULAR);
   }
 

+ 42 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -90,6 +90,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurableBase;
 import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -336,6 +337,21 @@ public class DataNode extends ReconfigurableBase
 
   private SpanReceiverHost spanReceiverHost;
 
+  /**
+   * Creates a dummy DataNode for testing purpose.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  DataNode(final Configuration conf) {
+    super(conf);
+    this.fileDescriptorPassingDisabledReason = null;
+    this.maxNumberOfBlocksToLog = 0;
+    this.confVersion = null;
+    this.usersWithLocalPathAccess = null;
+    this.connectToDnViaHostname = false;
+    this.getHdfsBlockLocationsEnabled = false;
+  }
+
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
    * and a namenode proxy
@@ -478,7 +494,6 @@ public class DataNode extends ReconfigurableBase
    */
   private synchronized void refreshVolumes(String newVolumes) throws Exception {
     Configuration conf = getConf();
-    String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
     List<StorageLocation> locations = getStorageLocations(conf);
 
@@ -486,6 +501,7 @@ public class DataNode extends ReconfigurableBase
     dataDirs = locations;
     ChangedVolumes changedVolumes = parseChangedVolumes();
 
+    StringBuilder errorMessageBuilder = new StringBuilder();
     try {
       if (numOldDataDirs + changedVolumes.newLocations.size() -
           changedVolumes.deactivateLocations.size() <= 0) {
@@ -514,8 +530,13 @@ public class DataNode extends ReconfigurableBase
           // Clean all failed volumes.
           for (StorageLocation location : changedVolumes.newLocations) {
             if (!succeedVolumes.contains(location)) {
+              errorMessageBuilder.append("FAILED TO ADD:");
               failedVolumes.add(location);
+            } else {
+              errorMessageBuilder.append("ADDED:");
             }
+            errorMessageBuilder.append(location);
+            errorMessageBuilder.append("\n");
           }
           storage.removeVolumes(failedVolumes);
           data.removeVolumes(failedVolumes);
@@ -529,10 +550,12 @@ public class DataNode extends ReconfigurableBase
         data.removeVolumes(changedVolumes.deactivateLocations);
         storage.removeVolumes(changedVolumes.deactivateLocations);
       }
+
+      if (errorMessageBuilder.length() > 0) {
+        throw new IOException(errorMessageBuilder.toString());
+      }
     } catch (IOException e) {
-      LOG.warn("There is IOException when refreshing volumes! "
-          + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
-          + " = " + oldVolumes, e);
+      LOG.warn("There is IOException when refresh volumes! ", e);
       throw e;
     }
   }
@@ -1594,6 +1617,9 @@ public class DataNode extends ReconfigurableBase
     // before the restart prep is done.
     this.shouldRun = false;
     
+    // wait reconfiguration thread, if any, to exit
+    shutdownReconfigurationTask();
+
     // wait for all data receiver threads to exit
     if (this.threadGroup != null) {
       int sleepMs = 2;
@@ -2849,6 +2875,18 @@ public class DataNode extends ReconfigurableBase
         confVersion, uptime);
   }
 
+  @Override // ClientDatanodeProtocol
+  public void startReconfiguration() throws IOException {
+    checkSuperuserPrivilege();
+    startReconfigurationTask();
+  }
+
+  @Override // ClientDatanodeProtocol
+  public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+    checkSuperuserPrivilege();
+    return getReconfigurationTaskStatus();
+  }
+
   /**
    * @param addr rpc address of the namenode
    * @return true if the datanode is connected to a NameNode at the

+ 21 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java

@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -57,7 +58,8 @@ public class EncryptionZoneManager {
       .class);
 
   public static final EncryptionZone NULL_EZ =
-      new EncryptionZone(-1, "", CipherSuite.UNKNOWN, "");
+      new EncryptionZone(-1, "", CipherSuite.UNKNOWN,
+          CryptoProtocolVersion.UNKNOWN, "");
 
   /**
    * EncryptionZoneInt is the internal representation of an encryption zone. The
@@ -67,11 +69,16 @@ public class EncryptionZoneManager {
   private static class EncryptionZoneInt {
     private final long inodeId;
     private final CipherSuite suite;
+    private final CryptoProtocolVersion version;
     private final String keyName;
 
-    EncryptionZoneInt(long inodeId, CipherSuite suite, String keyName) {
+    EncryptionZoneInt(long inodeId, CipherSuite suite,
+        CryptoProtocolVersion version, String keyName) {
+      Preconditions.checkArgument(suite != CipherSuite.UNKNOWN);
+      Preconditions.checkArgument(version != CryptoProtocolVersion.UNKNOWN);
       this.inodeId = inodeId;
       this.suite = suite;
+      this.version = version;
       this.keyName = keyName;
     }
 
@@ -83,6 +90,8 @@ public class EncryptionZoneManager {
       return suite;
     }
 
+    CryptoProtocolVersion getVersion() { return version; }
+
     String getKeyName() {
       return keyName;
     }
@@ -118,9 +127,10 @@ public class EncryptionZoneManager {
    * @param inodeId of the encryption zone
    * @param keyName encryption zone key name
    */
-  void addEncryptionZone(Long inodeId, CipherSuite suite, String keyName) {
+  void addEncryptionZone(Long inodeId, CipherSuite suite,
+      CryptoProtocolVersion version, String keyName) {
     assert dir.hasWriteLock();
-    unprotectedAddEncryptionZone(inodeId, suite, keyName);
+    unprotectedAddEncryptionZone(inodeId, suite, version, keyName);
   }
 
   /**
@@ -132,9 +142,9 @@ public class EncryptionZoneManager {
    * @param keyName encryption zone key name
    */
   void unprotectedAddEncryptionZone(Long inodeId,
-      CipherSuite suite, String keyName) {
+      CipherSuite suite, CryptoProtocolVersion version, String keyName) {
     final EncryptionZoneInt ez = new EncryptionZoneInt(
-        inodeId, suite, keyName);
+        inodeId, suite, version, keyName);
     encryptionZones.put(inodeId, ez);
   }
 
@@ -219,7 +229,7 @@ public class EncryptionZoneManager {
       return NULL_EZ;
     } else {
       return new EncryptionZone(ezi.getINodeId(), getFullPathName(ezi),
-          ezi.getSuite(), ezi.getKeyName());
+          ezi.getSuite(), ezi.getVersion(), ezi.getKeyName());
     }
   }
 
@@ -275,7 +285,8 @@ public class EncryptionZoneManager {
    * <p/>
    * Called while holding the FSDirectory lock.
    */
-  XAttr createEncryptionZone(String src, CipherSuite suite, String keyName)
+  XAttr createEncryptionZone(String src, CipherSuite suite,
+      CryptoProtocolVersion version, String keyName)
       throws IOException {
     assert dir.hasWriteLock();
     if (dir.isNonEmptyDirectory(src)) {
@@ -296,7 +307,7 @@ public class EncryptionZoneManager {
     }
 
     final HdfsProtos.ZoneEncryptionInfoProto proto =
-        PBHelper.convert(suite, keyName);
+        PBHelper.convert(suite, version, keyName);
     final XAttr ezXAttr = XAttrHelper
         .buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, proto.toByteArray());
 
@@ -341,7 +352,7 @@ public class EncryptionZoneManager {
       }
       // Add the EZ to the result list
       zones.add(new EncryptionZone(ezi.getINodeId(), pathName,
-          ezi.getSuite(), ezi.getKeyName()));
+          ezi.getSuite(), ezi.getVersion(), ezi.getKeyName()));
       count++;
       if (count >= numResponses) {
         break;

+ 10 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
@@ -2180,6 +2181,7 @@ public class FSDirectory implements Closeable {
                         xattr.getValue());
                 ezManager.unprotectedAddEncryptionZone(inode.getId(),
                     PBHelper.convert(ezProto.getSuite()),
+                    PBHelper.convert(ezProto.getCryptoProtocolVersion()),
                     ezProto.getKeyName());
               } catch (InvalidProtocolBufferException e) {
                 NameNode.LOG.warn("Error parsing protocol buffer of " +
@@ -2775,11 +2777,12 @@ public class FSDirectory implements Closeable {
     }
   }
 
-  XAttr createEncryptionZone(String src, CipherSuite suite, String keyName)
+  XAttr createEncryptionZone(String src, CipherSuite suite,
+      CryptoProtocolVersion version, String keyName)
     throws IOException {
     writeLock();
     try {
-      return ezManager.createEncryptionZone(src, suite, keyName);
+      return ezManager.createEncryptionZone(src, suite, version, keyName);
     } finally {
       writeUnlock();
     }
@@ -2863,8 +2866,9 @@ public class FSDirectory implements Closeable {
         }
       }
 
-      CipherSuite suite = encryptionZone.getSuite();
-      String keyName = encryptionZone.getKeyName();
+      final CryptoProtocolVersion version = encryptionZone.getVersion();
+      final CipherSuite suite = encryptionZone.getSuite();
+      final String keyName = encryptionZone.getKeyName();
 
       XAttr fileXAttr = unprotectedGetXAttrByName(inode, snapshotId,
           CRYPTO_XATTR_FILE_ENCRYPTION_INFO);
@@ -2880,7 +2884,7 @@ public class FSDirectory implements Closeable {
         HdfsProtos.PerFileEncryptionInfoProto fileProto =
             HdfsProtos.PerFileEncryptionInfoProto.parseFrom(
                 fileXAttr.getValue());
-        return PBHelper.convert(fileProto, suite, keyName);
+        return PBHelper.convert(fileProto, suite, version, keyName);
       } catch (InvalidProtocolBufferException e) {
         throw new IOException("Could not parse file encryption info for " +
             "inode " + inode, e);
@@ -2923,6 +2927,7 @@ public class FSDirectory implements Closeable {
             HdfsProtos.ZoneEncryptionInfoProto.parseFrom(xattr.getValue());
         ezManager.addEncryptionZone(inode.getId(),
             PBHelper.convert(ezProto.getSuite()),
+            PBHelper.convert(ezProto.getCryptoProtocolVersion()),
             ezProto.getKeyName());
       }
 

+ 53 - 48
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -134,6 +134,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -166,8 +167,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
 import org.apache.hadoop.hdfs.XAttrHelper;
-import org.apache.hadoop.hdfs.UnknownCipherSuiteException;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -2387,46 +2388,41 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
   /**
    * If the file is within an encryption zone, select the appropriate 
-   * CipherSuite from the list provided by the client. Since the client may 
-   * be newer, need to handle unknown CipherSuites.
+   * CryptoProtocolVersion from the list provided by the client. Since the
+   * client may be newer, we need to handle unknown versions.
    *
-   * @param srcIIP path of the file
-   * @param cipherSuites client-provided list of supported CipherSuites, 
-   *                     in desired order.
-   * @return chosen CipherSuite, or null if file is not in an EncryptionZone
+   * @param zone EncryptionZone of the file
+   * @param supportedVersions List of supported protocol versions
+   * @return chosen protocol version
    * @throws IOException
    */
-  private CipherSuite chooseCipherSuite(INodesInPath srcIIP, List<CipherSuite>
-      cipherSuites)
-      throws UnknownCipherSuiteException, UnresolvedLinkException,
+  private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone,
+      CryptoProtocolVersion[] supportedVersions)
+      throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
         SnapshotAccessControlException {
-    // Not in an EZ
-    if (!dir.isInAnEZ(srcIIP)) {
-      return null;
-    }
-    CipherSuite chosen = null;
-    for (CipherSuite c : cipherSuites) {
-      if (c.equals(CipherSuite.UNKNOWN)) {
+    Preconditions.checkNotNull(zone);
+    Preconditions.checkNotNull(supportedVersions);
+    // Right now, we only support a single protocol version,
+    // so simply look for it in the list of provided options
+    final CryptoProtocolVersion required = zone.getVersion();
+
+    for (CryptoProtocolVersion c : supportedVersions) {
+      if (c.equals(CryptoProtocolVersion.UNKNOWN)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Ignoring unknown CipherSuite provided by client: "
-              + c.getUnknownValue());
+          LOG.debug("Ignoring unknown CryptoProtocolVersion provided by " +
+              "client: " + c.getUnknownValue());
         }
         continue;
       }
-      for (CipherSuite supported : CipherSuite.values()) {
-        if (supported.equals(c)) {
-          chosen = c;
-          break;
-        }
+      if (c.equals(required)) {
+        return c;
       }
     }
-    if (chosen == null) {
-      throw new UnknownCipherSuiteException(
-          "No cipher suites provided by the client are supported."
-              + " Client provided: " + Arrays.toString(cipherSuites.toArray())
-              + " NameNode supports: " + Arrays.toString(CipherSuite.values()));
-    }
-    return chosen;
+    throw new UnknownCryptoProtocolVersionException(
+        "No crypto protocol versions provided by the client are supported."
+            + " Client provided: " + Arrays.toString(supportedVersions)
+            + " NameNode supports: " + Arrays.toString(CryptoProtocolVersion
+            .values()));
   }
 
   /**
@@ -2462,7 +2458,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   HdfsFileStatus startFile(String src, PermissionStatus permissions,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, 
-      List<CipherSuite> cipherSuites)
+      CryptoProtocolVersion[] supportedVersions)
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
@@ -2475,7 +2471,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     
     try {
       status = startFileInt(src, permissions, holder, clientMachine, flag,
-          createParent, replication, blockSize, cipherSuites,
+          createParent, replication, blockSize, supportedVersions,
           cacheEntry != null);
     } catch (AccessControlException e) {
       logAuditEvent(false, "create", src);
@@ -2489,7 +2485,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private HdfsFileStatus startFileInt(final String srcArg,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
-      long blockSize, List<CipherSuite> cipherSuites, boolean logRetryCache)
+      long blockSize, CryptoProtocolVersion[] supportedVersions,
+      boolean logRetryCache)
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
@@ -2503,9 +2500,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
               + ", replication=" + replication
               + ", createFlag=" + flag.toString()
               + ", blockSize=" + blockSize);
-      builder.append(", cipherSuites=");
-      if (cipherSuites != null) {
-        builder.append(Arrays.toString(cipherSuites.toArray()));
+      builder.append(", supportedVersions=");
+      if (supportedVersions != null) {
+        builder.append(Arrays.toString(supportedVersions));
       } else {
         builder.append("null");
       }
@@ -2544,6 +2541,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
      * special RetryStartFileException to ask the DFSClient to try the create
      * again later.
      */
+    CryptoProtocolVersion protocolVersion = null;
     CipherSuite suite = null;
     String ezKeyName = null;
     readLock();
@@ -2552,13 +2550,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       INodesInPath iip = dir.getINodesInPath4Write(src);
       // Nothing to do if the path is not within an EZ
       if (dir.isInAnEZ(iip)) {
-        suite = chooseCipherSuite(iip, cipherSuites);
-        if (suite != null) {
-          Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
-              "Chose an UNKNOWN CipherSuite!");
-        }
+        EncryptionZone zone = dir.getEZForPath(iip);
+        protocolVersion = chooseProtocolVersion(zone, supportedVersions);
+        suite = zone.getSuite();
         ezKeyName = dir.getKeyName(iip);
-        Preconditions.checkState(ezKeyName != null);
+
+        Preconditions.checkNotNull(protocolVersion);
+        Preconditions.checkNotNull(suite);
+        Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+            "Chose an UNKNOWN CipherSuite!");
+        Preconditions.checkNotNull(ezKeyName);
       }
     } finally {
       readUnlock();
@@ -2584,7 +2585,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       src = resolvePath(src, pathComponents);
       toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
           clientMachine, create, overwrite, createParent, replication, 
-          blockSize, isLazyPersist, suite, edek, logRetryCache);
+          blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
       stat = dir.getFileInfo(src, false,
           FSDirectory.isReservedRawName(srcArg), false);
     } catch (StandbyException se) {
@@ -2620,8 +2621,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       String src, PermissionStatus permissions, String holder, 
       String clientMachine, boolean create, boolean overwrite, 
       boolean createParent, short replication, long blockSize, 
-       boolean isLazyPersist, CipherSuite suite,
-       EncryptedKeyVersion edek, boolean logRetryEntry)
+      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
+      EncryptedKeyVersion edek, boolean logRetryEntry)
       throws FileAlreadyExistsException, AccessControlException,
       UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, RetryStartFileException, IOException {
@@ -2646,7 +2647,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
         throw new RetryStartFileException();
       }
-      feInfo = new FileEncryptionInfo(suite,
+      feInfo = new FileEncryptionInfo(suite, version,
           edek.getEncryptedKeyVersion().getMaterial(),
           edek.getEncryptedKeyIv(),
           ezKeyName, edek.getEncryptionKeyVersionName());
@@ -8778,7 +8779,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       src = resolvePath(src, pathComponents);
 
       final CipherSuite suite = CipherSuite.convert(cipher);
-      final XAttr ezXAttr = dir.createEncryptionZone(src, suite, keyName);
+      // For now this is hardcoded, as we only support one method.
+      final CryptoProtocolVersion version =
+          CryptoProtocolVersion.ENCRYPTION_ZONES;
+      final XAttr ezXAttr = dir.createEncryptionZone(src, suite,
+          version, keyName);
       List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
       xAttrs.add(ezXAttr);
       getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -542,7 +542,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public HdfsFileStatus create(String src, FsPermission masked,
       String clientName, EnumSetWritable<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, 
-      List<CipherSuite> cipherSuites)
+      CryptoProtocolVersion[] supportedVersions)
       throws IOException {
     String clientMachine = getClientMachine();
     if (stateChangeLog.isDebugEnabled()) {
@@ -556,7 +556,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
         getRemoteUser().getShortUserName(), null, masked),
         clientName, clientMachine, flag.get(), createParent, replication,
-        blockSize, cipherSuites);
+        blockSize, supportedVersions);
     metrics.incrFilesCreated();
     metrics.incrCreateFileOps();
     return fileStatus;

+ 94 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -29,14 +29,19 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 
+import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
@@ -378,6 +383,7 @@ public class DFSAdmin extends FsShell {
     "\t[-refreshSuperUserGroupsConfiguration]\n" +
     "\t[-refreshCallQueue]\n" +
     "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
+    "\t[-reconfig <datanode|...> <host:ipc_port> <start|status>]\n" +
     "\t[-printTopology]\n" +
     "\t[-refreshNamenodes datanode_host:ipc_port]\n"+
     "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@@ -915,9 +921,14 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
 
+    String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status>:\n" +
+        "\tStarts reconfiguration or gets the status of an ongoing reconfiguration.\n" +
+        "\tThe second parameter specifies the node type.\n" +
+        "\tCurrently, only reloading DataNode's configuration is supported.\n";
+
     String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
       "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
-      "\ton <hostname:port>. All other args after are sent to the host.";
+      "\ton <hostname:port>. All other args after are sent to the host.\n";
 
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
@@ -1011,6 +1022,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshCallQueue);
     } else if ("refresh".equals(cmd)) {
       System.out.println(genericRefresh);
+    } else if ("reconfig".equals(cmd)) {
+      System.out.println(reconfig);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -1055,6 +1068,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshSuperUserGroupsConfiguration);
       System.out.println(refreshCallQueue);
       System.out.println(genericRefresh);
+      System.out.println(reconfig);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -1364,6 +1378,75 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int reconfig(String[] argv, int i) throws IOException {
+    String nodeType = argv[i];
+    String address = argv[i + 1];
+    String op = argv[i + 2];
+    if ("start".equals(op)) {
+      return startReconfiguration(nodeType, address);
+    } else if ("status".equals(op)) {
+      return getReconfigurationStatus(nodeType, address, System.out, System.err);
+    }
+    System.err.println("Unknown operation: " + op);
+    return -1;
+  }
+
+  int startReconfiguration(String nodeType, String address) throws IOException {
+    if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+      dnProxy.startReconfiguration();
+      System.out.println("Started reconfiguration task on DataNode " + address);
+    } else {
+      System.err.println("Node type " + nodeType +
+          " does not support reconfiguration.");
+    }
+    return -1;
+  }
+
+  int getReconfigurationStatus(String nodeType, String address,
+      PrintStream out, PrintStream err) throws IOException {
+    if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+      try {
+        ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
+        out.print("Reconfiguring status for DataNode[" + address + "]: ");
+        if (!status.hasTask()) {
+          out.println("no task was found.");
+          return 0;
+        }
+        out.print("started at " + new Date(status.getStartTime()));
+        if (!status.stopped()) {
+          out.println(" and is still running.");
+          return 0;
+        }
+
+        out.println(" and finished at " +
+            new Date(status.getEndTime()).toString() + ".");
+        for (Map.Entry<PropertyChange, Optional<String>> result :
+            status.getStatus().entrySet()) {
+          if (!result.getValue().isPresent()) {
+            out.print("SUCCESS: ");
+          } else {
+            out.print("FAILED: ");
+          }
+          out.printf("Change property %s\n\tFrom: \"%s\"\n\tTo: \"%s\"\n",
+              result.getKey().prop, result.getKey().oldVal,
+              result.getKey().newVal);
+          if (result.getValue().isPresent()) {
+            out.println("\tError: " + result.getValue().get() + ".");
+          }
+        }
+      } catch (IOException e) {
+        err.println("DataNode reloading configuration: " + e + ".");
+        return -1;
+      }
+    } else {
+      err.println("Node type " + nodeType + " does not support reconfiguration.");
+      return -1;
+    }
+    return 0;
+  }
+
   public int genericRefresh(String[] argv, int i) throws IOException {
     String hostport = argv[i++];
     String identifier = argv[i++];
@@ -1482,6 +1565,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshCallQueue".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
                          + " [-refreshCallQueue]");
+    } else if ("-reconfig".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-reconfig <datanode|...> <host:port> <start|status>]");
     } else if ("-refresh".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
                          + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
@@ -1614,6 +1700,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-reconfig".equals(cmd)) {
+      if (argv.length != 4) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-deleteBlockPool".equals(cmd)) {
       if ((argv.length != 3) && (argv.length != 4)) {
         printUsage(cmd);
@@ -1720,6 +1811,8 @@ public class DFSAdmin extends FsShell {
         exitCode = shutdownDatanode(argv, i);
       } else if ("-getDatanodeInfo".equals(cmd)) {
         exitCode = getDatanodeInfo(argv, i);
+      } else if ("-reconfig".equals(cmd)) {
+        exitCode = reconfig(argv, i);
       } else if ("-setStoragePolicy".equals(cmd)) {
         exitCode = setStoragePolicy(argv);
       } else if ("-getStoragePolicy".equals(cmd)) {

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java

@@ -26,6 +26,8 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 
 /**
  * A FileOutputStream that has the property that it will only show
@@ -73,9 +75,15 @@ public class AtomicFileOutputStream extends FilterOutputStream {
         boolean renamed = tmpFile.renameTo(origFile);
         if (!renamed) {
           // On windows, renameTo does not replace.
-          if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
-            throw new IOException("Could not rename temporary file " +
-                tmpFile + " to " + origFile);
+          if (origFile.exists() && !origFile.delete()) {
+            throw new IOException("Could not delete original file " + origFile);
+          }
+          try {
+            NativeIO.renameTo(tmpFile, origFile);
+          } catch (NativeIOException e) {
+            throw new IOException("Could not rename temporary file " + tmpFile
+              + " to " + origFile + " due to failure in native rename. "
+              + e.toString());
           }
         }
       } else {

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto

@@ -149,6 +149,30 @@ message GetDatanodeInfoResponseProto {
   required DatanodeLocalInfoProto localInfo = 1;
 }
 
+/** Asks DataNode to reload configuration file. */
+message StartReconfigurationRequestProto {
+}
+
+message StartReconfigurationResponseProto {
+}
+
+/** Query the running status of reconfiguration process */
+message GetReconfigurationStatusRequestProto {
+}
+
+message GetReconfigurationStatusConfigChangeProto {
+  required string name = 1;
+  required string oldValue = 2;
+  optional string newValue = 3;
+  optional string errorMessage = 4;  // It is empty if success.
+}
+
+message GetReconfigurationStatusResponseProto {
+  required int64 startTime = 1;
+  optional int64 endTime = 2;
+  repeated GetReconfigurationStatusConfigChangeProto changes = 3;
+}
+
 /**
  * Protocol used from client to the Datanode.
  * See the request and response for details of rpc call.
@@ -192,4 +216,10 @@ service ClientDatanodeProtocolService {
 
   rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
       returns(GetDatanodeInfoResponseProto);
+
+  rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
+      returns(GetReconfigurationStatusResponseProto);
+
+  rpc startReconfiguration(StartReconfigurationRequestProto)
+      returns(StartReconfigurationResponseProto);
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -76,7 +76,7 @@ message CreateRequestProto {
   required bool createParent = 5;
   required uint32 replication = 6; // Short: Only 16 bits used
   required uint64 blockSize = 7;
-  repeated CipherSuite cipherSuites = 8;
+  repeated CryptoProtocolVersionProto cryptoProtocolVersion = 8;
 }
 
 message CreateResponseProto {

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/encryption.proto

@@ -48,8 +48,9 @@ message ListEncryptionZonesRequestProto {
 message EncryptionZoneProto {
   required int64 id = 1;
   required string path = 2;
-  required CipherSuite suite = 3;
-  required string keyName = 4;
+  required CipherSuiteProto suite = 3;
+  required CryptoProtocolVersionProto cryptoProtocolVersion = 4;
+  required string keyName = 5;
 }
 
 message ListEncryptionZonesResponseProto {

+ 18 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -219,20 +219,29 @@ message DataEncryptionKeyProto {
 /**
  * Cipher suite.
  */
-enum CipherSuite {
+enum CipherSuiteProto {
     UNKNOWN = 1;
     AES_CTR_NOPADDING = 2;
 }
 
+/**
+ * Crypto protocol version used to access encrypted files.
+ */
+enum CryptoProtocolVersionProto {
+    UNKNOWN_PROTOCOL_VERSION = 1;
+    ENCRYPTION_ZONES = 2;
+}
+
 /**
  * Encryption information for a file.
  */
 message FileEncryptionInfoProto {
-  required CipherSuite suite = 1;
-  required bytes key = 2;
-  required bytes iv = 3;
-  required string keyName = 4;
-  required string ezKeyVersionName = 5;
+  required CipherSuiteProto suite = 1;
+  required CryptoProtocolVersionProto cryptoProtocolVersion = 2;
+  required bytes key = 3;
+  required bytes iv = 4;
+  required string keyName = 5;
+  required string ezKeyVersionName = 6;
 }
 
 /**
@@ -250,8 +259,9 @@ message PerFileEncryptionInfoProto {
  * zone
  */
 message ZoneEncryptionInfoProto {
-  required CipherSuite suite = 1;
-  required string keyName = 2;
+  required CipherSuiteProto suite = 1;
+  required CryptoProtocolVersionProto cryptoProtocolVersion = 2;
+  required string keyName = 3;
 }
 
 /**

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -53,6 +53,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -264,7 +265,7 @@ public class TestDFSClientRetries {
         .when(mockNN)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
-            anyShort(), anyLong(), (List<CipherSuite>) anyList());
+            anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject());
 
     final DFSClient client = new DFSClient(null, mockNN, conf, null);
     OutputStream os = client.create("testfile", true);

+ 95 - 23
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java

@@ -40,10 +40,13 @@ import java.util.concurrent.Future;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSTestWrapper;
 import org.apache.hadoop.fs.FileContext;
@@ -57,7 +60,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
@@ -68,6 +73,7 @@ import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -83,6 +89,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyShort;
 import static org.mockito.Mockito.withSettings;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
@@ -628,7 +639,7 @@ public class TestEncryptionZones {
   }
 
   @Test(timeout = 60000)
-  public void testCipherSuiteNegotiation() throws Exception {
+  public void testVersionAndSuiteNegotiation() throws Exception {
     final HdfsAdmin dfsAdmin =
         new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     final Path zone = new Path("/zone");
@@ -637,43 +648,44 @@ public class TestEncryptionZones {
     // Create a file in an EZ, which should succeed
     DFSTestUtil
         .createFile(fs, new Path(zone, "success1"), 0, (short) 1, 0xFEED);
-    // Pass no cipherSuites, fail
-    fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(0);
+    // Pass no supported versions, fail
+    DFSOutputStream.SUPPORTED_CRYPTO_VERSIONS = new CryptoProtocolVersion[] {};
     try {
       DFSTestUtil.createFile(fs, new Path(zone, "fail"), 0, (short) 1, 0xFEED);
-      fail("Created a file without specifying a CipherSuite!");
-    } catch (UnknownCipherSuiteException e) {
-      assertExceptionContains("No cipher suites", e);
+      fail("Created a file without specifying a crypto protocol version");
+    } catch (UnknownCryptoProtocolVersionException e) {
+      assertExceptionContains("No crypto protocol versions", e);
     }
-    // Pass some unknown cipherSuites, fail
-    fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
+    // Pass some unknown versions, fail
+    DFSOutputStream.SUPPORTED_CRYPTO_VERSIONS = new CryptoProtocolVersion[]
+        { CryptoProtocolVersion.UNKNOWN, CryptoProtocolVersion.UNKNOWN };
     try {
       DFSTestUtil.createFile(fs, new Path(zone, "fail"), 0, (short) 1, 0xFEED);
-      fail("Created a file without specifying a CipherSuite!");
-    } catch (UnknownCipherSuiteException e) {
-      assertExceptionContains("No cipher suites", e);
+      fail("Created a file without specifying a known crypto protocol version");
+    } catch (UnknownCryptoProtocolVersionException e) {
+      assertExceptionContains("No crypto protocol versions", e);
     }
     // Pass some unknown and a good cipherSuites, success
-    fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
-    fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
+    DFSOutputStream.SUPPORTED_CRYPTO_VERSIONS =
+        new CryptoProtocolVersion[] {
+            CryptoProtocolVersion.UNKNOWN,
+            CryptoProtocolVersion.UNKNOWN,
+            CryptoProtocolVersion.ENCRYPTION_ZONES };
     DFSTestUtil
         .createFile(fs, new Path(zone, "success2"), 0, (short) 1, 0xFEED);
-    fs.getClient().cipherSuites = Lists.newArrayListWithCapacity(3);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
-    fs.getClient().cipherSuites.add(CipherSuite.UNKNOWN);
-    fs.getClient().cipherSuites.add(CipherSuite.AES_CTR_NOPADDING);
+    DFSOutputStream.SUPPORTED_CRYPTO_VERSIONS =
+        new CryptoProtocolVersion[] {
+            CryptoProtocolVersion.ENCRYPTION_ZONES,
+            CryptoProtocolVersion.UNKNOWN,
+            CryptoProtocolVersion.UNKNOWN} ;
     DFSTestUtil
         .createFile(fs, new Path(zone, "success3"), 4096, (short) 1, 0xFEED);
     // Check KeyProvider state
     // Flushing the KP on the NN, since it caches, and init a test one
     cluster.getNamesystem().getProvider().flush();
     KeyProvider provider = KeyProviderFactory
-        .get(new URI(conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)), conf);
+        .get(new URI(conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)),
+            conf);
     List<String> keys = provider.getKeys();
     assertEquals("Expected NN to have created one key per zone", 1,
         keys.size());
@@ -691,6 +703,66 @@ public class TestEncryptionZones {
               "/success" + i));
       assertEquals(feInfo.getCipherSuite(), CipherSuite.AES_CTR_NOPADDING);
     }
+
+    DFSClient old = fs.dfs;
+    try {
+      testCipherSuiteNegotiation(fs, conf);
+    } finally {
+      fs.dfs = old;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static void mockCreate(ClientProtocol mcp,
+      CipherSuite suite, CryptoProtocolVersion version) throws Exception {
+    Mockito.doReturn(
+        new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
+            (short) 777), "owner", "group", new byte[0], new byte[0],
+            1010, 0, new FileEncryptionInfo(suite,
+            version, new byte[suite.getAlgorithmBlockSize()],
+            new byte[suite.getAlgorithmBlockSize()],
+            "fakeKey", "fakeVersion"),
+            (byte) 0))
+        .when(mcp)
+        .create(anyString(), (FsPermission) anyObject(), anyString(),
+            (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
+            anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject());
+  }
+
+  // This test only uses mocks. Called from the end of an existing test to
+  // avoid an extra mini cluster.
+  private static void testCipherSuiteNegotiation(DistributedFileSystem fs,
+      Configuration conf) throws Exception {
+    // Set up mock ClientProtocol to test client-side CipherSuite negotiation
+    final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
+
+    // Try with an empty conf
+    final Configuration noCodecConf = new Configuration(conf);
+    final CipherSuite suite = CipherSuite.AES_CTR_NOPADDING;
+    final String confKey = CommonConfigurationKeysPublic
+        .HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX + suite
+        .getConfigSuffix();
+    noCodecConf.set(confKey, "");
+    fs.dfs = new DFSClient(null, mcp, noCodecConf, null);
+    mockCreate(mcp, suite, CryptoProtocolVersion.ENCRYPTION_ZONES);
+    try {
+      fs.create(new Path("/mock"));
+      fail("Created with no configured codecs!");
+    } catch (UnknownCipherSuiteException e) {
+      assertExceptionContains("No configuration found for the cipher", e);
+    }
+
+    // Try create with an UNKNOWN CipherSuite
+    fs.dfs = new DFSClient(null, mcp, conf, null);
+    CipherSuite unknown = CipherSuite.UNKNOWN;
+    unknown.setUnknownValue(989);
+    mockCreate(mcp, unknown, CryptoProtocolVersion.ENCRYPTION_ZONES);
+    try {
+      fs.create(new Path("/mock"));
+      fail("Created with unknown cipher!");
+    } catch (IOException e) {
+      assertExceptionContains("unknown CipherSuite with ID 989", e);
+    }
   }
 
   @Test(timeout = 120000)

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java

@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -352,7 +353,7 @@ public class TestLease {
         .when(mcp)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
-            anyShort(), anyLong(), (List<CipherSuite>) anyList());
+            anyShort(), anyLong(), (CryptoProtocolVersion[]) anyObject());
 
     final Configuration conf = new Configuration();
     final DFSClient c1 = createDFSClientAs(ugi[0], conf);

+ 44 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java

@@ -38,19 +38,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
 import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.util.BestEffortLongFile;
 import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
+import org.mockito.internal.util.reflection.Whitebox;
 
 /**
  * Tests for upgrading with HA enabled.
@@ -294,6 +298,16 @@ public class TestDFSUpgradeWithHA {
     }
   }
 
+  private long getCommittedTxnIdValue(MiniQJMHACluster qjCluster)
+      throws IOException {
+    Journal journal1 = qjCluster.getJournalCluster().getJournalNode(0)
+        .getOrCreateJournal(MiniQJMHACluster.NAMESERVICE);
+    BestEffortLongFile committedTxnId = (BestEffortLongFile) Whitebox
+        .getInternalState(journal1, "committedTxnId");
+    return committedTxnId != null ? committedTxnId.get() :
+        HdfsConstants.INVALID_TXID;
+  }
+
   /**
    * Make sure that an HA NN can successfully upgrade when configured using
    * JournalNodes.
@@ -320,7 +334,10 @@ public class TestDFSUpgradeWithHA {
       cluster.transitionToActive(0);
       fs = HATestUtil.configureFailoverFs(cluster, conf);
       assertTrue(fs.mkdirs(new Path("/foo1")));
-      
+
+      // get the value of the committedTxnId in journal nodes
+      final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
+
       // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
       // flag.
       cluster.shutdownNameNode(1);
@@ -330,6 +347,8 @@ public class TestDFSUpgradeWithHA {
       checkNnPreviousDirExistence(cluster, 0, true);
       checkNnPreviousDirExistence(cluster, 1, false);
       checkJnPreviousDirExistence(qjCluster, true);
+
+      assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
       
       // NN0 should come up in the active state when given the -upgrade option,
       // so no need to transition it to active.
@@ -342,6 +361,8 @@ public class TestDFSUpgradeWithHA {
       // Make sure we can still do FS ops after upgrading.
       cluster.transitionToActive(0);
       assertTrue(fs.mkdirs(new Path("/foo3")));
+
+      assertTrue(getCommittedTxnIdValue(qjCluster) > cidBeforeUpgrade);
       
       // Now bootstrap the standby with the upgraded info.
       int rc = BootstrapStandby.run(
@@ -388,15 +409,18 @@ public class TestDFSUpgradeWithHA {
       cluster.transitionToActive(0);
       fs = HATestUtil.configureFailoverFs(cluster, conf);
       assertTrue(fs.mkdirs(new Path("/foo1")));
+
+      final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
       
       // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
       // flag.
       cluster.shutdownNameNode(1);
       cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
       cluster.restartNameNode(0, false);
+      assertTrue(cidBeforeUpgrade <= getCommittedTxnIdValue(qjCluster));
       
       assertTrue(fs.mkdirs(new Path("/foo2")));
-      
+
       checkNnPreviousDirExistence(cluster, 0, true);
       checkNnPreviousDirExistence(cluster, 1, false);
       checkJnPreviousDirExistence(qjCluster, true);
@@ -408,9 +432,13 @@ public class TestDFSUpgradeWithHA {
       assertEquals(0, rc);
       
       cluster.restartNameNode(1);
-      
+
+      final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
+      assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
+
       runFinalizeCommand(cluster);
-      
+
+      assertEquals(cidDuringUpgrade, getCommittedTxnIdValue(qjCluster));
       checkClusterPreviousDirExistence(cluster, false);
       checkJnPreviousDirExistence(qjCluster, false);
       assertCTimesEqual(cluster);
@@ -614,7 +642,9 @@ public class TestDFSUpgradeWithHA {
       cluster.transitionToActive(0);
       fs = HATestUtil.configureFailoverFs(cluster, conf);
       assertTrue(fs.mkdirs(new Path("/foo1")));
-      
+
+      final long cidBeforeUpgrade = getCommittedTxnIdValue(qjCluster);
+
       // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
       // flag.
       cluster.shutdownNameNode(1);
@@ -628,7 +658,10 @@ public class TestDFSUpgradeWithHA {
       // NN0 should come up in the active state when given the -upgrade option,
       // so no need to transition it to active.
       assertTrue(fs.mkdirs(new Path("/foo2")));
-      
+
+      final long cidDuringUpgrade = getCommittedTxnIdValue(qjCluster);
+      assertTrue(cidDuringUpgrade > cidBeforeUpgrade);
+
       // Now bootstrap the standby with the upgraded info.
       int rc = BootstrapStandby.run(
           new String[]{"-force"},
@@ -649,6 +682,11 @@ public class TestDFSUpgradeWithHA {
       conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
       NameNode.doRollback(conf, false);
 
+      final long cidAfterRollback = getCommittedTxnIdValue(qjCluster);
+      assertTrue(cidBeforeUpgrade < cidAfterRollback);
+      // make sure the committedTxnId has been reset correctly after rollback
+      assertTrue(cidDuringUpgrade > cidAfterRollback);
+
       // The rollback operation should have rolled back the first NN's local
       // dirs, and the shared dir, but not the other NN's dirs. Those have to be
       // done by bootstrapping the standby.

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -395,7 +396,8 @@ public class TestRetryCacheWithHA {
       this.status = client.getNamenode().create(fileName,
           FsPermission.getFileDefault(), client.getClientName(),
           new EnumSetWritable<CreateFlag>(createFlag), false, DataNodes,
-          BlockSize, null);
+          BlockSize,
+          new CryptoProtocolVersion[] {CryptoProtocolVersion.ENCRYPTION_ZONES});
     }
 
     @Override

+ 152 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDFSAdmin {
+  private MiniDFSCluster cluster;
+  private DFSAdmin admin;
+  private DataNode datanode;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    admin = new DFSAdmin();
+    datanode = cluster.getDataNodes().get(0);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private List<String> getReconfigureStatus(String nodeType, String address)
+      throws IOException {
+    ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(bufOut);
+    ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
+    PrintStream err = new PrintStream(bufErr);
+    admin.getReconfigurationStatus(nodeType, address, out, err);
+    Scanner scanner = new Scanner(bufOut.toString());
+    List<String> outputs = Lists.newArrayList();
+    while (scanner.hasNextLine()) {
+      outputs.add(scanner.nextLine());
+    }
+    return outputs;
+  }
+
+  @Test(timeout = 30000)
+  public void testGetReconfigureStatus()
+      throws IOException, InterruptedException {
+    ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
+    datanode.setReconfigurationUtil(ru);
+
+    List<ReconfigurationUtil.PropertyChange> changes =
+        new ArrayList<ReconfigurationUtil.PropertyChange>();
+    File newDir = new File(cluster.getDataDirectory(), "data_new");
+    newDir.mkdirs();
+    changes.add(new ReconfigurationUtil.PropertyChange(
+        DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
+        datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+    changes.add(new ReconfigurationUtil.PropertyChange(
+        "randomKey", "new123", "old456"));
+    when(ru.parseChangedProperties(any(Configuration.class),
+        any(Configuration.class))).thenReturn(changes);
+
+    final int port = datanode.getIpcPort();
+    final String address = "localhost:" + port;
+
+    admin.startReconfiguration("datanode", address);
+
+    List<String> outputs = null;
+    int count = 100;
+    while (count > 0) {
+      outputs = getReconfigureStatus("datanode", address);
+      if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
+        break;
+      }
+      count--;
+      Thread.sleep(100);
+    }
+    assertTrue(count > 0);
+    assertThat(outputs.size(), is(8));  // 3 (SUCCESS) + 4 (FAILED)
+
+    List<StorageLocation> locations = DataNode.getStorageLocations(
+        datanode.getConf());
+    assertThat(locations.size(), is(1));
+    assertThat(locations.get(0).getFile(), is(newDir));
+    // Verify the directory is appropriately formatted.
+    assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
+
+    int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
+    int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
+    assertThat(outputs.get(successOffset),
+        containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
+    assertThat(outputs.get(successOffset + 1),
+        is(allOf(containsString("From:"), containsString("data1"),
+            containsString("data2"))));
+    assertThat(outputs.get(successOffset + 2),
+        is(not(anyOf(containsString("data1"), containsString("data2")))));
+    assertThat(outputs.get(successOffset + 2),
+        is(allOf(containsString("To"), containsString("data_new"))));
+    assertThat(outputs.get(failedOffset),
+        containsString("Change property randomKey"));
+    assertThat(outputs.get(failedOffset + 1),
+        containsString("From: \"old456\""));
+    assertThat(outputs.get(failedOffset + 2),
+        containsString("To: \"new123\""));
+  }
+}

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestAtomicFileOutputStream.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -30,9 +31,13 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Shell;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import com.google.common.base.Joiner;
 
@@ -44,6 +49,9 @@ public class TestAtomicFileOutputStream {
   private static final File TEST_DIR = PathUtils.getTestDir(TestAtomicFileOutputStream.class);
   
   private static final File DST_FILE = new File(TEST_DIR, "test.txt");
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
   
   @Before
   public void cleanupTestDir() throws IOException {
@@ -119,6 +127,27 @@ public class TestAtomicFileOutputStream {
         DST_FILE.getName(), Joiner.on(",").join(TEST_DIR.list()));
   }
 
+  @Test
+  public void testFailToRename() throws IOException {
+    assumeTrue(Shell.WINDOWS);
+    OutputStream fos = null;
+    try {
+      fos = new AtomicFileOutputStream(DST_FILE);
+      fos.write(TEST_STRING.getBytes());
+      FileUtil.setWritable(TEST_DIR, false);
+      exception.expect(IOException.class);
+      exception.expectMessage("failure in native rename");
+      try {
+        fos.close();
+      } finally {
+        fos = null;
+      }
+    } finally {
+      IOUtils.cleanup(null, fos);
+      FileUtil.setWritable(TEST_DIR, true);
+    }
+  }
+
   /**
    * Create a stream that fails to flush at close time
    */

+ 6 - 0
hadoop-yarn-project/CHANGES.txt

@@ -447,6 +447,12 @@ Release 2.6.0 - UNRELEASED
     YARN-2546. Made REST API for application creation/submission use numeric and
     boolean types instead of the string of them. (Varun Vasudev via zjshen)
 
+    YARN-2523. ResourceManager UI showing negative value for "Decommissioned
+    Nodes" field (Rohith via jlowe)
+
+    YARN-2608. FairScheduler: Potential deadlocks in loading alloc files and 
+    clock access. (Wei Yan via kasha)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -121,7 +121,6 @@ public class NodesListManager extends AbstractService implements
                   this.conf, includesFile), excludesFile.isEmpty() ? null
               : this.rmContext.getConfigurationProvider()
                   .getConfigurationInputStream(this.conf, excludesFile));
-      setDecomissionedNMsMetrics();
       printConfiguredHosts();
     }
   }

+ 0 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -460,22 +460,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         break;
     }
 
-    // Decomissioned NMs equals to the nodes missing in include list (if
-    // include list not empty) or the nodes listed in excluded list.
-    // DecomissionedNMs as per exclude list is set upfront when the
-    // exclude list is read so that RM restart can also reflect the
-    // decomissionedNMs. Note that RM is still not able to know decomissionedNMs
-    // as per include list after it restarts as they are known when those nodes
-    // come for registration.
-    // DecomissionedNMs as per include list is incremented in this transition.
     switch (finalState) {
     case DECOMMISSIONED:
-      Set<String> ecludedHosts =
-          context.getNodesListManager().getHostsReader().getExcludedHosts();
-      if (!ecludedHosts.contains(hostName)
-          && !ecludedHosts.contains(NetUtils.normalizeHostName(hostName))) {
         metrics.incrDecommisionedNMs();
-      }
       break;
     case LOST:
       metrics.incrNumLostNMs();

+ 62 - 60
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -117,7 +117,7 @@ public class FairScheduler extends
 
   private Resource incrAllocation;
   private QueueManager queueMgr;
-  private Clock clock;
+  private volatile Clock clock;
   private boolean usePortForNodeName;
 
   private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@@ -555,11 +555,12 @@ public class FairScheduler extends
     return continuousSchedulingSleepMs;
   }
 
-  public synchronized Clock getClock() {
+  public Clock getClock() {
     return clock;
   }
 
-  protected synchronized void setClock(Clock clock) {
+  @VisibleForTesting
+  void setClock(Clock clock) {
     this.clock = clock;
   }
 
@@ -1204,64 +1205,65 @@ public class FairScheduler extends
     this.rmContext = rmContext;
   }
 
-  private synchronized void initScheduler(Configuration conf)
-      throws IOException {
-    this.conf = new FairSchedulerConfiguration(conf);
-    validateConf(this.conf);
-    minimumAllocation = this.conf.getMinimumAllocation();
-    maximumAllocation = this.conf.getMaximumAllocation();
-    incrAllocation = this.conf.getIncrementAllocation();
-    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-    continuousSchedulingSleepMs =
-        this.conf.getContinuousSchedulingSleepMs();
-    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-    preemptionEnabled = this.conf.getPreemptionEnabled();
-    preemptionUtilizationThreshold =
-        this.conf.getPreemptionUtilizationThreshold();
-    assignMultiple = this.conf.getAssignMultiple();
-    maxAssign = this.conf.getMaxAssign();
-    sizeBasedWeight = this.conf.getSizeBasedWeight();
-    preemptionInterval = this.conf.getPreemptionInterval();
-    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-    usePortForNodeName = this.conf.getUsePortForNodeName();
-
-    updateInterval = this.conf.getUpdateInterval();
-    if (updateInterval < 0) {
-      updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
-      LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
-              + " is invalid, so using default value " +
-              + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
-              + " ms instead");
-    }
-
-    rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
-    fsOpDurations = FSOpDurations.getInstance(true);
-
-    // This stores per-application scheduling information
-    this.applications = new ConcurrentHashMap<
-        ApplicationId, SchedulerApplication<FSAppAttempt>>();
-    this.eventLog = new FairSchedulerEventLog();
-    eventLog.init(this.conf);
-
-    allocConf = new AllocationConfiguration(conf);
-    try {
-      queueMgr.initialize(conf);
-    } catch (Exception e) {
-      throw new IOException("Failed to start FairScheduler", e);
-    }
+  private void initScheduler(Configuration conf) throws IOException {
+    synchronized (this) {
+      this.conf = new FairSchedulerConfiguration(conf);
+      validateConf(this.conf);
+      minimumAllocation = this.conf.getMinimumAllocation();
+      maximumAllocation = this.conf.getMaximumAllocation();
+      incrAllocation = this.conf.getIncrementAllocation();
+      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+      continuousSchedulingSleepMs =
+          this.conf.getContinuousSchedulingSleepMs();
+      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+      preemptionEnabled = this.conf.getPreemptionEnabled();
+      preemptionUtilizationThreshold =
+          this.conf.getPreemptionUtilizationThreshold();
+      assignMultiple = this.conf.getAssignMultiple();
+      maxAssign = this.conf.getMaxAssign();
+      sizeBasedWeight = this.conf.getSizeBasedWeight();
+      preemptionInterval = this.conf.getPreemptionInterval();
+      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+      usePortForNodeName = this.conf.getUsePortForNodeName();
+
+      updateInterval = this.conf.getUpdateInterval();
+      if (updateInterval < 0) {
+        updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
+        LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
+            + " is invalid, so using default value " +
+            +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+            + " ms instead");
+      }
 
-    updateThread = new UpdateThread();
-    updateThread.setName("FairSchedulerUpdateThread");
-    updateThread.setDaemon(true);
+      rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
+      fsOpDurations = FSOpDurations.getInstance(true);
 
-    if (continuousSchedulingEnabled) {
-      // start continuous scheduling thread
-      schedulingThread = new ContinuousSchedulingThread();
-      schedulingThread.setName("FairSchedulerContinuousScheduling");
-      schedulingThread.setDaemon(true);
+      // This stores per-application scheduling information
+      this.applications = new ConcurrentHashMap<
+          ApplicationId, SchedulerApplication<FSAppAttempt>>();
+      this.eventLog = new FairSchedulerEventLog();
+      eventLog.init(this.conf);
+
+      allocConf = new AllocationConfiguration(conf);
+      try {
+        queueMgr.initialize(conf);
+      } catch (Exception e) {
+        throw new IOException("Failed to start FairScheduler", e);
+      }
+
+      updateThread = new UpdateThread();
+      updateThread.setName("FairSchedulerUpdateThread");
+      updateThread.setDaemon(true);
+
+      if (continuousSchedulingEnabled) {
+        // start continuous scheduling thread
+        schedulingThread = new ContinuousSchedulingThread();
+        schedulingThread.setName("FairSchedulerContinuousScheduling");
+        schedulingThread.setDaemon(true);
+      }
     }
 
     allocsLoader.init(conf);
@@ -1321,7 +1323,7 @@ public class FairScheduler extends
   }
 
   @Override
-  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+  public void reinitialize(Configuration conf, RMContext rmContext)
       throws IOException {
     try {
       allocsLoader.reloadAllocations();

+ 22 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -1833,10 +1835,16 @@ public class TestRMRestart {
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
       hostFile.getAbsolutePath());
     writeToHostsFile("");
-    MockRM rm1 = new MockRM(conf);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
     rm1.start();
-    rm1.registerNode("localhost:1234", 8000);
-    rm1.registerNode("host2:1234", 8000);
+    MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
+    MockNM nm2 = rm1.registerNode("host2:1234", 8000);
     Assert
       .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
     String ip = NetUtils.normalizeHostName("localhost");
@@ -1845,15 +1853,25 @@ public class TestRMRestart {
 
     // refresh nodes
     rm1.getNodesListManager().refreshNodes(conf);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    Assert
+        .assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
+    nodeHeartbeat = nm2.nodeHeartbeat(true);
+    Assert.assertTrue("The decommisioned metrics are not updated",
+        NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
+
+    dispatcher.await();
     Assert
       .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+    rm1.stop();
+    Assert
+        .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
 
     // restart RM.
     MockRM rm2 = new MockRM(conf);
     rm2.start();
     Assert
       .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
-    rm1.stop();
     rm2.stop();
   }
 

+ 23 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -157,25 +157,33 @@ public class TestResourceTrackerService {
         .getAbsolutePath());
 
     writeToHostsFile("");
-    rm = new MockRM(conf);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
     rm.start();
 
     MockNM nm1 = rm.registerNode("host1:1234", 5120);
     MockNM nm2 = rm.registerNode("host2:5678", 10240);
     MockNM nm3 = rm.registerNode("localhost:4433", 1024);
 
+    dispatcher.await();
+
     int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
     NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     nodeHeartbeat = nm2.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    dispatcher.await();
 
     // To test that IPs also work
     String ip = NetUtils.normalizeHostName("localhost");
     writeToHostsFile("host2", ip);
 
     rm.getNodesListManager().refreshNodes(conf);
-    checkDecommissionedNMCount(rm, metricCount + 2);
 
     nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@@ -186,6 +194,19 @@ public class TestResourceTrackerService {
     nodeHeartbeat = nm3.nodeHeartbeat(true);
     Assert.assertTrue("The decommisioned metrics are not updated",
         NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
+    dispatcher.await();
+    checkDecommissionedNMCount(rm, metricCount + 2);
+    writeToHostsFile("");
+    rm.getNodesListManager().refreshNodes(conf);
+
+    nm3 = rm.registerNode("localhost:4433", 1024);
+    dispatcher.await();
+    nodeHeartbeat = nm3.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+    // decommissined node is 1 since 1 node is rejoined after updating exclude
+    // file
+    checkDecommissionedNMCount(rm, metricCount + 1);
   }
 
   /**