瀏覽代碼

HDFS-9917. IBR accumulate more objects when SNN was down for sometime. (Contributed by Brahma Reddy Battula)

Vinayakumar B 9 年之前
父節點
當前提交
fc0ffc48d4

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -155,6 +155,9 @@ Release 2.7.3 - UNRELEASED
     HDFS-10178. Permanent write failures can happen if pipeline recoveries
     occur for the first packet (kihwal)
 
+    HDFS-9917. IBR accumulate more objects when SNN was down for sometime.
+    (Brahma Reddy Battula via vinayakumarb)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -885,6 +885,11 @@ class BPServiceActor implements Runnable {
       // and re-register
       register(nsInfo);
       scheduler.scheduleHeartbeat();
+      // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
+      // for sometime.
+      if (state == HAServiceState.STANDBY) {
+        pendingIncrementalBRperStorage.clear();
+      }
     }
   }
 
@@ -993,6 +998,10 @@ class BPServiceActor implements Runnable {
       }
     }
   }
+  @VisibleForTesting
+  int getPendingIBRSize() {
+    return pendingIncrementalBRperStorage.size();
+  }
 
   Scheduler getScheduler() {
     return scheduler;

+ 94 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -47,10 +48,12 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -91,7 +94,10 @@ public class TestBPOfferService {
 
   private DatanodeProtocolClientSideTranslatorPB mockNN1;
   private DatanodeProtocolClientSideTranslatorPB mockNN2;
-  private final NNHAStatusHeartbeat[] mockHaStatuses = new NNHAStatusHeartbeat[2];
+  private final NNHAStatusHeartbeat[] mockHaStatuses =
+      new NNHAStatusHeartbeat[2];
+  private final DatanodeCommand[][] datanodeCommands =
+      new DatanodeCommand[2][0];
   private final int[] heartbeatCounts = new int[2];
   private DataNode mockDn;
   private FsDatasetSpi<?> mockFSDataset;
@@ -145,6 +151,7 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
+    datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
   }
   
@@ -163,8 +170,12 @@ public class TestBPOfferService {
     @Override
     public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
       heartbeatCounts[nnIdx]++;
-      return new HeartbeatResponse(new DatanodeCommand[0],
-          mockHaStatuses[nnIdx], null);
+      HeartbeatResponse heartbeatResponse =
+          new HeartbeatResponse(datanodeCommands[nnIdx], mockHaStatuses[nnIdx],
+              null);
+      //reset the command
+      datanodeCommands[nnIdx] = new DatanodeCommand[0];
+      return heartbeatResponse;
     }
   }
 
@@ -675,4 +686,84 @@ public class TestBPOfferService {
       bpos.stop();
     }
   }
+
+  /*
+   * HDFS-9917 : Standby IBR accumulation when Standby was down.
+   */
+  @Test
+  public void testIBRClearanceForStandbyOnReRegister() throws Exception {
+    final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForInitialization(bpos);
+      // Should start with neither NN as active.
+      assertNull(bpos.getActiveNN());
+      // Have NN1 claim active at txid 1
+      mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+      bpos.triggerHeartbeatForTests();
+      // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+      assertSame(mockNN1, bpos.getActiveNN());
+      // Return nothing when active Active Namenode gets IBRs
+      Mockito.doNothing().when(mockNN1).blockReceivedAndDeleted(
+          Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
+              .any(StorageReceivedDeletedBlocks[].class));
+
+      final IOException re = new IOException(
+          "Standby NN is currently not able to process IBR");
+
+      final AtomicBoolean ibrReported = new AtomicBoolean(false);
+      // throw exception for standby when first IBR is receieved
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          ibrReported.set(true);
+          throw re;
+        }
+      }).when(mockNN2).blockReceivedAndDeleted(
+          Mockito.any(DatanodeRegistration.class), Mockito.anyString(), Mockito
+              .any(StorageReceivedDeletedBlocks[].class));
+
+      DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
+      Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
+      // Add IBRs
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0");
+      // Send heartbeat so that the BpServiceActor can send IBR to
+      // namenode
+      bpos.triggerHeartbeatForTests();
+      // Wait till first IBR is received at standbyNN. Just for confirmation.
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return ibrReported.get();
+        }
+      }, 100, 1000);
+
+      // Send register command back to Datanode to reRegister().
+      // After reRegister IBRs should be cleared.
+      datanodeCommands[1] = new DatanodeCommand[] { new RegisterCommand() };
+      assertEquals(
+          "IBR size before reRegister should be non-0", 1, getStandbyIBRSize(
+              bpos));
+      bpos.triggerHeartbeatForTests();
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return getStandbyIBRSize(bpos) == 0;
+        }
+      }, 100, 1000);
+    } finally {
+      bpos.stop();
+      bpos.join();
+    }
+  }
+
+  private int getStandbyIBRSize(BPOfferService bpos) {
+    List<BPServiceActor> bpServiceActors = bpos.getBPServiceActors();
+    for (BPServiceActor bpServiceActor : bpServiceActors) {
+      if (bpServiceActor.state == HAServiceState.STANDBY) {
+        return bpServiceActor.getPendingIBRSize();
+      }
+    }
+    return -1;
+  }
 }