Explorar o código

HDFS-16400. Reconfig DataXceiver parameters for datanode (#3843)

Reviewed-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
litao %!s(int64=3) %!d(string=hai) anos
pai
achega
f02374df92

+ 23 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
@@ -303,7 +305,8 @@ public class DataNode extends ReconfigurableBase
           Arrays.asList(
               DFS_DATANODE_DATA_DIR_KEY,
               DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
-              DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
+              DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+              DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
 
   public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
 
@@ -647,6 +650,8 @@ public class DataNode extends ReconfigurableBase
       }
       break;
     }
+    case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
+      return reconfDataXceiverParameters(property, newVal);
     default:
       break;
     }
@@ -654,6 +659,23 @@ public class DataNode extends ReconfigurableBase
         property, newVal, getConf().get(property));
   }
 
+  private String reconfDataXceiverParameters(String property, String newVal)
+      throws ReconfigurationException {
+    String result;
+    try {
+      LOG.info("Reconfiguring {} to {}", property, newVal);
+      Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
+      int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
+          Integer.parseInt(newVal));
+      result = Integer.toString(threads);
+      getXferServer().setMaxXceiverCount(threads);
+      LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
+      return result;
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, getConf().get(property), e);
+    }
+  }
+
   /**
    * Get a list of the keys of the re-configurable properties in configuration.
    */

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -68,8 +68,7 @@ class DataXceiverServer implements Runnable {
    * Enforcing the limit is required in order to avoid data-node
    * running out of memory.
    */
-  int maxXceiverCount =
-    DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
+  volatile int maxXceiverCount;
 
   /**
    * A manager to make sure that cluster balancing does not take too much
@@ -514,4 +513,15 @@ class DataXceiverServer implements Runnable {
   void setMaxReconfigureWaitTime(int max) {
     this.maxReconfigureWaitTime = max;
   }
+
+  public void setMaxXceiverCount(int xceiverCount) {
+    Preconditions.checkArgument(xceiverCount > 0,
+        "dfs.datanode.max.transfer.threads should be larger than 0");
+    maxXceiverCount = xceiverCount;
+  }
+
+  @VisibleForTesting
+  public int getMaxXceiverCount() {
+    return maxXceiverCount;
+  }
 }

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

@@ -22,7 +22,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -365,4 +368,42 @@ public class TestDataNodeReconfiguration {
           .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
     }
   }
+
+  @Test
+  public void testDataXceiverReconfiguration()
+      throws ReconfigurationException {
+    for (int i = 0; i < NUM_DATA_NODE; i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+
+      // Try invalid values.
+      try {
+        dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "text");
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting NumberFormatException",
+            expected.getCause() instanceof NumberFormatException);
+      }
+      try {
+        dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(-1));
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting IllegalArgumentException",
+            expected.getCause() instanceof IllegalArgumentException);
+      }
+
+      // Change properties and verify change.
+      dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
+      assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
+          123, dn.getXferServer().getMaxXceiverCount());
+
+      // Revert to default.
+      dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
+      assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
+          DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount());
+
+      assertNull(String.format("expect %s is not configured",
+          DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
+          dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
+    }
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -338,7 +338,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("datanode", address, outs, errs);
-    assertEquals(4, outs.size());
+    assertEquals(5, outs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }