瀏覽代碼

HDFS-12523. Thread pools in ErasureCodingWorker do not shutdown. (Huafeng Wang via Lei)

(cherry picked from commit 8680131e4b38f0b876173cbee838fcdb91cb52b6)
Lei Xu 7 年之前
父節點
當前提交
98612bb2d0

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1115,7 +1115,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * Shutdown disk balancer.
    */
-  private  void shutdownDiskBalancer() {
+  private void shutdownDiskBalancer() {
     if (this.diskBalancer != null) {
       this.diskBalancer.shutdown();
       this.diskBalancer = null;
@@ -2077,6 +2077,10 @@ public class DataNode extends ReconfigurableBase
       ipcServer.stop();
     }
 
+    if (ecWorker != null) {
+      ecWorker.shutDown();
+    }
+
     if(blockPoolManager != null) {
       try {
         this.blockPoolManager.shutDownAll(bposArray);

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -149,7 +151,12 @@ public final class ErasureCodingWorker {
     return conf;
   }
 
-  ThreadPoolExecutor getStripedReadPool() {
-    return stripedReadPool;
+  CompletionService<Void> createReadService() {
+    return new ExecutorCompletionService<>(stripedReadPool);
+  }
+
+  public void shutDown() {
+    stripedReconstructionPool.shutdown();
+    stripedReadPool.shutdown();
   }
 }

+ 3 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

@@ -39,8 +39,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -110,7 +108,7 @@ abstract class StripedReconstructor {
   // position in striped internal block
   private long positionInBlock;
   private StripedReader stripedReader;
-  private ThreadPoolExecutor stripedReadPool;
+  private ErasureCodingWorker erasureCodingWorker;
   private final CachingStrategy cachingStrategy;
   private long maxTargetLength = 0L;
   private final BitSet liveBitSet;
@@ -122,7 +120,7 @@ abstract class StripedReconstructor {
 
   StripedReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo) {
-    this.stripedReadPool = worker.getStripedReadPool();
+    this.erasureCodingWorker = worker;
     this.datanode = worker.getDatanode();
     this.conf = worker.getConf();
     this.ecPolicy = stripedReconInfo.getEcPolicy();
@@ -225,7 +223,7 @@ abstract class StripedReconstructor {
   }
 
   CompletionService<Void> createReadService() {
-    return new ExecutorCompletionService<>(stripedReadPool);
+    return erasureCodingWorker.createReadService();
   }
 
   ExtendedBlock getBlockGroup() {