浏览代码

HDFS-7990. IBR delete ack should not be delayed. Contributed by Daryn Sharp. Backport HDFS-11838 by Vinitha Gankidi.

(cherry picked from commit 60882ab26d49f05cbf0686944af6559f86b3417d)
Konstantin V Shvachko 8 年之前
父节点
当前提交
2c3f6aedf1

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -88,6 +88,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-11785. Backport HDFS-9902 to branch-2.7: Support different values of
     HDFS-11785. Backport HDFS-9902 to branch-2.7: Support different values of
     dfs.datanode.du.reserved per storage type. (Brahma Reddy Battula)
     dfs.datanode.du.reserved per storage type. (Brahma Reddy Battula)
 
 
+    HDFS-7990. IBR delete ack should not be delayed. (Daryn Sharp.
+    Backport HDFS-11838 by Vinitha Gankidi)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.
     HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.

+ 3 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -82,8 +82,6 @@ class BPServiceActor implements Runnable {
   HAServiceState state;
   HAServiceState state;
 
 
   final BPOfferService bpos;
   final BPOfferService bpos;
-  
-  volatile long lastDeletedReport = 0;
 
 
   volatile long lastCacheReport = 0;
   volatile long lastCacheReport = 0;
   private final Scheduler scheduler;
   private final Scheduler scheduler;
@@ -385,10 +383,10 @@ class BPServiceActor implements Runnable {
   @VisibleForTesting
   @VisibleForTesting
   void triggerDeletionReportForTests() {
   void triggerDeletionReportForTests() {
     synchronized (pendingIncrementalBRperStorage) {
     synchronized (pendingIncrementalBRperStorage) {
-      lastDeletedReport = 0;
+      sendImmediateIBR = true;
       pendingIncrementalBRperStorage.notifyAll();
       pendingIncrementalBRperStorage.notifyAll();
 
 
-      while (lastDeletedReport == 0) {
+      while (sendImmediateIBR) {
         try {
         try {
           pendingIncrementalBRperStorage.wait(100);
           pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
@@ -421,7 +419,6 @@ class BPServiceActor implements Runnable {
    */
    */
   List<DatanodeCommand> blockReport() throws IOException {
   List<DatanodeCommand> blockReport() throws IOException {
     // send block report if timer has expired.
     // send block report if timer has expired.
-    final long startTime = scheduler.monotonicNow(); 
     if (!scheduler.isBlockReportDue()) {
     if (!scheduler.isBlockReportDue()) {
       return null;
       return null;
     }
     }
@@ -433,7 +430,6 @@ class BPServiceActor implements Runnable {
     // or we will report an RBW replica after the BlockReport already reports
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
     // a FINALIZED one.
     reportReceivedDeletedBlocks();
     reportReceivedDeletedBlocks();
-    lastDeletedReport = startTime;
 
 
     long brCreateStartTime = monotonicNow();
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -624,7 +620,6 @@ class BPServiceActor implements Runnable {
    */
    */
   private void offerService() throws Exception {
   private void offerService() throws Exception {
     LOG.info("For namenode " + nnAddr + " using"
     LOG.info("For namenode " + nnAddr + " using"
-        + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
         + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
         + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
         + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
         + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
         + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
         + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
@@ -679,10 +674,8 @@ class BPServiceActor implements Runnable {
             }
             }
           }
           }
         }
         }
-        if (sendImmediateIBR ||
-            (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+        if (sendImmediateIBR || sendHeartbeat) {
           reportReceivedDeletedBlocks();
           reportReceivedDeletedBlocks();
-          lastDeletedReport = startTime;
         }
         }
 
 
         List<DatanodeCommand> cmds = blockReport();
         List<DatanodeCommand> cmds = blockReport();

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -84,7 +84,6 @@ public class DNConf {
   final long heartBeatInterval;
   final long heartBeatInterval;
   final long blockReportInterval;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
   final long blockReportSplitThreshold;
-  final long deleteReportInterval;
   final long initialBlockReportDelay;
   final long initialBlockReportDelay;
   final long cacheReportInterval;
   final long cacheReportInterval;
   final long dfsclientSlowIoWarningThresholdMs;
   final long dfsclientSlowIoWarningThresholdMs;
@@ -168,7 +167,6 @@ public class DNConf {
     heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
     heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
     
     
-    this.deleteReportInterval = 100 * heartBeatInterval;
     // do we need to sync block file contents to disk when blockfile is closed?
     // do we need to sync block file contents to disk when blockfile is closed?
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
         DFS_DATANODE_SYNCONCLOSE_DEFAULT);
         DFS_DATANODE_SYNCONCLOSE_DEFAULT);

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -84,7 +84,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     @Override
     @Override
     public SimulatedFSDataset newInstance(DataNode datanode,
     public SimulatedFSDataset newInstance(DataNode datanode,
         DataStorage storage, Configuration conf) throws IOException {
         DataStorage storage, Configuration conf) throws IOException {
-      return new SimulatedFSDataset(storage, conf);
+      return new SimulatedFSDataset(datanode, storage, conf);
     }
     }
 
 
     @Override
     @Override
@@ -519,8 +519,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   private final SimulatedStorage storage;
   private final SimulatedStorage storage;
   private final SimulatedVolume volume;
   private final SimulatedVolume volume;
   private final String datanodeUuid;
   private final String datanodeUuid;
+  private final DataNode datanode;
   
   
+
   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
+    this(null, storage, conf);
+  }
+
+  public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
+    this.datanode = datanode;
     if (storage != null) {
     if (storage != null) {
       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
         DataStorage.createStorageID(storage.getStorageDir(i), false);
         DataStorage.createStorageID(storage.getStorageDir(i), false);
@@ -747,6 +754,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
       }
       storage.free(bpid, binfo.getNumBytes());
       storage.free(bpid, binfo.getNumBytes());
       map.remove(b);
       map.remove(b);
+      if (datanode != null) {
+        datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
+            binfo.getStorageUuid());
+      }
     }
     }
     if (error) {
     if (error) {
       throw new IOException("Invalidate: Missing blocks.");
       throw new IOException("Invalidate: Missing blocks.");

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java

@@ -159,8 +159,8 @@ public class TestIncrementalBlockReports {
           anyString(),
           anyString(),
           any(StorageReceivedDeletedBlocks[].class));
           any(StorageReceivedDeletedBlocks[].class));
 
 
-      // Trigger a block report, this also triggers an IBR.
-      DataNodeTestUtils.triggerBlockReport(singletonDn);
+      // Trigger a heartbeat, this also triggers an IBR.
+      DataNodeTestUtils.triggerHeartbeat(singletonDn);
       Thread.sleep(2000);
       Thread.sleep(2000);
 
 
       // Ensure that the deleted block is reported.
       // Ensure that the deleted block is reported.