浏览代码

HDFS-10675. Datanode support to read from external stores.

Virajith Jalaparti 8 年之前
父节点
当前提交
02a28b93ff
共有 45 个文件被更改,包括 2873 次插入85 次删除
  1. 2 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
  2. 2 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
  3. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  5. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
  6. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  7. 29 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
  8. 82 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
  9. 121 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
  10. 37 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
  11. 66 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
  12. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
  13. 442 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
  14. 88 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
  15. 20 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
  16. 31 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
  17. 12 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  18. 91 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
  19. 248 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
  20. 95 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
  21. 19 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  22. 22 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
  23. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  24. 28 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
  25. 58 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
  26. 35 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  27. 23 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
  28. 13 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  29. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
  30. 34 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
  31. 526 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
  32. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
  33. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
  34. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
  35. 78 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  36. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
  37. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
  38. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
  39. 10 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
  40. 160 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
  41. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  42. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  43. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
  44. 426 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
  45. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java

@@ -37,7 +37,8 @@ public enum StorageType {
   RAM_DISK(true),
   SSD(false),
   DISK(false),
-  ARCHIVE(false);
+  ARCHIVE(false),
+  PROVIDED(false);
 
   private final boolean isTransient;
 

+ 2 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java

@@ -285,7 +285,7 @@ public class TestCount {
         // <----13---> <-------17------> <----13-----> <------17------->
         "    SSD_QUOTA     REM_SSD_QUOTA    DISK_QUOTA    REM_DISK_QUOTA " +
         // <----13---> <-------17------>
-        "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+        "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
         "PATHNAME";
     verify(out).println(withStorageTypeHeader);
     verifyNoMoreInteractions(out);
@@ -340,6 +340,7 @@ public class TestCount {
         "    SSD_QUOTA     REM_SSD_QUOTA " +
         "   DISK_QUOTA    REM_DISK_QUOTA " +
         "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+        "PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
         "PATHNAME";
     verify(out).println(withStorageTypeHeader);
     verifyNoMoreInteractions(out);

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java

@@ -47,6 +47,10 @@ public final class HdfsConstants {
   public static final String WARM_STORAGE_POLICY_NAME = "WARM";
   public static final byte COLD_STORAGE_POLICY_ID = 2;
   public static final String COLD_STORAGE_POLICY_NAME = "COLD";
+  // branch HDFS-9806 XXX temporary until HDFS-7076
+  public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
+  public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
+
 
   public static final int DEFAULT_DATA_SOCKET_SIZE = 0;
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -403,6 +403,8 @@ public class PBHelperClient {
       return StorageTypeProto.ARCHIVE;
     case RAM_DISK:
       return StorageTypeProto.RAM_DISK;
+    case PROVIDED:
+      return StorageTypeProto.PROVIDED;
     default:
       throw new IllegalStateException(
           "BUG: StorageType not found, type=" + type);
@@ -419,6 +421,8 @@ public class PBHelperClient {
       return StorageType.ARCHIVE;
     case RAM_DISK:
       return StorageType.RAM_DISK;
+    case PROVIDED:
+      return StorageType.PROVIDED;
     default:
       throw new IllegalStateException(
           "BUG: StorageTypeProto not found, type=" + type);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -205,6 +205,7 @@ enum StorageTypeProto {
   SSD = 2;
   ARCHIVE = 3;
   RAM_DISK = 4;
+  PROVIDED = 5;
 }
 
 /**

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -328,6 +328,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.edits.asynclogging";
   public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true;
 
+  public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
+  public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
+  public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
+  public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT =  "DS-PROVIDED";
+  public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter";
+  public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path";
+  public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec";
+  public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH  = "dfs.provided.textprovider.write.path";
+
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Interface used to load provided blocks.
+ */
+public interface BlockAlias {
+
+  Block getBlock();
+
+}

+ 82 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * An abstract class used to read and write block maps for provided blocks.
+ */
+public abstract class BlockFormat<T extends BlockAlias>  {
+
+  /**
+   * An abstract class that is used to read {@link BlockAlias}es
+   * for provided blocks.
+   */
+  public static abstract class Reader<U extends BlockAlias>
+      implements Iterable<U>, Closeable {
+
+    /**
+     * reader options.
+     */
+    public interface Options { }
+
+    public abstract U resolve(Block ident) throws IOException;
+
+  }
+
+  /**
+   * Returns the reader for the provided block map.
+   * @param opts reader options
+   * @return {@link Reader} to the block map.
+   * @throws IOException
+   */
+  public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
+
+  /**
+   * An abstract class used as a writer for the provided block map.
+   */
+  public static abstract class Writer<U extends BlockAlias>
+      implements Closeable {
+    /**
+     * writer options.
+     */
+    public interface Options { }
+
+    public abstract void store(U token) throws IOException;
+
+  }
+
+  /**
+   * Returns the writer for the provided block map.
+   * @param opts writer options.
+   * @return {@link Writer} to the block map.
+   * @throws IOException
+   */
+  public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
+
+  /**
+   * Refresh based on the underlying block map.
+   * @throws IOException
+   */
+  public abstract void refresh() throws IOException;
+
+}

+ 121 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * This class is used to represent provided blocks that are file regions,
+ * i.e., can be described using (path, offset, length).
+ */
+public class FileRegion implements BlockAlias {
+
+  private final Path path;
+  private final long offset;
+  private final long length;
+  private final long blockId;
+  private final String bpid;
+  private final long genStamp;
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, String bpid, long genStamp) {
+    this.path = path;
+    this.offset = offset;
+    this.length = length;
+    this.blockId = blockId;
+    this.bpid = bpid;
+    this.genStamp = genStamp;
+  }
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, String bpid) {
+    this(blockId, path, offset, length, bpid,
+        HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+
+  }
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, long genStamp) {
+    this(blockId, path, offset, length, null, genStamp);
+
+  }
+
+  public FileRegion(long blockId, Path path, long offset, long length) {
+    this(blockId, path, offset, length, null);
+  }
+
+  @Override
+  public Block getBlock() {
+    return new Block(blockId, length, genStamp);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof FileRegion)) {
+      return false;
+    }
+    FileRegion o = (FileRegion) other;
+    return blockId == o.blockId
+      && offset == o.offset
+      && length == o.length
+      && genStamp == o.genStamp
+      && path.equals(o.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(blockId & Integer.MIN_VALUE);
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public long getGenerationStamp() {
+    return genStamp;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ block=\"").append(getBlock()).append("\"");
+    sb.append(", path=\"").append(getPath()).append("\"");
+    sb.append(", off=\"").append(getOffset()).append("\"");
+    sb.append(", len=\"").append(getBlock().getNumBytes()).append("\"");
+    sb.append(", genStamp=\"").append(getBlock()
+        .getGenerationStamp()).append("\"");
+    sb.append(", bpid=\"").append(bpid).append("\"");
+    sb.append(" }");
+    return sb.toString();
+  }
+
+  public String getBlockPoolId() {
+    return this.bpid;
+  }
+
+}

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This class is a stub for reading file regions from the block map.
+ */
+public class FileRegionProvider implements Iterable<FileRegion> {
+  @Override
+  public Iterator<FileRegion> iterator() {
+    return Collections.emptyListIterator();
+  }
+
+  public void refresh() throws IOException {
+    return;
+  }
+}

+ 66 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -196,7 +197,10 @@ public abstract class Storage extends StorageInfo {
     Iterator<StorageDirectory> it =
       (dirType == null) ? dirIterator() : dirIterator(dirType);
     for ( ;it.hasNext(); ) {
-      list.add(new File(it.next().getCurrentDir(), fileName));
+      File currentDir = it.next().getCurrentDir();
+      if (currentDir != null) {
+        list.add(new File(currentDir, fileName));
+      }
     }
     return list;
   }
@@ -328,10 +332,20 @@ public abstract class Storage extends StorageInfo {
      */
     public StorageDirectory(String bpid, StorageDirType dirType,
         boolean isShared, StorageLocation location) {
-      this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType,
+      this(getBlockPoolCurrentDir(bpid, location), dirType,
           isShared, location);
     }
 
+    private static File getBlockPoolCurrentDir(String bpid,
+        StorageLocation location) {
+      if (location == null ||
+          location.getStorageType() == StorageType.PROVIDED) {
+        return null;
+      } else {
+        return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT));
+      }
+    }
+
     private StorageDirectory(File dir, StorageDirType dirType,
         boolean isShared, StorageLocation location) {
       this.root = dir;
@@ -347,7 +361,8 @@ public abstract class Storage extends StorageInfo {
     }
 
     private static File getStorageLocationFile(StorageLocation location) {
-      if (location == null) {
+      if (location == null ||
+          location.getStorageType() == StorageType.PROVIDED) {
         return null;
       }
       try {
@@ -406,6 +421,10 @@ public abstract class Storage extends StorageInfo {
      */
     public void clearDirectory() throws IOException {
       File curDir = this.getCurrentDir();
+      if (curDir == null) {
+        //if the directory is null, there is nothing to do.
+        return;
+      }
       if (curDir.exists()) {
         File[] files = FileUtil.listFiles(curDir);
         LOG.info("Will remove files: " + Arrays.toString(files));
@@ -423,6 +442,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getCurrentDir() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_DIR_CURRENT);
     }
 
@@ -443,6 +465,9 @@ public abstract class Storage extends StorageInfo {
      * @return the version file path
      */
     public File getVersionFile() {
+      if (root == null) {
+        return null;
+      }
       return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
     }
 
@@ -452,6 +477,9 @@ public abstract class Storage extends StorageInfo {
      * @return the previous version file path
      */
     public File getPreviousVersionFile() {
+      if (root == null) {
+        return null;
+      }
       return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
     }
 
@@ -462,6 +490,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousDir() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_DIR_PREVIOUS);
     }
 
@@ -476,6 +507,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_PREVIOUS);
     }
 
@@ -490,6 +524,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getRemovedTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_REMOVED);
     }
 
@@ -503,6 +540,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getFinalizedTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_FINALIZED);
     }
 
@@ -517,6 +557,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getLastCheckpointTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_LAST_CKPT);
     }
 
@@ -530,6 +573,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousCheckpoint() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_PREVIOUS_CKPT);
     }
 
@@ -543,7 +589,7 @@ public abstract class Storage extends StorageInfo {
     private void checkEmptyCurrent() throws InconsistentFSStateException,
         IOException {
       File currentDir = getCurrentDir();
-      if(!currentDir.exists()) {
+      if(currentDir == null || !currentDir.exists()) {
         // if current/ does not exist, it's safe to format it.
         return;
       }
@@ -589,6 +635,13 @@ public abstract class Storage extends StorageInfo {
     public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
         boolean checkCurrentIsEmpty)
         throws IOException {
+
+      if (location != null &&
+          location.getStorageType() == StorageType.PROVIDED) {
+        //currently we assume that PROVIDED storages are always NORMAL
+        return StorageState.NORMAL;
+      }
+
       assert root != null : "root is null";
       boolean hadMkdirs = false;
       String rootPath = root.getCanonicalPath();
@@ -710,6 +763,10 @@ public abstract class Storage extends StorageInfo {
      */
     public void doRecover(StorageState curState) throws IOException {
       File curDir = getCurrentDir();
+      if (curDir == null || root == null) {
+        //at this point, we do not support recovery on PROVIDED storages
+        return;
+      }
       String rootPath = root.getCanonicalPath();
       switch(curState) {
       case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
@@ -883,7 +940,8 @@ public abstract class Storage extends StorageInfo {
     
     @Override
     public String toString() {
-      return "Storage Directory " + this.root;
+      return "Storage Directory root= " + this.root +
+          "; location= " + this.location;
     }
 
     /**
@@ -1153,6 +1211,9 @@ public abstract class Storage extends StorageInfo {
   }
   
   public void writeProperties(File to, StorageDirectory sd) throws IOException {
+    if (to == null) {
+      return;
+    }
     Properties props = new Properties();
     setPropertiesFromFields(props, sd);
     writeProperties(to, props);

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java

@@ -152,6 +152,9 @@ public class StorageInfo {
    */
   protected void setFieldsFromProperties(
       Properties props, StorageDirectory sd) throws IOException {
+    if (props == null) {
+      return;
+    }
     setLayoutVersion(props, sd);
     setNamespaceID(props, sd);
     setcTime(props, sd);
@@ -241,6 +244,9 @@ public class StorageInfo {
   }
 
   public static Properties readPropertiesFile(File from) throws IOException {
+    if (from == null) {
+      return null;
+    }
     RandomAccessFile file = new RandomAccessFile(from, "rws");
     FileInputStream in = null;
     Properties props = new Properties();

+ 442 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java

@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for block maps stored as text files,
+ * with a specified delimiter.
+ */
+public class TextFileRegionFormat
+    extends BlockFormat<FileRegion> implements Configurable {
+
+  private Configuration conf;
+  private ReaderOptions readerOpts = TextReader.defaults();
+  private WriterOptions writerOpts = TextWriter.defaults();
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TextFileRegionFormat.class);
+  @Override
+  public void setConf(Configuration conf) {
+    readerOpts.setConf(conf);
+    writerOpts.setConf(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Reader<FileRegion> getReader(Reader.Options opts)
+      throws IOException {
+    if (null == opts) {
+      opts = readerOpts;
+    }
+    if (!(opts instanceof ReaderOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    ReaderOptions o = (ReaderOptions) opts;
+    Configuration readerConf = (null == o.getConf())
+        ? new Configuration()
+            : o.getConf();
+    return createReader(o.file, o.delim, readerConf);
+  }
+
+  @VisibleForTesting
+  TextReader createReader(Path file, String delim, Configuration cfg)
+      throws IOException {
+    FileSystem fs = file.getFileSystem(cfg);
+    if (fs instanceof LocalFileSystem) {
+      fs = ((LocalFileSystem)fs).getRaw();
+    }
+    CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+    CompressionCodec codec = factory.getCodec(file);
+    return new TextReader(fs, file, codec, delim);
+  }
+
+  @Override
+  public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
+    if (null == opts) {
+      opts = writerOpts;
+    }
+    if (!(opts instanceof WriterOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    WriterOptions o = (WriterOptions) opts;
+    Configuration cfg = (null == o.getConf())
+        ? new Configuration()
+            : o.getConf();
+    if (o.codec != null) {
+      CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+      CompressionCodec codec = factory.getCodecByName(o.codec);
+      String name = o.file.getName() + codec.getDefaultExtension();
+      o.filename(new Path(o.file.getParent(), name));
+      return createWriter(o.file, codec, o.delim, cfg);
+    }
+    return createWriter(o.file, null, o.delim, conf);
+  }
+
+  @VisibleForTesting
+  TextWriter createWriter(Path file, CompressionCodec codec, String delim,
+      Configuration cfg) throws IOException {
+    FileSystem fs = file.getFileSystem(cfg);
+    if (fs instanceof LocalFileSystem) {
+      fs = ((LocalFileSystem)fs).getRaw();
+    }
+    OutputStream tmp = fs.create(file);
+    java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
+          (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
+    return new TextWriter(out, delim);
+  }
+
+  /**
+   * Class specifying reader options for the {@link TextFileRegionFormat}.
+   */
+  public static class ReaderOptions
+      implements TextReader.Options, Configurable {
+
+    private Configuration conf;
+    private String delim =
+        DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+    private Path file = new Path(
+        new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
+        .toURI().toString());
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+      file = new Path(tmpfile);
+      delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+      LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public ReaderOptions filename(Path file) {
+      this.file = file;
+      return this;
+    }
+
+    @Override
+    public ReaderOptions delimiter(String delim) {
+      this.delim = delim;
+      return this;
+    }
+  }
+
+  /**
+   * Class specifying writer options for the {@link TextFileRegionFormat}.
+   */
+  public static class WriterOptions
+      implements TextWriter.Options, Configurable {
+
+    private Configuration conf;
+    private String codec = null;
+    private Path file =
+        new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+    private String delim =
+        DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      String tmpfile = conf.get(
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
+      file = new Path(tmpfile);
+      codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
+      delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public WriterOptions filename(Path file) {
+      this.file = file;
+      return this;
+    }
+
+    public String getCodec() {
+      return codec;
+    }
+
+    public Path getFile() {
+      return file;
+    }
+
+    @Override
+    public WriterOptions codec(String codec) {
+      this.codec = codec;
+      return this;
+    }
+
+    @Override
+    public WriterOptions delimiter(String delim) {
+      this.delim = delim;
+      return this;
+    }
+
+  }
+
+  /**
+   * This class is used as a reader for block maps which
+   * are stored as delimited text files.
+   */
+  public static class TextReader extends Reader<FileRegion> {
+
+    /**
+     * Options for {@link TextReader}.
+     */
+    public interface Options extends Reader.Options {
+      Options filename(Path file);
+      Options delimiter(String delim);
+    }
+
+    static ReaderOptions defaults() {
+      return new ReaderOptions();
+    }
+
+    private final Path file;
+    private final String delim;
+    private final FileSystem fs;
+    private final CompressionCodec codec;
+    private final Map<FRIterator, BufferedReader> iterators;
+
+    protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
+        String delim) {
+      this(fs, file, codec, delim,
+          new IdentityHashMap<FRIterator, BufferedReader>());
+    }
+
+    TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
+        Map<FRIterator, BufferedReader> iterators) {
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.delim = delim;
+      this.iterators = Collections.synchronizedMap(iterators);
+    }
+
+    @Override
+    public FileRegion resolve(Block ident) throws IOException {
+      // consider layering index w/ composable format
+      Iterator<FileRegion> i = iterator();
+      try {
+        while (i.hasNext()) {
+          FileRegion f = i.next();
+          if (f.getBlock().equals(ident)) {
+            return f;
+          }
+        }
+      } finally {
+        BufferedReader r = iterators.remove(i);
+        if (r != null) {
+          // null on last element
+          r.close();
+        }
+      }
+      return null;
+    }
+
+    class FRIterator implements Iterator<FileRegion> {
+
+      private FileRegion pending;
+
+      @Override
+      public boolean hasNext() {
+        return pending != null;
+      }
+
+      @Override
+      public FileRegion next() {
+        if (null == pending) {
+          throw new NoSuchElementException();
+        }
+        FileRegion ret = pending;
+        try {
+          pending = nextInternal(this);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return ret;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException {
+      BufferedReader r = iterators.get(i);
+      if (null == r) {
+        throw new IllegalStateException();
+      }
+      String line = r.readLine();
+      if (null == line) {
+        iterators.remove(i);
+        return null;
+      }
+      String[] f = line.split(delim);
+      if (f.length != 6) {
+        throw new IOException("Invalid line: " + line);
+      }
+      return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
+          Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
+          Long.parseLong(f[4]));
+    }
+
+    public InputStream createStream() throws IOException {
+      InputStream i = fs.open(file);
+      if (codec != null) {
+        i = codec.createInputStream(i);
+      }
+      return i;
+    }
+
+    @Override
+    public Iterator<FileRegion> iterator() {
+      FRIterator i = new FRIterator();
+      try {
+        BufferedReader r =
+            new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
+        iterators.put(i, r);
+        i.pending = nextInternal(i);
+      } catch (IOException e) {
+        iterators.remove(i);
+        throw new RuntimeException(e);
+      }
+      return i;
+    }
+
+    @Override
+    public void close() throws IOException {
+      ArrayList<IOException> ex = new ArrayList<>();
+      synchronized (iterators) {
+        for (Iterator<BufferedReader> i = iterators.values().iterator();
+             i.hasNext();) {
+          try {
+            BufferedReader r = i.next();
+            r.close();
+          } catch (IOException e) {
+            ex.add(e);
+          } finally {
+            i.remove();
+          }
+        }
+        iterators.clear();
+      }
+      if (!ex.isEmpty()) {
+        throw MultipleIOException.createIOException(ex);
+      }
+    }
+
+  }
+
+  /**
+   * This class is used as a writer for block maps which
+   * are stored as delimited text files.
+   */
+  public static class TextWriter extends Writer<FileRegion> {
+
+    /**
+     * Interface for Writer options.
+     */
+    public interface Options extends Writer.Options {
+      Options codec(String codec);
+      Options filename(Path file);
+      Options delimiter(String delim);
+    }
+
+    public static WriterOptions defaults() {
+      return new WriterOptions();
+    }
+
+    private final String delim;
+    private final java.io.Writer out;
+
+    public TextWriter(java.io.Writer out, String delim) {
+      this.out = out;
+      this.delim = delim;
+    }
+
+    @Override
+    public void store(FileRegion token) throws IOException {
+      out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
+      out.append(token.getPath().toString()).append(delim);
+      out.append(Long.toString(token.getOffset())).append(delim);
+      out.append(Long.toString(token.getLength())).append(delim);
+      out.append(Long.toString(token.getGenerationStamp())).append(delim);
+      out.append(token.getBlockPoolId()).append("\n");
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    //nothing to do;
+  }
+
+}

+ 88 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class is used to read file regions from block maps
+ * specified using delimited text.
+ */
+public class TextFileRegionProvider
+    extends FileRegionProvider implements Configurable {
+
+  private Configuration conf;
+  private BlockFormat<FileRegion> fmt;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setConf(Configuration conf) {
+    fmt = ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+            TextFileRegionFormat.class,
+            BlockFormat.class),
+        conf);
+    ((Configurable)fmt).setConf(conf); //redundant?
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<FileRegion> iterator() {
+    try {
+      final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
+      return new Iterator<FileRegion>() {
+
+        private final Iterator<FileRegion> inner = r.iterator();
+
+        @Override
+        public boolean hasNext() {
+          return inner.hasNext();
+        }
+
+        @Override
+        public FileRegion next() {
+          return inner.next();
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to read provided blocks", e);
+    }
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    fmt.refresh();
+  }
+}

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -360,6 +361,9 @@ public class BlockPoolSliceStorage extends Storage {
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
       StartupOption startOpt, List<Callable<StorageDirectory>> callables,
       Configuration conf) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      return false; // regular startup for PROVIDED storage directories
+    }
     if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
       Preconditions.checkState(!getTrashRootDir(sd).exists(),
           sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@@ -439,6 +443,10 @@ public class BlockPoolSliceStorage extends Storage {
         LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       return;
     }
+    //no upgrades for storage directories that are PROVIDED
+    if (bpSd.getRoot() == null) {
+      return;
+    }
     final int oldLV = getLayoutVersion();
     LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
         + ".\n   old LV = " + oldLV
@@ -589,8 +597,9 @@ public class BlockPoolSliceStorage extends Storage {
       throws IOException {
     File prevDir = bpSd.getPreviousDir();
     // regular startup if previous dir does not exist
-    if (!prevDir.exists())
+    if (prevDir == null || !prevDir.exists()) {
       return;
+    }
     // read attributes out of the VERSION file of previous directory
     BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
     prevInfo.readPreviousVersionProperties(bpSd);
@@ -631,6 +640,10 @@ public class BlockPoolSliceStorage extends Storage {
    * that holds the snapshot.
    */
   void doFinalize(File dnCurDir) throws IOException {
+    LOG.info("doFinalize: " + dnCurDir);
+    if (dnCurDir == null) {
+      return; //we do nothing if the directory is null
+    }
     File bpRoot = getBpRoot(blockpoolID, dnCurDir);
     StorageDirectory bpSd = new StorageDirectory(bpRoot);
     // block pool level previous directory
@@ -841,6 +854,9 @@ public class BlockPoolSliceStorage extends Storage {
   public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
       throws IOException {
     for (StorageDirectory sd : dnStorageDirs) {
+      if (sd.getCurrentDir() == null) {
+        return;
+      }
       File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
       File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
       if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
@@ -863,6 +879,9 @@ public class BlockPoolSliceStorage extends Storage {
   public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
       throws IOException {
     for (StorageDirectory sd : dnStorageDirs) {
+      if (sd.getCurrentDir() == null) {
+        continue;
+      }
       File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
       File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
       if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {

+ 31 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -129,22 +130,31 @@ public class DataStorage extends Storage {
     this.datanodeUuid = newDatanodeUuid;
   }
 
-  private static boolean createStorageID(StorageDirectory sd, int lv) {
+  private static boolean createStorageID(StorageDirectory sd, int lv,
+      Configuration conf) {
     // Clusters previously upgraded from layout versions earlier than
     // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
     // new storage ID. We check for that and fix it now.
     final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
         && DatanodeStorage.isValidStorageId(sd.getStorageUuid());
-    return createStorageID(sd, !haveValidStorageId);
+    return createStorageID(sd, !haveValidStorageId, conf);
   }
 
   /** Create an ID for this storage.
    * @return true if a new storage ID was generated.
    * */
   public static boolean createStorageID(
-      StorageDirectory sd, boolean regenerateStorageIds) {
+      StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) {
     final String oldStorageID = sd.getStorageUuid();
+    if (sd.getStorageLocation() != null &&
+        sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      // We only support one provided storage per datanode for now.
+      // TODO support multiple provided storage ids per datanode.
+      sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+          DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
+      return false;
+    }
     if (oldStorageID == null || regenerateStorageIds) {
       sd.setStorageUuid(DatanodeStorage.generateUuid());
       LOG.info("Generated new storageID " + sd.getStorageUuid() +
@@ -273,7 +283,7 @@ public class DataStorage extends Storage {
         LOG.info("Storage directory with location " + location
             + " is not formatted for namespace " + nsInfo.getNamespaceID()
             + ". Formatting...");
-        format(sd, nsInfo, datanode.getDatanodeUuid());
+        format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
         break;
       default:  // recovery part is common
         sd.doRecover(curState);
@@ -547,15 +557,15 @@ public class DataStorage extends Storage {
   }
 
   void format(StorageDirectory sd, NamespaceInfo nsInfo,
-              String datanodeUuid) throws IOException {
+              String newDatanodeUuid, Configuration conf) throws IOException {
     sd.clearDirectory(); // create directory
     this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
     this.clusterID = nsInfo.getClusterID();
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
-    setDatanodeUuid(datanodeUuid);
+    setDatanodeUuid(newDatanodeUuid);
 
-    createStorageID(sd, false);
+    createStorageID(sd, false, conf);
     writeProperties(sd);
   }
 
@@ -600,6 +610,9 @@ public class DataStorage extends Storage {
 
   private void setFieldsFromProperties(Properties props, StorageDirectory sd,
       boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
+    if (props == null) {
+      return;
+    }
     if (overrideLayoutVersion) {
       this.layoutVersion = toLayoutVersion;
     } else {
@@ -694,6 +707,10 @@ public class DataStorage extends Storage {
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
       StartupOption startOpt, List<Callable<StorageDirectory>> callables,
       Configuration conf) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      createStorageID(sd, layoutVersion, conf);
+      return false; // regular start up for PROVIDED storage directories
+    }
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
     }
@@ -724,7 +741,7 @@ public class DataStorage extends Storage {
 
     // regular start up.
     if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
-      createStorageID(sd, layoutVersion);
+      createStorageID(sd, layoutVersion, conf);
       return false; // need to write properties
     }
 
@@ -733,7 +750,7 @@ public class DataStorage extends Storage {
       if (federationSupported) {
         // If the existing on-disk layout version supports federation,
         // simply update the properties.
-        upgradeProperties(sd);
+        upgradeProperties(sd, conf);
       } else {
         doUpgradePreFederation(sd, nsInfo, callables, conf);
       }
@@ -829,15 +846,16 @@ public class DataStorage extends Storage {
 
     // 4. Write version file under <SD>/current
     clusterID = nsInfo.getClusterID();
-    upgradeProperties(sd);
+    upgradeProperties(sd, conf);
     
     // 5. Rename <SD>/previous.tmp to <SD>/previous
     rename(tmpDir, prevDir);
     LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
   }
 
-  void upgradeProperties(StorageDirectory sd) throws IOException {
-    createStorageID(sd, layoutVersion);
+  void upgradeProperties(StorageDirectory sd, Configuration conf)
+      throws IOException {
+    createStorageID(sd, layoutVersion, conf);
     LOG.info("Updating layout version from " + layoutVersion
         + " to " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
         + " for storage " + sd.getRoot());
@@ -989,7 +1007,7 @@ public class DataStorage extends Storage {
     // then finalize it. Else finalize the corresponding BP.
     for (StorageDirectory sd : getStorageDirs()) {
       File prevDir = sd.getPreviousDir();
-      if (prevDir.exists()) {
+      if (prevDir != null && prevDir.exists()) {
         // data node level storage finalize
         doFinalize(sd);
       } else {

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -105,7 +106,7 @@ public class DirectoryScanner implements Runnable {
    * @param b whether to retain diffs
    */
   @VisibleForTesting
-  void setRetainDiffs(boolean b) {
+  public void setRetainDiffs(boolean b) {
     retainDiffs = b;
   }
 
@@ -215,7 +216,8 @@ public class DirectoryScanner implements Runnable {
    * @param dataset the dataset to scan
    * @param conf the Configuration object
    */
-  DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
+  public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
+      Configuration conf) {
     this.datanode = datanode;
     this.dataset = dataset;
     int interval = (int) conf.getTimeDuration(
@@ -369,15 +371,14 @@ public class DirectoryScanner implements Runnable {
    * Reconcile differences between disk and in-memory blocks
    */
   @VisibleForTesting
-  void reconcile() throws IOException {
+  public void reconcile() throws IOException {
     scan();
     for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
       String bpid = entry.getKey();
       LinkedList<ScanInfo> diff = entry.getValue();
       
       for (ScanInfo info : diff) {
-        dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
-            info.getMetaFile(), info.getVolume());
+        dataset.checkAndUpdate(bpid, info);
       }
     }
     if (!retainDiffs) clear();
@@ -429,11 +430,12 @@ public class DirectoryScanner implements Runnable {
           }
           // Block file and/or metadata file exists on the disk
           // Block exists in memory
-          if (info.getBlockFile() == null) {
+          if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
+              info.getBlockFile() == null) {
             // Block metadata file exits and block file is missing
             addDifference(diffRecord, statsRecord, info);
           } else if (info.getGenStamp() != memBlock.getGenerationStamp()
-              || info.getBlockFileLength() != memBlock.getNumBytes()) {
+              || info.getBlockLength() != memBlock.getNumBytes()) {
             // Block metadata file is missing or has wrong generation stamp,
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
@@ -611,6 +613,9 @@ public class DirectoryScanner implements Runnable {
       for (String bpid : bpList) {
         LinkedList<ScanInfo> report = new LinkedList<>();
 
+        perfTimer.reset().start();
+        throttleTimer.reset().start();
+
         try {
           result.put(bpid, volume.compileReport(bpid, report, this));
         } catch (InterruptedException ex) {

+ 91 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+
+/**
+ * This class is used for provided replicas that are finalized.
+ */
+public class FinalizedProvidedReplica extends ProvidedReplica {
+
+  public FinalizedProvidedReplica(long blockId, URI fileURI,
+      long fileOffset, long blockLen, long genStamp,
+      FsVolumeSpi volume, Configuration conf) {
+    super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf);
+  }
+
+  @Override
+  public ReplicaState getState() {
+    return ReplicaState.FINALIZED;
+  }
+
+  @Override
+  public long getBytesOnDisk() {
+    return getNumBytes();
+  }
+
+  @Override
+  public long getVisibleLength() {
+    return getNumBytes(); //all bytes are visible
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return super.toString();
+  }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
+}

+ 248 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java

@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class is used as a base class for provided replicas.
+ */
+public abstract class ProvidedReplica extends ReplicaInfo {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ProvidedReplica.class);
+
+  // Null checksum information for provided replicas.
+  // Shared across all replicas.
+  static final byte[] NULL_CHECKSUM_ARRAY =
+      FsDatasetUtil.createNullChecksumByteArray();
+  private URI fileURI;
+  private long fileOffset;
+  private Configuration conf;
+  private FileSystem remoteFS;
+
+  /**
+   * Constructor.
+   * @param blockId block id
+   * @param fileURI remote URI this block is to be read from
+   * @param fileOffset the offset in the remote URI
+   * @param blockLen the length of the block
+   * @param genStamp the generation stamp of the block
+   * @param volume the volume this block belongs to
+   */
+  public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
+      long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) {
+    super(volume, blockId, blockLen, genStamp);
+    this.fileURI = fileURI;
+    this.fileOffset = fileOffset;
+    this.conf = conf;
+    try {
+      this.remoteFS = FileSystem.get(fileURI, this.conf);
+    } catch (IOException e) {
+      LOG.warn("Failed to obtain filesystem for " + fileURI);
+      this.remoteFS = null;
+    }
+  }
+
+  public ProvidedReplica(ProvidedReplica r) {
+    super(r);
+    this.fileURI = r.fileURI;
+    this.fileOffset = r.fileOffset;
+    this.conf = r.conf;
+    try {
+      this.remoteFS = FileSystem.newInstance(fileURI, this.conf);
+    } catch (IOException e) {
+      this.remoteFS = null;
+    }
+  }
+
+  @Override
+  public URI getBlockURI() {
+    return this.fileURI;
+  }
+
+  @Override
+  public InputStream getDataInputStream(long seekOffset) throws IOException {
+    if (remoteFS != null) {
+      FSDataInputStream ins = remoteFS.open(new Path(fileURI));
+      ins.seek(fileOffset + seekOffset);
+      return new FSDataInputStream(ins);
+    } else {
+      throw new IOException("Remote filesystem for provided replica " + this +
+          " does not exist");
+    }
+  }
+
+  @Override
+  public OutputStream getDataOutputStream(boolean append) throws IOException {
+    throw new UnsupportedOperationException(
+        "OutputDataStream is not implemented for ProvidedReplica");
+  }
+
+  @Override
+  public URI getMetadataURI() {
+    return null;
+  }
+
+  @Override
+  public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public boolean blockDataExists() {
+    if(remoteFS != null) {
+      try {
+        return remoteFS.exists(new Path(fileURI));
+      } catch (IOException e) {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean deleteBlockData() {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support deleting block data");
+  }
+
+  @Override
+  public long getBlockDataLength() {
+    return this.getNumBytes();
+  }
+
+  @Override
+  public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException {
+    return new LengthInputStream(new ByteArrayInputStream(NULL_CHECKSUM_ARRAY),
+        NULL_CHECKSUM_ARRAY.length);
+  }
+
+  @Override
+  public boolean metadataExists() {
+    return NULL_CHECKSUM_ARRAY == null ? false : true;
+  }
+
+  @Override
+  public boolean deleteMetadata() {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support deleting metadata");
+  }
+
+  @Override
+  public long getMetadataLength() {
+    return NULL_CHECKSUM_ARRAY == null ? 0 : NULL_CHECKSUM_ARRAY.length;
+  }
+
+  @Override
+  public boolean renameMeta(URI destURI) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support renaming metadata");
+  }
+
+  @Override
+  public boolean renameData(URI destURI) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support renaming data");
+  }
+
+  @Override
+  public boolean getPinning(LocalFileSystem localFS) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void setPinning(LocalFileSystem localFS) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support pinning");
+  }
+
+  @Override
+  public void bumpReplicaGS(long newGS) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support writes");
+  }
+
+  @Override
+  public boolean breakHardLinksIfNeeded() throws IOException {
+    return false;
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo()
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support writes");
+  }
+
+  @Override
+  public int compareWith(ScanInfo info) {
+    //local scanning cannot find any provided blocks.
+    if (info.getFileRegion().equals(
+        new FileRegion(this.getBlockId(), new Path(fileURI),
+            fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
+      return 0;
+    } else {
+      return (int) (info.getBlockLength() - getNumBytes());
+    }
+  }
+
+  @Override
+  public void truncateBlock(long newLength) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support truncate");
+  }
+
+  @Override
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support update");
+  }
+
+  @Override
+  public void copyMetadata(URI destination) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support copy metadata");
+  }
+
+  @Override
+  public void copyBlockdata(URI destination) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support copy data");
+  }
+}

+ 95 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java

@@ -18,9 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.net.URI;
 
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 
 /**
@@ -42,11 +46,20 @@ public class ReplicaBuilder {
 
   private ReplicaInfo fromReplica;
 
+  private URI uri;
+  private long offset;
+  private Configuration conf;
+  private FileRegion fileRegion;
+
   public ReplicaBuilder(ReplicaState state) {
     volume = null;
     writer = null;
     block = null;
     length = -1;
+    fileRegion = null;
+    conf = null;
+    fromReplica = null;
+    uri = null;
     this.state = state;
   }
 
@@ -105,6 +118,26 @@ public class ReplicaBuilder {
     return this;
   }
 
+  public ReplicaBuilder setURI(URI uri) {
+    this.uri = uri;
+    return this;
+  }
+
+  public ReplicaBuilder setConf(Configuration conf) {
+    this.conf = conf;
+    return this;
+  }
+
+  public ReplicaBuilder setOffset(long offset) {
+    this.offset = offset;
+    return this;
+  }
+
+  public ReplicaBuilder setFileRegion(FileRegion fileRegion) {
+    this.fileRegion = fileRegion;
+    return this;
+  }
+
   public LocalReplicaInPipeline buildLocalReplicaInPipeline()
       throws IllegalArgumentException {
     LocalReplicaInPipeline info = null;
@@ -176,7 +209,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+  private LocalReplica buildFinalizedReplica() throws IllegalArgumentException {
     if (null != fromReplica &&
         fromReplica.getState() == ReplicaState.FINALIZED) {
       return new FinalizedReplica((FinalizedReplica)fromReplica);
@@ -193,7 +226,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildRWR() throws IllegalArgumentException {
+  private LocalReplica buildRWR() throws IllegalArgumentException {
 
     if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
       return new ReplicaWaitingToBeRecovered(
@@ -211,7 +244,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildRUR() throws IllegalArgumentException {
+  private LocalReplica buildRUR() throws IllegalArgumentException {
     if (null == fromReplica) {
       throw new IllegalArgumentException(
           "Missing a valid replica to recover from");
@@ -228,8 +261,53 @@ public class ReplicaBuilder {
     }
   }
 
-  public ReplicaInfo build() throws IllegalArgumentException {
-    ReplicaInfo info = null;
+  private ProvidedReplica buildProvidedFinalizedReplica()
+      throws IllegalArgumentException {
+    ProvidedReplica info = null;
+    if (fromReplica != null) {
+      throw new IllegalArgumentException("Finalized PROVIDED replica " +
+          "cannot be constructed from another replica");
+    }
+    if (fileRegion == null && uri == null) {
+      throw new IllegalArgumentException(
+          "Trying to construct a provided replica on " + volume +
+          " without enough information");
+    }
+    if (fileRegion == null) {
+      info = new FinalizedProvidedReplica(blockId, uri, offset,
+          length, genStamp, volume, conf);
+    } else {
+      info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
+          fileRegion.getPath().toUri(),
+          fileRegion.getOffset(),
+          fileRegion.getBlock().getNumBytes(),
+          fileRegion.getBlock().getGenerationStamp(),
+          volume, conf);
+    }
+    return info;
+  }
+
+  private ProvidedReplica buildProvidedReplica()
+      throws IllegalArgumentException {
+    ProvidedReplica info = null;
+    switch(this.state) {
+    case FINALIZED:
+      info = buildProvidedFinalizedReplica();
+      break;
+    case RWR:
+    case RUR:
+    case RBW:
+    case TEMPORARY:
+    default:
+      throw new IllegalArgumentException("Unknown replica state " +
+          state + " for PROVIDED replica");
+    }
+    return info;
+  }
+
+  private LocalReplica buildLocalReplica()
+      throws IllegalArgumentException {
+    LocalReplica info = null;
     switch(this.state) {
     case FINALIZED:
       info = buildFinalizedReplica();
@@ -249,4 +327,16 @@ public class ReplicaBuilder {
     }
     return info;
   }
+
+  public ReplicaInfo build() throws IllegalArgumentException {
+
+    ReplicaInfo info = null;
+    if(volume != null && volume.getStorageType() == StorageType.PROVIDED) {
+      info = buildProvidedReplica();
+    } else {
+      info = buildLocalReplica();
+    }
+
+    return info;
+  }
 }

+ 19 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java

@@ -49,6 +49,17 @@ abstract public class ReplicaInfo extends Block
   private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
       new FileIoProvider(null, null);
 
+  /**
+   * Constructor.
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(Block block, FsVolumeSpi vol) {
+    this(vol, block.getBlockId(), block.getNumBytes(),
+        block.getGenerationStamp());
+  }
+
   /**
   * Constructor
   * @param vol volume where replica is located
@@ -62,7 +73,14 @@ abstract public class ReplicaInfo extends Block
   }
   
   /**
-   * Get the volume where this replica is located on disk.
+   * Copy constructor.
+   * @param from where to copy from
+   */
+  ReplicaInfo(ReplicaInfo from) {
+    this(from, from.getVolume());
+  }
+
+  /**
    * @return the volume where this replica is located on disk
    */
   public FsVolumeSpi getVolume() {

+ 22 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java

@@ -98,6 +98,16 @@ public class StorageLocation
 
   public boolean matchesStorageDirectory(StorageDirectory sd,
       String bpid) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED &&
+        storageType == StorageType.PROVIDED) {
+      return matchesStorageDirectory(sd);
+    }
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
+        storageType == StorageType.PROVIDED) {
+      //only one of these is PROVIDED; so it cannot be a match!
+      return false;
+    }
+    //both storage directories are local
     return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
         .equals(sd.getRoot().toURI().normalize());
   }
@@ -197,6 +207,10 @@ public class StorageLocation
     if (conf == null) {
       conf = new HdfsConfiguration();
     }
+    if (storageType == StorageType.PROVIDED) {
+      //skip creation if the storage type is PROVIDED
+      return;
+    }
 
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     FsPermission permission = new FsPermission(conf.get(
@@ -213,10 +227,14 @@ public class StorageLocation
 
   @Override  // Checkable
   public VolumeCheckResult check(CheckContext context) throws IOException {
-    DiskChecker.checkDir(
-        context.localFileSystem,
-        new Path(baseURI),
-        context.expectedPermission);
+    //we assume provided storage locations are always healthy,
+    //and check only for local storages.
+    if (storageType != StorageType.PROVIDED) {
+      DiskChecker.checkDir(
+          context.localFileSystem,
+          new Path(baseURI),
+          context.expectedPermission);
+    }
     return VolumeCheckResult.HEALTHY;
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -252,8 +253,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * and, in case that they are not matched, update the record or mark it
    * as corrupted.
    */
-  void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException;
+  void checkAndUpdate(String bpid, ScanInfo info) throws IOException;
 
   /**
    * @param b - the block

+ 28 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@@ -241,10 +242,11 @@ public interface FsVolumeSpi
 
     private final FsVolumeSpi volume;
 
+    private final FileRegion fileRegion;
     /**
      * Get the file's length in async block scan
      */
-    private final long blockFileLength;
+    private final long blockLength;
 
     private final static Pattern CONDENSED_PATH_REGEX =
         Pattern.compile("(?<!^)(\\\\|/){2,}");
@@ -294,13 +296,30 @@ public interface FsVolumeSpi
      */
     public ScanInfo(long blockId, File blockFile, File metaFile,
         FsVolumeSpi vol) {
+      this(blockId, blockFile, metaFile, vol, null,
+          (blockFile != null) ? blockFile.length() : 0);
+    }
+
+    /**
+     * Create a ScanInfo object for a block. This constructor will examine
+     * the block data and meta-data files.
+     *
+     * @param blockId the block ID
+     * @param blockFile the path to the block data file
+     * @param metaFile the path to the block meta-data file
+     * @param vol the volume that contains the block
+     * @param fileRegion the file region (for provided blocks)
+     * @param length the length of the block data
+     */
+    public ScanInfo(long blockId, File blockFile, File metaFile,
+        FsVolumeSpi vol, FileRegion fileRegion, long length) {
       this.blockId = blockId;
       String condensedVolPath =
           (vol == null || vol.getBaseURI() == null) ? null :
             getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
       this.blockSuffix = blockFile == null ? null :
         getSuffix(blockFile, condensedVolPath);
-      this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
+      this.blockLength = length;
       if (metaFile == null) {
         this.metaSuffix = null;
       } else if (blockFile == null) {
@@ -310,6 +329,7 @@ public interface FsVolumeSpi
             condensedVolPath + blockSuffix);
       }
       this.volume = vol;
+      this.fileRegion = fileRegion;
     }
 
     /**
@@ -328,8 +348,8 @@ public interface FsVolumeSpi
      *
      * @return the length of the data block
      */
-    public long getBlockFileLength() {
-      return blockFileLength;
+    public long getBlockLength() {
+      return blockLength;
     }
 
     /**
@@ -399,6 +419,10 @@ public interface FsVolumeSpi
           getMetaFile().getName()) :
             HdfsConstants.GRANDFATHER_GENERATION_STAMP;
     }
+
+    public FileRegion getFileRegion() {
+      return fileRegion;
+    }
   }
 
   /**

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The default usage statistics for a provided volume.
+ */
+public class DefaultProvidedVolumeDF
+    implements ProvidedVolumeDF, Configurable {
+
+  @Override
+  public void setConf(Configuration conf) {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public long getCapacity() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long getSpaceUsed() {
+    return 0;
+  }
+
+  @Override
+  public long getBlockPoolUsed(String bpid) {
+    return 0;
+  }
+
+  @Override
+  public long getAvailable() {
+    return Long.MAX_VALUE;
+  }
+}

+ 35 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -1744,6 +1745,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       Set<String> missingVolumesReported = new HashSet<>();
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        //skip blocks in PROVIDED storage
+        if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
+          continue;
+        }
         String volStorageID = b.getVolume().getStorageID();
         if (!builders.containsKey(volStorageID)) {
           if (!missingVolumesReported.contains(volStorageID)) {
@@ -1879,7 +1884,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       r = volumeMap.get(bpid, blockId);
     }
-
     if (r != null) {
       if (r.blockDataExists()) {
         return r;
@@ -2232,13 +2236,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param vol Volume of the block file
    */
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException {
+  public void checkAndUpdate(String bpid, ScanInfo scanInfo)
+      throws IOException {
+
+    long blockId = scanInfo.getBlockId();
+    File diskFile = scanInfo.getBlockFile();
+    File diskMetaFile = scanInfo.getMetaFile();
+    FsVolumeSpi vol = scanInfo.getVolume();
+
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
-      if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
+      if (memBlockInfo != null &&
+          memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
       }
@@ -2253,6 +2264,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           Block.getGenerationStamp(diskMetaFile.getName()) :
           HdfsConstants.GRANDFATHER_GENERATION_STAMP;
 
+      if (vol.getStorageType() == StorageType.PROVIDED) {
+        if (memBlockInfo == null) {
+          //replica exists on provided store but not in memory
+          ReplicaInfo diskBlockInfo =
+              new ReplicaBuilder(ReplicaState.FINALIZED)
+              .setFileRegion(scanInfo.getFileRegion())
+              .setFsVolume(vol)
+              .setConf(conf)
+              .build();
+
+          volumeMap.add(bpid, diskBlockInfo);
+          LOG.warn("Added missing block to memory " + diskBlockInfo);
+        } else {
+          //replica exists in memory but not in the provided store
+          volumeMap.remove(bpid, blockId);
+          LOG.warn("Deleting missing provided block " + memBlockInfo);
+        }
+        return;
+      }
+
       if (!diskFileExists) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -3028,7 +3059,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           newReplicaInfo =
               replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
                   replicaInfo, replicaState);
-
           // Update the volumeMap entry.
           volumeMap.add(bpid, newReplicaInfo);
 

+ 23 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
@@ -32,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -44,6 +48,22 @@ public class FsDatasetUtil {
     return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
   }
 
+  public static byte[] createNullChecksumByteArray() {
+    DataChecksum csum =
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(out);
+    try {
+      BlockMetadataHeader.writeHeader(dataOut, csum);
+      dataOut.close();
+    } catch (IOException e) {
+      FsVolumeImpl.LOG.error(
+          "Exception in creating null checksum stream: " + e);
+      return null;
+    }
+    return out.toByteArray();
+  }
+
   static File getOrigFile(File unlinkTmpFile) {
     final String name = unlinkTmpFile.getName();
     if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
@@ -135,8 +155,9 @@ public class FsDatasetUtil {
    * Compute the checksum for a block file that does not already have
    * its checksum computed, and save it to dstMeta file.
    */
-  public static void computeChecksum(File srcMeta, File dstMeta, File blockFile,
-      int smallBufferSize, Configuration conf) throws IOException {
+  public static void computeChecksum(File srcMeta, File dstMeta,
+      File blockFile, int smallBufferSize, Configuration conf)
+          throws IOException {
     Preconditions.checkNotNull(srcMeta);
     Preconditions.checkNotNull(dstMeta);
     Preconditions.checkNotNull(blockFile);

+ 13 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -154,18 +154,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.reservedForReplicas = new AtomicLong(0L);
     this.storageLocation = sd.getStorageLocation();
     this.currentDir = sd.getCurrentDir();
-    File parent = currentDir.getParentFile();
-    this.usage = new DF(parent, conf);
     this.storageType = storageLocation.getStorageType();
     this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
         + "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
     this.configuredCapacity = -1;
+    if (currentDir != null) {
+      File parent = currentDir.getParentFile();
+      this.usage = new DF(parent, conf);
+      cacheExecutor = initializeCacheExecutor(parent);
+      this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath());
+    } else {
+      this.usage = null;
+      cacheExecutor = null;
+      this.metrics = null;
+    }
     this.conf = conf;
     this.fileIoProvider = fileIoProvider;
-    cacheExecutor = initializeCacheExecutor(parent);
-    this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath());
   }
 
   protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
@@ -440,7 +446,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
   /**
    * Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
    *
-   * @return
+   * @return Disk usage excluding space used by HDFS and excluding space
+   * reserved for blocks open for write.
    * @throws IOException
    */
   public long getNonDfsUsed() throws IOException {
@@ -518,7 +525,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   public String[] getBlockPoolList() {
     return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);   
   }
-    
+
   /**
    * Temporary files. They get moved to the finalized block directory when
    * the block is finalized.

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 
@@ -67,6 +68,11 @@ public class FsVolumeImplBuilder {
   }
 
   FsVolumeImpl build() throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      return new ProvidedVolumeImpl(dataset, storageID, sd,
+          fileIoProvider != null ? fileIoProvider :
+            new FileIoProvider(null, null), conf);
+    }
     return new FsVolumeImpl(
         dataset, storageID, sd,
         fileIoProvider != null ? fileIoProvider :

+ 34 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+/**
+ * This interface is used to define the usage statistics
+ * of the provided storage.
+ */
+public interface ProvidedVolumeDF {
+
+  long getCapacity();
+
+  long getSpaceUsed();
+
+  long getBlockPoolUsed(String bpid);
+
+  long getAvailable();
+}

+ 526 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java

@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.util.Timer;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+
+/**
+ * This class is used to create provided volumes.
+ */
+public class ProvidedVolumeImpl extends FsVolumeImpl {
+
+  static class ProvidedBlockPoolSlice {
+    private FsVolumeImpl providedVolume;
+
+    private FileRegionProvider provider;
+    private Configuration conf;
+    private String bpid;
+    private ReplicaMap bpVolumeMap;
+
+    ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
+        Configuration conf) {
+      this.providedVolume = volume;
+      bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+      Class<? extends FileRegionProvider> fmt =
+          conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+              TextFileRegionProvider.class, FileRegionProvider.class);
+      provider = ReflectionUtils.newInstance(fmt, conf);
+      this.conf = conf;
+      this.bpid = bpid;
+      bpVolumeMap.initBlockPool(bpid);
+      LOG.info("Created provider: " + provider.getClass());
+    }
+
+    FileRegionProvider getFileRegionProvider() {
+      return provider;
+    }
+
+    public void getVolumeMap(ReplicaMap volumeMap,
+        RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
+      Iterator<FileRegion> iter = provider.iterator();
+      while(iter.hasNext()) {
+        FileRegion region = iter.next();
+        if (region.getBlockPoolId() != null &&
+            region.getBlockPoolId().equals(bpid)) {
+          ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+              .setBlockId(region.getBlock().getBlockId())
+              .setURI(region.getPath().toUri())
+              .setOffset(region.getOffset())
+              .setLength(region.getBlock().getNumBytes())
+              .setGenerationStamp(region.getBlock().getGenerationStamp())
+              .setFsVolume(providedVolume)
+              .setConf(conf).build();
+
+          ReplicaInfo oldReplica =
+              volumeMap.get(bpid, newReplica.getBlockId());
+          if (oldReplica == null) {
+            volumeMap.add(bpid, newReplica);
+            bpVolumeMap.add(bpid, newReplica);
+          } else {
+            throw new IOException(
+                "A block with id " + newReplica.getBlockId() +
+                " already exists in the volumeMap");
+          }
+        }
+      }
+    }
+
+    public boolean isEmpty() {
+      return bpVolumeMap.replicas(bpid).size() == 0;
+    }
+
+    public void shutdown(BlockListAsLongs blocksListsAsLongs) {
+      //nothing to do!
+    }
+
+    public void compileReport(LinkedList<ScanInfo> report,
+        ReportCompiler reportCompiler)
+            throws IOException, InterruptedException {
+      /* refresh the provider and return the list of blocks found.
+       * the assumption here is that the block ids in the external
+       * block map, after the refresh, are consistent with those
+       * from before the refresh, i.e., for blocks which did not change,
+       * the ids remain the same.
+       */
+      provider.refresh();
+      Iterator<FileRegion> iter = provider.iterator();
+      while(iter.hasNext()) {
+        reportCompiler.throttle();
+        FileRegion region = iter.next();
+        if (region.getBlockPoolId().equals(bpid)) {
+          LOG.info("Adding ScanInfo for blkid " +
+              region.getBlock().getBlockId());
+          report.add(new ScanInfo(region.getBlock().getBlockId(), null, null,
+              providedVolume, region, region.getLength()));
+        }
+      }
+    }
+  }
+
+  private URI baseURI;
+  private final Map<String, ProvidedBlockPoolSlice> bpSlices =
+      new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
+
+  private ProvidedVolumeDF df;
+
+  ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
+      StorageDirectory sd, FileIoProvider fileIoProvider,
+      Configuration conf) throws IOException {
+    super(dataset, storageID, sd, fileIoProvider, conf);
+    assert getStorageLocation().getStorageType() == StorageType.PROVIDED:
+      "Only provided storages must use ProvidedVolume";
+
+    baseURI = getStorageLocation().getUri();
+    Class<? extends ProvidedVolumeDF> dfClass =
+        conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
+            DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
+    df = ReflectionUtils.newInstance(dfClass, conf);
+  }
+
+  @Override
+  public String[] getBlockPoolList() {
+    return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
+  }
+
+  @Override
+  public long getCapacity() {
+    if (configuredCapacity < 0) {
+      return df.getCapacity();
+    }
+    return configuredCapacity;
+  }
+
+  @Override
+  public long getDfsUsed() throws IOException {
+    return df.getSpaceUsed();
+  }
+
+  @Override
+  long getBlockPoolUsed(String bpid) throws IOException {
+    return df.getBlockPoolUsed(bpid);
+  }
+
+  @Override
+  public long getAvailable() throws IOException {
+    return df.getAvailable();
+  }
+
+  @Override
+  long getActualNonDfsUsed() throws IOException {
+    return df.getSpaceUsed();
+  }
+
+  @Override
+  public long getNonDfsUsed() throws IOException {
+    return 0L;
+  }
+
+  @Override
+  public URI getBaseURI() {
+    return baseURI;
+  }
+
+  @Override
+  public File getFinalizedDir(String bpid) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void reserveSpaceForReplica(long bytesToReserve) {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public void releaseReservedSpace(long bytesToRelease) {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  private static final ObjectWriter WRITER =
+      new ObjectMapper().writerWithDefaultPrettyPrinter();
+  private static final ObjectReader READER =
+      new ObjectMapper().reader(ProvidedBlockIteratorState.class);
+
+  private static class ProvidedBlockIteratorState {
+    ProvidedBlockIteratorState() {
+      iterStartMs = Time.now();
+      lastSavedMs = iterStartMs;
+      atEnd = false;
+      lastBlockId = -1;
+    }
+
+    // The wall-clock ms since the epoch at which this iterator was last saved.
+    @JsonProperty
+    private long lastSavedMs;
+
+    // The wall-clock ms since the epoch at which this iterator was created.
+    @JsonProperty
+    private long iterStartMs;
+
+    @JsonProperty
+    private boolean atEnd;
+
+    //The id of the last block read when the state of the iterator is saved.
+    //This implementation assumes that provided blocks are returned
+    //in sorted order of the block ids.
+    @JsonProperty
+    private long lastBlockId;
+  }
+
+  private class ProviderBlockIteratorImpl
+      implements FsVolumeSpi.BlockIterator {
+
+    private String bpid;
+    private String name;
+    private FileRegionProvider provider;
+    private Iterator<FileRegion> blockIterator;
+    private ProvidedBlockIteratorState state;
+
+    ProviderBlockIteratorImpl(String bpid, String name,
+        FileRegionProvider provider) {
+      this.bpid = bpid;
+      this.name = name;
+      this.provider = provider;
+      rewind();
+    }
+
+    @Override
+    public void close() throws IOException {
+      //No action needed
+    }
+
+    @Override
+    public ExtendedBlock nextBlock() throws IOException {
+      if (null == blockIterator || !blockIterator.hasNext()) {
+        return null;
+      }
+      FileRegion nextRegion = null;
+      while (null == nextRegion && blockIterator.hasNext()) {
+        FileRegion temp = blockIterator.next();
+        if (temp.getBlock().getBlockId() < state.lastBlockId) {
+          continue;
+        }
+        if (temp.getBlockPoolId().equals(bpid)) {
+          nextRegion = temp;
+        }
+      }
+      if (null == nextRegion) {
+        return null;
+      }
+      state.lastBlockId = nextRegion.getBlock().getBlockId();
+      return new ExtendedBlock(bpid, nextRegion.getBlock());
+    }
+
+    @Override
+    public boolean atEnd() {
+      return blockIterator != null ? !blockIterator.hasNext(): true;
+    }
+
+    @Override
+    public void rewind() {
+      blockIterator = provider.iterator();
+      state = new ProvidedBlockIteratorState();
+    }
+
+    @Override
+    public void save() throws IOException {
+      //We do not persist the state of this iterator anywhere, locally.
+      //We just re-scan provided volumes as necessary.
+      state.lastSavedMs = Time.now();
+    }
+
+    @Override
+    public void setMaxStalenessMs(long maxStalenessMs) {
+      //do not use max staleness
+    }
+
+    @Override
+    public long getIterStartMs() {
+      return state.iterStartMs;
+    }
+
+    @Override
+    public long getLastSavedMs() {
+      return state.lastSavedMs;
+    }
+
+    @Override
+    public String getBlockPoolId() {
+      return bpid;
+    }
+
+    public void load() throws IOException {
+      //on load, we just rewind the iterator for provided volumes.
+      rewind();
+      LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
+          bpid, name, WRITER.writeValueAsString(state));
+    }
+  }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return new ProviderBlockIteratorImpl(bpid, name,
+        bpSlices.get(bpid).getFileRegionProvider());
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
+        bpSlices.get(bpid).getFileRegionProvider());
+    iter.load();
+    return iter;
+  }
+
+  @Override
+  ReplicaInfo addFinalizedBlock(String bpid, Block b,
+      ReplicaInfo replicaInfo, long bytesReserved) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public VolumeCheckResult check(VolumeCheckContext ignored)
+      throws DiskErrorException {
+    return VolumeCheckResult.HEALTHY;
+  }
+
+  @Override
+  void getVolumeMap(ReplicaMap volumeMap,
+      final RamDiskReplicaTracker ramDiskReplicaMap)
+          throws IOException {
+    LOG.info("Creating volumemap for provided volume " + this);
+    for(ProvidedBlockPoolSlice s : bpSlices.values()) {
+      s.getVolumeMap(volumeMap, ramDiskReplicaMap);
+    }
+  }
+
+  private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid)
+      throws IOException {
+    ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp == null) {
+      throw new IOException("block pool " + bpid + " is not found");
+    }
+    return bp;
+  }
+
+  @Override
+  void getVolumeMap(String bpid, ReplicaMap volumeMap,
+      final RamDiskReplicaTracker ramDiskReplicaMap)
+          throws IOException {
+    getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
+  }
+
+  @VisibleForTesting
+  FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
+    return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
+  }
+
+  @Override
+  public String toString() {
+    return this.baseURI.toString();
+  }
+
+  @Override
+  void addBlockPool(String bpid, Configuration conf) throws IOException {
+    addBlockPool(bpid, conf, null);
+  }
+
+  @Override
+  void addBlockPool(String bpid, Configuration conf, Timer timer)
+      throws IOException {
+    LOG.info("Adding block pool " + bpid +
+        " to volume with id " + getStorageID());
+    ProvidedBlockPoolSlice bp;
+    bp = new ProvidedBlockPoolSlice(bpid, this, conf);
+    bpSlices.put(bpid, bp);
+  }
+
+  void shutdown() {
+    if (cacheExecutor != null) {
+      cacheExecutor.shutdown();
+    }
+    Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet();
+    for (Entry<String, ProvidedBlockPoolSlice> entry : set) {
+      entry.getValue().shutdown(null);
+    }
+  }
+
+  @Override
+  void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
+    ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp != null) {
+      bp.shutdown(blocksListsAsLongs);
+    }
+    bpSlices.remove(bpid);
+  }
+
+  @Override
+  boolean isBPDirEmpty(String bpid) throws IOException {
+    return getProvidedBlockPoolSlice(bpid).isEmpty();
+  }
+
+  @Override
+  void deleteBPDirectories(String bpid, boolean force) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public LinkedList<ScanInfo> compileReport(String bpid,
+      LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+          throws InterruptedException, IOException {
+    LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
+    //get the report from the appropriate block pool.
+    if(bpSlices.containsKey(bpid)) {
+      bpSlices.get(bpid).compileReport(report, reportCompiler);
+    }
+    return report;
+  }
+
+  @Override
+  public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
+      long newGS, long estimateBlockLen) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
+      ReplicaInfo temp) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline createTemporary(ExtendedBlock b)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
+      String bpid, long newBlockId, long recoveryId, long newlength)
+          throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
+      ReplicaInfo replicaInfo, int smallBufferSize,
+      Configuration conf) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
+      long genStamp, ReplicaInfo replicaInfo, int smallBufferSize,
+      Configuration conf) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java

@@ -686,7 +686,7 @@ public class Mover {
     }
   }
 
-  static class Cli extends Configured implements Tool {
+  public static class Cli extends Configured implements Tool {
     private static final String USAGE = "Usage: hdfs mover "
         + "[-p <files/dirs> | -f <local file>]"
         + "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate."

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java

@@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class FSImageCompression {
+public class FSImageCompression {
 
   /** Codec to use to save or load image, or null if the image is not compressed */
   private CompressionCodec imageCodec;

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java

@@ -658,6 +658,10 @@ public class NNStorage extends Storage implements Closeable,
   void readProperties(StorageDirectory sd, StartupOption startupOption)
       throws IOException {
     Properties props = readPropertiesFile(sd.getVersionFile());
+    if (props == null) {
+      throw new IOException(
+          "Properties not found  for storage directory " + sd);
+    }
     if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK
         .matches(startupOption)) {
       int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
@@ -975,7 +979,11 @@ public class NNStorage extends Storage implements Closeable,
       StorageDirectory sd = sdit.next();
       try {
         Properties props = readPropertiesFile(sd.getVersionFile());
-        cid = props.getProperty("clusterID");
+        if (props == null) {
+          cid = null;
+        } else {
+          cid = props.getProperty("clusterID");
+        }
         LOG.info("current cluster id for sd="+sd.getCurrentDir() + 
             ";lv=" + layoutVersion + ";cid=" + cid);
         

+ 78 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -4621,6 +4621,84 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.provider.class</name>
+    <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
+    <description>
+        The class that is used to load information about blocks stored in
+        provided storages.
+        org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
+        is used as the default, which expects the blocks to be specified
+        using a delimited text file.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.df.class</name>
+    <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
+    <description>
+        The class that is used to measure usage statistics of provided stores.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.storage.id</name>
+    <value>DS-PROVIDED</value>
+    <description>
+        The storage ID used for provided stores.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.blockformat.class</name>
+    <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
+    <description>
+      The class that is used to specify the input format of the blocks on
+      provided storages. The default is
+      org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
+      file regions to describe blocks. The file regions are specified as a
+      delimited text file. Each file region is a 6-tuple containing the
+      block id, remote file path, offset into file, length of block, the
+      block pool id containing the block, and the generation stamp of the
+      block.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.delimiter</name>
+    <value>,</value>
+    <description>
+        The delimiter used when the provided block map is specified as
+        a text file.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.read.path</name>
+    <value></value>
+    <description>
+        The path specifying the provided block map as a text file, specified as
+        a URI.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.read.codec</name>
+    <value></value>
+    <description>
+        The codec used to de-compress the provided block map.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.write.path</name>
+    <value></value>
+    <description>
+        The path to which the provided block map should be written as a text
+        file, specified as a URI.
+    </description>
+  </property>
+
   <property>
     <name>dfs.lock.suppress.warning.interval</name>
     <value>10s</value>

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java

@@ -208,7 +208,7 @@ public class TestDFSRollback {
       UpgradeUtilities.createDataNodeVersionFile(
           dataCurrentDirs,
           storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
 
       cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
       assertTrue(cluster.isDataNodeUp());
@@ -256,7 +256,7 @@ public class TestDFSRollback {
           NodeType.DATA_NODE);
       
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.ROLLBACK, 
           cluster.getNamesystem().getBlockPoolId());
@@ -283,7 +283,7 @@ public class TestDFSRollback {
           NodeType.DATA_NODE);
      
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.ROLLBACK, 
           cluster.getNamesystem().getBlockPoolId());

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java

@@ -265,7 +265,7 @@ public class TestDFSStartupVersions {
           conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
       log("DataNode version info", DATA_NODE, i, versions[i]);
       UpgradeUtilities.createDataNodeVersionFile(storage,
-          versions[i].storageInfo, bpid, versions[i].blockPoolId);
+          versions[i].storageInfo, bpid, versions[i].blockPoolId, conf);
       try {
         cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
       } catch (Exception ignore) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java

@@ -290,7 +290,7 @@ public class TestDFSUpgrade {
           UpgradeUtilities.getCurrentFsscTime(cluster), NodeType.DATA_NODE);
       
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
           .getCurrentBlockPoolID(null));
@@ -308,7 +308,7 @@ public class TestDFSUpgrade {
           NodeType.DATA_NODE);
           
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, 
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       // Ensure corresponding block pool failed to initialized
       startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
           .getCurrentBlockPoolID(null));

+ 10 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java

@@ -384,8 +384,10 @@ public class UpgradeUtilities {
           new File(datanodeStorage.toString()));
       sd.setStorageUuid(DatanodeStorage.generateUuid());
       Properties properties = Storage.readPropertiesFile(sd.getVersionFile());
-      properties.setProperty("storageID", sd.getStorageUuid());
-      Storage.writeProperties(sd.getVersionFile(), properties);
+      if (properties != null) {
+        properties.setProperty("storageID", sd.getStorageUuid());
+        Storage.writeProperties(sd.getVersionFile(), properties);
+      }
 
       retVal[i] = newDir;
     }
@@ -461,8 +463,9 @@ public class UpgradeUtilities {
    * @param bpid Block pool Id
    */
   public static void createDataNodeVersionFile(File[] parent,
-      StorageInfo version, String bpid) throws IOException {
-    createDataNodeVersionFile(parent, version, bpid, bpid);
+      StorageInfo version, String bpid, Configuration conf)
+          throws IOException {
+    createDataNodeVersionFile(parent, version, bpid, bpid, conf);
   }
   
   /**
@@ -477,7 +480,8 @@ public class UpgradeUtilities {
    * @param bpidToWrite Block pool Id to write into the version file
    */
   public static void createDataNodeVersionFile(File[] parent,
-      StorageInfo version, String bpid, String bpidToWrite) throws IOException {
+      StorageInfo version, String bpid, String bpidToWrite, Configuration conf)
+          throws IOException {
     DataStorage storage = new DataStorage(version);
     storage.setDatanodeUuid("FixedDatanodeUuid");
 
@@ -485,7 +489,7 @@ public class UpgradeUtilities {
     for (int i = 0; i < parent.length; i++) {
       File versionFile = new File(parent[i], "VERSION");
       StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
-      DataStorage.createStorageID(sd, false);
+      DataStorage.createStorageID(sd, false, conf);
       storage.writeProperties(versionFile, sd);
       versionFiles[i] = versionFile;
       File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test for the text based block format for provided block maps.
+ */
+public class TestTextBlockFormat {
+
+  static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
+
+  void check(TextWriter.Options opts, final Path vp,
+      final Class<? extends CompressionCodec> vc) throws IOException {
+    TextFileRegionFormat mFmt = new TextFileRegionFormat() {
+      @Override
+      public TextWriter createWriter(Path file, CompressionCodec codec,
+          String delim, Configuration conf) throws IOException {
+        assertEquals(vp, file);
+        if (null == vc) {
+          assertNull(codec);
+        } else {
+          assertEquals(vc, codec.getClass());
+        }
+        return null; // ignored
+      }
+    };
+    mFmt.getWriter(opts);
+  }
+
+  @Test
+  public void testWriterOptions() throws Exception {
+    TextWriter.Options opts = TextWriter.defaults();
+    assertTrue(opts instanceof WriterOptions);
+    WriterOptions wopts = (WriterOptions) opts;
+    Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+    assertEquals(def, wopts.getFile());
+    assertNull(wopts.getCodec());
+
+    opts.filename(OUTFILE);
+    check(opts, OUTFILE, null);
+
+    opts.filename(OUTFILE);
+    opts.codec("gzip");
+    Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
+    check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
+
+  }
+
+  @Test
+  public void testCSVReadWrite() throws Exception {
+    final DataOutputBuffer out = new DataOutputBuffer();
+    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
+      csv.store(r1);
+      csv.store(r2);
+      csv.store(r3);
+    }
+    Iterator<FileRegion> i3;
+    try (TextReader csv = new TextReader(null, null, null, ",") {
+      @Override
+      public InputStream createStream() {
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(out.getData(), 0, out.getLength());
+        return in;
+        }}) {
+      Iterator<FileRegion> i1 = csv.iterator();
+      assertEquals(r1, i1.next());
+      Iterator<FileRegion> i2 = csv.iterator();
+      assertEquals(r1, i2.next());
+      assertEquals(r2, i2.next());
+      assertEquals(r3, i2.next());
+      assertEquals(r2, i1.next());
+      assertEquals(r3, i1.next());
+
+      assertFalse(i1.hasNext());
+      assertFalse(i2.hasNext());
+      i3 = csv.iterator();
+    }
+    try {
+      i3.next();
+    } catch (IllegalStateException e) {
+      return;
+    }
+    fail("Invalid iterator");
+  }
+
+  @Test
+  public void testCSVReadWriteTsv() throws Exception {
+    final DataOutputBuffer out = new DataOutputBuffer();
+    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
+      csv.store(r1);
+      csv.store(r2);
+      csv.store(r3);
+    }
+    Iterator<FileRegion> i3;
+    try (TextReader csv = new TextReader(null, null, null, "\t") {
+      @Override
+      public InputStream createStream() {
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(out.getData(), 0, out.getLength());
+        return in;
+      }}) {
+      Iterator<FileRegion> i1 = csv.iterator();
+      assertEquals(r1, i1.next());
+      Iterator<FileRegion> i2 = csv.iterator();
+      assertEquals(r1, i2.next());
+      assertEquals(r2, i2.next());
+      assertEquals(r3, i2.next());
+      assertEquals(r2, i1.next());
+      assertEquals(r3, i1.next());
+
+      assertFalse(i1.hasNext());
+      assertFalse(i2.hasNext());
+      i3 = csv.iterator();
+    }
+    try {
+      i3.next();
+    } catch (IllegalStateException e) {
+      return;
+    }
+    fail("Invalid iterator");
+  }
+
+}

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -616,7 +617,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     this.datanode = datanode;
     if (storage != null) {
       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
-        DataStorage.createStorageID(storage.getStorageDir(i), false);
+        DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
       }
       this.datanodeUuid = storage.getDatanodeUuid();
     } else {
@@ -1352,8 +1353,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException {
+  public void checkAndUpdate(String bpid, ScanInfo info) throws IOException {
     throw new UnsupportedOperationException();
   }
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -94,8 +95,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+  public void checkAndUpdate(String bpid, ScanInfo info) {
+    return;
   }
 
   @Override

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -119,11 +119,12 @@ public class TestFsDatasetImpl {
   
   private final static String BLOCKPOOL = "BP-TEST";
 
-  private static Storage.StorageDirectory createStorageDirectory(File root)
+  private static Storage.StorageDirectory createStorageDirectory(File root,
+      Configuration conf)
       throws SecurityException, IOException {
     Storage.StorageDirectory sd = new Storage.StorageDirectory(
         StorageLocation.parse(root.toURI().toString()));
-    DataStorage.createStorageID(sd, false);
+    DataStorage.createStorageID(sd, false, conf);
     return sd;
   }
 
@@ -137,7 +138,7 @@ public class TestFsDatasetImpl {
       File loc = new File(BASE_DIR + "/data" + i);
       dirStrings.add(new Path(loc.toString()).toUri().toString());
       loc.mkdirs();
-      dirs.add(createStorageDirectory(loc));
+      dirs.add(createStorageDirectory(loc, conf));
       when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
     }
 
@@ -197,7 +198,8 @@ public class TestFsDatasetImpl {
       String pathUri = new Path(path).toUri().toString();
       expectedVolumes.add(new File(pathUri).getAbsolutePath());
       StorageLocation loc = StorageLocation.parse(pathUri);
-      Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+      Storage.StorageDirectory sd = createStorageDirectory(
+          new File(path), conf);
       DataStorage.VolumeBuilder builder =
           new DataStorage.VolumeBuilder(storage, sd);
       when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -315,7 +317,8 @@ public class TestFsDatasetImpl {
     String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
     StorageLocation loc = StorageLocation.parse(newVolumePath);
 
-    Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
+    Storage.StorageDirectory sd = createStorageDirectory(
+        new File(newVolumePath), conf);
     DataStorage.VolumeBuilder builder =
         new DataStorage.VolumeBuilder(storage, sd);
     when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -348,7 +351,7 @@ public class TestFsDatasetImpl {
         any(ReplicaMap.class),
         any(RamDiskReplicaLruTracker.class));
 
-    Storage.StorageDirectory sd = createStorageDirectory(badDir);
+    Storage.StorageDirectory sd = createStorageDirectory(badDir, conf);
     sd.lock();
     DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
     when(storage.prepareVolume(eq(datanode),
@@ -492,7 +495,7 @@ public class TestFsDatasetImpl {
     String path = BASE_DIR + "/newData0";
     String pathUri = new Path(path).toUri().toString();
     StorageLocation loc = StorageLocation.parse(pathUri);
-    Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+    Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf);
     DataStorage.VolumeBuilder builder =
         new DataStorage.VolumeBuilder(storage, sd);
     when(

+ 426 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java

@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic test cases for provided implementation.
+ */
+public class TestProvidedImpl {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFsDatasetImpl.class);
+  private static final String BASE_DIR =
+      new FileSystemTestHelper().getTestRootDir();
+  private static final int NUM_LOCAL_INIT_VOLUMES = 1;
+  private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
+  private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
+  private static final int NUM_PROVIDED_BLKS = 10;
+  private static final long BLK_LEN = 128 * 1024;
+  private static final int MIN_BLK_ID = 0;
+  private static final int CHOSEN_BP_ID = 0;
+
+  private static String providedBasePath = BASE_DIR;
+
+  private Configuration conf;
+  private DataNode datanode;
+  private DataStorage storage;
+  private FsDatasetImpl dataset;
+  private static Map<Long, String> blkToPathMap;
+  private static List<FsVolumeImpl> providedVolumes;
+
+  /**
+   * A simple FileRegion iterator for tests.
+   */
+  public static class TestFileRegionIterator implements Iterator<FileRegion> {
+
+    private int numBlocks;
+    private int currentCount;
+    private String basePath;
+
+    public TestFileRegionIterator(String basePath, int minID, int numBlocks) {
+      this.currentCount = minID;
+      this.numBlocks = numBlocks;
+      this.basePath = basePath;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentCount < numBlocks;
+    }
+
+    @Override
+    public FileRegion next() {
+      FileRegion region = null;
+      if (hasNext()) {
+        File newFile = new File(basePath, "file" + currentCount);
+        if(!newFile.exists()) {
+          try {
+            LOG.info("Creating file for blkid " + currentCount);
+            blkToPathMap.put((long) currentCount, newFile.getAbsolutePath());
+            LOG.info("Block id " + currentCount + " corresponds to file " +
+                newFile.getAbsolutePath());
+            newFile.createNewFile();
+            Writer writer = new OutputStreamWriter(
+                new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
+            for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) {
+              writer.write(currentCount);
+            }
+            writer.flush();
+            writer.close();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+        region = new FileRegion(currentCount, new Path(newFile.toString()),
+            0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+        currentCount++;
+      }
+      return region;
+    }
+
+    @Override
+    public void remove() {
+      //do nothing.
+    }
+
+    public void resetMinBlockId(int minId) {
+      currentCount = minId;
+    }
+
+    public void resetBlockCount(int numBlocks) {
+      this.numBlocks = numBlocks;
+    }
+
+  }
+
+  /**
+   * A simple FileRegion provider for tests.
+   */
+  public static class TestFileRegionProvider
+      extends FileRegionProvider implements Configurable {
+
+    private Configuration conf;
+    private int minId;
+    private int numBlocks;
+
+    TestFileRegionProvider() {
+      minId = MIN_BLK_ID;
+      numBlocks = NUM_PROVIDED_BLKS;
+    }
+
+    @Override
+    public Iterator<FileRegion> iterator() {
+      return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void refresh() {
+      //do nothing!
+    }
+
+    public void setMinBlkId(int minId) {
+      this.minId = minId;
+    }
+
+    public void setBlockCount(int numBlocks) {
+      this.numBlocks = numBlocks;
+    }
+  }
+
+  private static Storage.StorageDirectory createLocalStorageDirectory(
+      File root, Configuration conf)
+      throws SecurityException, IOException {
+    Storage.StorageDirectory sd =
+        new Storage.StorageDirectory(
+            StorageLocation.parse(root.toURI().toString()));
+    DataStorage.createStorageID(sd, false, conf);
+    return sd;
+  }
+
+  private static Storage.StorageDirectory createProvidedStorageDirectory(
+      String confString, Configuration conf)
+      throws SecurityException, IOException {
+    Storage.StorageDirectory sd =
+        new Storage.StorageDirectory(StorageLocation.parse(confString));
+    DataStorage.createStorageID(sd, false, conf);
+    return sd;
+  }
+
+  private static void createStorageDirs(DataStorage storage,
+      Configuration conf, int numDirs, int numProvidedDirs)
+          throws IOException {
+    List<Storage.StorageDirectory> dirs =
+        new ArrayList<Storage.StorageDirectory>();
+    List<String> dirStrings = new ArrayList<String>();
+    FileUtils.deleteDirectory(new File(BASE_DIR));
+    for (int i = 0; i < numDirs; i++) {
+      File loc = new File(BASE_DIR, "data" + i);
+      dirStrings.add(new Path(loc.toString()).toUri().toString());
+      loc.mkdirs();
+      dirs.add(createLocalStorageDirectory(loc, conf));
+      when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+    }
+
+    for (int i = numDirs; i < numDirs + numProvidedDirs; i++) {
+      File loc = new File(BASE_DIR, "data" + i);
+      providedBasePath = loc.getAbsolutePath();
+      loc.mkdirs();
+      String dirString = "[PROVIDED]" +
+          new Path(loc.toString()).toUri().toString();
+      dirStrings.add(dirString);
+      dirs.add(createProvidedStorageDirectory(dirString, conf));
+      when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+    }
+
+    String dataDir = StringUtils.join(",", dirStrings);
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+    when(storage.dirIterator()).thenReturn(dirs.iterator());
+    when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs);
+  }
+
+  private int getNumVolumes() {
+    try (FsDatasetSpi.FsVolumeReferences volumes =
+        dataset.getFsVolumeReferences()) {
+      return volumes.size();
+    } catch (IOException e) {
+      return 0;
+    }
+  }
+
+  private void compareBlkFile(InputStream ins, String filepath)
+      throws FileNotFoundException, IOException {
+    try (ReadableByteChannel i = Channels.newChannel(
+        new FileInputStream(new File(filepath)))) {
+      try (ReadableByteChannel j = Channels.newChannel(ins)) {
+        ByteBuffer ib = ByteBuffer.allocate(4096);
+        ByteBuffer jb = ByteBuffer.allocate(4096);
+        while (true) {
+          int il = i.read(ib);
+          int jl = j.read(jb);
+          if (il < 0 || jl < 0) {
+            assertEquals(il, jl);
+            break;
+          }
+          ib.flip();
+          jb.flip();
+          int cmp = Math.min(ib.remaining(), jb.remaining());
+          for (int k = 0; k < cmp; ++k) {
+            assertEquals(ib.get(), jb.get());
+          }
+          ib.compact();
+          jb.compact();
+        }
+      }
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    datanode = mock(DataNode.class);
+    storage = mock(DataStorage.class);
+    this.conf = new Configuration();
+    this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
+
+    when(datanode.getConf()).thenReturn(conf);
+    final DNConf dnConf = new DNConf(datanode);
+    when(datanode.getDnConf()).thenReturn(dnConf);
+
+    final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+    when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
+    final ShortCircuitRegistry shortCircuitRegistry =
+        new ShortCircuitRegistry(conf);
+    when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
+
+    this.conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+        TestFileRegionProvider.class, FileRegionProvider.class);
+
+    blkToPathMap = new HashMap<Long, String>();
+    providedVolumes = new LinkedList<FsVolumeImpl>();
+
+    createStorageDirs(
+        storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES);
+
+    dataset = new FsDatasetImpl(datanode, storage, conf);
+    FsVolumeReferences volumes = dataset.getFsVolumeReferences();
+    for (int i = 0; i < volumes.size(); i++) {
+      FsVolumeSpi vol = volumes.get(i);
+      if (vol.getStorageType() == StorageType.PROVIDED) {
+        providedVolumes.add((FsVolumeImpl) vol);
+      }
+    }
+
+    for (String bpid : BLOCK_POOL_IDS) {
+      dataset.addBlockPool(bpid, conf);
+    }
+
+    assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES,
+        getNumVolumes());
+    assertEquals(0, dataset.getNumFailedVolumes());
+  }
+
+  @Test
+  public void testProvidedStorageID() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
+          providedVolumes.get(i).getStorageID());
+    }
+  }
+
+  @Test
+  public void testBlockLoad() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      FsVolumeImpl vol = providedVolumes.get(i);
+      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      vol.getVolumeMap(volumeMap, null);
+
+      assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
+      for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
+        if (j != CHOSEN_BP_ID) {
+          //this block pool should not have any blocks
+          assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
+        }
+      }
+      assertEquals(NUM_PROVIDED_BLKS,
+          volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
+    }
+  }
+
+  @Test
+  public void testProvidedBlockRead() throws IOException {
+    for (int id = 0; id < NUM_PROVIDED_BLKS; id++) {
+      ExtendedBlock eb = new ExtendedBlock(
+          BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN,
+          HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+      InputStream ins = dataset.getBlockInputStream(eb, 0);
+      String filepath = blkToPathMap.get((long) id);
+      compareBlkFile(ins, filepath);
+    }
+  }
+
+  @Test
+  public void testProvidedBlockIterator() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      FsVolumeImpl vol = providedVolumes.get(i);
+      BlockIterator iter =
+          vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp");
+      Set<Long> blockIdsUsed = new HashSet<Long>();
+      while(!iter.atEnd()) {
+        ExtendedBlock eb = iter.nextBlock();
+        long blkId = eb.getBlockId();
+        assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
+        //all block ids must be unique!
+        assertTrue(!blockIdsUsed.contains(blkId));
+        blockIdsUsed.add(blkId);
+      }
+      assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size());
+    }
+  }
+
+
+  @Test
+  public void testRefresh() throws IOException {
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
+      TestFileRegionProvider provider = (TestFileRegionProvider)
+          vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+      //equivalent to two new blocks appearing
+      provider.setBlockCount(NUM_PROVIDED_BLKS + 2);
+      //equivalent to deleting the first block
+      provider.setMinBlkId(MIN_BLK_ID + 1);
+
+      DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
+      scanner.reconcile();
+      ReplicaInfo info = dataset.getBlockReplica(
+          BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
+      //new replica should be added to the dataset
+      assertTrue(info != null);
+      try {
+        info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
+      } catch(Exception ex) {
+        LOG.info("Exception expected: " + ex);
+      }
+    }
+  }
+}

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java

@@ -68,7 +68,10 @@ public class TestClusterId {
       fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);
     StorageDirectory sd = sdit.next();
     Properties props = Storage.readPropertiesFile(sd.getVersionFile());
-    String cid = props.getProperty("clusterID");
+    String cid = null;
+    if (props != null) {
+      cid = props.getProperty("clusterID");
+    }
     LOG.info("successfully formated : sd="+sd.getCurrentDir() + ";cid="+cid);
     return cid;
   }