Ver código fonte

HDFS-14553. Make queue size of BlockReportProcessingThread configurable. Contributed by He Xiaoqiao.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit bd46bdf9f9244f3f3474d316255ac98717ed5719)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

(cherry picked from commit d7560c866ecbfcd533a43890522ccb8b732a5382)
He Xiaoqiao 6 anos atrás
pai
commit
03ec4c4c7c

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -259,6 +259,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       = "dfs.namenode.storageinfo.defragment.ratio";
   public static final double
       DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
+  public static final String DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY
+      = "dfs.namenode.blockreport.queue.size";
+  public static final int    DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
+      = 1024;
   public static final String  DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
   /* Phrased as below to avoid javac inlining as a constant, to match the behavior when
      this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -315,8 +315,7 @@ public class BlockManager implements BlockStatsMXBean {
       new Daemon(new StorageInfoDefragmenter());
   
   /** Block report thread for handling async reports. */
-  private final BlockReportProcessingThread blockReportThread =
-      new BlockReportProcessingThread();
+  private final BlockReportProcessingThread blockReportThread;
 
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@@ -562,6 +561,11 @@ public class BlockManager implements BlockStatsMXBean {
 
     bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
 
+    int queueSize = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
+    blockReportThread = new BlockReportProcessingThread(queueSize);
+
     LOG.info("defaultReplication         = {}", defaultReplication);
     LOG.info("maxReplication             = {}", maxReplication);
     LOG.info("minReplication             = {}", minReplication);
@@ -4917,11 +4921,11 @@ public class BlockManager implements BlockStatsMXBean {
     private static final long MAX_LOCK_HOLD_MS = 4;
     private long lastFull = 0;
 
-    private final BlockingQueue<Runnable> queue =
-        new ArrayBlockingQueue<Runnable>(1024);
+    private final BlockingQueue<Runnable> queue;
 
-    BlockReportProcessingThread() {
+    BlockReportProcessingThread(int size) {
       super("Block report processor");
+      queue = new ArrayBlockingQueue<>(size);
       setDaemon(true);
     }
 

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -5012,4 +5012,12 @@
       ensure that other waiters on the lock can get in.
     </description>
   </property>
+
+  <property>
+    <name>dfs.namenode.blockreport.queue.size</name>
+    <value>1024</value>
+    <description>
+      The queue size of BlockReportProcessingThread in BlockManager.
+    </description>
+  </property>
 </configuration>