瀏覽代碼

HDFS-9710. DN can be configured to send block receipt IBRs in batches.

Tsz-Wo Nicholas Sze 9 年之前
父節點
當前提交
7b79567cf9
共有 19 個文件被更改,包括 388 次插入75 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  4. 4 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  5. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  6. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  7. 12 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  8. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  9. 44 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
  10. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  11. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  12. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java
  13. 5 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
  15. 263 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java
  16. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
  18. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  19. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1104,6 +1104,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9425. Expose number of blocks per volume as a metric
     HDFS-9425. Expose number of blocks per volume as a metric
     (Brahma Reddy Battula via vinayakumarb)
     (Brahma Reddy Battula via vinayakumarb)
 
 
+    HDFS-9710. DN can be configured to send block receipt IBRs in batches.
+    (szetszwo)
+
   BUG FIXES
   BUG FIXES
 
 
     HDFS-8091: ACLStatus and XAttributes should be presented to
     HDFS-8091: ACLStatus and XAttributes should be presented to

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -522,6 +522,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
 
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";
   public static final int     DFS_DF_INTERVAL_DEFAULT = 60000;
   public static final int     DFS_DF_INTERVAL_DEFAULT = 60000;
+  public static final String  DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY
+      = "dfs.blockreport.incremental.intervalMsec";
+  public static final long    DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT
+      = 0;
   public static final String  DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
   public static final String  DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec";
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
   public static final long    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
   public static final String  DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -231,29 +231,32 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    * client? For now we don't.
    */
    */
-  void notifyNamenodeReceivedBlock(
-      ExtendedBlock block, String delHint, String storageUuid) {
+  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
+      String storageUuid, boolean isOnTransientStorage) {
     notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
     notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
-        storageUuid);
+        storageUuid, isOnTransientStorage);
   }
   }
 
 
   void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
   void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
-    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid);
+    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
+        false);
   }
   }
 
 
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
-    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid);
+    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
+        false);
   }
   }
 
 
   private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
   private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status,
-      String delHint, String storageUuid) {
+      String delHint, String storageUuid, boolean isOnTransientStorage) {
     checkBlock(block);
     checkBlock(block);
     final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
     final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo(
         block.getLocalBlock(), status, delHint);
         block.getLocalBlock(), status, delHint);
     final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
     final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid);
 
 
     for (BPServiceActor actor : bpServices) {
     for (BPServiceActor actor : bpServices) {
-      actor.getIbrManager().notifyNamenodeBlock(info, storage);
+      actor.getIbrManager().notifyNamenodeBlock(info, storage,
+          isOnTransientStorage);
     }
     }
   }
   }
 
 

+ 4 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -97,8 +97,7 @@ class BPServiceActor implements Runnable {
   private final DNConf dnConf;
   private final DNConf dnConf;
   private long prevBlockReportId;
   private long prevBlockReportId;
 
 
-  private final IncrementalBlockReportManager ibrManager
-      = new IncrementalBlockReportManager();
+  private final IncrementalBlockReportManager ibrManager;
 
 
   private DatanodeRegistration bpRegistration;
   private DatanodeRegistration bpRegistration;
   final LinkedList<BPServiceActorAction> bpThreadQueue 
   final LinkedList<BPServiceActorAction> bpThreadQueue 
@@ -109,6 +108,7 @@ class BPServiceActor implements Runnable {
     this.dn = bpos.getDataNode();
     this.dn = bpos.getDataNode();
     this.nnAddr = nnAddr;
     this.nnAddr = nnAddr;
     this.dnConf = dn.getDnConf();
     this.dnConf = dn.getDnConf();
+    this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
     scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
   }
   }
@@ -568,20 +568,9 @@ class BPServiceActor implements Runnable {
           processCommand(new DatanodeCommand[]{ cmd });
           processCommand(new DatanodeCommand[]{ cmd });
         }
         }
 
 
-        //
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // There is no work to do;  sleep until hearbeat timer elapses, 
         // or work arrives, and then iterate again.
         // or work arrives, and then iterate again.
-        //
-        long waitTime = scheduler.getHeartbeatWaitTime();
-        synchronized(ibrManager) {
-          if (waitTime > 0 && !ibrManager.sendImmediately()) {
-            try {
-              ibrManager.wait(waitTime);
-            } catch (InterruptedException ie) {
-              LOG.warn("BPOfferService for " + this + " interrupted");
-            }
-          }
-        } // synchronized
+        ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
       } catch(RemoteException re) {
       } catch(RemoteException re) {
         String reClass = re.getClassName();
         String reClass = re.getClassName();
         if (UnregisteredNodeException.class.getName().equals(reClass) ||
         if (UnregisteredNodeException.class.getName().equals(reClass) ||
@@ -768,7 +757,7 @@ class BPServiceActor implements Runnable {
   void triggerBlockReport(BlockReportOptions options) {
   void triggerBlockReport(BlockReportOptions options) {
     if (options.isIncremental()) {
     if (options.isIncremental()) {
       LOG.info(bpos.toString() + ": scheduling an incremental block report.");
       LOG.info(bpos.toString() + ": scheduling an incremental block report.");
-      ibrManager.triggerIBR();
+      ibrManager.triggerIBR(true);
     } else {
     } else {
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       synchronized(ibrManager) {
       synchronized(ibrManager) {

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -302,8 +302,8 @@ class BlockReceiver implements Closeable {
   /** Return the datanode object. */
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
   DataNode getDataNode() {return datanode;}
 
 
-  String getStorageUuid() {
-    return replicaInfo.getStorageUuid();
+  Replica getReplica() {
+    return replicaInfo;
   }
   }
 
 
   /**
   /**
@@ -1439,8 +1439,8 @@ class BlockReceiver implements Closeable {
         datanode.data.setPinning(block);
         datanode.data.setPinning(block);
       }
       }
       
       
-      datanode.closeBlock(
-          block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
+      datanode.closeBlock(block, null, replicaInfo.getStorageUuid(),
+          replicaInfo.isOnTransientStorage());
       if (ClientTraceLog.isInfoEnabled() && isClient) {
       if (ClientTraceLog.isInfoEnabled() && isClient) {
         long offset = 0;
         long offset = 0;
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
         DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -89,6 +89,7 @@ public class DNConf {
   final long heartBeatInterval;
   final long heartBeatInterval;
   final long blockReportInterval;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
   final long blockReportSplitThreshold;
+  final long ibrInterval;
   final long initialBlockReportDelayMs;
   final long initialBlockReportDelayMs;
   final long cacheReportInterval;
   final long cacheReportInterval;
   final long dfsclientSlowIoWarningThresholdMs;
   final long dfsclientSlowIoWarningThresholdMs;
@@ -156,6 +157,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.ibrInterval = conf.getLong(
+        DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
+        DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);
     this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
     this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
                                             DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
                                             DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,

+ 12 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -316,7 +316,6 @@ public class DataNode extends ReconfigurableBase
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   private String clusterId = null;
   private String clusterId = null;
 
 
-  public final static String EMPTY_DEL_HINT = "";
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   Daemon dataXceiverServer = null;
   DataXceiverServer xserver = null;
   DataXceiverServer xserver = null;
@@ -1100,11 +1099,12 @@ public class DataNode extends ReconfigurableBase
   }
   }
   
   
   // calls specific to BP
   // calls specific to BP
-  public void notifyNamenodeReceivedBlock(
-      ExtendedBlock block, String delHint, String storageUuid) {
+  public void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
+      String storageUuid, boolean isOnTransientStorage) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
     if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
+      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid,
+          isOnTransientStorage);
     } else {
     } else {
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
       LOG.error("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
           + block.getBlockPoolId());
@@ -2364,15 +2364,11 @@ public class DataNode extends ReconfigurableBase
    * @param delHint hint on which excess block to delete
    * @param delHint hint on which excess block to delete
    * @param storageUuid UUID of the storage where block is stored
    * @param storageUuid UUID of the storage where block is stored
    */
    */
-  void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
+  void closeBlock(ExtendedBlock block, String delHint, String storageUuid,
+      boolean isTransientStorage) {
     metrics.incrBlocksWritten();
     metrics.incrBlocksWritten();
-    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
-    if(bpos != null) {
-      bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
-    } else {
-      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
-          + block.getBlockPoolId());
-    }
+    notifyNamenodeReceivedBlock(block, delHint, storageUuid,
+        isTransientStorage);
   }
   }
 
 
   /** Start a single datanode daemon and wait for it to finish.
   /** Start a single datanode daemon and wait for it to finish.
@@ -2702,7 +2698,7 @@ public class DataNode extends ReconfigurableBase
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
   public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
       final long recoveryId, final long newBlockId, final long newLength)
       final long recoveryId, final long newBlockId, final long newLength)
       throws IOException {
       throws IOException {
-    final String storageID = data.updateReplicaUnderRecovery(oldBlock,
+    final Replica r = data.updateReplicaUnderRecovery(oldBlock,
         recoveryId, newBlockId, newLength);
         recoveryId, newBlockId, newLength);
     // Notify the namenode of the updated block info. This is important
     // Notify the namenode of the updated block info. This is important
     // for HA, since otherwise the standby node may lose track of the
     // for HA, since otherwise the standby node may lose track of the
@@ -2711,7 +2707,9 @@ public class DataNode extends ReconfigurableBase
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setGenerationStamp(recoveryId);
     newBlock.setBlockId(newBlockId);
     newBlock.setBlockId(newBlockId);
     newBlock.setNumBytes(newLength);
     newBlock.setNumBytes(newLength);
-    notifyNamenodeReceivedBlock(newBlock, "", storageID);
+    final String storageID = r.getStorageUuid();
+    notifyNamenodeReceivedBlock(newBlock, null, storageID,
+        r.isOnTransientStorage());
     return storageID;
     return storageID;
   }
   }
 
 

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -672,7 +672,9 @@ class DataXceiver extends Receiver implements Runnable {
     String firstBadLink = "";           // first datanode that failed in connection setup
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
     Status mirrorInStatus = SUCCESS;
     final String storageUuid;
     final String storageUuid;
+    final boolean isOnTransientStorage;
     try {
     try {
+      final Replica replica;
       if (isDatanode || 
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
         // open a block receiver
@@ -682,12 +684,13 @@ class DataXceiver extends Receiver implements Runnable {
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy, allowLazyPersist, pinning);
             cachingStrategy, allowLazyPersist, pinning);
-
-        storageUuid = blockReceiver.getStorageUuid();
+        replica = blockReceiver.getReplica();
       } else {
       } else {
-        storageUuid = datanode.data.recoverClose(
+        replica = datanode.data.recoverClose(
             block, latestGenerationStamp, minBytesRcvd);
             block, latestGenerationStamp, minBytesRcvd);
       }
       }
+      storageUuid = replica.getStorageUuid();
+      isOnTransientStorage = replica.isOnTransientStorage();
 
 
       //
       //
       // Connect to downstream machine, if appropriate
       // Connect to downstream machine, if appropriate
@@ -830,7 +833,7 @@ class DataXceiver extends Receiver implements Runnable {
       // the block is finalized in the PacketResponder.
       // the block is finalized in the PacketResponder.
       if (isDatanode ||
       if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
+        datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
         LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
             + localAddress + " of size " + block.getNumBytes());
             + localAddress + " of size " + block.getNumBytes());
       }
       }
@@ -1146,8 +1149,9 @@ class DataXceiver extends Receiver implements Runnable {
             dataXceiverServer.balanceThrottler, null, true);
             dataXceiverServer.balanceThrottler, null, true);
         
         
         // notify name node
         // notify name node
+        final Replica r = blockReceiver.getReplica();
         datanode.notifyNamenodeReceivedBlock(
         datanode.notifyNamenodeReceivedBlock(
-            block, delHint, blockReceiver.getStorageUuid());
+            block, delHint, r.getStorageUuid(), r.isOnTransientStorage());
         
         
         LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
         LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
             + ", delHint=" + delHint);
             + ", delHint=" + delHint);

+ 44 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps;
@@ -42,6 +45,9 @@ import com.google.common.collect.Maps;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 class IncrementalBlockReportManager {
 class IncrementalBlockReportManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      IncrementalBlockReportManager.class);
+
   private static class PerStorageIBR {
   private static class PerStorageIBR {
     /** The blocks in this IBR. */
     /** The blocks in this IBR. */
     final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
     final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
@@ -103,8 +109,29 @@ class IncrementalBlockReportManager {
    */
    */
   private volatile boolean readyToSend = false;
   private volatile boolean readyToSend = false;
 
 
+  /** The time interval between two IBRs. */
+  private final long ibrInterval;
+
+  /** The timestamp of the last IBR. */
+  private volatile long lastIBR;
+
+  IncrementalBlockReportManager(final long ibrInterval) {
+    this.ibrInterval = ibrInterval;
+    this.lastIBR = monotonicNow() - ibrInterval;
+  }
+
   boolean sendImmediately() {
   boolean sendImmediately() {
-    return readyToSend;
+    return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
+  }
+
+  synchronized void waitTillNextIBR(long waitTime) {
+    if (waitTime > 0 && !sendImmediately()) {
+      try {
+        wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
+      } catch (InterruptedException ie) {
+        LOG.warn(getClass().getSimpleName() + " interrupted");
+      }
+    }
   }
   }
 
 
   private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
   private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
@@ -144,6 +171,9 @@ class IncrementalBlockReportManager {
     }
     }
 
 
     // Send incremental block reports to the Namenode outside the lock
     // Send incremental block reports to the Namenode outside the lock
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
+    }
     boolean success = false;
     boolean success = false;
     final long startTime = monotonicNow();
     final long startTime = monotonicNow();
     try {
     try {
@@ -151,7 +181,9 @@ class IncrementalBlockReportManager {
       success = true;
       success = true;
     } finally {
     } finally {
       metrics.addIncrementalBlockReport(monotonicNow() - startTime);
       metrics.addIncrementalBlockReport(monotonicNow() - startTime);
-      if (!success) {
+      if (success) {
+        lastIBR = startTime;
+      } else {
         // If we didn't succeed in sending the report, put all of the
         // If we didn't succeed in sending the report, put all of the
         // blocks back onto our queue, but only in the case where we
         // blocks back onto our queue, but only in the case where we
         // didn't put something newer in the meantime.
         // didn't put something newer in the meantime.
@@ -191,7 +223,7 @@ class IncrementalBlockReportManager {
   }
   }
 
 
   synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
   synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi,
-      DatanodeStorage storage) {
+      DatanodeStorage storage, boolean isOnTransientStorage) {
     addRDBI(rdbi, storage);
     addRDBI(rdbi, storage);
 
 
     final BlockStatus status = rdbi.getStatus();
     final BlockStatus status = rdbi.getStatus();
@@ -200,18 +232,23 @@ class IncrementalBlockReportManager {
       readyToSend = true;
       readyToSend = true;
     } else if (status == BlockStatus.RECEIVED_BLOCK) {
     } else if (status == BlockStatus.RECEIVED_BLOCK) {
       // the report is sent right away.
       // the report is sent right away.
-      triggerIBR();
+      triggerIBR(isOnTransientStorage);
     }
     }
   }
   }
 
 
-  synchronized void triggerIBR() {
+  synchronized void triggerIBR(boolean force) {
     readyToSend = true;
     readyToSend = true;
-    notifyAll();
+    if (force) {
+      lastIBR = monotonicNow() - ibrInterval;
+    }
+    if (sendImmediately()) {
+      notifyAll();
+    }
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   synchronized void triggerDeletionReportForTests() {
   synchronized void triggerDeletionReportForTests() {
-    triggerIBR();
+    triggerIBR(true);
 
 
     while (sendImmediately()) {
     while (sendImmediately()) {
       try {
       try {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -375,7 +375,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the storage uuid of the replica.
    * @return the storage uuid of the replica.
    * @throws IOException
    * @throws IOException
    */
    */
-  String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
+  Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
       ) throws IOException;
   
   
   /**
   /**
@@ -525,7 +525,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Update replica's generation stamp and length and finalize it.
    * Update replica's generation stamp and length and finalize it.
    * @return the ID of storage that stores the block
    * @return the ID of storage that stores the block
    */
    */
-  String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
       long recoveryId, long newBlockId, long newLength) throws IOException;
       long recoveryId, long newBlockId, long newLength) throws IOException;
 
 
   /**
   /**

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1286,7 +1286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public synchronized String recoverClose(ExtendedBlock b, long newGS,
+  public synchronized Replica recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
     LOG.info("Recover failed close " + b);
     // check replica's state
     // check replica's state
@@ -1297,7 +1297,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (replicaInfo.getState() == ReplicaState.RBW) {
     if (replicaInfo.getState() == ReplicaState.RBW) {
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
     }
-    return replicaInfo.getStorageUuid();
+    return replicaInfo;
   }
   }
   
   
   /**
   /**
@@ -2429,7 +2429,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public synchronized String updateReplicaUnderRecovery(
+  public synchronized Replica updateReplicaUnderRecovery(
                                     final ExtendedBlock oldBlock,
                                     final ExtendedBlock oldBlock,
                                     final long recoveryId,
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newBlockId,
@@ -2489,8 +2489,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     //check replica files after update
     //check replica files after update
     checkReplicaFiles(finalized);
     checkReplicaFiles(finalized);
 
 
-    //return storage ID
-    return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
+    return finalized;
   }
   }
 
 
   private FinalizedReplica updateReplicaUnderRecovery(
   private FinalizedReplica updateReplicaUnderRecovery(
@@ -2873,7 +2872,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     datanode.getShortCircuitRegistry().processBlockInvalidation(
     datanode.getShortCircuitRegistry().processBlockInvalidation(
         ExtendedBlockId.fromExtendedBlock(extendedBlock));
         ExtendedBlockId.fromExtendedBlock(extendedBlock));
     datanode.notifyNamenodeReceivedBlock(
     datanode.notifyNamenodeReceivedBlock(
-        extendedBlock, null, newReplicaInfo.getStorageUuid());
+        extendedBlock, null, newReplicaInfo.getStorageUuid(),
+        newReplicaInfo.isOnTransientStorage());
 
 
     // Remove the old replicas
     // Remove the old replicas
     if (blockFile.delete() || !blockFile.exists()) {
     if (blockFile.delete() || !blockFile.exists()) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java

@@ -18,6 +18,8 @@
 
 
 package org.apache.hadoop.hdfs.server.protocol;
 package org.apache.hadoop.hdfs.server.protocol;
 
 
+import java.util.Arrays;
+
 /**
 /**
  * Report of block received and deleted per Datanode
  * Report of block received and deleted per Datanode
  * storage.
  * storage.
@@ -51,4 +53,9 @@ public class StorageReceivedDeletedBlocks {
     this.storage = storage;
     this.storage = storage;
     this.blocks = blocks;
     this.blocks = blocks;
   }
   }
+
+  @Override
+  public String toString() {
+    return storage + Arrays.toString(blocks);
+  }
 }
 }

+ 5 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -899,7 +899,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
       throws IOException {
       throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     BInfo binfo = map.get(b.getLocalBlock());
@@ -913,7 +913,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     map.remove(b.getLocalBlock());
     map.remove(b.getLocalBlock());
     binfo.theBlock.setGenerationStamp(newGS);
     binfo.theBlock.setGenerationStamp(newGS);
     map.put(binfo.theBlock, binfo);
     map.put(binfo.theBlock, binfo);
-    return binfo.getStorageUuid();
+    return binfo;
   }
   }
   
   
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
@@ -1192,12 +1192,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
                                         long recoveryId,
                                         long recoveryId,
                                         long newBlockId,
                                         long newBlockId,
-                                        long newlength) {
-    // Caller does not care about the exact Storage UUID returned.
-    return datanodeUuid;
+                                        long newlength) throws IOException {
+    return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
   }
   }
 
 
   @Override // FsDatasetSpi
   @Override // FsDatasetSpi

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -193,7 +193,7 @@ public class TestBPOfferService {
       waitForBlockReport(mockNN2);
       waitForBlockReport(mockNN2);
 
 
       // When we receive a block, it should report it to both NNs
       // When we receive a block, it should report it to both NNs
-      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, "", "");
+      bpos.notifyNamenodeReceivedBlock(FAKE_BLOCK, null, "", false);
 
 
       ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
       ReceivedDeletedBlockInfo[] ret = waitForBlockReceived(FAKE_BLOCK, mockNN1);
       assertEquals(1, ret.length);
       assertEquals(1, ret.length);

+ 263 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBatchIbr.java

@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test verifies that incremental block reports are sent in batch mode
+ * and the namenode allows closing a file with COMMITTED blocks.
+ */
+public class TestBatchIbr {
+  public static final Log LOG = LogFactory.getLog(TestBatchIbr.class);
+
+  private static final short NUM_DATANODES = 4;
+  private static final int BLOCK_SIZE = 1024;
+  private static final int MAX_BLOCK_NUM = 8;
+  private static final int NUM_FILES = 1000;
+  private static final int NUM_THREADS = 128;
+
+  private static final ThreadLocalBuffer IO_BUF = new ThreadLocalBuffer();
+  private static final ThreadLocalBuffer VERIFY_BUF = new ThreadLocalBuffer();
+
+  static {
+    GenericTestUtils.setLogLevel(
+        LogFactory.getLog(IncrementalBlockReportManager.class), Level.ALL);
+  }
+
+  static HdfsConfiguration newConf(long ibrInterval) throws IOException {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setBoolean(ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
+
+    if (ibrInterval > 0) {
+      conf.setLong(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, ibrInterval);
+    }
+    return conf;
+  }
+
+  static ExecutorService createExecutor() throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    final ExecutorCompletionService<Path> completion
+        = new ExecutorCompletionService<>(executor);
+
+    // initialize all threads and buffers
+    for(int i = 0; i < NUM_THREADS; i++) {
+      completion.submit(new Callable<Path>() {
+        @Override
+        public Path call() throws Exception {
+          IO_BUF.get();
+          VERIFY_BUF.get();
+          return null;
+        }
+      });
+    }
+    for(int i = 0; i < NUM_THREADS; i++) {
+      completion.take().get();
+    }
+    return executor;
+  }
+
+  static void runIbrTest(final long ibrInterval) throws Exception {
+    final ExecutorService executor = createExecutor();
+    final Random ran = new Random();
+
+    final Configuration conf = newConf(ibrInterval);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_DATANODES).build();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+
+    try {
+      final String dirPathString = "/dir";
+      final Path dir = new Path(dirPathString);
+      dfs.mkdirs(dir);
+
+      // start testing
+      final long testStartTime = Time.monotonicNow();
+      final ExecutorCompletionService<Path> createService
+          = new ExecutorCompletionService<>(executor);
+      final AtomicLong createFileTime = new AtomicLong();
+      final AtomicInteger numBlockCreated = new AtomicInteger();
+
+      // create files
+      for(int i = 0; i < NUM_FILES; i++) {
+        createService.submit(new Callable<Path>() {
+          @Override
+          public Path call() throws Exception {
+            final long start = Time.monotonicNow();
+            try {
+              final long seed = ran.nextLong();
+              final int numBlocks = ran.nextInt(MAX_BLOCK_NUM) + 1;
+              numBlockCreated.addAndGet(numBlocks);
+              return createFile(dir, numBlocks, seed, dfs);
+            } finally {
+              createFileTime.addAndGet(Time.monotonicNow() - start);
+            }
+          }
+        });
+      }
+
+      // verify files
+      final ExecutorCompletionService<Boolean> verifyService
+          = new ExecutorCompletionService<>(executor);
+      final AtomicLong verifyFileTime = new AtomicLong();
+      for(int i = 0; i < NUM_FILES; i++) {
+        final Path file = createService.take().get();
+        verifyService.submit(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            final long start = Time.monotonicNow();
+            try {
+              return verifyFile(file, dfs);
+            } finally {
+              verifyFileTime.addAndGet(Time.monotonicNow() - start);
+            }
+          }
+        });
+      }
+      for(int i = 0; i < NUM_FILES; i++) {
+        Assert.assertTrue(verifyService.take().get());
+      }
+      final long testEndTime = Time.monotonicNow();
+
+      LOG.info("ibrInterval=" + ibrInterval + " ("
+          + toConfString(DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, conf)
+          + "), numBlockCreated=" + numBlockCreated);
+      LOG.info("duration=" + toSecondString(testEndTime - testStartTime)
+          + ", createFileTime=" + toSecondString(createFileTime.get())
+          + ", verifyFileTime=" + toSecondString(verifyFileTime.get()));
+      LOG.info("NUM_FILES=" + NUM_FILES
+          + ", MAX_BLOCK_NUM=" + MAX_BLOCK_NUM
+          + ", BLOCK_SIZE=" + BLOCK_SIZE
+          + ", NUM_THREADS=" + NUM_THREADS
+          + ", NUM_DATANODES=" + NUM_DATANODES);
+      logIbrCounts(cluster.getDataNodes());
+    } finally {
+      executor.shutdown();
+      cluster.shutdown();
+    }
+  }
+
+  static String toConfString(String key, Configuration conf) {
+    return key + "=" + conf.get(key);
+  }
+
+  static String toSecondString(long ms) {
+    return (ms/1000.0) + "s";
+  }
+
+  static void logIbrCounts(List<DataNode> datanodes) {
+    final String name = "IncrementalBlockReportsNumOps";
+    for(DataNode dn : datanodes) {
+      final MetricsRecordBuilder m = MetricsAsserts.getMetrics(
+          dn.getMetrics().name());
+      final long ibr = MetricsAsserts.getLongCounter(name, m);
+      LOG.info(dn.getDisplayName() + ": " + name + "=" + ibr);
+    }
+
+  }
+
+  static class ThreadLocalBuffer extends ThreadLocal<byte[]> {
+    @Override
+    protected byte[] initialValue() {
+      return new byte[BLOCK_SIZE];
+    }
+  }
+
+  static byte[] nextBytes(int blockIndex, long seed, byte[] bytes) {
+    byte b = (byte)(seed ^ (seed >> blockIndex));
+    for(int i = 0; i < bytes.length; i++) {
+      bytes[i] = b++;
+    }
+    return bytes;
+  }
+
+  static Path createFile(Path dir, int numBlocks, long seed,
+      DistributedFileSystem dfs) throws IOException {
+    final Path f = new Path(dir, seed + "_" + numBlocks);
+    final byte[] bytes = IO_BUF.get();
+
+    try(FSDataOutputStream out = dfs.create(f)) {
+      for(int i = 0; i < numBlocks; i++) {
+        out.write(nextBytes(i, seed, bytes));
+      }
+    }
+    return f;
+  }
+
+  static boolean verifyFile(Path f, DistributedFileSystem dfs) {
+    final long seed;
+    final int numBlocks;
+    {
+      final String name = f.getName();
+      final int i = name.indexOf('_');
+      seed = Long.parseLong(name.substring(0, i));
+      numBlocks = Integer.parseInt(name.substring(i + 1));
+    }
+
+    final byte[] computed = IO_BUF.get();
+    final byte[] expected = VERIFY_BUF.get();
+
+    try(FSDataInputStream in = dfs.open(f)) {
+      for(int i = 0; i < numBlocks; i++) {
+        in.read(computed);
+        nextBytes(i, seed, expected);
+        Assert.assertArrayEquals(expected, computed);
+      }
+      return true;
+    } catch(Exception e) {
+      LOG.error("Failed to verify file " + f);
+      return false;
+    }
+  }
+
+  @Test
+  public void testIbr() throws Exception {
+    runIbrTest(0L);
+    runIbrTest(100L);
+  }
+}

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java

@@ -138,12 +138,14 @@ public class TestDataXceiverLazyPersistHint {
       PeerLocality locality,
       PeerLocality locality,
       NonLocalLazyPersist nonLocalLazyPersist,
       NonLocalLazyPersist nonLocalLazyPersist,
       final ArgumentCaptor<Boolean> captor) throws IOException {
       final ArgumentCaptor<Boolean> captor) throws IOException {
+    final BlockReceiver mockBlockReceiver = mock(BlockReceiver.class);
+    doReturn(mock(Replica.class)).when(mockBlockReceiver).getReplica();
+
     DataXceiver xceiverSpy = spy(DataXceiver.create(
     DataXceiver xceiverSpy = spy(DataXceiver.create(
             getMockPeer(locality),
             getMockPeer(locality),
             getMockDn(nonLocalLazyPersist),
             getMockDn(nonLocalLazyPersist),
             mock(DataXceiverServer.class)));
             mock(DataXceiverServer.class)));
-
-    doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver(
+    doReturn(mockBlockReceiver).when(xceiverSpy).getBlockReceiver(
         any(ExtendedBlock.class), any(StorageType.class),
         any(ExtendedBlock.class), any(StorageType.class),
         any(DataInputStream.class), anyString(), anyString(),
         any(DataInputStream.class), anyString(), anyString(),
         any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
         any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java

@@ -88,7 +88,7 @@ public class TestIncrementalBlockReports {
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
     ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo(
         getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
         getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null);
     DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
     DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid);
-    actor.getIbrManager().notifyNamenodeBlock(rdbi, s);
+    actor.getIbrManager().notifyNamenodeBlock(rdbi, s, false);
   }
   }
 
 
   /**
   /**

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -177,7 +177,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
   }
 
 
   @Override
   @Override
-  public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
+  public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlkLen)
       throws IOException {
       throws IOException {
     return null;
     return null;
   }
   }
@@ -272,7 +272,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
   }
 
 
   @Override
   @Override
-  public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
+  public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
       long recoveryId, long newBlockId, long newLength) throws IOException {
       long recoveryId, long newBlockId, long newLength) throws IOException {
     return null;
     return null;
   }
   }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java

@@ -328,7 +328,6 @@ public class TestInterDatanodeProtocol {
     try {
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
       cluster.waitActive();
       cluster.waitActive();
-      String bpid = cluster.getNamesystem().getBlockPoolId();
 
 
       //create a file
       //create a file
       DistributedFileSystem dfs = cluster.getFileSystem();
       DistributedFileSystem dfs = cluster.getFileSystem();
@@ -379,10 +378,11 @@ public class TestInterDatanodeProtocol {
       }
       }
 
 
       //update
       //update
-      final String storageID = fsdataset.updateReplicaUnderRecovery(
+      final Replica r = fsdataset.updateReplicaUnderRecovery(
           new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
           new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid,
           rri.getBlockId(), newlength);
           rri.getBlockId(), newlength);
-      assertTrue(storageID != null);
+      assertTrue(r != null);
+      assertTrue(r.getStorageUuid() != null);
 
 
     } finally {
     } finally {
       if (cluster != null) cluster.shutdown();
       if (cluster != null) cluster.shutdown();