Browse Source

HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN continues to time out. Contributed by Rushabh Shah.

Kihwal Lee 10 năm trước cách đây
mục cha
commit
38262779bb

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -929,6 +929,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME
     (cmccabe)
 
+    HDFS-7704. DN heartbeat to Active NN may be blocked and expire if
+    connection to Standby NN continues to time out (Rushabh Shah via kihwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -218,7 +218,9 @@ class BPOfferService {
                        String storageUuid, StorageType storageType) {
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block, storageUuid, storageType);
+      ReportBadBlockAction rbbAction = new ReportBadBlockAction
+          (block, storageUuid, storageType);
+      actor.bpThreadEnqueue(rbbAction);
     }
   }
   
@@ -414,7 +416,9 @@ class BPOfferService {
    */
   void trySendErrorReport(int errCode, String errMsg) {
     for (BPServiceActor actor : bpServices) {
-      actor.trySendErrorReport(errCode, errMsg);
+      ErrorReportAction errorReportAction = new ErrorReportAction 
+          (errCode, errMsg);
+      actor.bpThreadEnqueue(errorReportAction);
     }
   }
 

+ 31 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -120,6 +120,8 @@ class BPServiceActor implements Runnable {
   private final DNConf dnConf;
 
   private DatanodeRegistration bpRegistration;
+  final LinkedList<BPServiceActorAction> bpThreadQueue 
+      = new LinkedList<BPServiceActorAction>();
 
   BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
     this.bpos = bpos;
@@ -254,26 +256,6 @@ class BPServiceActor implements Runnable {
     resetBlockReportTime = true; // reset future BRs for randomness
   }
 
-  void reportBadBlocks(ExtendedBlock block,
-      String storageUuid, StorageType storageType) {
-    if (bpRegistration == null) {
-      return;
-    }
-    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    String[] uuids = { storageUuid };
-    StorageType[] types = { storageType };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
-    
-    try {
-      bpNamenode.reportBadBlocks(blocks);  
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : "
-          + " Exception", e);
-    }
-  }
   
   /**
    * Report received blocks and delete hints to the Namenode for each
@@ -771,6 +753,7 @@ class BPServiceActor implements Runnable {
       } catch (IOException e) {
         LOG.warn("IOException in offerService", e);
       }
+      processQueueMessages();
     } // while (shouldRun())
   } // offerService
 
@@ -909,14 +892,6 @@ class BPServiceActor implements Runnable {
     return true;
   }
 
-  void trySendErrorReport(int errCode, String errMsg) {
-    try {
-      bpNamenode.errorReport(bpRegistration, errCode, errMsg);
-    } catch(IOException e) {
-      LOG.warn("Error reporting an error to NameNode " + nnAddr,
-          e);
-    }
-  }
 
   /**
    * Report a bad block from another DN in this cluster.
@@ -1018,4 +993,30 @@ class BPServiceActor implements Runnable {
       }
     }
   }
-}
+  
+  public void bpThreadEnqueue(BPServiceActorAction action) {
+    synchronized (bpThreadQueue) {
+      if (!bpThreadQueue.contains(action)) {
+        bpThreadQueue.add(action);
+      }
+    }
+  }
+
+  private void processQueueMessages() {
+    LinkedList<BPServiceActorAction> duplicateQueue;
+    synchronized (bpThreadQueue) {
+      duplicateQueue = new LinkedList<BPServiceActorAction>(bpThreadQueue);
+      bpThreadQueue.clear();
+    }
+    while (!duplicateQueue.isEmpty()) {
+      BPServiceActorAction actionItem = duplicateQueue.remove();
+      try {
+        actionItem.reportTo(bpNamenode, bpRegistration);
+      } catch (BPServiceActorActionException baae) {
+        LOG.warn(baae.getMessage() + nnAddr , baae);
+        // Adding it back to the queue if not present
+        bpThreadEnqueue(actionItem);
+      }
+    }
+  }
+}

+ 157 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,6 +77,8 @@ public class TestBPOfferService {
   private static final ExtendedBlock FAKE_BLOCK =
     new ExtendedBlock(FAKE_BPID, 12345L);
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
+  private long firstCallTime = 0; 
+  private long secondCallTime = 0;
 
   static {
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
@@ -458,4 +463,156 @@ public class TestBPOfferService {
     return captor.getValue()[0].getBlocks();
   }
 
+  private void setTimeForSynchronousBPOSCalls() {
+    if (firstCallTime == 0) {
+      firstCallTime = Time.now();
+    } else {
+      secondCallTime = Time.now();
+    }
+  }
+  
+  private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
+    private final int nnIdx;
+
+    public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    // For active namenode we will record the processTime and for standby
+    // namenode we will sleep for 5 seconds (This will simulate the situation
+    // where the standby namenode is down ) .
+    @Override
+    public Void answer(InvocationOnMock invocation) throws Throwable {
+      if (nnIdx == 0) {
+        setTimeForSynchronousBPOSCalls();
+      } else {
+        Thread.sleep(5000);
+      }
+      return null;
+    }
+   }
+
+  /**
+   * This test case test the {@link BPOfferService#reportBadBlocks} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+         .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+         .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode reportBadBlock processing should be "
+          + "independent of standby namenode reportBadBlock processing ",
+          difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+
+  /**
+   * This test case test the {@link BPOfferService#trySendErrorReport} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+          .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+          .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode trySendErrorReport processing "
+          + "should be independent of standby namenode trySendErrorReport"
+          + " processing ", difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+  /**
+   * This test case tests whether the {@BPServiceActor#processQueueMessages}
+   * adds back the error report back to the queue when 
+   * {BPServiceActorAction#reportTo} throws an IOException
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenNNThrowsIOException() 
+      throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new Answer<Void>() {
+        // Throw an IOException when this function is first called which will
+        // in turn add that errorReport back to the bpThreadQueue and let it 
+        // process the next time. 
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          if (firstCallTime == 0) {
+            firstCallTime = Time.now();
+            throw new IOException();
+          } else {
+            secondCallTime = Time.now();
+            return null;
+          }
+        }
+      }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      assertTrue("Active namenode didn't add the report back to the queue "
+          + "when errorReport threw IOException", secondCallTime != 0);
+    } finally {
+      bpos.stop();
+    }
+  } 
 }