1
0
Quellcode durchsuchen

HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.

Inigo Goiri vor 5 Jahren
Ursprung
Commit
b86895485d

+ 107 - 36
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -32,7 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -117,6 +119,7 @@ class BPServiceActor implements Runnable {
   private DatanodeRegistration bpRegistration;
   final LinkedList<BPServiceActorAction> bpThreadQueue 
       = new LinkedList<BPServiceActorAction>();
+  private final CommandProcessingThread commandProcessingThread;
 
   BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
       InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
@@ -144,6 +147,8 @@ class BPServiceActor implements Runnable {
     if (nnId != null) {
       this.nnId = nnId;
     }
+    commandProcessingThread = new CommandProcessingThread(this);
+    commandProcessingThread.start();
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -696,8 +701,7 @@ class BPServiceActor implements Runnable {
             }
 
             long startProcessCommands = monotonicNow();
-            if (!processCommand(resp.getCommands()))
-              continue;
+            commandProcessingThread.enqueue(resp.getCommands());
             long endProcessCommands = monotonicNow();
             if (endProcessCommands - startProcessCommands > 2000) {
               LOG.info("Took " + (endProcessCommands - startProcessCommands)
@@ -722,11 +726,11 @@ class BPServiceActor implements Runnable {
           cmds = blockReport(fullBlockReportLeaseId);
           fullBlockReportLeaseId = 0;
         }
-        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
+        commandProcessingThread.enqueue(cmds);
 
         if (!dn.areCacheReportsDisabledForTests()) {
           DatanodeCommand cmd = cacheReport();
-          processCommand(new DatanodeCommand[]{ cmd });
+          commandProcessingThread.enqueue(cmd);
         }
 
         if (sendHeartbeat) {
@@ -900,37 +904,6 @@ class BPServiceActor implements Runnable {
     return shouldServiceRun && dn.shouldRun();
   }
 
-  /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
-   */
-  boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (bpos.processCommandFromActor(cmd, this) == false) {
-            return false;
-          }
-        } catch (RemoteException re) {
-          String reClass = re.getClassName();
-          if (UnregisteredNodeException.class.getName().equals(reClass) ||
-              DisallowedDatanodeException.class.getName().equals(reClass) ||
-              IncorrectVersionException.class.getName().equals(reClass)) {
-            LOG.warn(this + " is shutting down", re);
-            shouldServiceRun = false;
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
-      }
-    }
-    return true;
-  }
-
-
   /**
    * Report a bad block from another DN in this cluster.
    */
@@ -1304,4 +1277,102 @@ class BPServiceActor implements Runnable {
       return Time.monotonicNow();
     }
   }
-}
+
+  /**
+   * CommandProcessingThread that process commands asynchronously.
+   */
+  class CommandProcessingThread extends Thread {
+    private final BPServiceActor actor;
+    private final BlockingQueue<Runnable> queue;
+
+    CommandProcessingThread(BPServiceActor actor) {
+      super("Command processor");
+      this.actor = actor;
+      this.queue = new LinkedBlockingQueue<>();
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        processQueue();
+      } catch (Throwable t) {
+        LOG.error("{} encountered fatal exception and exit.", getName(), t);
+      }
+    }
+
+    /**
+     * Process commands in queue one by one, and wait until queue not empty.
+     */
+    private void processQueue() {
+      while (shouldRun()) {
+        try {
+          Runnable action = queue.take();
+          action.run();
+          dn.getMetrics().incrActorCmdQueueLength(-1);
+          dn.getMetrics().incrNumProcessedCommands();
+        } catch (InterruptedException e) {
+          LOG.error("{} encountered interrupt and exit.", getName());
+          // ignore unless thread was specifically interrupted.
+          if (Thread.interrupted()) {
+            break;
+          }
+        }
+      }
+      dn.getMetrics().incrActorCmdQueueLength(-1 * queue.size());
+      queue.clear();
+    }
+
+    /**
+     * Process an array of datanode commands.
+     *
+     * @param cmds an array of datanode commands
+     * @return true if further processing may be required or false otherwise.
+     */
+    private boolean processCommand(DatanodeCommand[] cmds) {
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          try {
+            if (!bpos.processCommandFromActor(cmd, actor)) {
+              return false;
+            }
+          } catch (RemoteException re) {
+            String reClass = re.getClassName();
+            if (UnregisteredNodeException.class.getName().equals(reClass) ||
+                DisallowedDatanodeException.class.getName().equals(reClass) ||
+                IncorrectVersionException.class.getName().equals(reClass)) {
+              LOG.warn("{} is shutting down", this, re);
+              shouldServiceRun = false;
+              return false;
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Error processing datanode Command", ioe);
+          }
+        }
+      }
+      return true;
+    }
+
+    void enqueue(DatanodeCommand cmd) throws InterruptedException {
+      if (cmd == null) {
+        return;
+      }
+      queue.put(() -> processCommand(new DatanodeCommand[]{cmd}));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+
+    void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
+      if (cmds == null) {
+        return;
+      }
+      queue.put(() -> processCommand(
+          cmds.toArray(new DatanodeCommand[cmds.size()])));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+
+    void enqueue(DatanodeCommand[] cmds) throws InterruptedException {
+      queue.put(() -> processCommand(cmds));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+  }
+}

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -160,6 +160,10 @@ public class DataNodeMetrics {
   private MutableCounterLong ecReconstructionDecodingTimeMillis;
   @Metric("Milliseconds spent on write by erasure coding worker")
   private MutableCounterLong ecReconstructionWriteTimeMillis;
+  @Metric("Sum of all BPServiceActors command queue length")
+  private MutableCounterLong sumOfActorCommandQueueLength;
+  @Metric("Num of processed commands of all BPServiceActors")
+  private MutableCounterLong numProcessedCommands;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   @Metric("Milliseconds spent on calling NN rpc")
@@ -552,4 +556,12 @@ public class DataNodeMetrics {
             .value(), totalWriteTime.value(), totalReadTime.value(),
         blocksWritten.value(), blocksRead.value(), timeSinceLastReport);
   }
+
+  public void incrActorCmdQueueLength(int delta) {
+    sumOfActorCommandQueueLength.incr(delta);
+  }
+
+  public void incrNumProcessedCommands() {
+    numProcessedCommands.incr();
+  }
 }

+ 35 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -18,7 +18,15 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
@@ -36,6 +44,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1045,4 +1054,29 @@ public class TestBPOfferService {
       bpos.stop();
     }
   }
-}
+
+  @Test(timeout = 15000)
+  public void testCommandProcessingThread() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(datanodes.size(), 1);
+      DataNode datanode = datanodes.get(0);
+
+      // Try to write file and trigger NN send back command to DataNode.
+      FileSystem fs = cluster.getFileSystem();
+      Path file = new Path("/test");
+      DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L);
+
+      MetricsRecordBuilder mrb = getMetrics(datanode.getMetrics().name());
+      assertTrue("Process command nums is not expected.",
+          getLongCounter("NumProcessedCommands", mrb) > 0);
+      assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}