Browse Source

HDFS-6808. Add command line option to ask DataNode reload configuration. (Lei Xu via Colin McCabe)
(cherry picked from commit 1861b32eb551a07d748afc6205ea9573f9503eda)

Colin Patrick Mccabe 10 năm trước cách đây
mục cha
commit
cb9283549f

+ 140 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurableBase.java

@@ -18,9 +18,18 @@
 
 package org.apache.hadoop.conf;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.*;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Utility base class for implementing the Reconfigurable interface.
@@ -34,6 +43,30 @@ public abstract class ReconfigurableBase
   
   private static final Log LOG =
     LogFactory.getLog(ReconfigurableBase.class);
+  // Use for testing purpose.
+  private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
+
+  /** Background thread to reload configuration. */
+  private Thread reconfigThread = null;
+  private volatile boolean shouldRun = true;
+  private Object reconfigLock = new Object();
+
+  /**
+   * The timestamp when the <code>reconfigThread</code> starts.
+   */
+  private long startTime = 0;
+
+  /**
+   * The timestamp when the <code>reconfigThread</code> finishes.
+   */
+  private long endTime = 0;
+
+  /**
+   * A map of <changed property, error message>. If error message is present,
+   * it contains the messages about the error occurred when applies the particular
+   * change. Otherwise, it indicates that the change has been successfully applied.
+   */
+  private Map<PropertyChange, Optional<String>> status = null;
 
   /**
    * Construct a ReconfigurableBase.
@@ -50,6 +83,113 @@ public abstract class ReconfigurableBase
     super((conf == null) ? new Configuration() : conf);
   }
 
+  @VisibleForTesting
+  public void setReconfigurationUtil(ReconfigurationUtil ru) {
+    reconfigurationUtil = Preconditions.checkNotNull(ru);
+  }
+
+  @VisibleForTesting
+  public Collection<PropertyChange> getChangedProperties(
+      Configuration newConf, Configuration oldConf) {
+    return reconfigurationUtil.parseChangedProperties(newConf, oldConf);
+  }
+
+  /**
+   * A background thread to apply configuration changes.
+   */
+  private static class ReconfigurationThread extends Thread {
+    private ReconfigurableBase parent;
+
+    ReconfigurationThread(ReconfigurableBase base) {
+      this.parent = base;
+    }
+
+    // See {@link ReconfigurationServlet#applyChanges}
+    public void run() {
+      LOG.info("Starting reconfiguration task.");
+      Configuration oldConf = this.parent.getConf();
+      Configuration newConf = new Configuration();
+      Collection<PropertyChange> changes =
+          this.parent.getChangedProperties(newConf, oldConf);
+      Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
+      for (PropertyChange change : changes) {
+        String errorMessage = null;
+        if (!this.parent.isPropertyReconfigurable(change.prop)) {
+          errorMessage = "Property " + change.prop +
+              " is not reconfigurable";
+          LOG.info(errorMessage);
+          results.put(change, Optional.of(errorMessage));
+          continue;
+        }
+        LOG.info("Change property: " + change.prop + " from \""
+            + ((change.oldVal == null) ? "<default>" : change.oldVal)
+            + "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
+            + "\".");
+        try {
+          this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
+        } catch (ReconfigurationException e) {
+          errorMessage = e.toString();
+        }
+        results.put(change, Optional.fromNullable(errorMessage));
+      }
+
+      synchronized (this.parent.reconfigLock) {
+        this.parent.endTime = Time.monotonicNow();
+        this.parent.status = Collections.unmodifiableMap(results);
+        this.parent.reconfigThread = null;
+      }
+    }
+  }
+
+  /**
+   * Start a reconfiguration task to reload configuration in background.
+   */
+  public void startReconfigurationTask() throws IOException {
+    synchronized (reconfigLock) {
+      if (!shouldRun) {
+        String errorMessage = "The server is stopped.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      }
+      if (reconfigThread != null) {
+        String errorMessage = "Another reconfiguration task is running.";
+        LOG.warn(errorMessage);
+        throw new IOException(errorMessage);
+      }
+      reconfigThread = new ReconfigurationThread(this);
+      reconfigThread.setDaemon(true);
+      reconfigThread.setName("Reconfiguration Task");
+      reconfigThread.start();
+      startTime = Time.monotonicNow();
+    }
+  }
+
+  public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
+    synchronized (reconfigLock) {
+      if (reconfigThread != null) {
+        return new ReconfigurationTaskStatus(startTime, 0, null);
+      }
+      return new ReconfigurationTaskStatus(startTime, endTime, status);
+    }
+  }
+
+  public void shutdownReconfigurationTask() {
+    Thread tempThread;
+    synchronized (reconfigLock) {
+      shouldRun = false;
+      if (reconfigThread == null) {
+        return;
+      }
+      tempThread = reconfigThread;
+      reconfigThread = null;
+    }
+
+    try {
+      tempThread.join();
+    } catch (InterruptedException e) {
+    }
+  }
+
   /**
    * {@inheritDoc}
    *

+ 70 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationTaskStatus.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.conf;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+
+import java.util.Map;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ReconfigurationTaskStatus {
+  long startTime;
+  long endTime;
+  final Map<ReconfigurationUtil.PropertyChange, Optional<String>> status;
+
+  public ReconfigurationTaskStatus(long startTime, long endTime,
+      Map<ReconfigurationUtil.PropertyChange, Optional<String>> status) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.status = status;
+  }
+
+  /**
+   * Return true if
+   *   - A reconfiguration task has finished or
+   *   - an active reconfiguration task is running
+   */
+  public boolean hasTask() {
+    return startTime > 0;
+  }
+
+  /**
+   * Return true if the latest reconfiguration task has finished and there is
+   * no another active task running.
+   */
+  public boolean stopped() {
+    return endTime > 0;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getEndTime() {
+    return endTime;
+  }
+
+  public final Map<PropertyChange, Optional<String>> getStatus() {
+    return status;
+  }
+}

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/ReconfigurationUtil.java

@@ -63,4 +63,9 @@ public class ReconfigurationUtil {
 
     return changes.values();
   }
+
+  public Collection<PropertyChange> parseChangedProperties(
+      Configuration newConf, Configuration oldConf) {
+    return getChangedProperties(newConf, oldConf);
+  }
 }

+ 154 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestReconfiguration.java

@@ -18,13 +18,32 @@
 
 package org.apache.hadoop.conf;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.junit.Test;
 import org.junit.Before;
-import static org.junit.Assert.*;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 public class TestReconfiguration {
   private Configuration conf1;
@@ -105,8 +124,8 @@ public class TestReconfiguration {
     }
 
     @Override
-    public synchronized void reconfigurePropertyImpl(String property, 
-                                                     String newVal) {
+    public synchronized void reconfigurePropertyImpl(
+        String property, String newVal) throws ReconfigurationException {
       // do nothing
     }
     
@@ -312,4 +331,136 @@ public class TestReconfiguration {
     
   }
 
+  private static class AsyncReconfigurableDummy extends ReconfigurableBase {
+    AsyncReconfigurableDummy(Configuration conf) {
+      super(conf);
+    }
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public Collection<String> getReconfigurableProperties() {
+      return Arrays.asList(PROP1, PROP2, PROP4);
+    }
+
+    @Override
+    public synchronized void reconfigurePropertyImpl(String property,
+        String newVal) throws ReconfigurationException {
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+
+  private static void waitAsyncReconfigureTaskFinish(ReconfigurableBase rb)
+      throws InterruptedException {
+    ReconfigurationTaskStatus status = null;
+    int count = 20;
+    while (count > 0) {
+      status = rb.getReconfigurationTaskStatus();
+      if (status.stopped()) {
+        break;
+      }
+      count--;
+      Thread.sleep(500);
+    }
+    assert(status.stopped());
+  }
+
+  @Test
+  public void testAsyncReconfigure()
+      throws ReconfigurationException, IOException, InterruptedException {
+    AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+
+    List<PropertyChange> changes = Lists.newArrayList();
+    changes.add(new PropertyChange("name1", "new1", "old1"));
+    changes.add(new PropertyChange("name2", "new2", "old2"));
+    changes.add(new PropertyChange("name3", "new3", "old3"));
+    doReturn(changes).when(dummy).getChangedProperties(
+        any(Configuration.class), any(Configuration.class));
+
+    doReturn(true).when(dummy).isPropertyReconfigurable(eq("name1"));
+    doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
+    doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));
+
+    doNothing().when(dummy)
+        .reconfigurePropertyImpl(eq("name1"), anyString());
+    doNothing().when(dummy)
+        .reconfigurePropertyImpl(eq("name2"), anyString());
+    doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3"))
+        .when(dummy).reconfigurePropertyImpl(eq("name3"), anyString());
+
+    dummy.startReconfigurationTask();
+
+    waitAsyncReconfigureTaskFinish(dummy);
+    ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+    assertEquals(3, status.getStatus().size());
+    for (Map.Entry<PropertyChange, Optional<String>> result :
+        status.getStatus().entrySet()) {
+      PropertyChange change = result.getKey();
+      if (change.prop.equals("name1")) {
+        assertFalse(result.getValue().isPresent());
+      } else if (change.prop.equals("name2")) {
+        assertThat(result.getValue().get(),
+            containsString("Property name2 is not reconfigurable"));
+      } else if (change.prop.equals("name3")) {
+        assertThat(result.getValue().get(), containsString("NAME3"));
+      } else {
+        fail("Unknown property: " + change.prop);
+      }
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testStartReconfigurationFailureDueToExistingRunningTask()
+      throws InterruptedException, IOException {
+    AsyncReconfigurableDummy dummy = spy(new AsyncReconfigurableDummy(conf1));
+    List<PropertyChange> changes = Lists.newArrayList(
+        new PropertyChange(PROP1, "new1", "old1")
+    );
+    doReturn(changes).when(dummy).getChangedProperties(
+        any(Configuration.class), any(Configuration.class));
+
+    ReconfigurationTaskStatus status = dummy.getReconfigurationTaskStatus();
+    assertFalse(status.hasTask());
+
+    dummy.startReconfigurationTask();
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertFalse(status.stopped());
+
+    // An active reconfiguration task is running.
+    try {
+      dummy.startReconfigurationTask();
+      fail("Expect to throw IOException.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Another reconfiguration task is running", e);
+    }
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertFalse(status.stopped());
+
+    dummy.latch.countDown();
+    waitAsyncReconfigureTaskFinish(dummy);
+    status = dummy.getReconfigurationTaskStatus();
+    assertTrue(status.hasTask());
+    assertTrue(status.stopped());
+
+    // The first task has finished.
+    dummy.startReconfigurationTask();
+    waitAsyncReconfigureTaskFinish(dummy);
+    ReconfigurationTaskStatus status2 = dummy.getReconfigurationTaskStatus();
+    assertTrue(status2.getStartTime() >= status.getEndTime());
+
+    dummy.shutdownReconfigurationTask();
+    try {
+      dummy.startReconfigurationTask();
+      fail("Expect to throw IOException");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("The server is stopped", e);
+    }
+  }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -240,6 +240,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7118. Improve diagnostics on storage directory rename operations by
     using NativeIO#renameTo in Storage#rename. (cnauroth)
 
+    HDFS-6808. Add command line option to ask DataNode reload configuration.
+    (Lei Xu via Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java

@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
@@ -146,4 +147,15 @@ public interface ClientDatanodeProtocol {
    * @return software/config version and uptime of the datanode
    */
   DatanodeLocalInfo getDatanodeInfo() throws IOException;
+
+  /**
+   * Asynchronously reload configuration on disk and apply changes.
+   */
+  void startReconfiguration() throws IOException;
+
+  /**
+   * Get the status of the previously issued reconfig task.
+   * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
+   */
+  ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
 }

+ 58 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java

@@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
@@ -32,6 +35,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlo
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
@@ -41,11 +47,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Refres
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
@@ -66,6 +72,8 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       DeleteBlockPoolResponseProto.newBuilder().build();
   private final static ShutdownDatanodeResponseProto SHUTDOWN_DATANODE_RESP =
       ShutdownDatanodeResponseProto.newBuilder().build();
+  private final static StartReconfigurationResponseProto START_RECONFIG_RESP =
+      StartReconfigurationResponseProto.newBuilder().build();
   
   private final ClientDatanodeProtocol impl;
 
@@ -182,4 +190,51 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
     }
     return res;
   }
+
+  @Override
+  public StartReconfigurationResponseProto startReconfiguration(
+      RpcController unused, StartReconfigurationRequestProto request)
+    throws ServiceException {
+    try {
+      impl.startReconfiguration();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return START_RECONFIG_RESP;
+  }
+
+  @Override
+  public GetReconfigurationStatusResponseProto getReconfigurationStatus(
+      RpcController unused, GetReconfigurationStatusRequestProto request)
+      throws ServiceException {
+    GetReconfigurationStatusResponseProto.Builder builder =
+        GetReconfigurationStatusResponseProto.newBuilder();
+    try {
+      ReconfigurationTaskStatus status = impl.getReconfigurationStatus();
+      builder.setStartTime(status.getStartTime());
+      if (status.stopped()) {
+        builder.setEndTime(status.getEndTime());
+        assert status.getStatus() != null;
+        for (Map.Entry<PropertyChange, Optional<String>> result :
+            status.getStatus().entrySet()) {
+          GetReconfigurationStatusConfigChangeProto.Builder changeBuilder =
+              GetReconfigurationStatusConfigChangeProto.newBuilder();
+          PropertyChange change = result.getKey();
+          changeBuilder.setName(change.prop);
+          changeBuilder.setOldValue(change.oldVal != null ? change.oldVal : "");
+          if (change.newVal != null) {
+            changeBuilder.setNewValue(change.newVal);
+          }
+          if (result.getValue().isPresent()) {
+            // Get full stack trace.
+            changeBuilder.setErrorMessage(result.getValue().get());
+          }
+          builder.addChanges(changeBuilder);
+        }
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return builder.build();
+  }
 }

+ 53 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java

@@ -21,16 +21,20 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import javax.net.SocketFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -48,8 +52,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdf
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -87,6 +94,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
       RefreshNamenodesRequestProto.newBuilder().build();
   private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
       GetDatanodeInfoRequestProto.newBuilder().build();
+  private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS =
+      GetReconfigurationStatusRequestProto.newBuilder().build();
+  private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
+      StartReconfigurationRequestProto.newBuilder().build();
 
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
       Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
@@ -282,4 +293,44 @@ public class ClientDatanodeProtocolTranslatorPB implements
     }
   }
 
+  @Override
+  public void startReconfiguration() throws IOException {
+    try {
+      rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+    GetReconfigurationStatusResponseProto response;
+    Map<PropertyChange, Optional<String>> statusMap = null;
+    long startTime;
+    long endTime = 0;
+    try {
+      response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
+          VOID_GET_RECONFIG_STATUS);
+      startTime = response.getStartTime();
+      if (response.hasEndTime()) {
+        endTime = response.getEndTime();
+      }
+      if (response.getChangesCount() > 0) {
+        statusMap = Maps.newHashMap();
+        for (GetReconfigurationStatusConfigChangeProto change :
+            response.getChangesList()) {
+          PropertyChange pc = new PropertyChange(
+              change.getName(), change.getNewValue(), change.getOldValue());
+          String errorMessage = null;
+          if (change.hasErrorMessage()) {
+            errorMessage = change.getErrorMessage();
+          }
+          statusMap.put(pc, Optional.fromNullable(errorMessage));
+        }
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
+  }
 }

+ 42 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -90,6 +90,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurableBase;
 import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -339,6 +340,21 @@ public class DataNode extends ReconfigurableBase
 
   private SpanReceiverHost spanReceiverHost;
 
+  /**
+   * Creates a dummy DataNode for testing purpose.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  DataNode(final Configuration conf) {
+    super(conf);
+    this.fileDescriptorPassingDisabledReason = null;
+    this.maxNumberOfBlocksToLog = 0;
+    this.confVersion = null;
+    this.usersWithLocalPathAccess = null;
+    this.connectToDnViaHostname = false;
+    this.getHdfsBlockLocationsEnabled = false;
+  }
+
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
    * and a namenode proxy
@@ -481,7 +497,6 @@ public class DataNode extends ReconfigurableBase
    */
   private synchronized void refreshVolumes(String newVolumes) throws Exception {
     Configuration conf = getConf();
-    String oldVolumes = conf.get(DFS_DATANODE_DATA_DIR_KEY);
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
     List<StorageLocation> locations = getStorageLocations(conf);
 
@@ -489,6 +504,7 @@ public class DataNode extends ReconfigurableBase
     dataDirs = locations;
     ChangedVolumes changedVolumes = parseChangedVolumes();
 
+    StringBuilder errorMessageBuilder = new StringBuilder();
     try {
       if (numOldDataDirs + changedVolumes.newLocations.size() -
           changedVolumes.deactivateLocations.size() <= 0) {
@@ -517,8 +533,13 @@ public class DataNode extends ReconfigurableBase
           // Clean all failed volumes.
           for (StorageLocation location : changedVolumes.newLocations) {
             if (!succeedVolumes.contains(location)) {
+              errorMessageBuilder.append("FAILED TO ADD:");
               failedVolumes.add(location);
+            } else {
+              errorMessageBuilder.append("ADDED:");
             }
+            errorMessageBuilder.append(location);
+            errorMessageBuilder.append("\n");
           }
           storage.removeVolumes(failedVolumes);
           data.removeVolumes(failedVolumes);
@@ -532,10 +553,12 @@ public class DataNode extends ReconfigurableBase
         data.removeVolumes(changedVolumes.deactivateLocations);
         storage.removeVolumes(changedVolumes.deactivateLocations);
       }
+
+      if (errorMessageBuilder.length() > 0) {
+        throw new IOException(errorMessageBuilder.toString());
+      }
     } catch (IOException e) {
-      LOG.warn("There is IOException when refreshing volumes! "
-          + "Recover configurations: " + DFS_DATANODE_DATA_DIR_KEY
-          + " = " + oldVolumes, e);
+      LOG.warn("There is IOException when refresh volumes! ", e);
       throw e;
     }
   }
@@ -1601,6 +1624,9 @@ public class DataNode extends ReconfigurableBase
     // before the restart prep is done.
     this.shouldRun = false;
     
+    // wait reconfiguration thread, if any, to exit
+    shutdownReconfigurationTask();
+
     // wait for all data receiver threads to exit
     if (this.threadGroup != null) {
       int sleepMs = 2;
@@ -2852,6 +2878,18 @@ public class DataNode extends ReconfigurableBase
         confVersion, uptime);
   }
 
+  @Override // ClientDatanodeProtocol
+  public void startReconfiguration() throws IOException {
+    checkSuperuserPrivilege();
+    startReconfigurationTask();
+  }
+
+  @Override // ClientDatanodeProtocol
+  public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+    checkSuperuserPrivilege();
+    return getReconfigurationTaskStatus();
+  }
+
   /**
    * @param addr rpc address of the namenode
    * @return true if the datanode is connected to a NameNode at the

+ 94 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -28,14 +28,19 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeSet;
 
+import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
@@ -374,6 +379,7 @@ public class DFSAdmin extends FsShell {
     "\t[-refreshSuperUserGroupsConfiguration]\n" +
     "\t[-refreshCallQueue]\n" +
     "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
+    "\t[-reconfig <datanode|...> <host:ipc_port> <start|status>]\n" +
     "\t[-printTopology]\n" +
     "\t[-refreshNamenodes datanode_host:ipc_port]\n"+
     "\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
@@ -880,9 +886,14 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
 
+    String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status>:\n" +
+        "\tStarts reconfiguration or gets the status of an ongoing reconfiguration.\n" +
+        "\tThe second parameter specifies the node type.\n" +
+        "\tCurrently, only reloading DataNode's configuration is supported.\n";
+
     String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
       "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
-      "\ton <hostname:port>. All other args after are sent to the host.";
+      "\ton <hostname:port>. All other args after are sent to the host.\n";
 
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
@@ -970,6 +981,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshCallQueue);
     } else if ("refresh".equals(cmd)) {
       System.out.println(genericRefresh);
+    } else if ("reconfig".equals(cmd)) {
+      System.out.println(reconfig);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -1010,6 +1023,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshSuperUserGroupsConfiguration);
       System.out.println(refreshCallQueue);
       System.out.println(genericRefresh);
+      System.out.println(reconfig);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -1317,6 +1331,75 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int reconfig(String[] argv, int i) throws IOException {
+    String nodeType = argv[i];
+    String address = argv[i + 1];
+    String op = argv[i + 2];
+    if ("start".equals(op)) {
+      return startReconfiguration(nodeType, address);
+    } else if ("status".equals(op)) {
+      return getReconfigurationStatus(nodeType, address, System.out, System.err);
+    }
+    System.err.println("Unknown operation: " + op);
+    return -1;
+  }
+
+  int startReconfiguration(String nodeType, String address) throws IOException {
+    if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+      dnProxy.startReconfiguration();
+      System.out.println("Started reconfiguration task on DataNode " + address);
+    } else {
+      System.err.println("Node type " + nodeType +
+          " does not support reconfiguration.");
+    }
+    return -1;
+  }
+
+  int getReconfigurationStatus(String nodeType, String address,
+      PrintStream out, PrintStream err) throws IOException {
+    if ("datanode".equals(nodeType)) {
+      ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
+      try {
+        ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
+        out.print("Reconfiguring status for DataNode[" + address + "]: ");
+        if (!status.hasTask()) {
+          out.println("no task was found.");
+          return 0;
+        }
+        out.print("started at " + new Date(status.getStartTime()));
+        if (!status.stopped()) {
+          out.println(" and is still running.");
+          return 0;
+        }
+
+        out.println(" and finished at " +
+            new Date(status.getEndTime()).toString() + ".");
+        for (Map.Entry<PropertyChange, Optional<String>> result :
+            status.getStatus().entrySet()) {
+          if (!result.getValue().isPresent()) {
+            out.print("SUCCESS: ");
+          } else {
+            out.print("FAILED: ");
+          }
+          out.printf("Change property %s\n\tFrom: \"%s\"\n\tTo: \"%s\"\n",
+              result.getKey().prop, result.getKey().oldVal,
+              result.getKey().newVal);
+          if (result.getValue().isPresent()) {
+            out.println("\tError: " + result.getValue().get() + ".");
+          }
+        }
+      } catch (IOException e) {
+        err.println("DataNode reloading configuration: " + e + ".");
+        return -1;
+      }
+    } else {
+      err.println("Node type " + nodeType + " does not support reconfiguration.");
+      return -1;
+    }
+    return 0;
+  }
+
   public int genericRefresh(String[] argv, int i) throws IOException {
     String hostport = argv[i++];
     String identifier = argv[i++];
@@ -1429,6 +1512,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshCallQueue".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
                          + " [-refreshCallQueue]");
+    } else if ("-reconfig".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-reconfig <datanode|...> <host:port> <start|status>]");
     } else if ("-refresh".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
                          + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
@@ -1561,6 +1647,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-reconfig".equals(cmd)) {
+      if (argv.length != 4) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-deleteBlockPool".equals(cmd)) {
       if ((argv.length != 3) && (argv.length != 4)) {
         printUsage(cmd);
@@ -1657,6 +1748,8 @@ public class DFSAdmin extends FsShell {
         exitCode = shutdownDatanode(argv, i);
       } else if ("-getDatanodeInfo".equals(cmd)) {
         exitCode = getDatanodeInfo(argv, i);
+      } else if ("-reconfig".equals(cmd)) {
+        exitCode = reconfig(argv, i);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto

@@ -149,6 +149,30 @@ message GetDatanodeInfoResponseProto {
   required DatanodeLocalInfoProto localInfo = 1;
 }
 
+/** Asks DataNode to reload configuration file. */
+message StartReconfigurationRequestProto {
+}
+
+message StartReconfigurationResponseProto {
+}
+
+/** Query the running status of reconfiguration process */
+message GetReconfigurationStatusRequestProto {
+}
+
+message GetReconfigurationStatusConfigChangeProto {
+  required string name = 1;
+  required string oldValue = 2;
+  optional string newValue = 3;
+  optional string errorMessage = 4;  // It is empty if success.
+}
+
+message GetReconfigurationStatusResponseProto {
+  required int64 startTime = 1;
+  optional int64 endTime = 2;
+  repeated GetReconfigurationStatusConfigChangeProto changes = 3;
+}
+
 /**
  * Protocol used from client to the Datanode.
  * See the request and response for details of rpc call.
@@ -192,4 +216,10 @@ service ClientDatanodeProtocolService {
 
   rpc getDatanodeInfo(GetDatanodeInfoRequestProto)
       returns(GetDatanodeInfoResponseProto);
+
+  rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
+      returns(GetReconfigurationStatusResponseProto);
+
+  rpc startReconfiguration(StartReconfigurationRequestProto)
+      returns(StartReconfigurationResponseProto);
 }

+ 152 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDFSAdmin {
+  private MiniDFSCluster cluster;
+  private DFSAdmin admin;
+  private DataNode datanode;
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    admin = new DFSAdmin();
+    datanode = cluster.getDataNodes().get(0);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private List<String> getReconfigureStatus(String nodeType, String address)
+      throws IOException {
+    ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(bufOut);
+    ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
+    PrintStream err = new PrintStream(bufErr);
+    admin.getReconfigurationStatus(nodeType, address, out, err);
+    Scanner scanner = new Scanner(bufOut.toString());
+    List<String> outputs = Lists.newArrayList();
+    while (scanner.hasNextLine()) {
+      outputs.add(scanner.nextLine());
+    }
+    return outputs;
+  }
+
+  @Test(timeout = 30000)
+  public void testGetReconfigureStatus()
+      throws IOException, InterruptedException {
+    ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
+    datanode.setReconfigurationUtil(ru);
+
+    List<ReconfigurationUtil.PropertyChange> changes =
+        new ArrayList<ReconfigurationUtil.PropertyChange>();
+    File newDir = new File(cluster.getDataDirectory(), "data_new");
+    newDir.mkdirs();
+    changes.add(new ReconfigurationUtil.PropertyChange(
+        DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
+        datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+    changes.add(new ReconfigurationUtil.PropertyChange(
+        "randomKey", "new123", "old456"));
+    when(ru.parseChangedProperties(any(Configuration.class),
+        any(Configuration.class))).thenReturn(changes);
+
+    final int port = datanode.getIpcPort();
+    final String address = "localhost:" + port;
+
+    admin.startReconfiguration("datanode", address);
+
+    List<String> outputs = null;
+    int count = 100;
+    while (count > 0) {
+      outputs = getReconfigureStatus("datanode", address);
+      if (!outputs.isEmpty() && outputs.get(0).contains("finished")) {
+        break;
+      }
+      count--;
+      Thread.sleep(100);
+    }
+    assertTrue(count > 0);
+    assertThat(outputs.size(), is(8));  // 3 (SUCCESS) + 4 (FAILED)
+
+    List<StorageLocation> locations = DataNode.getStorageLocations(
+        datanode.getConf());
+    assertThat(locations.size(), is(1));
+    assertThat(locations.get(0).getFile(), is(newDir));
+    // Verify the directory is appropriately formatted.
+    assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
+
+    int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
+    int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
+    assertThat(outputs.get(successOffset),
+        containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
+    assertThat(outputs.get(successOffset + 1),
+        is(allOf(containsString("From:"), containsString("data1"),
+            containsString("data2"))));
+    assertThat(outputs.get(successOffset + 2),
+        is(not(anyOf(containsString("data1"), containsString("data2")))));
+    assertThat(outputs.get(successOffset + 2),
+        is(allOf(containsString("To"), containsString("data_new"))));
+    assertThat(outputs.get(failedOffset),
+        containsString("Change property randomKey"));
+    assertThat(outputs.get(failedOffset + 1),
+        containsString("From: \"old456\""));
+    assertThat(outputs.get(failedOffset + 2),
+        containsString("To: \"new123\""));
+  }
+}