Jelajahi Sumber

HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.

Rakesh Radhakrishnan 7 tahun lalu
induk
melakukan
2acc50b826
42 mengubah file dengan 776 tambahan dan 659 penghapusan
  1. 1 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
  2. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
  3. 0 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  4. 80 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  5. 22 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java
  6. 8 101
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java
  7. 7 63
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java
  8. 1 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  9. 1 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  10. 15 33
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
  11. 11 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
  12. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
  13. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  14. 10 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  15. 18 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  16. 29 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  17. 112 55
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
  18. 13 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
  19. 101 53
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
  20. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  21. 11 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
  22. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
  23. 0 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
  24. 41 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  25. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
  26. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
  27. 88 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java
  28. 5 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
  29. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  30. 2 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
  31. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
  32. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
  33. 7 69
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
  34. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
  35. 4 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  36. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
  37. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
  38. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java
  39. 53 35
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
  40. 62 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
  41. 29 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java
  42. 13 31
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java

+ 1 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -139,8 +138,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
@@ -165,13 +163,6 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
     }
 
-    // Adding blocks movement results to the heart beat request.
-    if (storageMovementFinishedBlks != null
-        && storageMovementFinishedBlks.getBlocks() != null) {
-      builder.setStorageMoveAttemptFinishedBlks(
-          PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
-    }
-
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -122,9 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
-          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
-          PBHelper.convertBlksMovReport(
-              request.getStorageMoveAttemptFinishedBlks()));
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 0 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -105,7 +104,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -971,29 +969,6 @@ public class PBHelper {
     return SlowDiskReports.create(slowDisksMap);
   }
 
-  public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
-      BlocksStorageMoveAttemptFinishedProto proto) {
-
-    List<BlockProto> blocksList = proto.getBlocksList();
-    Block[] blocks = new Block[blocksList.size()];
-    for (int i = 0; i < blocksList.size(); i++) {
-      BlockProto blkProto = blocksList.get(i);
-      blocks[i] = PBHelperClient.convert(blkProto);
-    }
-    return new BlocksStorageMoveAttemptFinished(blocks);
-  }
-
-  public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
-      BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
-    BlocksStorageMoveAttemptFinishedProto.Builder builder =
-        BlocksStorageMoveAttemptFinishedProto.newBuilder();
-    Block[] blocks = blocksMoveAttemptFinished.getBlocks();
-    for (Block block : blocks) {
-      builder.addBlocks(PBHelperClient.convert(block));
-    }
-    return builder.build();
-  }
-
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

+ 80 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -427,8 +429,11 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final BlockIdManager blockIdManager;
 
-  /** For satisfying block storage policies. */
-  private final StoragePolicySatisfyManager spsManager;
+  /**
+   * For satisfying block storage policies. Instantiates if sps is enabled
+   * internally or externally.
+   */
+  private StoragePolicySatisfyManager spsManager;
 
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -469,8 +474,7 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
         * 1000L);
 
-    // sps manager manages the user invoked sps paths and does the movement.
-    spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+    createSPSManager(conf);
 
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
@@ -699,7 +703,9 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public void close() {
-    getSPSManager().stop();
+    if (getSPSManager() != null) {
+      getSPSManager().stop();
+    }
     bmSafeMode.close();
     try {
       redundancyThread.interrupt();
@@ -713,7 +719,9 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.close();
     pendingReconstruction.stop();
     blocksMap.close();
-    getSPSManager().stopGracefully();
+    if (getSPSManager() != null) {
+      getSPSManager().stopGracefully();
+    }
   }
 
   /** @return the datanodeManager */
@@ -3881,6 +3889,21 @@ public class BlockManager implements BlockStatsMXBean {
     }
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
+
+    // notify SPS about the reported block
+    notifyStorageMovementAttemptFinishedBlk(storageInfo, block);
+  }
+
+  private void notifyStorageMovementAttemptFinishedBlk(
+      DatanodeStorageInfo storageInfo, Block block) {
+    if (getSPSManager() != null) {
+      SPSService<Long> sps = getSPSManager().getInternalSPSService();
+      if (sps.isRunning()) {
+        sps.notifyStorageMovementAttemptFinishedBlk(
+            storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
+            block);
+      }
+    }
   }
   
   private void processAndHandleReportedBlock(
@@ -5025,6 +5048,57 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Create SPS manager instance. It manages the user invoked sps paths and does
+   * the movement.
+   *
+   * @param conf
+   *          configuration
+   * @return true if the instance is successfully created, false otherwise.
+   */
+  private boolean createSPSManager(final Configuration conf) {
+    return createSPSManager(conf, null);
+  }
+
+  /**
+   * Create SPS manager instance. It manages the user invoked sps paths and does
+   * the movement.
+   *
+   * @param conf
+   *          configuration
+   * @param spsMode
+   *          satisfier mode
+   * @return true if the instance is successfully created, false otherwise.
+   */
+  public boolean createSPSManager(final Configuration conf,
+      final String spsMode) {
+    // sps manager manages the user invoked sps paths and does the movement.
+    // StoragePolicySatisfier(SPS) configs
+    boolean storagePolicyEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+    String modeVal = spsMode;
+    if (org.apache.commons.lang.StringUtils.isBlank(modeVal)) {
+      modeVal = conf.get(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+    }
+    StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
+        .fromString(modeVal);
+    if (!storagePolicyEnabled || mode == StoragePolicySatisfierMode.NONE) {
+      LOG.info("Storage policy satisfier is disabled");
+      return false;
+    }
+    spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+    return true;
+  }
+
+  /**
+   * Nullify SPS manager as this feature is disabled fully.
+   */
+  public void disableSPS() {
+    spsManager = null;
+  }
+
   /**
    * @return sps manager.
    */

+ 22 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common.sps;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
@@ -33,6 +34,7 @@ public class BlockMovementAttemptFinished {
   private final Block block;
   private final DatanodeInfo src;
   private final DatanodeInfo target;
+  private final StorageType targetType;
   private final BlockMovementStatus status;
 
   /**
@@ -44,14 +46,17 @@ public class BlockMovementAttemptFinished {
    *          src datanode
    * @param target
    *          target datanode
+   * @param targetType
+   *          target storage type
    * @param status
    *          movement status
    */
   public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
-      DatanodeInfo target, BlockMovementStatus status) {
+      DatanodeInfo target, StorageType targetType, BlockMovementStatus status) {
     this.block = block;
     this.src = src;
     this.target = target;
+    this.targetType = targetType;
     this.status = status;
   }
 
@@ -63,6 +68,20 @@ public class BlockMovementAttemptFinished {
     return block;
   }
 
+  /**
+   * @return the target datanode where it moved the block.
+   */
+  public DatanodeInfo getTargetDatanode() {
+    return target;
+  }
+
+  /**
+   * @return target storage type.
+   */
+  public StorageType getTargetType() {
+    return targetType;
+  }
+
   /**
    * @return block movement status code.
    */
@@ -74,7 +93,8 @@ public class BlockMovementAttemptFinished {
   public String toString() {
     return new StringBuilder().append("Block movement attempt finished(\n  ")
         .append(" block : ").append(block).append(" src node: ").append(src)
-        .append(" target node: ").append(target).append(" movement status: ")
+        .append(" target node: ").append(target).append(" target type: ")
+        .append(targetType).append(" movement status: ")
         .append(status).append(")").toString();
   }
 }

+ 8 - 101
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java

@@ -17,17 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.common.sps;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +34,10 @@ import org.slf4j.LoggerFactory;
 public class BlockStorageMovementTracker implements Runnable {
   private static final Logger LOG = LoggerFactory
       .getLogger(BlockStorageMovementTracker.class);
-  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
+  private final CompletionService<BlockMovementAttemptFinished>
+      moverCompletionService;
   private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
 
-  // Keeps the information - block vs its list of future move tasks
-  private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
-  private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
-
   private volatile boolean running = true;
 
   /**
@@ -60,53 +52,21 @@ public class BlockStorageMovementTracker implements Runnable {
       CompletionService<BlockMovementAttemptFinished> moverCompletionService,
       BlocksMovementsStatusHandler handler) {
     this.moverCompletionService = moverCompletionService;
-    this.moverTaskFutures = new HashMap<>();
     this.blksMovementsStatusHandler = handler;
-    this.movementResults = new HashMap<>();
   }
 
   @Override
   public void run() {
     while (running) {
-      if (moverTaskFutures.size() <= 0) {
-        try {
-          synchronized (moverTaskFutures) {
-            // Waiting for mover tasks.
-            moverTaskFutures.wait(2000);
-          }
-        } catch (InterruptedException ignore) {
-          // Sets interrupt flag of this thread.
-          Thread.currentThread().interrupt();
-        }
-      }
       try {
-        Future<BlockMovementAttemptFinished> future =
-            moverCompletionService.take();
+        Future<BlockMovementAttemptFinished> future = moverCompletionService
+            .take();
         if (future != null) {
           BlockMovementAttemptFinished result = future.get();
           LOG.debug("Completed block movement. {}", result);
-          Block block = result.getBlock();
-          List<Future<BlockMovementAttemptFinished>> blocksMoving =
-              moverTaskFutures.get(block);
-          if (blocksMoving == null) {
-            LOG.warn("Future task doesn't exist for block : {} ", block);
-            continue;
-          }
-          blocksMoving.remove(future);
-
-          List<BlockMovementAttemptFinished> resultPerTrackIdList =
-              addMovementResultToBlockIdList(result);
-
-          // Completed all the scheduled blocks movement under this 'trackId'.
-          if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
-            synchronized (moverTaskFutures) {
-              moverTaskFutures.remove(block);
-            }
-            if (running) {
-              // handle completed or inprogress blocks movements per trackId.
-              blksMovementsStatusHandler.handle(resultPerTrackIdList);
-            }
-            movementResults.remove(block);
+          if (running && blksMovementsStatusHandler != null) {
+            // handle completed block movement.
+            blksMovementsStatusHandler.handle(result);
           }
         }
       } catch (InterruptedException e) {
@@ -122,63 +82,10 @@ public class BlockStorageMovementTracker implements Runnable {
     }
   }
 
-  private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
-      BlockMovementAttemptFinished result) {
-    Block block = result.getBlock();
-    List<BlockMovementAttemptFinished> perBlockIdList;
-    synchronized (movementResults) {
-      perBlockIdList = movementResults.get(block);
-      if (perBlockIdList == null) {
-        perBlockIdList = new ArrayList<>();
-        movementResults.put(block, perBlockIdList);
-      }
-      perBlockIdList.add(result);
-    }
-    return perBlockIdList;
-  }
-
-  /**
-   * Add future task to the tracking list to check the completion status of the
-   * block movement.
-   *
-   * @param blockID
-   *          block identifier
-   * @param futureTask
-   *          future task used for moving the respective block
-   */
-  public void addBlock(Block block,
-      Future<BlockMovementAttemptFinished> futureTask) {
-    synchronized (moverTaskFutures) {
-      List<Future<BlockMovementAttemptFinished>> futures =
-          moverTaskFutures.get(block);
-      // null for the first task
-      if (futures == null) {
-        futures = new ArrayList<>();
-        moverTaskFutures.put(block, futures);
-      }
-      futures.add(futureTask);
-      // Notify waiting tracker thread about the newly added tasks.
-      moverTaskFutures.notify();
-    }
-  }
-
-  /**
-   * Clear the pending movement and movement result queues.
-   */
-  public void removeAll() {
-    synchronized (moverTaskFutures) {
-      moverTaskFutures.clear();
-    }
-    synchronized (movementResults) {
-      movementResults.clear();
-    }
-  }
-
   /**
-   * Sets running flag to false and clear the pending movement result queues.
+   * Sets running flag to false.
    */
   public void stopTracking() {
     running = false;
-    removeAll();
   }
 }

+ 7 - 63
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java

@@ -18,78 +18,22 @@
 
 package org.apache.hadoop.hdfs.server.common.sps;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
- * Blocks movements status handler, which is used to collect details of the
- * completed block movements and later these attempted finished(with success or
- * failure) blocks can be accessed to notify respective listeners, if any.
+ * Blocks movements status handler, which can be used to collect details of the
+ * completed block movements.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class BlocksMovementsStatusHandler {
-  private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
-
-  /**
-   * Collect all the storage movement attempt finished blocks. Later this will
-   * be send to namenode via heart beat.
-   *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks
-   */
-  public void handle(
-      List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
-    List<Block> blocks = new ArrayList<>();
-
-    for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-      blocks.add(item.getBlock());
-    }
-    // Adding to the tracking report list. Later this can be accessed to know
-    // the attempted block movements.
-    synchronized (blockIdVsMovementStatus) {
-      blockIdVsMovementStatus.addAll(blocks);
-    }
-  }
+public interface BlocksMovementsStatusHandler {
 
   /**
-   * @return unmodifiable list of storage movement attempt finished blocks.
-   */
-  public List<Block> getMoveAttemptFinishedBlocks() {
-    List<Block> moveAttemptFinishedBlks = new ArrayList<>();
-    // 1. Adding all the completed block ids.
-    synchronized (blockIdVsMovementStatus) {
-      if (blockIdVsMovementStatus.size() > 0) {
-        moveAttemptFinishedBlks = Collections
-            .unmodifiableList(blockIdVsMovementStatus);
-      }
-    }
-    return moveAttemptFinishedBlks;
-  }
-
-  /**
-   * Remove the storage movement attempt finished blocks from the tracking list.
+   * Collect all the storage movement attempt finished blocks.
    *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks
-   */
-  public void remove(List<Block> moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks != null) {
-      blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
-    }
-  }
-
-  /**
-   * Clear the blockID vs movement status tracking map.
+   * @param moveAttemptFinishedBlk
+   *          storage movement attempt finished block
    */
-  public void removeAll() {
-    synchronized (blockIdVsMovementStatus) {
-      blockIdVsMovementStatus.clear();
-    }
-  }
+  void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk);
 }

+ 1 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -514,12 +514,6 @@ class BPServiceActor implements Runnable {
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
             SlowDiskReports.EMPTY_REPORT;
 
-    // Get the blocks storage move attempt finished blocks
-    List<Block> results = dn.getStoragePolicySatisfyWorker()
-        .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
-    BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
-        getStorageMoveAttemptFinishedBlocks(results);
-
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -530,19 +524,13 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary,
         requestBlockReportLease,
         slowPeers,
-        slowDisks,
-        storageMoveAttemptFinishedBlks);
+        slowDisks);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
       scheduler.scheduleNextOutlierReport();
     }
 
-    // Remove the blocks movement results after successfully transferring
-    // to namenode.
-    dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
-        .remove(results);
-
     return response;
   }
 

+ 1 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1427,7 +1427,7 @@ public class DataNode extends ReconfigurableBase
     ecWorker = new ErasureCodingWorker(getConf(), this);
     blockRecoveryWorker = new BlockRecoveryWorker(this);
     storagePolicySatisfyWorker =
-        new StoragePolicySatisfyWorker(getConf(), this);
+        new StoragePolicySatisfyWorker(getConf(), this, null);
     storagePolicySatisfyWorker.start();
 
     blockPoolManager = new BlockPoolManager(this);
@@ -2137,11 +2137,6 @@ public class DataNode extends ReconfigurableBase
       notifyAll();
     }
     tracer.close();
-
-    // Waiting to finish SPS worker thread.
-    if (storagePolicySatisfyWorker != null) {
-      storagePolicySatisfyWorker.waitToFinishWorkerThread();
-    }
   }
 
   /**

+ 15 - 33
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java

@@ -24,7 +24,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,19 +37,17 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
  * These commands would be issued from NameNode as part of Datanode's heart beat
@@ -67,19 +64,19 @@ public class StoragePolicySatisfyWorker {
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
-  private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
-  private final BlocksMovementsStatusHandler handler;
+  private final CompletionService<BlockMovementAttemptFinished>
+      moverCompletionService;
   private final BlockStorageMovementTracker movementTracker;
   private Daemon movementTrackerThread;
   private final BlockDispatcher blkDispatcher;
 
-  public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
+  public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode,
+      BlocksMovementsStatusHandler handler) {
     this.datanode = datanode;
-    // Defaulting to 10. This is to minimise the number of move ops.
+    // Defaulting to 10. This is to minimize the number of move ops.
     moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);
     moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
-    handler = new BlocksMovementsStatusHandler();
     movementTracker = new BlockStorageMovementTracker(moverCompletionService,
         handler);
     movementTrackerThread = new Daemon(movementTracker);
@@ -88,7 +85,6 @@ public class StoragePolicySatisfyWorker {
     int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
     blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
         ioFileBufferSize, dnConf.getConnectToDnViaHostname());
-    // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
   /**
@@ -100,22 +96,17 @@ public class StoragePolicySatisfyWorker {
   }
 
   /**
-   * Stop StoragePolicySatisfyWorker, which will stop block movement tracker
-   * thread.
+   * Stop StoragePolicySatisfyWorker, which will terminate executor service and
+   * stop block movement tracker thread.
    */
   void stop() {
     movementTracker.stopTracking();
     movementTrackerThread.interrupt();
-  }
-
-  /**
-   * Timed wait to stop BlockStorageMovement tracker daemon thread.
-   */
-  void waitToFinishWorkerThread() {
+    moveExecutor.shutdown();
     try {
-      movementTrackerThread.join(3000);
-    } catch (InterruptedException ignore) {
-      // ignore
+      moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for mover thread to terminate", e);
     }
   }
 
@@ -160,10 +151,7 @@ public class StoragePolicySatisfyWorker {
           : "Source and Target storage type shouldn't be same!";
       BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
           blkMovingInfo);
-      Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
-          .submit(blockMovingTask);
-      movementTracker.addBlock(blkMovingInfo.getBlock(),
-          moveCallable);
+      moverCompletionService.submit(blockMovingTask);
     }
   }
 
@@ -185,7 +173,8 @@ public class StoragePolicySatisfyWorker {
     public BlockMovementAttemptFinished call() {
       BlockMovementStatus status = moveBlock();
       return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
-          blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
+          blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+          blkMovingInfo.getTargetStorageType(), status);
     }
 
     private BlockMovementStatus moveBlock() {
@@ -217,11 +206,6 @@ public class StoragePolicySatisfyWorker {
     }
   }
 
-  @VisibleForTesting
-  BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() {
-    return handler;
-  }
-
   /**
    * Drop the in-progress SPS work queues.
    */
@@ -229,7 +213,5 @@ public class StoragePolicySatisfyWorker {
     LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
         + "So, none of the SPS Worker queued block movements will"
         + " be scheduled.");
-    movementTracker.removeAll();
-    handler.removeAll();
   }
 }

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 
 import com.google.common.collect.Lists;
 
@@ -102,7 +103,11 @@ final class FSDirSatisfyStoragePolicyOp {
 
         // Adding directory in the pending queue, so FileInodeIdCollector
         // process directory child in batch and recursively
-        fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+        StoragePolicySatisfyManager spsManager =
+            fsd.getBlockManager().getSPSManager();
+        if (spsManager != null) {
+          spsManager.addPathId(inode.getId());
+        }
       }
     } finally {
       fsd.writeUnlock();
@@ -116,7 +121,11 @@ final class FSDirSatisfyStoragePolicyOp {
     } else {
       // Adding directory in the pending queue, so FileInodeIdCollector process
       // directory child in batch and recursively
-      fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
+      StoragePolicySatisfyManager spsManager =
+          fsd.getBlockManager().getSPSManager();
+      if (spsManager != null) {
+        spsManager.addPathId(inode.getId());
+      }
       return true;
     }
   }

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
@@ -209,8 +210,11 @@ class FSDirXAttrOp {
       for (XAttr xattr : toRemove) {
         if (XATTR_SATISFY_STORAGE_POLICY
             .equals(XAttrHelper.getPrefixedName(xattr))) {
-          fsd.getBlockManager().getSPSManager().getInternalSPSService()
-              .clearQueue(inode.getId());
+          StoragePolicySatisfyManager spsManager =
+              fsd.getBlockManager().getSPSManager();
+          if (spsManager != null) {
+            spsManager.getInternalSPSService().clearQueue(inode.getId());
+          }
           break;
         }
       }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -1401,7 +1402,9 @@ public class FSDirectory implements Closeable {
       if (!inode.isSymlink()) {
         final XAttrFeature xaf = inode.getXAttrFeature();
         addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
-        if (namesystem.getBlockManager().getSPSManager().isEnabled()) {
+        StoragePolicySatisfyManager spsManager =
+            namesystem.getBlockManager().getSPSManager();
+        if (spsManager != null && spsManager.isEnabled()) {
           addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
         }
       }

+ 10 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -259,7 +259,6 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
@@ -268,7 +267,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1292,7 +1290,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
             edekCacheLoaderDelay, edekCacheLoaderInterval);
       }
-      blockManager.getSPSManager().start();
+      if (blockManager.getSPSManager() != null) {
+        blockManager.getSPSManager().start();
+      }
     } finally {
       startingActiveService = false;
       blockManager.checkSafeMode();
@@ -1322,7 +1322,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     LOG.info("Stopping services started for active state");
     writeLock();
     try {
-      if (blockManager != null) {
+      if (blockManager != null && blockManager.getSPSManager() != null) {
         blockManager.getSPSManager().stop();
       }
       stopSecretManager();
@@ -1363,7 +1363,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         // Don't want to keep replication queues when not in Active.
         blockManager.clearQueues();
         blockManager.setInitializedReplQueues(false);
-        blockManager.getSPSManager().stopGracefully();
+        if (blockManager.getSPSManager() != null) {
+          blockManager.getSPSManager().stopGracefully();
+        }
       }
     } finally {
       writeUnlock("stopActiveServices");
@@ -2272,7 +2274,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           DFS_STORAGE_POLICY_ENABLED_KEY));
     }
     // checks sps status
-    if (!blockManager.getSPSManager().isEnabled() || (blockManager
+    boolean disabled = (blockManager.getSPSManager() == null);
+    if (disabled || (blockManager
         .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
         && !blockManager.getSPSManager().isInternalSatisfierRunning())) {
       throw new UnsupportedActionException(
@@ -3970,8 +3973,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished blksMovementsFinished)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     readLock();
     try {
@@ -3987,18 +3989,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
       }
 
-      // Handle blocks movement results sent by the coordinator datanode.
-      SPSService sps = blockManager.getSPSManager().getInternalSPSService();
-      if (!sps.isRunning()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Storage policy satisfier is not running. So, ignoring storage"
-                  + "  movement attempt finished block info sent by DN");
-        }
-      } else {
-        sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
-      }
-
       //create ha status
       final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
           haContext.getState().getServiceState(),

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -2147,7 +2147,24 @@ public class NameNode extends ReconfigurableBase implements
     }
     StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
         .fromString(newVal);
-    namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+    if (mode == StoragePolicySatisfierMode.NONE) {
+      // disabling sps service
+      if (namesystem.getBlockManager().getSPSManager() != null) {
+        namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+        namesystem.getBlockManager().disableSPS();
+      }
+    } else {
+      // enabling sps service
+      boolean spsCreated = (namesystem.getBlockManager()
+          .getSPSManager() != null);
+      if (!spsCreated) {
+        spsCreated = namesystem.getBlockManager().createSPSManager(getConf(),
+            newVal);
+      }
+      if (spsCreated) {
+        namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
+      }
+    }
     return newVal;
   }
 

+ 29 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -156,8 +156,8 @@ import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1517,16 +1517,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+      @Nonnull SlowDiskReports slowDisks)
           throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers, slowDisks,
-        storageMovementFinishedBlks);
+        slowPeers, slowDisks);
   }
 
   @Override // DatanodeProtocol
@@ -2543,10 +2541,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
-    boolean isSPSRunning = namesystem.getBlockManager().getSPSManager()
-        .isInternalSatisfierRunning();
+    StoragePolicySatisfyManager spsMgr =
+        namesystem.getBlockManager().getSPSManager();
+    boolean isInternalSatisfierRunning = (spsMgr != null
+        ? spsMgr.isInternalSatisfierRunning() : false);
     namesystem.logAuditEvent(true, operationName, null);
-    return isSPSRunning;
+    return isInternalSatisfierRunning;
   }
 
   @Override
@@ -2556,6 +2556,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
+    if (namesystem.getBlockManager().getSPSManager() == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Satisfier is not running inside namenode, so status "
+            + "can't be returned.");
+      }
+      throw new IOException("Satisfier is not running inside namenode, "
+          + "so status can't be returned.");
+    }
     return namesystem.getBlockManager().getSPSManager()
         .checkStoragePolicySatisfyPathStatus(path);
   }
@@ -2568,16 +2576,20 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (nn.isStandbyState()) {
       throw new StandbyException("Not supported by Standby Namenode.");
     }
-    // Check that SPS daemon service is running inside namenode
-    if (namesystem.getBlockManager().getSPSManager()
-        .getMode() == StoragePolicySatisfierMode.INTERNAL) {
-      LOG.debug("SPS service is internally enabled and running inside "
-          + "namenode, so external SPS is not allowed to fetch the path Ids");
-      throw new IOException("SPS service is internally enabled and running"
-          + " inside namenode, so external SPS is not allowed to fetch"
-          + " the path Ids");
+    // Check that SPS is enabled externally
+    StoragePolicySatisfyManager spsMgr =
+        namesystem.getBlockManager().getSPSManager();
+    StoragePolicySatisfierMode spsMode = (spsMgr != null ? spsMgr.getMode()
+        : StoragePolicySatisfierMode.NONE);
+    if (spsMode != StoragePolicySatisfierMode.EXTERNAL) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SPS service mode is {}, so external SPS service is "
+            + "not allowed to fetch the path Ids", spsMode);
+      }
+      throw new IOException("SPS service mode is " + spsMode + ", so "
+          + "external SPS service is not allowed to fetch the path Ids");
     }
-    Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
+    Long pathId = spsMgr.getNextPathId();
     if (pathId == null) {
       return null;
     }

+ 112 - 55
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java

@@ -17,21 +17,28 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
-
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +67,13 @@ public class BlockStorageMovementAttemptedItems<T> {
    * processing and sent to DNs.
    */
   private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
-  private final List<Block> movementFinishedBlocks;
+  private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
+  // Maintains separate Queue to keep the movement finished blocks. This Q
+  // is used to update the storageMovementAttemptedItems list asynchronously.
+  private final BlockingQueue<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
-  private final BlockMovementListener blkMovementListener;
+  private BlockMovementListener blkMovementListener;
   //
   // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
@@ -94,7 +104,8 @@ public class BlockStorageMovementAttemptedItems<T> {
         DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
     storageMovementAttemptedItems = new ArrayList<>();
-    movementFinishedBlocks = new ArrayList<>();
+    scheduledBlkLocs = new HashMap<>();
+    movementFinishedBlocks = new LinkedBlockingQueue<>();
     this.blkMovementListener = blockMovementListener;
   }
 
@@ -105,29 +116,67 @@ public class BlockStorageMovementAttemptedItems<T> {
    * @param itemInfo
    *          - tracking info
    */
-  public void add(AttemptedItemInfo<T> itemInfo) {
+  public void add(T startPath, T file, long monotonicNow,
+      Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
+    AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file,
+        monotonicNow, assignedBlocks.keySet(), retryCount);
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.add(itemInfo);
     }
+    synchronized (scheduledBlkLocs) {
+      scheduledBlkLocs.putAll(assignedBlocks);
+    }
   }
 
   /**
-   * Add the storage movement attempt finished blocks to
-   * storageMovementFinishedBlocks.
+   * Notify the storage movement attempt finished block.
    *
-   * @param moveAttemptFinishedBlks
-   *          storage movement attempt finished blocks
+   * @param reportedDn
+   *          reported datanode
+   * @param type
+   *          storage type
+   * @param reportedBlock
+   *          reported block
    */
-  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.length == 0) {
-      return;
+  public void notifyReportedBlock(DatanodeInfo reportedDn, StorageType type,
+      Block reportedBlock) {
+    synchronized (scheduledBlkLocs) {
+      if (scheduledBlkLocs.size() <= 0) {
+        return;
+      }
+      matchesReportedBlock(reportedDn, type, reportedBlock);
     }
-    synchronized (movementFinishedBlocks) {
-      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
+  }
+
+  private void matchesReportedBlock(DatanodeInfo reportedDn, StorageType type,
+      Block reportedBlock) {
+    Set<StorageTypeNodePair> blkLocs = scheduledBlkLocs.get(reportedBlock);
+    if (blkLocs == null) {
+      return; // unknown block, simply skip.
     }
-    // External listener if it is plugged-in
-    if (blkMovementListener != null) {
-      blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+
+    for (StorageTypeNodePair dn : blkLocs) {
+      boolean foundDn = dn.getDatanodeInfo().compareTo(reportedDn) == 0 ? true
+          : false;
+      boolean foundType = dn.getStorageType().equals(type);
+      if (foundDn && foundType) {
+        blkLocs.remove(dn);
+        // listener if it is plugged-in
+        if (blkMovementListener != null) {
+          blkMovementListener
+              .notifyMovementTriedBlocks(new Block[] {reportedBlock});
+        }
+        // All the block locations has reported.
+        if (blkLocs.size() <= 0) {
+          movementFinishedBlocks.add(reportedBlock);
+          scheduledBlkLocs.remove(reportedBlock); // clean-up reported block
+        }
+        return; // found
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reported block:{} not found in attempted blocks. Datanode:{}"
+          + ", StorageType:{}", reportedBlock, reportedDn, type);
     }
   }
 
@@ -203,14 +252,12 @@ public class BlockStorageMovementAttemptedItems<T> {
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
           T file = itemInfo.getFile();
-          synchronized (movementFinishedBlocks) {
-            ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
-                file, itemInfo.getRetryCount() + 1);
-            blockStorageMovementNeeded.add(candidate);
-            iter.remove();
-            LOG.info("TrackID: {} becomes timed out and moved to needed "
-                + "retries queue for next iteration.", file);
-          }
+          ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file,
+              itemInfo.getRetryCount() + 1);
+          blockStorageMovementNeeded.add(candidate);
+          iter.remove();
+          LOG.info("TrackID: {} becomes timed out and moved to needed "
+              + "retries queue for next iteration.", file);
         }
       }
     }
@@ -218,29 +265,25 @@ public class BlockStorageMovementAttemptedItems<T> {
 
   @VisibleForTesting
   void blockStorageMovementReportedItemsCheck() throws IOException {
-    synchronized (movementFinishedBlocks) {
-      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
-      while (finishedBlksIter.hasNext()) {
-        Block blk = finishedBlksIter.next();
-        synchronized (storageMovementAttemptedItems) {
-          Iterator<AttemptedItemInfo<T>> iterator =
-              storageMovementAttemptedItems.iterator();
-          while (iterator.hasNext()) {
-            AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
-            attemptedItemInfo.getBlocks().remove(blk);
-            if (attemptedItemInfo.getBlocks().isEmpty()) {
-              // TODO: try add this at front of the Queue, so that this element
-              // gets the chance first and can be cleaned from queue quickly as
-              // all movements already done.
-              blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
-                  .getStartPath(), attemptedItemInfo.getFile(),
-                  attemptedItemInfo.getRetryCount() + 1));
-              iterator.remove();
-            }
+    // Removes all available blocks from this queue and process it.
+    Collection<Block> finishedBlks = new ArrayList<>();
+    movementFinishedBlocks.drainTo(finishedBlks);
+
+    // Update attempted items list
+    for (Block blk : finishedBlks) {
+      synchronized (storageMovementAttemptedItems) {
+        Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems
+            .iterator();
+        while (iterator.hasNext()) {
+          AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
+          attemptedItemInfo.getBlocks().remove(blk);
+          if (attemptedItemInfo.getBlocks().isEmpty()) {
+            blockStorageMovementNeeded.add(new ItemInfo<T>(
+                attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
+                attemptedItemInfo.getRetryCount() + 1));
+            iterator.remove();
           }
         }
-        // Remove attempted blocks from movementFinishedBlocks list.
-        finishedBlksIter.remove();
       }
     }
   }
@@ -252,15 +295,29 @@ public class BlockStorageMovementAttemptedItems<T> {
 
   @VisibleForTesting
   public int getAttemptedItemsCount() {
-    return storageMovementAttemptedItems.size();
+    synchronized (storageMovementAttemptedItems) {
+      return storageMovementAttemptedItems.size();
+    }
   }
 
   public void clearQueues() {
-    synchronized (movementFinishedBlocks) {
-      movementFinishedBlocks.clear();
-    }
+    movementFinishedBlocks.clear();
     synchronized (storageMovementAttemptedItems) {
       storageMovementAttemptedItems.clear();
     }
+    synchronized (scheduledBlkLocs) {
+      scheduledBlkLocs.clear();
+    }
+  }
+
+  /**
+   * Sets external listener for testing.
+   *
+   * @param blkMoveListener
+   *          block movement listener callback object
+   */
+  @VisibleForTesting
+  void setBlockMovementListener(BlockMovementListener blkMoveListener) {
+    this.blkMovementListener = blkMoveListener;
   }
 }

+ 13 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java

@@ -22,8 +22,10 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 
 /**
  * An interface for SPSService, which exposes life cycle and processing APIs.
@@ -131,11 +133,16 @@ public interface SPSService<T> {
   void markScanCompletedForPath(T spsPath);
 
   /**
-   * Notify the details of storage movement attempt finished blocks.
+   * Given node is reporting that it received a certain movement attempt
+   * finished block.
    *
-   * @param moveAttemptFinishedBlks
-   *          - array contains all the blocks that are attempted to move
+   * @param dnInfo
+   *          - reported datanode
+   * @param storageType
+   *          - storage type
+   * @param block
+   *          - block that is attempted to move
    */
-  void notifyStorageMovementAttemptFinishedBlks(
-      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
+  void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+      StorageType storageType, Block block);
 }

+ 101 - 53
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java

@@ -24,9 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,7 +53,6 @@ import org.apache.hadoop.hdfs.server.balancer.Matcher;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
@@ -83,8 +85,6 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
   private BlockStorageMovementNeeded<T> storageMovementNeeded;
   private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
   private volatile boolean isRunning = false;
-  private volatile StoragePolicySatisfierMode spsMode =
-      StoragePolicySatisfierMode.NONE;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
@@ -128,11 +128,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
     }
 
     private Status status = null;
-    private List<Block> assignedBlocks = null;
+    private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null;
 
-    BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+    BlocksMovingAnalysis(Status status,
+        Map<Block, Set<StorageTypeNodePair>> assignedBlocks) {
       this.status = status;
-      this.assignedBlocks = blockMovingInfo;
+      this.assignedBlocks = assignedBlocks;
     }
   }
 
@@ -164,7 +165,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
           serviceMode);
       return;
     }
-    if (spsMode == StoragePolicySatisfierMode.INTERNAL
+    if (serviceMode == StoragePolicySatisfierMode.INTERNAL
         && ctxt.isMoverRunning()) {
       isRunning = false;
       LOG.error(
@@ -175,14 +176,13 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
     }
     if (reconfigStart) {
       LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
-          + "start it.", StringUtils.toLowerCase(spsMode.toString()));
+          + "start it.", StringUtils.toLowerCase(serviceMode.toString()));
     } else {
       LOG.info("Starting {} StoragePolicySatisfier.",
-          StringUtils.toLowerCase(spsMode.toString()));
+          StringUtils.toLowerCase(serviceMode.toString()));
     }
 
     isRunning = true;
-    this.spsMode = serviceMode;
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
@@ -297,36 +297,36 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
                 // be removed on storage movement attempt finished report.
               case BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Block analysis status:{} for the file path:{}."
+                  LOG.debug("Block analysis status:{} for the file id:{}."
                       + " Adding to attempt monitor queue for the storage "
                       + "movement attempt finished report",
-                      status.status, fileStatus.getPath());
+                      status.status, fileStatus.getFileId());
                 }
-                this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
-                    itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(),
-                    status.assignedBlocks, itemInfo.getRetryCount()));
+                this.storageMovementsMonitor.add(itemInfo.getStartPath(),
+                    itemInfo.getFile(), monotonicNow(), status.assignedBlocks,
+                    itemInfo.getRetryCount());
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to"
+                  LOG.debug("Adding trackID:{} for the file id:{} back to"
                       + " retry queue as none of the blocks found its eligible"
-                      + " targets.", trackId, fileStatus.getPath());
+                      + " targets.", trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                       + "retry queue as some of the blocks are low redundant.",
-                      trackId, fileStatus.getPath());
+                      trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
               case BLOCKS_FAILED_TO_MOVE:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                       + "retry queue as some of the blocks movement failed.",
-                      trackId, fileStatus.getPath());
+                      trackId, fileStatus.getFileId());
                 }
                 retryItem = true;
                 break;
@@ -334,9 +334,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
               case BLOCKS_TARGET_PAIRING_SKIPPED:
               case BLOCKS_ALREADY_SATISFIED:
               default:
-                LOG.info("Block analysis status:{} for the file path:{}."
+                LOG.info("Block analysis status:{} for the file id:{}."
                     + " So, Cleaning up the Xattrs.", status.status,
-                    fileStatus.getPath());
+                    fileStatus.getFileId());
                 storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
                 break;
               }
@@ -389,19 +389,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
     if (!lastBlkComplete) {
       // Postpone, currently file is under construction
       LOG.info("File: {} is under construction. So, postpone"
-          + " this to the next retry iteration", fileInfo.getPath());
+          + " this to the next retry iteration", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
-          new ArrayList<>());
+          new HashMap<>());
     }
 
     List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
     if (blocks.size() == 0) {
       LOG.info("File: {} is not having any blocks."
-          + " So, skipping the analysis.", fileInfo.getPath());
+          + " So, skipping the analysis.", fileInfo.getFileId());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-          new ArrayList<>());
+          new HashMap<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
     boolean hasLowRedundancyBlocks = false;
@@ -432,7 +432,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
               + "So, ignoring to move the blocks");
           return new BlocksMovingAnalysis(
               BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
-              new ArrayList<>());
+              new HashMap<>());
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
@@ -465,13 +465,21 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
         && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
       status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
     }
-    List<Block> assignedBlockIds = new ArrayList<Block>();
+    Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
       try {
         blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
-        assignedBlockIds.add(blkMovingInfo.getBlock());
+        StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
+            blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
+        Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks
+            .get(blkMovingInfo.getBlock());
+        if (nodesWithStorage == null) {
+          nodesWithStorage = new HashSet<>();
+          assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage);
+        }
+        nodesWithStorage.add(nodeStorage);
         blockCount++;
       } catch (IOException e) {
         LOG.warn("Exception while scheduling movement task", e);
@@ -479,7 +487,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
         status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
       }
     }
-    return new BlocksMovingAnalysis(status, assignedBlockIds);
+    return new BlocksMovingAnalysis(status, assignedBlocks);
   }
 
   /**
@@ -545,6 +553,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
           new ArrayList<StorageTypeNodePair>();
       List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
           Arrays.asList(storages));
+
+      // Add existing storages into exclude nodes to avoid choosing this as
+      // remote target later.
+      List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages);
+
       // if expected type exists in source node already, local movement would be
       // possible, so lets find such sources first.
       Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
@@ -582,7 +595,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
       foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
           blockMovingInfos, blockInfo, sourceWithStorageMap,
           expectedStorageTypes, targetDns,
-          ecPolicy);
+          ecPolicy, excludeNodes);
     }
     return foundMatchingTargetNodesForBlock;
   }
@@ -601,6 +614,10 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
    *          - Expecting storages to move
    * @param targetDns
    *          - Available DNs for expected storage types
+   * @param ecPolicy
+   *          - erasure coding policy of sps invoked file
+   * @param excludeNodes
+   *          - existing source nodes, which has replica copy
    * @return false if some of the block locations failed to find target node to
    *         satisfy the storage policy
    */
@@ -609,9 +626,8 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
       List<StorageTypeNodePair> sourceWithStorageList,
       List<StorageType> expectedTypes,
       EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
-      ErasureCodingPolicy ecPolicy) {
+      ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) {
     boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeInfo> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
     // storage within same node if possible. This is done separately to
@@ -638,10 +654,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
           expectedTypes.remove(chosenTarget.storageType);
         }
       }
-      // To avoid choosing this excludeNodes as targets later
-      excludeNodes.add(existingTypeNodePair.dn);
     }
-
+    // If all the sources and targets are paired within same node, then simply
+    // return.
+    if (expectedTypes.size() <= 0) {
+      return foundMatchingTargetNodesForBlock;
+    }
     // Looping over all the source node locations. Choose a remote target
     // storage node if it was not found out within same node.
     for (int i = 0; i < sourceWithStorageList.size(); i++) {
@@ -824,14 +842,29 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
   /**
    * Keeps datanode with its respective storage type.
    */
-  private static final class StorageTypeNodePair {
+  static final class StorageTypeNodePair {
     private final StorageType storageType;
     private final DatanodeInfo dn;
 
-    private StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
+    StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
       this.storageType = storageType;
       this.dn = dn;
     }
+
+    public DatanodeInfo getDatanodeInfo() {
+      return dn;
+    }
+
+    public StorageType getStorageType() {
+      return storageType;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("StorageTypeNodePair(\n  ")
+          .append("DatanodeInfo: ").append(dn).append(", StorageType: ")
+          .append(storageType).toString();
+    }
   }
 
   private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
@@ -1043,18 +1076,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
   }
 
   /**
-   * Receives set of storage movement attempt finished blocks report.
+   * Receives storage movement attempt finished block report.
    *
-   * @param moveAttemptFinishedBlks
-   *          set of storage movement attempt finished blocks.
+   * @param dnInfo
+   *          reported datanode
+   * @param storageType
+   *          - storage type
+   * @param block
+   *          movement attempt finished block.
    */
-  public void notifyStorageMovementAttemptFinishedBlks(
-      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
-    if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
-      return;
-    }
-    storageMovementsMonitor
-        .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks());
+  @Override
+  public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
+      StorageType storageType, Block block) {
+    storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block);
   }
 
   @VisibleForTesting
@@ -1086,7 +1120,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
    */
   final static class AttemptedItemInfo<T> extends ItemInfo<T> {
     private long lastAttemptedOrReportedTime;
-    private final List<Block> blocks;
+    private final Set<Block> blocks;
 
     /**
      * AttemptedItemInfo constructor.
@@ -1097,10 +1131,14 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
      *          trackId for file.
      * @param lastAttemptedOrReportedTime
      *          last attempted or reported time
+     * @param blocks
+     *          scheduled blocks
+     * @param retryCount
+     *          file retry count
      */
     AttemptedItemInfo(T rootId, T trackId,
         long lastAttemptedOrReportedTime,
-        List<Block> blocks, int retryCount) {
+        Set<Block> blocks, int retryCount) {
       super(rootId, trackId, retryCount);
       this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
       this.blocks = blocks;
@@ -1121,10 +1159,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
       this.lastAttemptedOrReportedTime = monotonicNow();
     }
 
-    List<Block> getBlocks() {
+    Set<Block> getBlocks() {
       return this.blocks;
     }
-
   }
 
   /**
@@ -1241,4 +1278,15 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
         "It should be a positive, non-zero integer value.");
     return spsWorkMultiplier;
   }
+
+  /**
+   * Sets external listener for testing.
+   *
+   * @param blkMovementListener
+   *          block movement listener callback object
+   */
+  @VisibleForTesting
+  void setBlockMovementListener(BlockMovementListener blkMovementListener) {
+    storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
+  }
 }

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -112,7 +112,6 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
-   * @param storageMovFinishedBlks array of movement attempt finished blocks
    * @throws IOException on error
    */
   @Idempotent
@@ -126,8 +125,7 @@ public interface DatanodeProtocol {
                                        VolumeFailureSummary volumeFailureSummary,
                                        boolean requestFullBlockReportLease,
                                        @Nonnull SlowPeerReports slowPeers,
-                                       @Nonnull SlowDiskReports slowDisks,
-                                       BlocksStorageMoveAttemptFinished storageMovFinishedBlks)
+                                       @Nonnull SlowDiskReports slowDisks)
       throws IOException;
 
   /**

+ 11 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java

@@ -20,13 +20,10 @@ package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -48,15 +44,14 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.balancer.KeyManager;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
 import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
 import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
-import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
@@ -105,12 +100,14 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
     int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
     blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
         ioFileBufferSize, connectToDnViaHostname);
+
+    startMovementTracker();
   }
 
   /**
    * Initializes block movement tracker daemon and starts the thread.
    */
-  public void init() {
+  private void startMovementTracker() {
     movementTrackerThread = new Daemon(this.blkMovementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
     movementTrackerThread.start();
@@ -156,24 +153,16 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
     // dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
     LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
     BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
-    Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
-        .submit(blockMovingTask);
-    blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
+    mCompletionServ.submit(blockMovingTask);
   }
 
   private class ExternalBlocksMovementsStatusHandler
-      extends BlocksMovementsStatusHandler {
+      implements BlocksMovementsStatusHandler {
     @Override
-    public void handle(
-        List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
-      List<Block> blocks = new ArrayList<>();
-      for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
-        blocks.add(item.getBlock());
-      }
-      BlocksStorageMoveAttemptFinished blkAttempted =
-          new BlocksStorageMoveAttemptFinished(
-          blocks.toArray(new Block[blocks.size()]));
-      service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
+    public void handle(BlockMovementAttemptFinished attemptedMove) {
+      service.notifyStorageMovementAttemptFinishedBlk(
+          attemptedMove.getTargetDatanode(), attemptedMove.getTargetType(),
+          attemptedMove.getBlock());
     }
   }
 
@@ -194,6 +183,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
       BlockMovementStatus blkMovementStatus = moveBlock();
       return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
           blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
+          blkMovingInfo.getTargetStorageType(),
           blkMovementStatus);
     }
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java

@@ -86,7 +86,6 @@ public final class ExternalStoragePolicySatisfier {
           new ExternalBlockMovementListener();
       ExternalSPSBlockMoveTaskHandler externalHandler =
           new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
-      externalHandler.init();
       sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
           blkMoveListener);
       sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
@@ -147,7 +146,7 @@ public final class ExternalStoragePolicySatisfier {
       for (Block block : moveAttemptFinishedBlks) {
         actualBlockMovements.add(block);
       }
-      LOG.info("Movement attempted blocks", actualBlockMovements);
+      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
     }
   }
 }

+ 0 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -184,14 +184,6 @@ message BlockMovingInfoProto {
   required StorageTypeProto targetStorageType = 5;
 }
 
-/**
- * Blocks for which storage movements has been attempted and finished
- * with either success or failure.
- */
-message BlocksStorageMoveAttemptFinishedProto {
-  repeated BlockProto blocks = 1;
-}
-
 /**
  * registration - Information of the datanode registering with the namenode
  */
@@ -249,7 +241,6 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
-  optional BlocksStorageMoveAttemptFinishedProto storageMoveAttemptFinishedBlks = 12;
 }
 
 /**

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4591,6 +4591,47 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.storage.policy.satisfier.max.outstanding.paths</name>
+  <value>10000</value>
+  <description>
+    Defines the maximum number of paths to satisfy that can be queued up in the
+    Satisfier call queue in a period of time. Default value is 10000.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.address</name>
+  <value>0.0.0.0:0</value>
+  <description>
+    The hostname used for a keytab based Kerberos login. Keytab based login
+    is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.keytab.file</name>
+  <value></value>
+  <description>
+    The keytab file used by external StoragePolicySatisfier to login as its
+    service principal. The principal name is configured with
+    dfs.storage.policy.satisfier.kerberos.principal. Keytab based login
+    is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.kerberos.principal</name>
+  <value></value>
+  <description>
+    The StoragePolicySatisfier principal. This is typically set to
+    satisfier/_HOST@REALM.TLD. The StoragePolicySatisfier will substitute
+    _HOST with its own fully qualified hostname at startup. The _HOST placeholder
+    allows using the same configuration setting on different servers. Keytab
+    based login is required when dfs.storage.policy.satisfier.mode is external.
+  </description>
+</property>
+
 <property>
   <name>dfs.pipeline.ecn</name>
   <value>false</value>

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -117,8 +116,7 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null));
+          SlowDiskReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -168,8 +167,7 @@ public class InternalDataNodeTestUtils {
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
+            Mockito.any(SlowDiskReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

+ 88 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimpleBlocksMovementsStatusHandler.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
+import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
+
+/**
+ * Blocks movements status handler, which is used to collect details of the
+ * completed block movements and later these attempted finished(with success or
+ * failure) blocks can be accessed to notify respective listeners, if any.
+ */
+public class SimpleBlocksMovementsStatusHandler
+    implements BlocksMovementsStatusHandler {
+  private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
+
+  /**
+   * Collect all the storage movement attempt finished blocks. Later this will
+   * be send to namenode via heart beat.
+   *
+   * @param moveAttemptFinishedBlk
+   *          storage movement attempt finished block
+   */
+  public void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk) {
+    // Adding to the tracking report list. Later this can be accessed to know
+    // the attempted block movements.
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.add(moveAttemptFinishedBlk.getBlock());
+    }
+  }
+
+  /**
+   * @return unmodifiable list of storage movement attempt finished blocks.
+   */
+  public List<Block> getMoveAttemptFinishedBlocks() {
+    List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+    // 1. Adding all the completed block ids.
+    synchronized (blockIdVsMovementStatus) {
+      if (blockIdVsMovementStatus.size() > 0) {
+        moveAttemptFinishedBlks = Collections
+            .unmodifiableList(blockIdVsMovementStatus);
+      }
+    }
+    return moveAttemptFinishedBlks;
+  }
+
+  /**
+   * Remove the storage movement attempt finished blocks from the tracking list.
+   *
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks
+   */
+  public void remove(List<Block> moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks != null) {
+      blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
+    }
+  }
+
+  /**
+   * Clear the blockID vs movement status tracking map.
+   */
+  public void removeAll() {
+    synchronized (blockIdVsMovementStatus) {
+      blockIdVsMovementStatus.clear();
+    }
+  }
+}

+ 5 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -124,8 +123,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
         .when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
-        .getStoragePolicySatisfyWorker();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+        .when(mockDn).getStoragePolicySatisfyWorker();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -160,8 +159,7 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
-          Mockito.any(SlowDiskReports.class),
-          Mockito.any(BlocksStorageMoveAttemptFinished.class));
+          Mockito.any(SlowDiskReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
@@ -380,8 +378,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
-    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
-        .getStoragePolicySatisfyWorker();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn, null))
+        .when(mockDn).getStoragePolicySatisfyWorker();
     final AtomicInteger count = new AtomicInteger();
     Mockito.doAnswer(new Answer<Void>() {
       @Override

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -93,7 +93,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -233,8 +232,7 @@ public class TestBlockRecovery {
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMoveAttemptFinished.class)))
+            Mockito.any(SlowDiskReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java

@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -173,8 +172,7 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class),
-            any(BlocksStorageMoveAttemptFinished.class));
+            any(SlowDiskReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -239,8 +237,7 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class),
-            any(BlocksStorageMoveAttemptFinished.class));
+            any(SlowDiskReports.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -223,8 +222,7 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
-           Mockito.any(SlowDiskReports.class),
-           Mockito.any(BlocksStorageMoveAttemptFinished.class));
+           Mockito.any(SlowDiskReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -209,8 +208,7 @@ public class TestFsDatasetCache {
           (StorageReport[]) any(), anyLong(), anyLong(),
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
-          any(SlowDiskReports.class),
-          any(BlocksStorageMoveAttemptFinished.class));
+          any(SlowDiskReports.class));
     } finally {
       lock.writeLock().unlock();
     }

+ 7 - 69
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -35,8 +33,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -173,8 +171,10 @@ public class TestStoragePolicySatisfyWorker {
     DatanodeInfo targetDnInfo = DFSTestUtil
         .getLocalDatanodeInfo(src.getXferPort());
 
+    SimpleBlocksMovementsStatusHandler handler =
+        new SimpleBlocksMovementsStatusHandler();
     StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
-        src);
+        src, handler);
     try {
       worker.start();
       List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
@@ -184,81 +184,19 @@ public class TestStoragePolicySatisfyWorker {
       blockMovingInfos.add(blockMovingInfo);
       worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
           blockMovingInfos);
-
-      waitForBlockMovementCompletion(worker, 1, 30000);
+      waitForBlockMovementCompletion(handler, 1, 30000);
     } finally {
       worker.stop();
     }
   }
 
-  /**
-   * Tests that drop SPS work method clears all the queues.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 120000)
-  public void testDropSPSWork() throws Exception {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
-
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String file = "/testDropSPSWork";
-    DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
-        DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
-
-    // move to ARCHIVE
-    dfs.setStoragePolicy(new Path(file), "COLD");
-
-    DataNode src = cluster.getDataNodes().get(2);
-    DatanodeInfo targetDnInfo =
-        DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
-
-    StoragePolicySatisfyWorker worker =
-        new StoragePolicySatisfyWorker(conf, src);
-    worker.start();
-    try {
-      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-      List<LocatedBlock> locatedBlocks =
-          dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
-      for (LocatedBlock locatedBlock : locatedBlocks) {
-        BlockMovingInfo blockMovingInfo =
-            prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
-                locatedBlock.getLocations()[0], targetDnInfo,
-                locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
-        blockMovingInfos.add(blockMovingInfo);
-      }
-      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
-          blockMovingInfos);
-      // Wait till results queue build up
-      waitForBlockMovementResult(worker, 30000);
-      worker.dropSPSWork();
-      assertTrue(worker.getBlocksMovementsStatusHandler()
-          .getMoveAttemptFinishedBlocks().size() == 0);
-    } finally {
-      worker.stop();
-    }
-  }
-
-  private void waitForBlockMovementResult(
-      final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
-            .getMoveAttemptFinishedBlocks();
-        return completedBlocks.size() > 0;
-      }
-    }, 100, timeout);
-  }
-
   private void waitForBlockMovementCompletion(
-      final StoragePolicySatisfyWorker worker,
+      final SimpleBlocksMovementsStatusHandler handler,
       int expectedFinishedItemsCount, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
-            .getMoveAttemptFinishedBlocks();
+        List<Block> completedBlocks = handler.getMoveAttemptFinishedBlocks();
         int finishedCount = completedBlocks.size();
         LOG.info("Block movement completed count={}, expected={} and actual={}",
             completedBlocks.size(), expectedFinishedItemsCount, finishedCount);

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -111,8 +110,7 @@ public class TestStorageReport {
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
-        Mockito.any(SlowDiskReports.class),
-        Mockito.any(BlocksStorageMoveAttemptFinished.class));
+        Mockito.any(SlowDiskReports.class));
 
     StorageReport[] reports = captor.getValue();
 

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -957,8 +956,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null)).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1008,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMoveAttemptFinished(null)).getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+          .getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -131,8 +130,7 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-        new BlocksStorageMoveAttemptFinished(null));
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,8 +139,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-            new BlocksStorageMoveAttemptFinished(null)).getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
+        .getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -250,10 +251,9 @@ public class TestNameNodeReconfigure {
         StoragePolicySatisfierMode.INTERNAL.toString());
 
     // Since DFS_STORAGE_POLICY_ENABLED_KEY is disabled, SPS can't be enabled.
-    assertEquals("SPS shouldn't start as "
-        + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled", false,
-            nameNode.getNamesystem().getBlockManager().getSPSManager()
-            .isInternalSatisfierRunning());
+    assertNull("SPS shouldn't start as "
+        + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY + " is disabled",
+            nameNode.getNamesystem().getBlockManager().getSPSManager());
     verifySPSEnabled(nameNode, DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
         StoragePolicySatisfierMode.INTERNAL, false);
 
@@ -352,9 +352,12 @@ public class TestNameNodeReconfigure {
 
   void verifySPSEnabled(final NameNode nameNode, String property,
       StoragePolicySatisfierMode expected, boolean isSatisfierRunning) {
-    assertEquals(property + " has wrong value", isSatisfierRunning, nameNode
-        .getNamesystem().getBlockManager().getSPSManager()
-        .isInternalSatisfierRunning());
+    StoragePolicySatisfyManager spsMgr = nameNode
+            .getNamesystem().getBlockManager().getSPSManager();
+    boolean isInternalSatisfierRunning = spsMgr != null
+        ? spsMgr.isInternalSatisfierRunning() : false;
+    assertEquals(property + " has wrong value", isSatisfierRunning,
+        isInternalSatisfierRunning);
     String actual = nameNode.getConf().get(property,
         DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
     assertEquals(property + " has wrong value", expected,

+ 53 - 35
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java

@@ -22,13 +22,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,14 +97,16 @@ public class TestBlockStorageMovementAttemptedItems {
    */
   @Test(timeout = 30000)
   public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<Block>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
-    Block[] blockArray = new Block[blocks.size()];
-    blocks.toArray(blockArray);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
+    Block block = new Block(item);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
     assertEquals("Failed to receive result!", 1,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
   }
@@ -111,9 +118,13 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
     bsmAttemptedItems.start(); // start block movement report monitor thread
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems.add(new AttemptedItemInfo<Long>(0L, 0L, 0L, blocks, 0));
+    Block block = new Block(item);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(0L, 0L, 0L, blocksMap, 0);
     assertEquals("Shouldn't receive result", 0,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
     assertEquals("Item doesn't exist in the attempted list", 1,
@@ -129,15 +140,18 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried1() throws Exception {
     Long item = new Long(1234);
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    blocks.add(new Block(5678L));
+    Block block1 = new Block(item);
+    Block block2 = new Block(5678L);
     Long trackID = 0L;
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block1, locs);
+    blocksMap.put(block2, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block1);
 
     // start block movement report monitor thread
     bsmAttemptedItems.start();
@@ -155,14 +169,16 @@ public class TestBlockStorageMovementAttemptedItems {
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried2() throws Exception {
     Long item = new Long(1234);
+    Block block = new Block(item);
     Long trackID = 0L;
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
 
     Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
 
@@ -183,14 +199,16 @@ public class TestBlockStorageMovementAttemptedItems {
   public void testPartialBlockMovementWithEmptyAttemptedQueue()
       throws Exception {
     Long item = new Long(1234);
+    Block block = new Block(item);
     Long trackID = 0L;
-    List<Block> blocks = new ArrayList<>();
-    blocks.add(new Block(item));
-    bsmAttemptedItems
-        .add(new AttemptedItemInfo<Long>(trackID, trackID, 0L, blocks, 0));
-    Block[] blksMovementReport = new Block[1];
-    blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
+    DatanodeInfo dnInfo = DFSTestUtil.getLocalDatanodeInfo(9867);
+    Set<StorageTypeNodePair> locs = new HashSet<>();
+    locs.add(new StorageTypeNodePair(StorageType.ARCHIVE, dnInfo));
+    Map<Block, Set<StorageTypeNodePair>> blocksMap = new HashMap<>();
+    blocksMap.put(block, locs);
+    bsmAttemptedItems.add(trackID, trackID, 0L, blocksMap, 0);
+    bsmAttemptedItems.notifyReportedBlock(dnInfo, StorageType.ARCHIVE,
+        block);
     assertFalse(
         "Should not add in queue again if it is not there in"
             + " storageMovementAttemptedItems",

+ 62 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -107,6 +108,8 @@ public class TestStoragePolicySatisfier {
   public static final long CAPACITY = 2 * 256 * 1024 * 1024;
   public static final String FILE = "/testMoveToSatisfyStoragePolicy";
   public static final int DEFAULT_BLOCK_SIZE = 1024;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   /**
    * Sets hdfs cluster.
@@ -1029,6 +1032,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       StorageType[][] newtypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1072,6 +1078,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       StorageType[][] newtypes = new StorageType[][] {
           {StorageType.ARCHIVE, StorageType.DISK},
           {StorageType.ARCHIVE, StorageType.DISK},
@@ -1089,7 +1098,7 @@ public class TestStoragePolicySatisfier {
       fs.setStoragePolicy(filePath, "COLD");
       fs.satisfyStoragePolicy(filePath);
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-          StorageType.ARCHIVE, 3, 30000, hdfsCluster.getFileSystem());
+          StorageType.ARCHIVE, 3, 60000, hdfsCluster.getFileSystem());
       assertFalse("Log output does not contain expected log message: ",
           logs.getOutput().contains("some of the blocks are low redundant"));
     } finally {
@@ -1425,6 +1434,9 @@ public class TestStoragePolicySatisfier {
       config.set(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
           "3000");
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+          "5000");
       config.setBoolean(DFSConfigKeys
           .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY,
           false);
@@ -1467,7 +1479,7 @@ public class TestStoragePolicySatisfier {
       for (int i = 1; i <= 10; i++) {
         Path filePath = new Path("/file" + i);
         DFSTestUtil.waitExpectedStorageType(filePath.toString(),
-            StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+            StorageType.DISK, 4, 60000, hdfsCluster.getFileSystem());
       }
       for (int i = 11; i <= 20; i++) {
         Path filePath = new Path("/file" + i);
@@ -1725,20 +1737,16 @@ public class TestStoragePolicySatisfier {
   public void waitForBlocksMovementAttemptReport(
       long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
-    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier<Long> sps =
-        (StoragePolicySatisfier<Long>) blockManager
-        .getSPSManager().getInternalSPSService();
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMovementFinishedBlocksCount,
-            ((BlockStorageMovementAttemptedItems<Long>) (sps
-                .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<Long>) (sps
-            .getAttemptedItemsMonitor()))
-                .getMovementFinishedBlocksCount()
+            actualCount);
+        return actualCount
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
@@ -1790,11 +1798,54 @@ public class TestStoragePolicySatisfier {
         .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
         .storageTypes(storageTypes).storageCapacities(capacities).build();
     cluster.waitActive();
+
+    // Sets external listener for assertion.
+    blkMoveListener.clear();
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier<Long> sps =
+        (StoragePolicySatisfier<Long>) blockManager
+        .getSPSManager().getInternalSPSService();
+    sps.setBlockMovementListener(blkMoveListener);
     return cluster;
   }
 
   public void restartNamenode() throws IOException {
     hdfsCluster.restartNameNodes();
     hdfsCluster.waitActive();
+    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+    StoragePolicySatisfyManager spsMgr = blockManager.getSPSManager();
+    if (spsMgr != null && spsMgr.isInternalSatisfierRunning()) {
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) spsMgr.getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+    }
+  }
+
+  /**
+   * Implementation of listener callback, where it collects all the sps move
+   * attempted blocks for assertion.
+   */
+  public static final class ExternalBlockMovementListener
+      implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks:{}", actualBlockMovements);
+    }
+
+    public List<Block> getActualBlockMovements() {
+      return actualBlockMovements;
+    }
+
+    public void clear() {
+      actualBlockMovements.clear();
+    }
   }
 }

+ 29 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier.ExternalBlockMovementListener;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -70,6 +71,8 @@ public class TestStoragePolicySatisfierWithStripedFile {
   private int cellSize;
   private int defaultStripeBlockSize;
   private Configuration conf;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   private ErasureCodingPolicy getEcPolicy() {
     return StripedFileTestUtil.getDefaultECPolicy();
@@ -131,6 +134,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     try {
       cluster.waitActive();
+
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) blockManager
+          .getSPSManager().getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+
       DistributedFileSystem dfs = cluster.getFileSystem();
       dfs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -240,6 +252,15 @@ public class TestStoragePolicySatisfierWithStripedFile {
     HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
     try {
       cluster.waitActive();
+
+      // Sets external listener for assertion.
+      blkMoveListener.clear();
+      BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+      final StoragePolicySatisfier<Long> sps =
+          (StoragePolicySatisfier<Long>) blockManager
+          .getSPSManager().getInternalSPSService();
+      sps.setBlockMovementListener(blkMoveListener);
+
       DistributedFileSystem dfs = cluster.getFileSystem();
       dfs.enableErasureCodingPolicy(
           StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -328,6 +349,9 @@ public class TestStoragePolicySatisfierWithStripedFile {
     conf.set(DFSConfigKeys
         .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
         "3000");
+    conf.set(DFSConfigKeys
+        .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+        "5000");
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(numOfDatanodes)
         .storagesPerDatanode(storagesPerDatanode)
@@ -559,22 +583,16 @@ public class TestStoragePolicySatisfierWithStripedFile {
   private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
       long expectedMoveFinishedBlks, int timeout)
           throws TimeoutException, InterruptedException {
-    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier<Long> sps =
-        (StoragePolicySatisfier<Long>) blockManager
-        .getSPSManager().getInternalSPSService();
-    Assert.assertNotNull("Failed to get SPS object reference!", sps);
-
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMoveFinishedBlks,
-            ((BlockStorageMovementAttemptedItems<Long>) sps
-                .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<Long>) sps
-            .getAttemptedItemsMonitor())
-                .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
+            actualCount);
+        return actualCount >= expectedMoveFinishedBlks;
       }
     }, 100, timeout);
   }

+ 13 - 31
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java

@@ -54,11 +54,9 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
@@ -92,6 +90,8 @@ public class TestExternalStoragePolicySatisfier
   private File baseDir;
   private StoragePolicySatisfier<String> externalSps;
   private ExternalSPSContext externalCtxt;
+  private ExternalBlockMovementListener blkMoveListener =
+      new ExternalBlockMovementListener();
 
   @After
   public void destroy() throws Exception {
@@ -144,15 +144,12 @@ public class TestExternalStoragePolicySatisfier
     nnc = getNameNodeConnector(getConf());
 
     externalSps = new StoragePolicySatisfier<String>(getConf());
-    externalCtxt = new ExternalSPSContext(externalSps,
-        getNameNodeConnector(conf));
+    externalCtxt = new ExternalSPSContext(externalSps, nnc);
 
-    ExternalBlockMovementListener blkMoveListener =
-        new ExternalBlockMovementListener();
+    blkMoveListener.clear();
     ExternalSPSBlockMoveTaskHandler externalHandler =
         new ExternalSPSBlockMoveTaskHandler(conf, nnc,
             externalSps);
-    externalHandler.init();
     externalSps.init(externalCtxt,
         new ExternalSPSFilePathCollector(externalSps), externalHandler,
         blkMoveListener);
@@ -169,33 +166,17 @@ public class TestExternalStoragePolicySatisfier
     getCluster().waitActive();
     externalSps = new StoragePolicySatisfier<>(getConf());
 
-    externalCtxt = new ExternalSPSContext(externalSps,
-        getNameNodeConnector(getConf()));
-    ExternalBlockMovementListener blkMoveListener =
-        new ExternalBlockMovementListener();
+    externalCtxt = new ExternalSPSContext(externalSps, nnc);
+    blkMoveListener.clear();
     ExternalSPSBlockMoveTaskHandler externalHandler =
         new ExternalSPSBlockMoveTaskHandler(getConf(), nnc,
             externalSps);
-    externalHandler.init();
     externalSps.init(externalCtxt,
         new ExternalSPSFilePathCollector(externalSps), externalHandler,
         blkMoveListener);
     externalSps.start(true, StoragePolicySatisfierMode.EXTERNAL);
   }
 
-  private class ExternalBlockMovementListener implements BlockMovementListener {
-
-    private List<Block> actualBlockMovements = new ArrayList<>();
-
-    @Override
-    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
-      for (Block block : moveAttemptFinishedBlks) {
-        actualBlockMovements.add(block);
-      }
-      LOG.info("Movement attempted blocks", actualBlockMovements);
-    }
-  }
-
   private NameNodeConnector getNameNodeConnector(Configuration conf)
       throws IOException {
     final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -237,16 +218,15 @@ public class TestExternalStoragePolicySatisfier
   public void waitForBlocksMovementAttemptReport(
       long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
+    Assert.assertNotNull("Didn't set external block move listener",
+        blkMoveListener);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        int actualCount = blkMoveListener.getActualBlockMovements().size();
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount,
-            ((BlockStorageMovementAttemptedItems<String>) (externalSps
-                .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
-        return ((BlockStorageMovementAttemptedItems<String>) (externalSps
-            .getAttemptedItemsMonitor()))
-                .getMovementFinishedBlocksCount()
+            expectedMovementFinishedBlocksCount, actualCount);
+        return actualCount
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
@@ -352,6 +332,8 @@ public class TestExternalStoragePolicySatisfier
       files.add(FILE);
       DistributedFileSystem fs = getFS();
 
+      // stops sps to make the SPS Q with many outstanding requests.
+      externalSps.stopGracefully();
       // Creates 4 more files. Send all of them for satisfying the storage
       // policy together.
       for (int i = 0; i < 3; i++) {