浏览代码

HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN continues to time out. Contributed by Rushabh Shah.
(cherry picked from commit 38262779bbf38a427bad6d044e91165567f1d206)

Kihwal Lee 10 年之前
父节点
当前提交
39594301e1

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -626,6 +626,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME
     HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME
     (cmccabe)
     (cmccabe)
 
 
+    HDFS-7704. DN heartbeat to Active NN may be blocked and expire if
+    connection to Standby NN continues to time out (Rushabh Shah via kihwal)
+
 Release 2.6.1 - UNRELEASED
 Release 2.6.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -218,7 +218,9 @@ class BPOfferService {
                        String storageUuid, StorageType storageType) {
                        String storageUuid, StorageType storageType) {
     checkBlock(block);
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block, storageUuid, storageType);
+      ReportBadBlockAction rbbAction = new ReportBadBlockAction
+          (block, storageUuid, storageType);
+      actor.bpThreadEnqueue(rbbAction);
     }
     }
   }
   }
   
   
@@ -415,7 +417,9 @@ class BPOfferService {
    */
    */
   void trySendErrorReport(int errCode, String errMsg) {
   void trySendErrorReport(int errCode, String errMsg) {
     for (BPServiceActor actor : bpServices) {
     for (BPServiceActor actor : bpServices) {
-      actor.trySendErrorReport(errCode, errMsg);
+      ErrorReportAction errorReportAction = new ErrorReportAction 
+          (errCode, errMsg);
+      actor.bpThreadEnqueue(errorReportAction);
     }
     }
   }
   }
 
 

+ 30 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -120,6 +120,8 @@ class BPServiceActor implements Runnable {
   private final DNConf dnConf;
   private final DNConf dnConf;
 
 
   private DatanodeRegistration bpRegistration;
   private DatanodeRegistration bpRegistration;
+  final LinkedList<BPServiceActorAction> bpThreadQueue 
+      = new LinkedList<BPServiceActorAction>();
 
 
   BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
   BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
     this.bpos = bpos;
     this.bpos = bpos;
@@ -254,26 +256,6 @@ class BPServiceActor implements Runnable {
     resetBlockReportTime = true; // reset future BRs for randomness
     resetBlockReportTime = true; // reset future BRs for randomness
   }
   }
 
 
-  void reportBadBlocks(ExtendedBlock block,
-      String storageUuid, StorageType storageType) {
-    if (bpRegistration == null) {
-      return;
-    }
-    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-    String[] uuids = { storageUuid };
-    StorageType[] types = { storageType };
-    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
-    
-    try {
-      bpNamenode.reportBadBlocks(blocks);  
-    } catch (IOException e){
-      /* One common reason is that NameNode could be in safe mode.
-       * Should we keep on retrying in that case?
-       */
-      LOG.warn("Failed to report bad block " + block + " to namenode : "
-          + " Exception", e);
-    }
-  }
   
   
   /**
   /**
    * Report received blocks and delete hints to the Namenode for each
    * Report received blocks and delete hints to the Namenode for each
@@ -777,6 +759,7 @@ class BPServiceActor implements Runnable {
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.warn("IOException in offerService", e);
         LOG.warn("IOException in offerService", e);
       }
       }
+      processQueueMessages();
     } // while (shouldRun())
     } // while (shouldRun())
   } // offerService
   } // offerService
 
 
@@ -915,14 +898,6 @@ class BPServiceActor implements Runnable {
     return true;
     return true;
   }
   }
 
 
-  void trySendErrorReport(int errCode, String errMsg) {
-    try {
-      bpNamenode.errorReport(bpRegistration, errCode, errMsg);
-    } catch(IOException e) {
-      LOG.warn("Error reporting an error to NameNode " + nnAddr,
-          e);
-    }
-  }
 
 
   /**
   /**
    * Report a bad block from another DN in this cluster.
    * Report a bad block from another DN in this cluster.
@@ -1024,4 +999,30 @@ class BPServiceActor implements Runnable {
       }
       }
     }
     }
   }
   }
+  
+  public void bpThreadEnqueue(BPServiceActorAction action) {
+    synchronized (bpThreadQueue) {
+      if (!bpThreadQueue.contains(action)) {
+        bpThreadQueue.add(action);
+      }
+    }
+  }
+
+  private void processQueueMessages() {
+    LinkedList<BPServiceActorAction> duplicateQueue;
+    synchronized (bpThreadQueue) {
+      duplicateQueue = new LinkedList<BPServiceActorAction>(bpThreadQueue);
+      bpThreadQueue.clear();
+    }
+    while (!duplicateQueue.isEmpty()) {
+      BPServiceActorAction actionItem = duplicateQueue.remove();
+      try {
+        actionItem.reportTo(bpNamenode, bpRegistration);
+      } catch (BPServiceActorActionException baae) {
+        LOG.warn(baae.getMessage() + nnAddr , baae);
+        // Adding it back to the queue if not present
+        bpThreadEnqueue(actionItem);
+      }
+    }
+  }
 }
 }

+ 157 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.log4j.Level;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -74,6 +77,8 @@ public class TestBPOfferService {
   private static final ExtendedBlock FAKE_BLOCK =
   private static final ExtendedBlock FAKE_BLOCK =
     new ExtendedBlock(FAKE_BPID, 12345L);
     new ExtendedBlock(FAKE_BPID, 12345L);
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
   private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
+  private long firstCallTime = 0; 
+  private long secondCallTime = 0;
 
 
   static {
   static {
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
@@ -458,4 +463,156 @@ public class TestBPOfferService {
     return captor.getValue()[0].getBlocks();
     return captor.getValue()[0].getBlocks();
   }
   }
 
 
+  private void setTimeForSynchronousBPOSCalls() {
+    if (firstCallTime == 0) {
+      firstCallTime = Time.now();
+    } else {
+      secondCallTime = Time.now();
+    }
+  }
+  
+  private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
+    private final int nnIdx;
+
+    public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
+      this.nnIdx = nnIdx;
+    }
+
+    // For active namenode we will record the processTime and for standby
+    // namenode we will sleep for 5 seconds (This will simulate the situation
+    // where the standby namenode is down ) .
+    @Override
+    public Void answer(InvocationOnMock invocation) throws Throwable {
+      if (nnIdx == 0) {
+        setTimeForSynchronousBPOSCalls();
+      } else {
+        Thread.sleep(5000);
+      }
+      return null;
+    }
+   }
+
+  /**
+   * This test case test the {@link BPOfferService#reportBadBlocks} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+         .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+         .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+          .getStorageType());
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode reportBadBlock processing should be "
+          + "independent of standby namenode reportBadBlock processing ",
+          difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+
+  /**
+   * This test case test the {@link BPOfferService#trySendErrorReport} method
+   * such that if call to standby namenode times out then that should not 
+   * affect the active namenode heartbeat processing since this function 
+   * are in writeLock.
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+          .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+          .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      long difference = secondCallTime - firstCallTime;
+      assertTrue("Active namenode trySendErrorReport processing "
+          + "should be independent of standby namenode trySendErrorReport"
+          + " processing ", difference < 5000);
+    } finally {
+      bpos.stop();
+    }
+  }
+  /**
+   * This test case tests whether the {@BPServiceActor#processQueueMessages}
+   * adds back the error report back to the queue when 
+   * {BPServiceActorAction#reportTo} throws an IOException
+   * @throws Exception
+   */
+  @Test
+  public void testTrySendErrorReportWhenNNThrowsIOException() 
+      throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      Mockito.doAnswer(new Answer<Void>() {
+        // Throw an IOException when this function is first called which will
+        // in turn add that errorReport back to the bpThreadQueue and let it 
+        // process the next time. 
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          if (firstCallTime == 0) {
+            firstCallTime = Time.now();
+            throw new IOException();
+          } else {
+            secondCallTime = Time.now();
+            return null;
+          }
+        }
+      }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+          Mockito.anyInt(), Mockito.anyString());
+      String errorString = "Can't send invalid block " + FAKE_BLOCK;
+      bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+      Thread.sleep(10000);
+      assertTrue("Active namenode didn't add the report back to the queue "
+          + "when errorReport threw IOException", secondCallTime != 0);
+    } finally {
+      bpos.stop();
+    }
+  } 
 }
 }