Browse Source

HDFS-9710. DN can be configured to send block receipt IBRs in batches. Contributed by Tsz-Wo Nicholas Sze.
Backport HDFS-11837 by Vinitha Reddy Gankidi

(cherry picked from commit 5b95971f8a6dee09d1143c6cf121afa22fa6c16e)

Konstantin V Shvachko 8 years ago
parent
commit
ac1b8ff78f
17 changed files with 123 additions and 73 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  4. 4 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  5. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  6. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  7. 12 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  8. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  9. 44 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
  10. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  11. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  12. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java
  13. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  14. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
  15. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
  16. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  17. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -111,6 +111,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-9412. getBlocks occupies FSLock and takes too long to complete.
     HDFS-9412. getBlocks occupies FSLock and takes too long to complete.
     Contributed by He Tianyi. Backport HDFS-11855 by Vinitha Reddy Gankidi.
     Contributed by He Tianyi. Backport HDFS-11855 by Vinitha Reddy Gankidi.
 
 
+    HDFS-9710. DN can be configured to send block receipt IBRs in batches.
+    (Tsz-Wo Nicholas Sze. Backport HDFS-11837 by Vinitha Reddy Gankidi)
+
   BUG FIXES
   BUG FIXES
    
    
     HDFS-8307. Spurious DNS Queries from hdfs shell. (Andres Perez via aengineer)
     HDFS-8307. Spurious DNS Queries from hdfs shell. (Andres Perez via aengineer)

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -554,6 +554,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
 
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";
   public static final int     DFS_DF_INTERVAL_DEFAULT = 60000;
   public static final int     DFS_DF_INTERVAL_DEFAULT = 60000;
+  public static final String  DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY
+      = "dfs.blockreport.incremental.intervalMsec";
+  public static final long    DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT
+      = 0;
   public static final String  DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
   public static final String  DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -230,29 +230,32 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    * client? For now we don't.
    */
    */
-  void notifyNamenodeReceivedBlock(
-      ExtendedBlock block, String delHint, String storageUuid) {
+  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
+      String storageUuid, boolean isOnTransientStorage) {
     notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
     notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
-        storageUuid);
+        storageUuid, isOnTransientStorage);
   }
   }
 
 
   void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
   void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
-    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid);
+    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
+        false);
   }
   }
 
 
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
-    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid);
+    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
+        false);
   }
   }
 
 
   private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
   private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
-      String delHint, String storageUuid) {
+      String delHint, String storageUuid, boolean isOnTransientStorage) {
     checkBlock(block);
     checkBlock(block);
     final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
     final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
         block.getLocalBlock(), status, delHint);
         block.getLocalBlock(), status, delHint);
     final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
     final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
 
 
     for (BPServiceActor actor : bpServices) {
     for (BPServiceActor actor : bpServices) {
-      actor.getIbrManager().notifyNamenodeBlock(info, storage);
+      actor.getIbrManager().notifyNamenodeBlock(info, storage,
+          isOnTransientStorage);
     }
     }
   }
   }
 
 

+ 4 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -96,8 +96,7 @@ class BPServiceActor implements Runnable {
   private final DNConf dnConf;
   private final DNConf dnConf;
   private long prevBlockReportId = 0;
   private long prevBlockReportId = 0;
 
 
-  private final IncrementalBlockReportManager ibrManager
-      = new IncrementalBlockReportManager();
+  private final IncrementalBlockReportManager ibrManager;
 
 
   private DatanodeRegistration bpRegistration;
   private DatanodeRegistration bpRegistration;
   final LinkedList<BPServiceActorAction> bpThreadQueue 
   final LinkedList<BPServiceActorAction> bpThreadQueue 
@@ -108,6 +107,7 @@ class BPServiceActor implements Runnable {
     this.dn = bpos.getDataNode();
     this.dn = bpos.getDataNode();
     this.nnAddr = nnAddr;
     this.nnAddr = nnAddr;
     this.dnConf = dn.getDnConf();
     this.dnConf = dn.getDnConf();
+    this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
     scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
     scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
   }
   }
 
 
@@ -537,20 +537,9 @@ class BPServiceActor implements Runnable {
         DatanodeCommand cmd = cacheReport();
         DatanodeCommand cmd = cacheReport();
         processCommand(new DatanodeCommand[]{ cmd });
         processCommand(new DatanodeCommand[]{ cmd });
 
 
-        //
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // or work arrives, and then iterate again.
         // or work arrives, and then iterate again.
-        //
-        long waitTime = scheduler.getHeartbeatWaitTime();
-        synchronized(ibrManager) {
-          if (waitTime > 0 && !ibrManager.sendImmediately()) {
-            try {
-              ibrManager.wait(waitTime);
-            } catch (InterruptedException ie) {
-              LOG.warn("BPOfferService for " + this + " interrupted");
-            }
-          }
-        } // synchronized
+        ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
       } catch(RemoteException re) {
       } catch(RemoteException re) {
         String reClass = re.getClassName();
         String reClass = re.getClassName();
         if (UnregisteredNodeException.class.getName().equals(reClass) ||
         if (UnregisteredNodeException.class.getName().equals(reClass) ||
@@ -742,7 +731,7 @@ class BPServiceActor implements Runnable {
   void triggerBlockReport(BlockReportOptions options) {
   void triggerBlockReport(BlockReportOptions options) {
     if (options.isIncremental()) {
     if (options.isIncremental()) {
       LOG.info(bpos.toString() + ": scheduling an incremental block report.");
       LOG.info(bpos.toString() + ": scheduling an incremental block report.");
-      ibrManager.triggerIBR();
+      ibrManager.triggerIBR(true);
     } else {
     } else {
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       synchronized(ibrManager) {
       synchronized(ibrManager) {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -286,8 +286,8 @@ class BlockReceiver implements Closeable {
   /** Return the datanode object. */
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
   DataNode getDataNode() {return datanode;}
 
 
-  String getStorageUuid() {
-    return replicaInfo.getStorageUuid();
+  Replica getReplica() {
+    return replicaInfo;
   }
   }
 
 
   /**
   /**
@@ -1425,8 +1425,8 @@ class BlockReceiver implements Closeable {
         datanode.data.setPinning(block);
         datanode.data.setPinning(block);
       }
       }
       
       
-      datanode.closeBlock(
-          block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
+      datanode.closeBlock(block, null, replicaInfo.getStorageUuid(),
+          replicaInfo.isOnTransientStorage());
       if (ClientTraceLog.isInfoEnabled() && isClient) {
       if (ClientTraceLog.isInfoEnabled() && isClient) {
         long offset = 0;
         long offset = 0;
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -85,6 +85,7 @@ public class DNConf {
   final long blockReportInterval;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
   final long blockReportSplitThreshold;
   final long initialBlockReportDelay;
   final long initialBlockReportDelay;
+  final long ibrInterval;
   final long cacheReportInterval;
   final long cacheReportInterval;
   final long dfsclientSlowIoWarningThresholdMs;
   final long dfsclientSlowIoWarningThresholdMs;
   final long datanodeSlowIoWarningThresholdMs;
   final long datanodeSlowIoWarningThresholdMs;
@@ -142,6 +143,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.ibrInterval = conf.getLong(
+        DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
+        DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);
     this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
     this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
                                             DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
                                             DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,

+ 12 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -289,7 +289,6 @@ public class DataNode extends ReconfigurableBase
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   private String clusterId = null;
   private String clusterId = null;
 
 
-  public final static String EMPTY_DEL_HINT = "";
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   Daemon dataXceiverServer = null;
   DataXceiverServer xserver = null;
   DataXceiverServer xserver = null;
@@ -960,11 +959,12 @@ public class DataNode extends ReconfigurableBase
   }
   }
   
   
   // calls specific to BP
   // calls specific to BP
-  public void notifyNamenodeReceivedBlock(
-      ExtendedBlock block, String delHint, String storageUuid) {
+  public void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
+      String storageUuid, boolean isOnTransientStorage) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid,
+          isOnTransientStorage);
     } else {
     } else {
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
           + block.getBlockPoolId());
@@ -2227,15 +2227,11 @@ public class DataNode extends ReconfigurableBase
    * @param delHint hint on which excess block to delete
    * @param delHint hint on which excess block to delete
    * @param storageUuid UUID of the storage where block is stored
    * @param storageUuid UUID of the storage where block is stored
    */
    */
-  void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
+  void closeBlock(ExtendedBlock block, String delHint, String storageUuid,
+      boolean isTransientStorage) {
     metrics.incrBlocksWritten();
     metrics.incrBlocksWritten();
-    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
-    if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
-    } else {
-      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
-          + block.getBlockPoolId());
-    }
+    notifyNamenodeReceivedBlock(block, delHint, storageUuid,
+        isTransientStorage);
   }
   }
 
 
   /** Start a single datanode daemon and wait for it to finish.
   /** Start a single datanode daemon and wait for it to finish.
@@ -2602,7 +2598,7 @@ public class DataNode extends ReconfigurableBase
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
       final long recoveryId, final long newBlockId, final long newLength)
       final long recoveryId, final long newBlockId, final long newLength)
       throws IOException {
       throws IOException {
-    final String storageID = data.updateReplicaUnderRecovery(oldBlock,
+    final Replica r = data.updateReplicaUnderRecovery(oldBlock,
         recoveryId, newBlockId, newLength);
         recoveryId, newBlockId, newLength);
     // Notify the namenode of the updated block info. This is important
     // Notify the namenode of the updated block info. This is important
     // for HA, since otherwise the standby node may lose track of the
     // for HA, since otherwise the standby node may lose track of the
@@ -2611,7 +2607,9 @@ public class DataNode extends ReconfigurableBase
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setBlockId(newBlockId);
     newBlock.setBlockId(newBlockId);
     newBlock.setNumBytes(newLength);
     newBlock.setNumBytes(newLength);
-    notifyNamenodeReceivedBlock(newBlock, "", storageID);
+    final String storageID = r.getStorageUuid();
+    notifyNamenodeReceivedBlock(newBlock, null, storageID,
+        r.isOnTransientStorage());
     return storageID;
     return storageID;
   }
   }
 
 

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -667,7 +667,9 @@ class DataXceiver extends Receiver implements Runnable {
     String firstBadLink = "";           // first datanode that failed in connection setup
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
     Status mirrorInStatus = SUCCESS;
     final String storageUuid;
     final String storageUuid;
+    final boolean isOnTransientStorage;
     try {
     try {
+      final Replica replica;
       if (isDatanode || 
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
         // open a block receiver
@@ -677,12 +679,13 @@ class DataXceiver extends Receiver implements Runnable {
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy, allowLazyPersist, pinning);
             cachingStrategy, allowLazyPersist, pinning);
-
-        storageUuid = blockReceiver.getStorageUuid();
+        replica = blockReceiver.getReplica();
       } else {
       } else {
-        storageUuid = datanode.data.recoverClose(
+        replica = datanode.data.recoverClose(
             block, latestGenerationStamp, minBytesRcvd);
             block, latestGenerationStamp, minBytesRcvd);
       }
       }
+      storageUuid = replica.getStorageUuid();
+      isOnTransientStorage = replica.isOnTransientStorage();
 
 
       //
       //
       // Connect to downstream machine, if appropriate
       // Connect to downstream machine, if appropriate
@@ -823,7 +826,7 @@ class DataXceiver extends Receiver implements Runnable {
       // the block is finalized in the PacketResponder.
       // the block is finalized in the PacketResponder.
       if (isDatanode ||
       if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
+        datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
             + localAddress + " of size " + block.getNumBytes());
             + localAddress + " of size " + block.getNumBytes());
       }
       }
@@ -1137,8 +1140,9 @@ class DataXceiver extends Receiver implements Runnable {
             dataXceiverServer.balanceThrottler, null, true);
             dataXceiverServer.balanceThrottler, null, true);
         
         
         // notify name node
         // notify name node
+        final Replica r = blockReceiver.getReplica();
         datanode.notifyNamenodeReceivedBlock(
         datanode.notifyNamenodeReceivedBlock(
-            block, delHint, blockReceiver.getStorageUuid());
+            block, delHint, r.getStorageUuid(), r.isOnTransientStorage());
         
         
         LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
         LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
             + ", delHint=" + delHint);
             + ", delHint=" + delHint);

+ 44 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
@@ -42,6 +45,9 @@ import com.google.common.collect.Maps;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 class IncrementalBlockReportManager {
 class IncrementalBlockReportManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      IncrementalBlockReportManager.class);
+
   private static class PerStorageIBR {
   private static class PerStorageIBR {
     /** The blocks in this IBR. */
     /** The blocks in this IBR. */
     final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
     final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
@@ -103,8 +109,29 @@ class IncrementalBlockReportManager {
    */
    */
   private volatile boolean readyToSend = false;
   private volatile boolean readyToSend = false;
 
 
+  /** The time interval between two IBRs. */
+  private final long ibrInterval;
+
+  /** The timestamp of the last IBR. */
+  private volatile long lastIBR;
+
+  IncrementalBlockReportManager(final long ibrInterval) {
+    this.ibrInterval = ibrInterval;
+    this.lastIBR = monotonicNow() - ibrInterval;
+  }
+
   boolean sendImmediately() {
   boolean sendImmediately() {
-    return readyToSend;
+    return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
+  }
+
+  synchronized void waitTillNextIBR(long waitTime) {
+    if (waitTime > 0 && !sendImmediately()) {
+      try {
+        wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
+      } catch (InterruptedException ie) {
+        LOG.warn(getClass().getSimpleName() + " interrupted");
+      }
+    }
   }
   }
 
 
   private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
   private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
@@ -144,6 +171,9 @@ class IncrementalBlockReportManager {
     }
     }
 
 
     // Send incremental block reports to the Namenode outside the lock
     // Send incremental block reports to the Namenode outside the lock
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
+    }
     boolean success = false;
     boolean success = false;
     final long startTime = monotonicNow();
     final long startTime = monotonicNow();
     try {
     try {
@@ -151,7 +181,9 @@ class IncrementalBlockReportManager {
       success = true;
       success = true;
     } finally {
     } finally {
       metrics.addIncrementalBlockReport(monotonicNow() - startTime);
       metrics.addIncrementalBlockReport(monotonicNow() - startTime);
-      if (!success) {
+      if (success) {
+        lastIBR = startTime;
+      } else {
         // If we didn't succeed in sending the report, put all of the
         // If we didn't succeed in sending the report, put all of the
         // blocks back onto our queue, but only in the case where we
         // blocks back onto our queue, but only in the case where we
         // didn't put something newer in the meantime.
         // didn't put something newer in the meantime.
@@ -191,7 +223,7 @@ class IncrementalBlockReportManager {
   }
   }
 
 
   synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
   synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
-      DatanodeStorage storage) {
+      DatanodeStorage storage, boolean isOnTransientStorage) {
     addRDBI(rdbi, storage);
     addRDBI(rdbi, storage);
 
 
     final BlockStatus status = rdbi.getStatus();
     final BlockStatus status = rdbi.getStatus();
@@ -200,18 +232,23 @@ class IncrementalBlockReportManager {
       readyToSend = true;
       readyToSend = true;
     } else if (status == BlockStatus.RECEIVED_BLOCK) {
     } else if (status == BlockStatus.RECEIVED_BLOCK) {
       // the report is sent right away.
       // the report is sent right away.
-      triggerIBR();
+      triggerIBR(isOnTransientStorage);
     }
     }
   }
   }
 
 
-  synchronized void triggerIBR() {
+  synchronized void triggerIBR(boolean force) {
     readyToSend = true;
     readyToSend = true;
-    notifyAll();
+    if (force) {
+      lastIBR = monotonicNow() - ibrInterval;
+    }
+    if (sendImmediately()) {
+      notifyAll();
+    }
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   synchronized void triggerDeletionReportForTests() {
   synchronized void triggerDeletionReportForTests() {
-    triggerIBR();
+    triggerIBR(true);
 
 
     while (sendImmediately()) {
     while (sendImmediately()) {
       try {
       try {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -285,7 +285,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the storage uuid of the replica.
    * @return the storage uuid of the replica.
    * @throws IOException
    * @throws IOException
    */
    */
-  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
+  Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
       ) throws IOException;
   
   
   /**
   /**
@@ -435,7 +435,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Update replica's generation stamp and length and finalize it.
    * Update replica's generation stamp and length and finalize it.
    * @return the ID of storage that stores the block
    * @return the ID of storage that stores the block
    */
    */
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
       long recoveryId, long newBlockId, long newLength) throws IOException;
       long recoveryId, long newBlockId, long newLength) throws IOException;
 
 
   /**
   /**

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1233,7 +1233,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public synchronized String recoverClose(ExtendedBlock b, long newGS,
+  public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
     LOG.info("Recover failed close " + b);
     // check replica's state
     // check replica's state
@@ -1244,7 +1244,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (replicaInfo.getState() == ReplicaState.RBW) {
     if (replicaInfo.getState() == ReplicaState.RBW) {
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
     }
-    return replicaInfo.getStorageUuid();
+    return replicaInfo;
   }
   }
   
   
   /**
   /**
@@ -2368,7 +2368,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public synchronized String updateReplicaUnderRecovery(
+  public synchronized Replica updateReplicaUnderRecovery(
                                     final ExtendedBlock oldBlock,
                                     final ExtendedBlock oldBlock,
                                     final long recoveryId,
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newBlockId,
@@ -2428,8 +2428,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     //check replica files after update
     //check replica files after update
     checkReplicaFiles(finalized);
     checkReplicaFiles(finalized);
 
 
-    //return storage ID
-    return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
+    return finalized;
   }
   }
 
 
   private FinalizedReplica updateReplicaUnderRecovery(
   private FinalizedReplica updateReplicaUnderRecovery(
@@ -2804,7 +2803,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     datanode.getShortCircuitRegistry().processBlockInvalidation(
     datanode.getShortCircuitRegistry().processBlockInvalidation(
         ExtendedBlockId.fromExtendedBlock(extendedBlock));
         ExtendedBlockId.fromExtendedBlock(extendedBlock));
     datanode.notifyNamenodeReceivedBlock(
     datanode.notifyNamenodeReceivedBlock(
-        extendedBlock, null, newReplicaInfo.getStorageUuid());
+        extendedBlock, null, newReplicaInfo.getStorageUuid(),
+        newReplicaInfo.isOnTransientStorage());
 
 
     // Remove the old replicas
     // Remove the old replicas
     if (blockFile.delete() || !blockFile.exists()) {
     if (blockFile.delete() || !blockFile.exists()) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java

@@ -18,6 +18,8 @@
 
 
 package org.apache.hadoop.hdfs.server.protocol;
 package org.apache.hadoop.hdfs.server.protocol;
 
 
+import java.util.Arrays;
+
 /**
 /**
  * Report of block received and deleted per Datanode
  * Report of block received and deleted per Datanode
  * storage.
  * storage.
@@ -51,4 +53,9 @@ public class StorageReceivedDeletedBlocks {
     this.storage = storage;
     this.storage = storage;
     this.blocks = blocks;
     this.blocks = blocks;
   }
   }
+
+  @Override
+  public String toString() {
+    return storage + Arrays.toString(blocks);
+  }
 }
 }

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -876,7 +876,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
       throws IOException {
       throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     BInfo binfo = map.get(b.getLocalBlock());
@@ -890,7 +890,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     map.remove(b.getLocalBlock());
     map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
     binfo.theBlock.setGenerationStamp(newGS);
     map.put(binfo.theBlock, binfo);
     map.put(binfo.theBlock, binfo);
-    return binfo.getStorageUuid();
+    return binfo;
   }
   }
   
   
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
@@ -1171,12 +1171,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
                                         long recoveryId,
                                         long recoveryId,
                                         long newBlockId,
                                         long newBlockId,
-                                        long newlength) {
-    // Caller does not care about the exact Storage UUID returned.
-    return datanodeUuid;
+                                        long newlength) throws IOException {
+    return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -202,7 +202,7 @@ public class TestBPOfferService {
       waitForBlockReport(mockNN2);
       waitForBlockReport(mockNN2);
 
 
       // When we receive a block, it should report it to both NNs
       // When we receive a block, it should report it to both NNs
-      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
 
 
       ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
       ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
       assertEquals(1, ret.length);
       assertEquals(1, ret.length);
@@ -726,7 +726,7 @@ public class TestBPOfferService {
       DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
       DatanodeStorage storage = Mockito.mock(DatanodeStorage.class);
       Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
       Mockito.doReturn(storage).when(mockFSDataset).getStorage("storage0");
       // Add IBRs
       // Add IBRs
-      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0");
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "storage0", false);
       // Send heartbeat so that the BpServiceActor can send IBR to
       // Send heartbeat so that the BpServiceActor can send IBR to
       // namenode
       // namenode
       bpos.triggerHeartbeatForTests();
       bpos.triggerHeartbeatForTests();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java

@@ -84,7 +84,7 @@ public class TestIncrementalBlockReports {
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
         getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
         getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
     DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
     DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
-    actor.getIbrManager().notifyNamenodeBlock(rdbi, s);
+    actor.getIbrManager().notifyNamenodeBlock(rdbi, s, false);
   }
   }
 
 
   /**
   /**

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -177,7 +177,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
   }
 
 
   @Override
   @Override
-  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlkLen)
       throws IOException {
       throws IOException {
     return null;
     return null;
   }
   }
@@ -272,7 +272,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
   }
 
 
   @Override
   @Override
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
       long recoveryId, long newBlockId, long newLength) throws IOException {
       long recoveryId, long newBlockId, long newLength) throws IOException {
     return null;
     return null;
   }
   }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -378,10 +379,11 @@ public class TestInterDatanodeProtocol {
       }
       }
 
 
       //update
       //update
-      final String storageID = fsdataset.updateReplicaUnderRecovery(
+      final Replica r = fsdataset.updateReplicaUnderRecovery(
           new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
           new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
           rri.getBlockId(), newlength);
           rri.getBlockId(), newlength);
-      assertTrue(storageID != null);
+      assertTrue(r != null);
+      assertTrue(r.getStorageUuid() != null);
 
 
     } finally {
     } finally {
       if (cluster != null) cluster.shutdown();
       if (cluster != null) cluster.shutdown();