ソースを参照

HDFS-12911. [SPS]: Modularize the SPS code and expose necessary interfaces for external/internal implementations. Contributed by Uma Maheswara Rao G

Rakesh Radhakrishnan 7 年 前
コミット
8d4f74e733
20 ファイル変更938 行追加437 行削除
  1. 52 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  2. 4 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
  3. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  4. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  5. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
  6. 40 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java
  7. 16 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
  8. 35 172
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
  9. 16 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
  10. 43 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
  11. 62 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
  12. 20 42
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
  13. 178 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
  14. 81 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
  15. 63 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
  16. 107 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
  17. 73 102
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
  18. 10 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
  19. 70 41
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
  20. 11 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java

+ 52 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -93,8 +93,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.sps.Context;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -434,7 +434,8 @@ public class BlockManager implements BlockStatsMXBean {
   private final StoragePolicySatisfier sps;
   private final boolean storagePolicyEnabled;
   private boolean spsEnabled;
-  private Context spsctxt = null;
+  private final SPSPathIds spsPaths;
+
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
    */
@@ -481,8 +482,8 @@ public class BlockManager implements BlockStatsMXBean {
         conf.getBoolean(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
-    spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf);
-    sps = new StoragePolicySatisfier(spsctxt);
+    sps = new StoragePolicySatisfier(conf);
+    spsPaths = new SPSPathIds();
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -5041,8 +5042,7 @@ public class BlockManager implements BlockStatsMXBean {
       LOG.info("Storage policy satisfier is already running.");
       return;
     }
-    // TODO: FSDirectory will get removed via HDFS-12911 modularization work
-    sps.start(false, namesystem.getFSDirectory());
+    sps.start(false);
   }
 
   /**
@@ -5078,8 +5078,7 @@ public class BlockManager implements BlockStatsMXBean {
       LOG.info("Storage policy satisfier is already running.");
       return;
     }
-    // TODO: FSDirectory will get removed via HDFS-12911 modularization work
-    sps.start(true, namesystem.getFSDirectory());
+    sps.start(true);
   }
 
   /**
@@ -5119,4 +5118,48 @@ public class BlockManager implements BlockStatsMXBean {
       String path) throws IOException {
     return sps.checkStoragePolicySatisfyPathStatus(path);
   }
+
+  /**
+   * @return SPS service instance.
+   */
+  public SPSService getSPSService() {
+    return this.sps;
+  }
+
+  /**
+   * @return the next SPS path id, on which path users has invoked to satisfy
+   *         storages.
+   */
+  public Long getNextSPSPathId() {
+    return spsPaths.pollNext();
+  }
+
+  /**
+   * Removes the SPS path id from the list of sps paths.
+   */
+  public void removeSPSPathId(long trackId) {
+    spsPaths.remove(trackId);
+  }
+
+  /**
+   * Clean up all sps path ids.
+   */
+  public void removeAllSPSPathIds() {
+    spsPaths.clear();
+  }
+
+  /**
+   * Adds the sps path to SPSPathIds list.
+   */
+  public void addSPSPathId(long id) {
+    spsPaths.add(id);
+  }
+
+  /**
+   * @return true if sps enabled.
+   */
+  public boolean isSPSEnabled() {
+    return spsEnabled;
+  }
+
 }

+ 4 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.collect.Lists;
 
@@ -87,21 +86,14 @@ final class FSDirSatisfyStoragePolicyOp {
   }
 
   static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) {
-    if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-      // Adding directly in the storageMovementNeeded queue, So it can
-      // get more priority compare to directory.
-      fsd.getBlockManager().getStoragePolicySatisfier()
-          .satisfyStoragePolicy(inode.getId());
-      return true;
-    } else if (inode.isDirectory()
-        && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) {
+    if (inode.isFile() && inode.asFile().numBlocks() == 0) {
+      return false;
+    } else {
       // Adding directory in the pending queue, so FileInodeIdCollector process
       // directory child in batch and recursively
-      fsd.getBlockManager().getStoragePolicySatisfier()
-          .addInodeToPendingDirQueue(inode.getId());
+      fsd.getBlockManager().addSPSPathId(inode.getId());
       return true;
     }
-    return false;
   }
 
   private static boolean inodeHasSatisfyXAttr(INode inode) {

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1401,14 +1401,16 @@ public class FSDirectory implements Closeable {
       if (!inode.isSymlink()) {
         final XAttrFeature xaf = inode.getXAttrFeature();
         addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
-        addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+        if (namesystem.getBlockManager().isSPSEnabled()) {
+          addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+        }
       }
     }
   }
 
   private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
       XAttrFeature xaf) {
-    if (xaf == null || inode.isDirectory()) {
+    if (xaf == null) {
       return;
     }
     XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -258,6 +258,9 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@@ -1291,7 +1294,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
             edekCacheLoaderDelay, edekCacheLoaderInterval);
       }
-
+      blockManager.getSPSService().init(
+          new IntraSPSNameNodeContext(this, blockManager,
+              blockManager.getSPSService()),
+          new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
+              blockManager.getSPSService()),
+          new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this));
       blockManager.startSPS();
     } finally {
       startingActiveService = false;

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+
+/**
+ * Interface for implementing different ways of block moving approaches. One can
+ * connect directly to DN and request block move, and other can talk NN to
+ * schedule via heart-beats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BlockMoveTaskHandler {
+
+  /**
+   * This is an interface method to handle the move tasks. BlockMovingInfo must
+   * contain the required info to move the block, that source location,
+   * destination location and storage types.
+   */
+  void submitMoveTask(BlockMovingInfo blkMovingInfo,
+      BlockMovementListener blockMoveCompletionListener) throws IOException;
+
+}

+ 40 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Interface for notifying about block movement attempt completion.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BlockMovementListener {
+
+  /**
+   * This method used to notify to the SPS about block movement attempt
+   * finished. Then SPS will re-check whether it needs retry or not.
+   *
+   * @param moveAttemptFinishedBlks
+   *          -list of movement attempt finished blocks
+   */
+  void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks);
+}

+ 16 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java

@@ -32,7 +32,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +46,8 @@ import com.google.common.annotations.VisibleForTesting;
  * finished for a longer time period, then such items will retries automatically
  * after timeout. The default timeout would be 5 minutes.
  */
-public class BlockStorageMovementAttemptedItems {
+public class BlockStorageMovementAttemptedItems
+    implements BlockMovementListener {
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
 
@@ -71,19 +71,19 @@ public class BlockStorageMovementAttemptedItems {
   //
   private long minCheckTimeout = 1 * 60 * 1000; // minimum value
   private BlockStorageMovementNeeded blockStorageMovementNeeded;
-  private final Context ctxt;
+  private final SPSService service;
 
-  public BlockStorageMovementAttemptedItems(Context context,
+  public BlockStorageMovementAttemptedItems(SPSService service,
       BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
-    this.ctxt = context;
-    long recheckTimeout = ctxt.getConf().getLong(
+    this.service = service;
+    long recheckTimeout = this.service.getConf().getLong(
         DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
         DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT);
     if (recheckTimeout > 0) {
       this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
     }
 
-    this.selfRetryTimeout = ctxt.getConf().getLong(
+    this.selfRetryTimeout = this.service.getConf().getLong(
         DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
         DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
@@ -111,7 +111,7 @@ public class BlockStorageMovementAttemptedItems {
    * @param moveAttemptFinishedBlks
    *          storage movement attempt finished blocks
    */
-  public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+  public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
     if (moveAttemptFinishedBlks.length == 0) {
       return;
     }
@@ -191,7 +191,7 @@ public class BlockStorageMovementAttemptedItems {
         AttemptedItemInfo itemInfo = iter.next();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
-          Long blockCollectionID = itemInfo.getTrackId();
+          Long blockCollectionID = itemInfo.getFileId();
           synchronized (movementFinishedBlocks) {
             ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
                 blockCollectionID, itemInfo.getRetryCount() + 1);
@@ -223,7 +223,7 @@ public class BlockStorageMovementAttemptedItems {
               // gets the chance first and can be cleaned from queue quickly as
               // all movements already done.
               blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
-                  .getStartId(), attemptedItemInfo.getTrackId(),
+                  .getStartId(), attemptedItemInfo.getFileId(),
                   attemptedItemInfo.getRetryCount() + 1));
               iterator.remove();
             }
@@ -246,7 +246,11 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   public void clearQueues() {
-    movementFinishedBlocks.clear();
-    storageMovementAttemptedItems.clear();
+    synchronized (movementFinishedBlocks) {
+      movementFinishedBlocks.clear();
+    }
+    synchronized (storageMovementAttemptedItems) {
+      storageMovementAttemptedItems.clear();
+    }
   }
 }

+ 35 - 172
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java

@@ -17,11 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
-
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -33,12 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -75,22 +65,21 @@ public class BlockStorageMovementNeeded {
 
   private final Context ctxt;
 
-  // List of pending dir to satisfy the policy
-  private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+  private Daemon pathIdCollector;
 
-  private Daemon inodeIdCollector;
+  private FileIdCollector fileIDCollector;
 
-  private final int maxQueuedItem;
+  private SPSPathIdProcessor pathIDProcessor;
 
   // Amount of time to cache the SUCCESS status of path before turning it to
   // NOT_AVAILABLE.
   private static long statusClearanceElapsedTimeMs = 300000;
 
-  public BlockStorageMovementNeeded(Context context) {
+  public BlockStorageMovementNeeded(Context context,
+      FileIdCollector fileIDCollector) {
     this.ctxt = context;
-    this.maxQueuedItem = ctxt.getConf().getInt(
-                  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
-                  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+    this.fileIDCollector = fileIDCollector;
+    pathIDProcessor = new SPSPathIdProcessor();
   }
 
   /**
@@ -140,29 +129,6 @@ public class BlockStorageMovementNeeded {
     return storageMovementNeeded.poll();
   }
 
-  public synchronized void addToPendingDirQueue(long id) {
-    spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
-        StoragePolicySatisfyPathStatus.PENDING));
-    spsDirsToBeTraveresed.add(id);
-    // Notify waiting FileInodeIdCollector thread about the newly
-    // added SPS path.
-    synchronized (spsDirsToBeTraveresed) {
-      spsDirsToBeTraveresed.notify();
-    }
-  }
-
-  /**
-   * Returns queue remaining capacity.
-   */
-  public synchronized int remainingCapacity() {
-    int size = storageMovementNeeded.size();
-    if (size >= maxQueuedItem) {
-      return 0;
-    } else {
-      return (maxQueuedItem - size);
-    }
-  }
-
   /**
    * Returns queue size.
    */
@@ -171,7 +137,7 @@ public class BlockStorageMovementNeeded {
   }
 
   public synchronized void clearAll() {
-    spsDirsToBeTraveresed.clear();
+    ctxt.removeAllSPSPathIds();
     storageMovementNeeded.clear();
     pendingWorkForDirectory.clear();
   }
@@ -206,13 +172,13 @@ public class BlockStorageMovementNeeded {
     } else {
       // Remove xAttr if trackID doesn't exist in
       // storageMovementAttemptedItems or file policy satisfied.
-      ctxt.removeSPSHint(trackInfo.getTrackId());
+      ctxt.removeSPSHint(trackInfo.getFileId());
       updateStatus(trackInfo.getStartId(), isSuccess);
     }
   }
 
   public synchronized void clearQueue(long trackId) {
-    spsDirsToBeTraveresed.remove(trackId);
+    ctxt.removeSPSPathId(trackId);
     Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
     while (iterator.hasNext()) {
       ItemInfo next = iterator.next();
@@ -249,7 +215,7 @@ public class BlockStorageMovementNeeded {
   public synchronized void clearQueuesWithNotification() {
     // Remove xAttr from directories
     Long trackId;
-    while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
+    while ((trackId = ctxt.getNextSPSPathId()) != null) {
       try {
         // Remove xAttr for file
         ctxt.removeSPSHint(trackId);
@@ -265,12 +231,12 @@ public class BlockStorageMovementNeeded {
       try {
         // Remove xAttr for file
         if (!itemInfo.isDir()) {
-          ctxt.removeSPSHint(itemInfo.getTrackId());
+          ctxt.removeSPSHint(itemInfo.getFileId());
         }
       } catch (IOException ie) {
         LOG.warn(
             "Failed to remove SPS xattr for track id "
-                + itemInfo.getTrackId(), ie);
+                + itemInfo.getFileId(), ie);
       }
     }
     this.clearAll();
@@ -280,57 +246,33 @@ public class BlockStorageMovementNeeded {
    * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
    * ID's to process for satisfy the policy.
    */
-  private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
-      implements Runnable {
-
-    private int remainingCapacity = 0;
-
-    private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
-
-    StorageMovementPendingInodeIdCollector(FSDirectory dir) {
-      super(dir);
-    }
+  private class SPSPathIdProcessor implements Runnable {
 
     @Override
     public void run() {
       LOG.info("Starting FileInodeIdCollector!.");
       long lastStatusCleanTime = 0;
       while (ctxt.isRunning()) {
+        LOG.info("Running FileInodeIdCollector!.");
         try {
           if (!ctxt.isInSafeMode()) {
-            Long startINodeId = spsDirsToBeTraveresed.poll();
+            Long startINodeId = ctxt.getNextSPSPathId();
             if (startINodeId == null) {
               // Waiting for SPS path
-              synchronized (spsDirsToBeTraveresed) {
-                spsDirsToBeTraveresed.wait(5000);
-              }
+              Thread.sleep(3000);
             } else {
-              INode startInode = getFSDirectory().getInode(startINodeId);
-              if (startInode != null) {
-                try {
-                  remainingCapacity = remainingCapacity();
-                  spsStatus.put(startINodeId,
-                      new StoragePolicySatisfyPathStatusInfo(
-                          StoragePolicySatisfyPathStatus.IN_PROGRESS));
-                  readLock();
-                  traverseDir(startInode.asDirectory(), startINodeId,
-                      HdfsFileStatus.EMPTY_NAME,
-                      new SPSTraverseInfo(startINodeId));
-                } finally {
-                  readUnlock();
-                }
-                // Mark startInode traverse is done
-                addAll(startInode.getId(), currentBatch, true);
-                currentBatch.clear();
-
-                // check if directory was empty and no child added to queue
-                DirPendingWorkInfo dirPendingWorkInfo =
-                    pendingWorkForDirectory.get(startInode.getId());
-                if (dirPendingWorkInfo.isDirWorkDone()) {
-                  ctxt.removeSPSHint(startInode.getId());
-                  pendingWorkForDirectory.remove(startInode.getId());
-                  updateStatus(startInode.getId(), true);
-                }
+              spsStatus.put(startINodeId,
+                  new StoragePolicySatisfyPathStatusInfo(
+                      StoragePolicySatisfyPathStatus.IN_PROGRESS));
+              fileIDCollector.scanAndCollectFileIds(startINodeId);
+              // check if directory was empty and no child added to queue
+              DirPendingWorkInfo dirPendingWorkInfo =
+                  pendingWorkForDirectory.get(startINodeId);
+              if (dirPendingWorkInfo != null
+                  && dirPendingWorkInfo.isDirWorkDone()) {
+                ctxt.removeSPSHint(startINodeId);
+                pendingWorkForDirectory.remove(startINodeId);
+                updateStatus(startINodeId, true);
               }
             }
             //Clear the SPS status if status is in SUCCESS more than 5 min.
@@ -355,71 +297,6 @@ public class BlockStorageMovementNeeded {
         }
       }
     }
-
-    @Override
-    protected void checkPauseForTesting() throws InterruptedException {
-      // TODO implement if needed
-    }
-
-    @Override
-    protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
-        throws IOException, InterruptedException {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Processing {} for statisy the policy",
-            inode.getFullPathName());
-      }
-      if (!inode.isFile()) {
-        return false;
-      }
-      if (inode.isFile() && inode.asFile().numBlocks() != 0) {
-        currentBatch.add(new ItemInfo(
-            ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
-        remainingCapacity--;
-      }
-      return true;
-    }
-
-    @Override
-    protected boolean canSubmitCurrentBatch() {
-      return remainingCapacity <= 0;
-    }
-
-    @Override
-    protected void checkINodeReady(long startId) throws IOException {
-      // SPS work won't be scheduled if NN is in standby. So, skipping NN
-      // standby check.
-      return;
-    }
-
-    @Override
-    protected void submitCurrentBatch(long startId)
-        throws IOException, InterruptedException {
-      // Add current child's to queue
-      addAll(startId, currentBatch, false);
-      currentBatch.clear();
-    }
-
-    @Override
-    protected void throttle() throws InterruptedException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
-            + " waiting for some free slots.");
-      }
-      remainingCapacity = remainingCapacity();
-      // wait for queue to be free
-      while (remainingCapacity <= 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Waiting for storageMovementNeeded queue to be free!");
-        }
-        Thread.sleep(5000);
-        remainingCapacity = remainingCapacity();
-      }
-    }
-
-    @Override
-    protected boolean canTraverseDir(INode inode) throws IOException {
-      return true;
-    }
   }
 
   /**
@@ -476,29 +353,15 @@ public class BlockStorageMovementNeeded {
     }
   }
 
-  // TODO: FSDirectory will get removed via HDFS-12911 modularization work
-  public void init(FSDirectory fsd) {
-    inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
-        fsd));
-    inodeIdCollector.setName("FileInodeIdCollector");
-    inodeIdCollector.start();
+  public void activate() {
+    pathIdCollector = new Daemon(pathIDProcessor);
+    pathIdCollector.setName("SPSPathIdProcessor");
+    pathIdCollector.start();
   }
 
   public void close() {
-    if (inodeIdCollector != null) {
-      inodeIdCollector.interrupt();
-    }
-  }
-
-  class SPSTraverseInfo extends TraverseInfo {
-    private long startId;
-
-    SPSTraverseInfo(long startId) {
-      this.startId = startId;
-    }
-
-    public long getStartId() {
-      return startId;
+    if (pathIdCollector != null) {
+      pathIdCollector.interrupt();
     }
   }
 

+ 16 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java

@@ -19,11 +19,9 @@
 package org.apache.hadoop.hdfs.server.namenode.sps;
 
 import java.io.IOException;
-import java.util.function.Supplier;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -31,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -42,24 +39,11 @@ import org.apache.hadoop.security.AccessControlException;
 @InterfaceStability.Evolving
 public interface Context {
 
-  /**
-   * Returns configuration object.
-   */
-  Configuration getConf();
-
   /**
    * Returns true if the SPS is running, false otherwise.
    */
   boolean isRunning();
 
-  /**
-   * Update the SPS running status.
-   *
-   * @param isSpsRunning
-   *          true represents running, false otherwise
-   */
-  void setSPSRunning(Supplier<Boolean> isSpsRunning);
-
   /**
    * Returns true if the Namenode in safe mode, false otherwise.
    */
@@ -152,17 +136,6 @@ public interface Context {
    */
   boolean hasLowRedundancyBlocks(long inodeID);
 
-  /**
-   * Assign the given block movement task to the target node present in
-   * {@link BlockMovingInfo}.
-   *
-   * @param blkMovingInfo
-   *          block to storage info
-   * @throws IOException
-   */
-  void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
-      throws IOException;
-
   /**
    * Checks whether the given datanode has sufficient space to occupy the given
    * blockSize data.
@@ -178,4 +151,20 @@ public interface Context {
    */
   boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
       StorageType type, long blockSize);
+
+  /**
+   * @return next SPS path id to process.
+   */
+  Long getNextSPSPathId();
+
+  /**
+   * Removes the SPS path id.
+   */
+  void removeSPSPathId(long pathId);
+
+  /**
+   * Removes all SPS path ids.
+   */
+  void removeAllSPSPathIds();
+
 }

+ 43 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for scanning the directory recursively and collect file ids
+ * under the given directory.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileIdCollector {
+
+  /**
+   * Scans the given inode directory and collects the file ids under that
+   * directory and adds to the given BlockStorageMovementNeeded.
+   *
+   * @param inodeID
+   *          - The directory ID
+   */
+  void scanAndCollectFileIds(Long inodeId)
+      throws IOException, InterruptedException;
+}

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+
+/**
+ * This class handles the internal SPS block movements. This will assign block
+ * movement tasks to target datanode descriptors.
+ */
+public class IntraSPSNameNodeBlockMoveTaskHandler
+    implements BlockMoveTaskHandler {
+
+  private BlockManager blockManager;
+  private Namesystem namesystem;
+
+  public IntraSPSNameNodeBlockMoveTaskHandler(BlockManager blockManager,
+      Namesystem namesytem) {
+    this.blockManager = blockManager;
+    this.namesystem = namesytem;
+  }
+
+  @Override
+  public void submitMoveTask(BlockMovingInfo blkMovingInfo,
+      BlockMovementListener blockMoveCompletionListener) throws IOException {
+    namesystem.readLock();
+    try {
+      DatanodeDescriptor dn = blockManager.getDatanodeManager()
+          .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
+      if (dn == null) {
+        throw new IOException("Failed to schedule block movement task:"
+            + blkMovingInfo + " as target datanode: "
+            + blkMovingInfo.getTarget() + " doesn't exists");
+      }
+      dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+      dn.addBlocksToMoveStorage(blkMovingInfo);
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+}

+ 20 - 42
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java

@@ -20,10 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 
 import java.io.IOException;
-import java.util.function.Supplier;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -38,7 +36,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
 import org.slf4j.Logger;
@@ -55,15 +52,14 @@ public class IntraSPSNameNodeContext implements Context {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
-  private final Configuration conf;
-  private Supplier<Boolean> isSpsRunning;
+
+  private SPSService service;
 
   public IntraSPSNameNodeContext(Namesystem namesystem,
-      BlockManager blockManager, Configuration conf) {
+      BlockManager blockManager, SPSService service) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
-    this.conf = conf;
-    isSpsRunning = () -> false;
+    this.service = service;
   }
 
   @Override
@@ -110,11 +106,6 @@ public class IntraSPSNameNodeContext implements Context {
     }
   }
 
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
   @Override
   public boolean isFileExist(long inodeId) {
     return namesystem.getFSDirectory().getInode(inodeId) != null;
@@ -127,16 +118,7 @@ public class IntraSPSNameNodeContext implements Context {
 
   @Override
   public boolean isRunning() {
-    // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside
-    // SPS. Context interface will be further refined as part of HDFS-12911
-    // modularization task. One idea is to introduce a cleaner interface similar
-    // to Namesystem for better abstraction.
-    return namesystem.isRunning() && isSpsRunning.get();
-  }
-
-  @Override
-  public void setSPSRunning(Supplier<Boolean> spsRunningFlag) {
-    this.isSpsRunning = spsRunningFlag;
+    return namesystem.isRunning() && service.isRunning();
   }
 
   @Override
@@ -182,25 +164,6 @@ public class IntraSPSNameNodeContext implements Context {
     }
   }
 
-  @Override
-  public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
-      throws IOException {
-    namesystem.readLock();
-    try {
-      DatanodeDescriptor dn = blockManager.getDatanodeManager()
-          .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
-      if (dn == null) {
-        throw new IOException("Failed to schedule block movement task:"
-            + blkMovingInfo + " as target datanode: "
-            + blkMovingInfo.getTarget() + " doesn't exists");
-      }
-      dn.addBlocksToMoveStorage(blkMovingInfo);
-      dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
-    } finally {
-      namesystem.readUnlock();
-    }
-  }
-
   @Override
   public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
       StorageType type, long blockSize) {
@@ -217,4 +180,19 @@ public class IntraSPSNameNodeContext implements Context {
       namesystem.readUnlock();
     }
   }
+
+  @Override
+  public Long getNextSPSPathId() {
+    return blockManager.getNextSPSPathId();
+  }
+
+  @Override
+  public void removeSPSPathId(long trackId) {
+    blockManager.removeSPSPathId(trackId);
+  }
+
+  @Override
+  public void removeAllSPSPathIds() {
+    blockManager.removeAllSPSPathIds();
+  }
 }

+ 178 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+
+/**
+ * A specific implementation for scanning the directory with Namenode internal
+ * Inode structure and collects the file ids under the given directory ID.
+ */
+public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
+    implements FileIdCollector {
+  private int maxQueueLimitToScan;
+  private final SPSService service;
+
+  private int remainingCapacity = 0;
+
+  private List<ItemInfo> currentBatch;
+
+  public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) {
+    super(dir);
+    this.service = service;
+    this.maxQueueLimitToScan = service.getConf().getInt(
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+    currentBatch = new ArrayList<>(maxQueueLimitToScan);
+  }
+
+  @Override
+  protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+      throws IOException, InterruptedException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Processing {} for statisy the policy",
+          inode.getFullPathName());
+    }
+    if (!inode.isFile()) {
+      return false;
+    }
+    if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+      currentBatch.add(new ItemInfo(
+          ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+      remainingCapacity--;
+    }
+    return true;
+  }
+
+  @Override
+  protected boolean canSubmitCurrentBatch() {
+    return remainingCapacity <= 0;
+  }
+
+  @Override
+  protected void checkINodeReady(long startId) throws IOException {
+    // SPS work won't be scheduled if NN is in standby. So, skipping NN
+    // standby check.
+    return;
+  }
+
+  @Override
+  protected void submitCurrentBatch(long startId)
+      throws IOException, InterruptedException {
+    // Add current child's to queue
+    service.addAllFileIdsToProcess(startId,
+        currentBatch, false);
+    currentBatch.clear();
+  }
+
+  @Override
+  protected void throttle() throws InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+          + " waiting for some free slots.");
+    }
+    remainingCapacity = remainingCapacity();
+    // wait for queue to be free
+    while (remainingCapacity <= 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+      }
+      Thread.sleep(5000);
+      remainingCapacity = remainingCapacity();
+    }
+  }
+
+  @Override
+  protected boolean canTraverseDir(INode inode) throws IOException {
+    return true;
+  }
+
+  @Override
+  protected void checkPauseForTesting() throws InterruptedException {
+    // Nothing to do
+  }
+
+  @Override
+  public void scanAndCollectFileIds(final Long startINodeId)
+      throws IOException, InterruptedException {
+    FSDirectory fsd = getFSDirectory();
+    INode startInode = fsd.getInode(startINodeId);
+    if (startInode != null) {
+      remainingCapacity = remainingCapacity();
+      if (remainingCapacity == 0) {
+        throttle();
+      }
+      if (startInode.isFile()) {
+        currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId()));
+      } else {
+
+        readLock();
+        // NOTE: this lock will not be held until full directory scanning. It is
+        // basically a sliced locking. Once it collects a batch size( at max the
+        // size of maxQueueLimitToScan (default 1000)) file ids, then it will
+        // unlock and submits the current batch to SPSService. Once
+        // service.processingQueueSize() shows empty slots, then lock will be
+        // resumed and scan also will be resumed. This logic was re-used from
+        // EDEK feature.
+        try {
+          traverseDir(startInode.asDirectory(), startINodeId,
+              HdfsFileStatus.EMPTY_NAME, new SPSTraverseInfo(startINodeId));
+        } finally {
+          readUnlock();
+        }
+      }
+      // Mark startInode traverse is done, this is last-batch
+      service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true);
+      currentBatch.clear();
+    }
+  }
+
+  /**
+   * Returns queue remaining capacity.
+   */
+  public synchronized int remainingCapacity() {
+    int size = service.processingQueueSize();
+    if (size >= maxQueueLimitToScan) {
+      return 0;
+    } else {
+      return (maxQueueLimitToScan - size);
+    }
+  }
+
+  class SPSTraverseInfo extends TraverseInfo {
+    private long startId;
+
+    SPSTraverseInfo(long startId) {
+      this.startId = startId;
+    }
+
+    public long getStartId() {
+      return startId;
+    }
+  }
+
+}

+ 81 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * ItemInfo is a file info object for which need to satisfy the policy.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ItemInfo {
+  private long startId;
+  private long fileId;
+  private int retryCount;
+
+  public ItemInfo(long startId, long fileId) {
+    this.startId = startId;
+    this.fileId = fileId;
+    // set 0 when item is getting added first time in queue.
+    this.retryCount = 0;
+  }
+
+  public ItemInfo(final long startId, final long fileId, final int retryCount) {
+    this.startId = startId;
+    this.fileId = fileId;
+    this.retryCount = retryCount;
+  }
+
+  /**
+   * Return the start inode id of the current track Id. This indicates that SPS
+   * was invoked on this inode id.
+   */
+  public long getStartId() {
+    return startId;
+  }
+
+  /**
+   * Return the File inode Id for which needs to satisfy the policy.
+   */
+  public long getFileId() {
+    return fileId;
+  }
+
+  /**
+   * Returns true if the tracking path is a directory, false otherwise.
+   */
+  public boolean isDir() {
+    return (startId != fileId);
+  }
+
+  /**
+   * Get the attempted retry count of the block for satisfy the policy.
+   */
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  /**
+   * Increments the retry count.
+   */
+  public void increRetryCount() {
+    this.retryCount++;
+  }
+}

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class which holds the SPS invoked path ids.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SPSPathIds {
+
+  // List of pending dir to satisfy the policy
+  private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+
+  /**
+   * Add the path id to queue.
+   */
+  public synchronized void add(long pathId) {
+    spsDirsToBeTraveresed.add(pathId);
+  }
+
+  /**
+   * Removes the path id.
+   */
+  public synchronized void remove(long pathId) {
+    spsDirsToBeTraveresed.remove(pathId);
+  }
+
+  /**
+   * Clears all path ids.
+   */
+  public synchronized void clear() {
+    spsDirsToBeTraveresed.clear();
+  }
+
+  /**
+   * @return next path id available in queue.
+   */
+  public synchronized Long pollNext() {
+    return spsDirsToBeTraveresed.poll();
+  }
+}

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An interface for SPSService, which exposes life cycle and processing APIs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface SPSService {
+
+  /**
+   * Initializes the helper services.
+   *
+   * @param ctxt
+   *          - context is an helper service to provide communication channel
+   *          between NN and SPS
+   * @param fileIDCollector
+   *          - a helper service for scanning the files under a given directory
+   *          id
+   * @param handler
+   *          - a helper service for moving the blocks
+   */
+  void init(Context ctxt, FileIdCollector fileIDCollector,
+      BlockMoveTaskHandler handler);
+
+  /**
+   * Starts the SPS service. Make sure to initialize the helper services before
+   * invoking this method.
+   *
+   * @param reconfigStart
+   *          - to indicate whether the SPS startup requested from
+   *          reconfiguration service
+   */
+  void start(boolean reconfigStart);
+
+  /**
+   * Stops the SPS service gracefully. Timed wait to stop storage policy
+   * satisfier daemon threads.
+   */
+  void stopGracefully();
+
+  /**
+   * Disable the SPS service.
+   *
+   * @param forceStop
+   */
+  void disable(boolean forceStop);
+
+  /**
+   * Check whether StoragePolicySatisfier is running.
+   *
+   * @return true if running
+   */
+  boolean isRunning();
+
+  /**
+   * Adds the Item information(file id etc) to processing queue.
+   *
+   * @param itemInfo
+   */
+  void addFileIdToProcess(ItemInfo itemInfo);
+
+  /**
+   * Adds all the Item information(file id etc) to processing queue.
+   *
+   * @param startId
+   *          - directory/file id, on which SPS was called.
+   * @param itemInfoList
+   *          - list of item infos
+   * @param scanCompleted
+   *          - whether the scanning of directory fully done with itemInfoList
+   */
+  void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+      boolean scanCompleted);
+
+  /**
+   * @return current processing queue size.
+   */
+  int processingQueueSize();
+
+  /**
+   * @return the configuration.
+   */
+  Configuration getConf();
+}

+ 73 - 102
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java

@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Matcher;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -64,28 +64,34 @@ import com.google.common.annotations.VisibleForTesting;
  * storage policy type in Namespace, but physical block storage movement will
  * not happen until user runs "Mover Tool" explicitly for such files. The
  * StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by HDFS itself instead of
- * running mover tool explicitly. Just calling client API to
- * satisfyStoragePolicy on a file/dir will automatically trigger to move its
- * physical storage locations as expected in asynchronous manner. Here Namenode
- * will pick the file blocks which are expecting to change its storages, then it
- * will build the mapping of source block location and expected storage type and
- * location to move. After that this class will also prepare commands to send to
- * Datanode for processing the physical block movements.
+ * where users may want to physically move the blocks by a dedidated daemon (can
+ * run inside Namenode or stand alone) instead of running mover tool explicitly.
+ * Just calling client API to satisfyStoragePolicy on a file/dir will
+ * automatically trigger to move its physical storage locations as expected in
+ * asynchronous manner. Here SPS will pick the file blocks which are expecting
+ * to change its storages, then it will build the mapping of source block
+ * location and expected storage type and location to move. After that this
+ * class will also prepare requests to send to Datanode for processing the
+ * physical block movements.
  */
 @InterfaceAudience.Private
-public class StoragePolicySatisfier implements Runnable {
+public class StoragePolicySatisfier implements SPSService, Runnable {
   public static final Logger LOG =
       LoggerFactory.getLogger(StoragePolicySatisfier.class);
   private Daemon storagePolicySatisfierThread;
-  private final BlockStorageMovementNeeded storageMovementNeeded;
-  private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
+  private BlockStorageMovementNeeded storageMovementNeeded;
+  private BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
   private int spsWorkMultiplier;
   private long blockCount = 0L;
   private int blockMovementMaxRetry;
-  private final Context ctxt;
+  private Context ctxt;
+  private BlockMoveTaskHandler blockMoveTaskHandler;
+  private Configuration conf;
 
+  public StoragePolicySatisfier(Configuration conf) {
+    this.conf = conf;
+  }
   /**
    * Represents the collective analysis status for all blocks.
    */
@@ -125,13 +131,17 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  public StoragePolicySatisfier(Context ctxt) {
-    this.ctxt = ctxt;
-    this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt);
-    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt,
+  public void init(final Context context, final FileIdCollector fileIDCollector,
+      final BlockMoveTaskHandler blockMovementTaskHandler) {
+    this.ctxt = context;
+    this.storageMovementNeeded =
+        new BlockStorageMovementNeeded(context, fileIDCollector);
+    this.storageMovementsMonitor =
+        new BlockStorageMovementAttemptedItems(this,
         storageMovementNeeded);
-    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf());
-    this.blockMovementMaxRetry = ctxt.getConf().getInt(
+    this.blockMoveTaskHandler = blockMovementTaskHandler;
+    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
+    this.blockMovementMaxRetry = getConf().getInt(
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
         DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
   }
@@ -139,12 +149,10 @@ public class StoragePolicySatisfier implements Runnable {
   /**
    * Start storage policy satisfier demon thread. Also start block storage
    * movements monitor for retry the attempts if needed.
-   *
-   * // TODO: FSDirectory will get removed via HDFS-12911 modularization work.
    */
-  public synchronized void start(boolean reconfigStart, FSDirectory fsd) {
+  @Override
+  public synchronized void start(boolean reconfigStart) {
     isRunning = true;
-    ctxt.setSPSRunning(this::isRunning);
     if (ctxt.isMoverRunning()) {
       isRunning = false;
       LOG.error(
@@ -163,20 +171,14 @@ public class StoragePolicySatisfier implements Runnable {
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
-    storageMovementNeeded.init(fsd);
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
     this.storageMovementsMonitor.start();
+    this.storageMovementNeeded.activate();
   }
 
-  /**
-   * Disables storage policy satisfier by stopping its services.
-   *
-   * @param forceStop
-   *          true represents that it should stop SPS service by clearing all
-   *          pending SPS work
-   */
+  @Override
   public synchronized void disable(boolean forceStop) {
     isRunning = false;
     if (storagePolicySatisfierThread == null) {
@@ -195,14 +197,15 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  /**
-   * Timed wait to stop storage policy satisfier daemon threads.
-   */
+  @Override
   public synchronized void stopGracefully() {
     if (isRunning) {
       disable(true);
     }
-    this.storageMovementsMonitor.stopGracefully();
+
+    if (this.storageMovementsMonitor != null) {
+      this.storageMovementsMonitor.stopGracefully();
+    }
 
     if (storagePolicySatisfierThread == null) {
       return;
@@ -213,10 +216,7 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  /**
-   * Check whether StoragePolicySatisfier is running.
-   * @return true if running
-   */
+  @Override
   public boolean isRunning() {
     return isRunning;
   }
@@ -239,11 +239,11 @@ public class StoragePolicySatisfier implements Runnable {
             if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
               LOG.info("Failed to satisfy the policy after "
                   + blockMovementMaxRetry + " retries. Removing inode "
-                  + itemInfo.getTrackId() + " from the queue");
+                  + itemInfo.getFileId() + " from the queue");
               storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
               continue;
             }
-            long trackId = itemInfo.getTrackId();
+            long trackId = itemInfo.getFileId();
             BlocksMovingAnalysis status = null;
             DatanodeStorageReport[] liveDnReports;
             BlockStoragePolicy existingStoragePolicy;
@@ -273,7 +273,7 @@ public class StoragePolicySatisfier implements Runnable {
                 // be removed on storage movement attempt finished report.
               case BLOCKS_TARGETS_PAIRED:
                 this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
-                    .getStartId(), itemInfo.getTrackId(), monotonicNow(),
+                    .getStartId(), itemInfo.getFileId(), monotonicNow(),
                     status.assignedBlocks, itemInfo.getRetryCount()));
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
@@ -282,7 +282,7 @@ public class StoragePolicySatisfier implements Runnable {
                       + " back to retry queue as none of the blocks"
                       + " found its eligible targets.");
                 }
-                itemInfo.retryCount++;
+                itemInfo.increRetryCount();
                 this.storageMovementNeeded.add(itemInfo);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
@@ -426,7 +426,8 @@ public class StoragePolicySatisfier implements Runnable {
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       // Check for at least one block storage movement has been chosen
       try {
-        ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo);
+        blockMoveTaskHandler.submitMoveTask(blkMovingInfo,
+            storageMovementsMonitor);
         LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
         assignedBlockIds.add(blkMovingInfo.getBlock());
         blockCount++;
@@ -611,7 +612,6 @@ public class StoragePolicySatisfier implements Runnable {
 
         expected.remove(chosenTarget.storageType);
         excludeNodes.add(chosenTarget.dn);
-        // TODO: We can increment scheduled block count for this node?
       } else {
         LOG.warn(
             "Failed to choose target datanode for the required"
@@ -830,11 +830,11 @@ public class StoragePolicySatisfier implements Runnable {
       return;
     }
     storageMovementsMonitor
-        .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
+        .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks());
   }
 
   @VisibleForTesting
-  BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+  BlockMovementListener getAttemptedItemsMonitor() {
     return storageMovementsMonitor;
   }
 
@@ -863,10 +863,6 @@ public class StoragePolicySatisfier implements Runnable {
     }
   }
 
-  public void addInodeToPendingDirQueue(long id) {
-    storageMovementNeeded.addToPendingDirQueue(id);
-  }
-
   /**
    * Clear queues for given track id.
    */
@@ -874,57 +870,6 @@ public class StoragePolicySatisfier implements Runnable {
     storageMovementNeeded.clearQueue(trackId);
   }
 
-  /**
-   * ItemInfo is a file info object for which need to satisfy the
-   * policy.
-   */
-  public static class ItemInfo {
-    private long startId;
-    private long trackId;
-    private int retryCount;
-
-    public ItemInfo(long startId, long trackId) {
-      this.startId = startId;
-      this.trackId = trackId;
-      //set 0 when item is getting added first time in queue.
-      this.retryCount = 0;
-    }
-
-    public ItemInfo(long startId, long trackId, int retryCount) {
-      this.startId = startId;
-      this.trackId = trackId;
-      this.retryCount = retryCount;
-    }
-
-    /**
-     * Return the start inode id of the current track Id.
-     */
-    public long getStartId() {
-      return startId;
-    }
-
-    /**
-     * Return the File inode Id for which needs to satisfy the policy.
-     */
-    public long getTrackId() {
-      return trackId;
-    }
-
-    /**
-     * Returns true if the tracking path is a directory, false otherwise.
-     */
-    public boolean isDir() {
-      return (startId != trackId);
-    }
-
-    /**
-     * Get the attempted retry count of the block for satisfy the policy.
-     */
-    public int getRetryCount() {
-      return retryCount;
-    }
-  }
-
   /**
    * This class contains information of an attempted blocks and its last
    * attempted or reported time stamp. This is used by
@@ -977,4 +922,30 @@ public class StoragePolicySatisfier implements Runnable {
       String path) throws IOException {
     return storageMovementNeeded.getStatus(ctxt.getFileID(path));
   }
+
+  @Override
+  public void addFileIdToProcess(ItemInfo trackInfo) {
+    storageMovementNeeded.add(trackInfo);
+  }
+
+  @Override
+  public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+      boolean scanCompleted) {
+    getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted);
+  }
+
+  @Override
+  public int processingQueueSize() {
+    return storageMovementNeeded.size();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @VisibleForTesting
+  public BlockStorageMovementNeeded getStorageMovementQueue() {
+    return storageMovementNeeded;
+  }
 }

+ 10 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,12 +48,14 @@ public class TestBlockStorageMovementAttemptedItems {
   public void setup() throws Exception {
     Configuration config = new HdfsConfiguration();
     Context ctxt = Mockito.mock(Context.class);
-    Mockito.when(ctxt.getConf()).thenReturn(config);
+    SPSService sps = Mockito.mock(StoragePolicySatisfier.class);
+    Mockito.when(sps.getConf()).thenReturn(config);
     Mockito.when(ctxt.isRunning()).thenReturn(true);
     Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
     Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
-    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt);
-    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt,
+    unsatisfiedStorageMovementFiles =
+        new BlockStorageMovementNeeded(ctxt, null);
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
         unsatisfiedStorageMovementFiles);
   }
 
@@ -73,7 +74,7 @@ public class TestBlockStorageMovementAttemptedItems {
     while (monotonicNow() < (stopTime)) {
       ItemInfo ele = null;
       while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
-        if (item == ele.getTrackId()) {
+        if (item == ele.getFileId()) {
           isItemFound = true;
           break;
         }
@@ -99,7 +100,7 @@ public class TestBlockStorageMovementAttemptedItems {
     bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
     Block[] blockArray = new Block[blocks.size()];
     blocks.toArray(blockArray);
-    bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+    bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
     assertEquals("Failed to receive result!", 1,
         bsmAttemptedItems.getMovementFinishedBlocksCount());
   }
@@ -137,7 +138,7 @@ public class TestBlockStorageMovementAttemptedItems {
         .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
 
     // start block movement report monitor thread
     bsmAttemptedItems.start();
@@ -162,7 +163,7 @@ public class TestBlockStorageMovementAttemptedItems {
         .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
 
     Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
 
@@ -190,7 +191,7 @@ public class TestBlockStorageMovementAttemptedItems {
         .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
     Block[] blksMovementReport = new Block[1];
     blksMovementReport[0] = new Block(item);
-    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+    bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
     assertFalse(
         "Should not add in queue again if it is not there in"
             + " storageMovementAttemptedItems",

+ 70 - 41
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java

@@ -72,7 +72,6 @@ import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -147,12 +146,11 @@ public class TestStoragePolicySatisfier {
     startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
         storagesPerDatanode, capacity, hdfsCluster);
 
-    dfs.satisfyStoragePolicy(new Path(file));
-
     hdfsCluster.triggerHeartbeats();
+    dfs.satisfyStoragePolicy(new Path(file));
     // Wait till namenode notified about the block location details
-    DFSTestUtil.waitExpectedStorageType(
-        file, StorageType.ARCHIVE, 3, 30000, dfs);
+    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
+        dfs);
   }
 
   @Test(timeout = 300000)
@@ -1284,6 +1282,7 @@ public class TestStoragePolicySatisfier {
         {StorageType.ARCHIVE, StorageType.SSD},
         {StorageType.DISK, StorageType.DISK}};
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
         storagesPerDatanode, capacity);
     dfs = hdfsCluster.getFileSystem();
@@ -1299,19 +1298,28 @@ public class TestStoragePolicySatisfier {
 
     //Queue limit can control the traverse logic to wait for some free
     //entry in queue. After 10 files, traverse control will be on U.
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
-    Mockito.when(sps.isRunning()).thenReturn(true);
-    Context ctxt = Mockito.mock(Context.class);
-    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
-    Mockito.when(ctxt.getConf()).thenReturn(config);
-    Mockito.when(ctxt.isRunning()).thenReturn(true);
-    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
-    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
-    BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(ctxt);
+    StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+        hdfsCluster.getNamesystem().getBlockManager(), sps) {
+      @Override
+      public boolean isInSafeMode() {
+        return false;
+      }
+
+      @Override
+      public boolean isRunning() {
+        return true;
+      }
+    };
+
+    FileIdCollector fileIDCollector =
+        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    sps.init(ctxt, fileIDCollector, null);
+    sps.getStorageMovementQueue().activate();
+
     INode rootINode = fsDir.getINode("/root");
-    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
-    movmentNeededQueue.init(fsDir);
+    hdfsCluster.getNamesystem().getBlockManager()
+        .addSPSPathId(rootINode.getId());
 
     //Wait for thread to reach U.
     Thread.sleep(1000);
@@ -1321,7 +1329,7 @@ public class TestStoragePolicySatisfier {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1332,7 +1340,7 @@ public class TestStoragePolicySatisfier {
     // Check other element traversed in order and R,S should not be added in
     // queue which we already removed from expected list
     for (String path : expectedTraverseOrder) {
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1352,6 +1360,7 @@ public class TestStoragePolicySatisfier {
         {StorageType.ARCHIVE, StorageType.SSD},
         {StorageType.DISK, StorageType.DISK}};
     config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
     hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
         storagesPerDatanode, capacity);
     dfs = hdfsCluster.getFileSystem();
@@ -1366,21 +1375,33 @@ public class TestStoragePolicySatisfier {
     expectedTraverseOrder.remove("/root/D/M");
     expectedTraverseOrder.remove("/root/E");
     FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
-    Mockito.when(sps.isRunning()).thenReturn(true);
+
     // Queue limit can control the traverse logic to wait for some free
     // entry in queue. After 10 files, traverse control will be on U.
-    Context ctxt = Mockito.mock(Context.class);
-    config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
-    Mockito.when(ctxt.getConf()).thenReturn(config);
-    Mockito.when(ctxt.isRunning()).thenReturn(true);
-    Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
-    Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
-    BlockStorageMovementNeeded movmentNeededQueue =
-        new BlockStorageMovementNeeded(ctxt);
-    movmentNeededQueue.init(fsDir);
+    // StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
+    Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
+        hdfsCluster.getNamesystem().getBlockManager(), sps) {
+      @Override
+      public boolean isInSafeMode() {
+        return false;
+      }
+
+      @Override
+      public boolean isRunning() {
+        return true;
+      }
+    };
+
+    FileIdCollector fileIDCollector =
+        new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+    sps.init(ctxt, fileIDCollector, null);
+    sps.getStorageMovementQueue().activate();
+
     INode rootINode = fsDir.getINode("/root");
-    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    hdfsCluster.getNamesystem().getBlockManager()
+        .addSPSPathId(rootINode.getId());
+
     // Wait for thread to reach U.
     Thread.sleep(1000);
 
@@ -1389,7 +1410,7 @@ public class TestStoragePolicySatisfier {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1400,7 +1421,7 @@ public class TestStoragePolicySatisfier {
     // Check other element traversed in order and E, M, U, R, S should not be
     // added in queue which we already removed from expected list
     for (String path : expectedTraverseOrder) {
-      long trackId = movmentNeededQueue.get().getTrackId();
+      long trackId = sps.getStorageMovementQueue().get().getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1502,17 +1523,20 @@ public class TestStoragePolicySatisfier {
       hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
           .storageTypes(storagetypes).build();
       hdfsCluster.waitActive();
-      BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000);
+      // BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(200000);
       dfs = hdfsCluster.getFileSystem();
       Path filePath = new Path("/file");
       DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2,
             0);
       dfs.setStoragePolicy(filePath, "COLD");
       dfs.satisfyStoragePolicy(filePath);
+      Thread.sleep(3000);
       StoragePolicySatisfyPathStatus status = dfs.getClient()
           .checkStoragePolicySatisfyPathStatus(filePath.toString());
-      Assert.assertTrue("Status should be IN_PROGRESS",
-          StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status));
+      Assert.assertTrue(
+          "Status should be IN_PROGRESS/SUCCESS, but status is " + status,
+          StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status)
+              || StoragePolicySatisfyPathStatus.SUCCESS.equals(status));
       DFSTestUtil.waitExpectedStorageType(filePath.toString(),
           StorageType.ARCHIVE, 2, 30000, dfs);
 
@@ -1530,7 +1554,7 @@ public class TestStoragePolicySatisfier {
           return false;
         }
       }, 100, 60000);
-
+      BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(1000);
       // wait till status is NOT_AVAILABLE
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
         @Override
@@ -1719,8 +1743,10 @@ public class TestStoragePolicySatisfier {
       public Boolean get() {
         LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
             expectedBlkMovAttemptedCount,
-            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
-        return sps.getAttemptedItemsMonitor()
+            ((BlockStorageMovementAttemptedItems) (sps
+                .getAttemptedItemsMonitor())).getAttemptedItemsCount());
+        return ((BlockStorageMovementAttemptedItems) (sps
+            .getAttemptedItemsMonitor()))
             .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
       }
     }, 100, timeout);
@@ -1736,8 +1762,11 @@ public class TestStoragePolicySatisfier {
       public Boolean get() {
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
             expectedMovementFinishedBlocksCount,
-            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
-        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            ((BlockStorageMovementAttemptedItems) (sps
+                .getAttemptedItemsMonitor())).getMovementFinishedBlocksCount());
+        return ((BlockStorageMovementAttemptedItems) (sps
+            .getAttemptedItemsMonitor()))
+                .getMovementFinishedBlocksCount()
             >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);

+ 11 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfierWithStripedFile.java

@@ -500,9 +500,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
       public Boolean get() {
         LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
             expectedBlkMovAttemptedCount,
-            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
-        return sps.getAttemptedItemsMonitor()
-            .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+            ((BlockStorageMovementAttemptedItems) sps
+                .getAttemptedItemsMonitor()).getAttemptedItemsCount());
+        return ((BlockStorageMovementAttemptedItems) sps
+            .getAttemptedItemsMonitor())
+                .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
       }
     }, 100, timeout);
   }
@@ -560,7 +562,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
   // Check whether the block movement attempt report has been arrived at the
   // Namenode(SPS).
   private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
-      long expectedMovementFinishedBlocksCount, int timeout)
+      long expectedMoveFinishedBlks, int timeout)
           throws TimeoutException, InterruptedException {
     BlockManager blockManager = cluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
@@ -570,10 +572,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
       @Override
       public Boolean get() {
         LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount,
-            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
-        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
-            >= expectedMovementFinishedBlocksCount;
+            expectedMoveFinishedBlks, ((BlockStorageMovementAttemptedItems) sps
+                .getAttemptedItemsMonitor()).getMovementFinishedBlocksCount());
+        return ((BlockStorageMovementAttemptedItems) sps
+            .getAttemptedItemsMonitor())
+                .getMovementFinishedBlocksCount() >= expectedMoveFinishedBlks;
       }
     }, 100, timeout);
   }