Ver Fonte

HDDS-156. Implement HDDSVolume to manage volume state

Hanisha Koneru há 7 anos atrás
pai
commit
9a5552bf76
18 ficheiros alterados com 1517 adições e 429 exclusões
  1. 5 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  2. 80 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java
  3. 95 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java
  4. 0 251
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java
  5. 2 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java
  6. 163 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
  7. 330 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
  8. 5 4
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
  9. 24 54
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java
  10. 309 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
  11. 10 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java
  12. 21 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java
  13. 38 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java
  14. 134 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java
  15. 0 100
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestRoundRobinVolumeChoosingPolicy.java
  16. 145 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java
  17. 131 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java
  18. 25 17
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java

+ 5 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -33,6 +33,11 @@ public final class OzoneConsts {
   public static final String OZONE_SIMPLE_ROOT_USER = "root";
   public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
 
+  public static final String STORAGE_ID = "storageID";
+  public static final String DATANODE_UUID = "datanodeUuid";
+  public static final String CLUSTER_ID = "clusterID";
+  public static final String LAYOUTVERSION = "layOutVersion";
+  public static final String CTIME = "ctime";
   /*
    * BucketName length is used for both buckets and volume lengths
    */

+ 80 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+/**
+ * Datanode layout version which describes information about the layout version
+ * on the datanode.
+ */
+public final class DataNodeLayoutVersion {
+
+  // We will just be normal and use positive counting numbers for versions.
+  private final static DataNodeLayoutVersion[] VERSION_INFOS =
+      {new DataNodeLayoutVersion(1, "HDDS Datanode LayOut Version 1")};
+
+  private final String description;
+  private final int version;
+
+  /**
+   * Never created outside this class.
+   *
+   * @param description -- description
+   * @param version     -- version number
+   */
+  private DataNodeLayoutVersion(int version, String description) {
+    this.description = description;
+    this.version = version;
+  }
+
+  /**
+   * Returns all versions.
+   *
+   * @return Version info array.
+   */
+  public static DataNodeLayoutVersion[] getAllVersions() {
+    return VERSION_INFOS.clone();
+  }
+
+  /**
+   * Returns the latest version.
+   *
+   * @return versionInfo
+   */
+  public static DataNodeLayoutVersion getLatestVersion() {
+    return VERSION_INFOS[VERSION_INFOS.length - 1];
+  }
+
+  /**
+   * Return description.
+   *
+   * @return String
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * Return the version.
+   *
+   * @return int.
+   */
+  public int getVersion() {
+    return version;
+  }
+
+}

+ 95 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java

@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Properties;
+
+/**
+ * This is a utility class which helps to create the version file on datanode
+ * and also validate the content of the version file.
+ */
+public class DatanodeVersionFile {
+
+  private final String storageId;
+  private final String clusterId;
+  private final String datanodeUuid;
+  private final long cTime;
+  private final int layOutVersion;
+
+  public DatanodeVersionFile(String storageId, String clusterId,
+      String datanodeUuid, long cTime, int layOutVersion) {
+    this.storageId = storageId;
+    this.clusterId = clusterId;
+    this.datanodeUuid = datanodeUuid;
+    this.cTime = cTime;
+    this.layOutVersion = layOutVersion;
+  }
+
+  private Properties createProperties() {
+    Properties properties = new Properties();
+    properties.setProperty(OzoneConsts.STORAGE_ID, storageId);
+    properties.setProperty(OzoneConsts.CLUSTER_ID, clusterId);
+    properties.setProperty(OzoneConsts.DATANODE_UUID, datanodeUuid);
+    properties.setProperty(OzoneConsts.CTIME, String.valueOf(cTime));
+    properties.setProperty(OzoneConsts.LAYOUTVERSION, String.valueOf(
+        layOutVersion));
+    return properties;
+  }
+
+  /**
+   * Creates a version File in specified path.
+   * @param path
+   * @throws IOException
+   */
+  public void createVersionFile(File path) throws
+      IOException {
+    try (RandomAccessFile file = new RandomAccessFile(path, "rws");
+         FileOutputStream out = new FileOutputStream(file.getFD())) {
+      file.getChannel().truncate(0);
+      Properties properties = createProperties();
+      /*
+       * If server is interrupted before this line,
+       * the version file will remain unchanged.
+       */
+      properties.store(out, null);
+    }
+  }
+
+
+  /**
+   * Creates a property object from the specified file content.
+   * @param  versionFile
+   * @return Properties
+   * @throws IOException
+   */
+  public static Properties readFrom(File versionFile) throws IOException {
+    try (RandomAccessFile file = new RandomAccessFile(versionFile, "rws");
+         FileInputStream in = new FileInputStream(file.getFD())) {
+      Properties props = new Properties();
+      props.load(in);
+      return props;
+    }
+  }
+}

+ 0 - 251
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java

@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.container.common.impl.VolumeInfo.VolumeState;
-import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * VolumeSet to manage volumes in a DataNode.
- */
-public class VolumeSet {
-
-  private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
-
-  private Configuration conf;
-
-  /**
-   * {@link VolumeSet#volumeMap} maintains a map of all active volumes in the
-   * DataNode. Each volume has one-to-one mapping with a volumeInfo object.
-   */
-  private Map<Path, VolumeInfo> volumeMap;
-  /**
-   * {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have
-   * failed. The keys in this map and {@link VolumeSet#volumeMap} are
-   * mutually exclusive.
-   */
-  private Map<Path, VolumeInfo> failedVolumeMap;
-  /**
-   * {@link VolumeSet#volumeStateMap} maintains a list of active volumes per
-   * StorageType.
-   */
-  private EnumMap<StorageType, List<VolumeInfo>> volumeStateMap;
-
-  /**
-   * Lock to synchronize changes to the VolumeSet. Any update to
-   * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
-   * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
-   */
-  private final AutoCloseableLock volumeSetLock;
-
-  public VolumeSet(Configuration conf) throws DiskOutOfSpaceException {
-    this.conf = conf;
-    this.volumeSetLock = new AutoCloseableLock(
-        new InstrumentedLock(getClass().getName(), LOG,
-            new ReentrantLock(true),
-            conf.getTimeDuration(
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
-                TimeUnit.MILLISECONDS),
-            300));
-
-    initializeVolumeSet();
-  }
-
-  // Add DN volumes configured through ConfigKeys to volumeMap.
-  private void initializeVolumeSet() throws DiskOutOfSpaceException {
-    volumeMap = new ConcurrentHashMap<>();
-    failedVolumeMap = new ConcurrentHashMap<>();
-    volumeStateMap = new EnumMap<>(StorageType.class);
-
-    Collection<String> datanodeDirs = conf.getTrimmedStringCollection(
-        HDDS_DATANODE_DIR_KEY);
-    if (datanodeDirs.isEmpty()) {
-      datanodeDirs = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    }
-    if (datanodeDirs.isEmpty()) {
-      throw new IllegalArgumentException("No location configured in either "
-          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
-    }
-
-    for (StorageType storageType : StorageType.values()) {
-      volumeStateMap.put(storageType, new ArrayList<VolumeInfo>());
-    }
-
-    for (String dir : datanodeDirs) {
-      try {
-        VolumeInfo volumeInfo = getVolumeInfo(dir);
-
-        volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
-        volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
-      } catch (IOException e) {
-        LOG.error("Failed to parse the storage location: " + dir, e);
-      }
-    }
-
-    if (volumeMap.size() == 0) {
-      throw new DiskOutOfSpaceException("No storage location configured");
-    }
-  }
-
-  public void acquireLock() {
-    volumeSetLock.acquire();
-  }
-
-  public void releaseLock() {
-    volumeSetLock.release();
-  }
-
-  private VolumeInfo getVolumeInfo(String rootDir) throws IOException {
-    StorageLocation location = StorageLocation.parse(rootDir);
-    StorageType storageType = location.getStorageType();
-
-    VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir, conf);
-    volumeBuilder.storageType(storageType);
-    return volumeBuilder.build();
-  }
-
-  // Add a volume to VolumeSet
-  public void addVolume(String dataDir) throws IOException {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        LOG.warn("Volume : {} already exists in VolumeMap", dataDir);
-      } else {
-        if (failedVolumeMap.containsKey(dirPath)) {
-          failedVolumeMap.remove(dirPath);
-        }
-
-        VolumeInfo volumeInfo = getVolumeInfo(dirPath.toString());
-        volumeMap.put(dirPath, volumeInfo);
-        volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
-
-        LOG.debug("Added Volume : {} to VolumeSet", dataDir);
-      }
-    }
-  }
-
-  // Mark a volume as failed
-  public void failVolume(String dataDir) {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = volumeMap.get(dirPath);
-        volumeInfo.failVolume();
-
-        volumeMap.remove(dirPath);
-        volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
-        failedVolumeMap.put(dirPath, volumeInfo);
-
-        LOG.debug("Moving Volume : {} to failed Volumes", dataDir);
-      } else if (failedVolumeMap.containsKey(dirPath)) {
-        LOG.debug("Volume : {} is not active", dataDir);
-      } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
-      }
-    }
-  }
-
-  // Remove a volume from the VolumeSet completely.
-  public void removeVolume(String dataDir) throws IOException {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = volumeMap.get(dirPath);
-        volumeInfo.shutdown();
-
-        volumeMap.remove(dirPath);
-        volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
-
-        LOG.debug("Removed Volume : {} from VolumeSet", dataDir);
-      } else if (failedVolumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = failedVolumeMap.get(dirPath);
-        volumeInfo.setState(VolumeState.NON_EXISTENT);
-
-        failedVolumeMap.remove(dirPath);
-        LOG.debug("Removed Volume : {} from failed VolumeSet", dataDir);
-      } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
-      }
-    }
-  }
-
-  public VolumeInfo chooseVolume(long containerSize,
-      VolumeChoosingPolicy choosingPolicy) throws IOException {
-    return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
-  }
-
-  public void shutdown() {
-    for (VolumeInfo volumeInfo : volumeMap.values()) {
-      try {
-        volumeInfo.shutdown();
-      } catch (Exception e) {
-        LOG.error("Failed to shutdown volume : " + volumeInfo.getRootDir(), e);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public List<VolumeInfo> getVolumesList() {
-    return ImmutableList.copyOf(volumeMap.values());
-  }
-
-  @VisibleForTesting
-  public List<VolumeInfo> getFailedVolumesList() {
-    return ImmutableList.copyOf(failedVolumeMap.values());
-  }
-
-  @VisibleForTesting
-  public Map<Path, VolumeInfo> getVolumeMap() {
-    return ImmutableMap.copyOf(volumeMap);
-  }
-
-  @VisibleForTesting
-  public Map<StorageType, List<VolumeInfo>> getVolumeStateMap() {
-    return ImmutableMap.copyOf(volumeStateMap);
-  }
-}

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java

@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ozone.container.common.impl.VolumeInfo;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,6 +41,6 @@ public interface VolumeChoosingPolicy {
    * @return the chosen volume.
    * @throws IOException when disks are unavailable or are full.
    */
-  VolumeInfo chooseVolume(List<VolumeInfo> volumes, long maxContainerSize)
+  HddsVolume chooseVolume(List<HddsVolume> volumes, long maxContainerSize)
       throws IOException;
 }

+ 163 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java

@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * A util class for {@link HddsVolume}.
+ */
+public final class HddsVolumeUtil {
+
+  // Private constructor for Utility class. Unused.
+  private HddsVolumeUtil() {
+  }
+
+  private static final String VERSION_FILE   = "VERSION";
+  private static final String STORAGE_ID_PREFIX = "DS-";
+
+  public static File getVersionFile(File rootDir) {
+    return new File(rootDir, VERSION_FILE);
+  }
+
+  public static String generateUuid() {
+    return STORAGE_ID_PREFIX + UUID.randomUUID();
+  }
+
+  /**
+   * Get hddsRoot from volume root. If volumeRoot points to hddsRoot, it is
+   * returned as is.
+   * For a volumeRoot /data/disk1, the hddsRoot is /data/disk1/hdds.
+   * @param volumeRoot root of the volume.
+   * @return hddsRoot of the volume.
+   */
+  public static String getHddsRoot(String volumeRoot) {
+    if (volumeRoot.endsWith(HddsVolume.HDDS_VOLUME_DIR)) {
+      return volumeRoot;
+    } else {
+      File hddsRoot = new File(volumeRoot, HddsVolume.HDDS_VOLUME_DIR);
+      return hddsRoot.getPath();
+    }
+  }
+
+  /**
+   * Returns storageID if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getStorageID(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    return getProperty(props, OzoneConsts.STORAGE_ID, versionFile);
+  }
+
+  /**
+   * Returns clusterID if it is valid. It should match the clusterID from the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getClusterID(Properties props, File versionFile,
+      String clusterID) throws InconsistentStorageStateException {
+    String cid = getProperty(props, OzoneConsts.CLUSTER_ID, versionFile);
+
+    if (clusterID == null) {
+      return cid;
+    }
+    if (!clusterID.equals(cid)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "ClusterIDs. Version File : " + versionFile + " has clusterID: " +
+          cid + " and Datanode has clusterID: " + clusterID);
+    }
+    return cid;
+  }
+
+  /**
+   * Returns datanodeUuid if it is valid. It should match the UUID of the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getDatanodeUUID(Properties props, File versionFile,
+      String datanodeUuid)
+      throws InconsistentStorageStateException {
+    String datanodeID = getProperty(props, OzoneConsts.DATANODE_UUID,
+        versionFile);
+
+    if (datanodeUuid != null && !datanodeUuid.equals(datanodeID)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "DatanodeUUIDs. Version File : " + versionFile + " has datanodeUuid: "
+          + datanodeID + " and Datanode has datanodeUuid: " + datanodeUuid);
+    }
+    return datanodeID;
+  }
+
+  /**
+   * Returns creationTime if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static long getCreationTime(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    String cTimeStr = getProperty(props, OzoneConsts.CTIME, versionFile);
+
+    long cTime = Long.parseLong(cTimeStr);
+    long currentTime = Time.now();
+    if (cTime > currentTime || cTime < 0) {
+      throw new InconsistentStorageStateException("Invalid Creation time in " +
+          "Version File : " + versionFile + " - " + cTime + ". Current system" +
+          " time is " + currentTime);
+    }
+    return cTime;
+  }
+
+  /**
+   * Returns layOutVersion if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static int getLayOutVersion(Properties props, File versionFile) throws
+      InconsistentStorageStateException {
+    String lvStr = getProperty(props, OzoneConsts.LAYOUTVERSION, versionFile);
+
+    int lv = Integer.parseInt(lvStr);
+    if(DataNodeLayoutVersion.getLatestVersion().getVersion() != lv) {
+      throw new InconsistentStorageStateException("Invalid layOutVersion. " +
+          "Version file has layOutVersion as " + lv + " and latest Datanode " +
+          "layOutVersion is " +
+          DataNodeLayoutVersion.getLatestVersion().getVersion());
+    }
+    return lv;
+  }
+
+  private static String getProperty(Properties props, String propName, File
+      versionFile)
+      throws InconsistentStorageStateException {
+    String value = props.getProperty(propName);
+    if (StringUtils.isBlank(value)) {
+      throw new InconsistentStorageStateException("Invalid " + propName +
+          ". Version File : " + versionFile + " has null or empty " + propName);
+    }
+    return value;
+  }
+}

+ 330 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java

@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * HddsVolume represents volume in a datanode. {@link VolumeSet} maitains a
+ * list of HddsVolumes, one for each volume in the Datanode.
+ * {@link VolumeInfo} in encompassed by this class.
+ */
+public final class HddsVolume {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
+
+  public static final String HDDS_VOLUME_DIR = "hdds";
+
+  private final File hddsRootDir;
+  private final VolumeInfo volumeInfo;
+  private VolumeState state;
+
+  // VERSION file properties
+  private String storageID;       // id of the file system
+  private String clusterID;       // id of the cluster
+  private String datanodeUuid;    // id of the DataNode
+  private long cTime;             // creation time of the file system state
+  private int layoutVersion;      // layout version of the storage data
+
+  /**
+   * Builder for HddsVolume.
+   */
+  public static class Builder {
+    private final String volumeRootStr;
+    private Configuration conf;
+    private StorageType storageType;
+    private long configuredCapacity;
+
+    private String datanodeUuid;
+    private String clusterID;
+
+    public Builder(String rootDirStr) {
+      this.volumeRootStr = rootDirStr;
+    }
+
+    public Builder conf(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    public Builder storageType(StorageType st) {
+      this.storageType = st;
+      return this;
+    }
+
+    public Builder configuredCapacity(long capacity) {
+      this.configuredCapacity = capacity;
+      return this;
+    }
+
+    public Builder datanodeUuid(String datanodeUUID) {
+      this.datanodeUuid = datanodeUUID;
+      return this;
+    }
+
+    public Builder clusterID(String cid) {
+      this.clusterID = cid;
+      return this;
+    }
+
+    public HddsVolume build() throws IOException {
+      return new HddsVolume(this);
+    }
+  }
+
+  private HddsVolume(Builder b) throws IOException {
+    Preconditions.checkNotNull(b.volumeRootStr,
+        "Volume root dir cannot be null");
+    Preconditions.checkNotNull(b.datanodeUuid, "DatanodeUUID cannot be null");
+    Preconditions.checkNotNull(b.conf, "Configuration cannot be null");
+
+    StorageLocation location = StorageLocation.parse(b.volumeRootStr);
+    hddsRootDir = new File(location.getUri().getPath(), HDDS_VOLUME_DIR);
+    this.state = VolumeState.NOT_INITIALIZED;
+    this.clusterID = b.clusterID;
+    this.datanodeUuid = b.datanodeUuid;
+
+    VolumeInfo.Builder volumeBuilder =
+        new VolumeInfo.Builder(b.volumeRootStr, b.conf)
+        .storageType(b.storageType)
+        .configuredCapacity(b.configuredCapacity);
+    this.volumeInfo = volumeBuilder.build();
+
+    LOG.info("Creating Volume: " + this.hddsRootDir + " of  storage type : " +
+        b.storageType + " and capacity : " + volumeInfo.getCapacity());
+
+    initialize();
+  }
+
+  /**
+   * Initializes the volume.
+   * Creates the Version file if not present,
+   * otherwise returns with IOException.
+   * @throws IOException
+   */
+  private void initialize() throws IOException {
+    VolumeState intialVolumeState = analyzeVolumeState();
+    switch (intialVolumeState) {
+    case NON_EXISTENT:
+      // Root directory does not exist. Create it.
+      if (!hddsRootDir.mkdir()) {
+        throw new IOException("Cannot create directory " + hddsRootDir);
+      }
+      setState(VolumeState.NOT_FORMATTED);
+      createVersionFile();
+      break;
+    case NOT_FORMATTED:
+      // Version File does not exist. Create it.
+      createVersionFile();
+      break;
+    case NOT_INITIALIZED:
+      // Version File exists. Verify its correctness and update property fields.
+      readVersionFile();
+      setState(VolumeState.NORMAL);
+      break;
+    default:
+      throw new IOException("Unrecognized initial state : " +
+          intialVolumeState + "of volume : " + hddsRootDir);
+    }
+  }
+
+  private VolumeState analyzeVolumeState() {
+    if (!hddsRootDir.exists()) {
+      return VolumeState.NON_EXISTENT;
+    }
+    if (!getVersionFile().exists()) {
+      return VolumeState.NOT_FORMATTED;
+    }
+    return VolumeState.NOT_INITIALIZED;
+  }
+
+  public void format(String cid) throws IOException {
+    Preconditions.checkNotNull(cid, "clusterID cannot be null while " +
+        "formatting Volume");
+    this.clusterID = cid;
+    initialize();
+  }
+
+  /**
+   * Create Version File and write property fields into it.
+   * @throws IOException
+   */
+  private void createVersionFile() throws IOException {
+    this.storageID = HddsVolumeUtil.generateUuid();
+    this.cTime = Time.now();
+    this.layoutVersion = ChunkLayOutVersion.getLatestVersion().getVersion();
+
+    if (this.clusterID == null || datanodeUuid == null) {
+      // HddsDatanodeService does not have the cluster information yet. Wait
+      // for registration with SCM.
+      LOG.debug("ClusterID not available. Cannot format the volume {}",
+          this.hddsRootDir.getPath());
+      setState(VolumeState.NOT_FORMATTED);
+    } else {
+      // Write the version file to disk.
+      writeVersionFile();
+      setState(VolumeState.NORMAL);
+    }
+  }
+
+  private void writeVersionFile() throws IOException {
+    Preconditions.checkNotNull(this.storageID,
+        "StorageID cannot be null in Version File");
+    Preconditions.checkNotNull(this.clusterID,
+        "ClusterID cannot be null in Version File");
+    Preconditions.checkNotNull(this.datanodeUuid,
+        "DatanodeUUID cannot be null in Version File");
+    Preconditions.checkArgument(this.cTime > 0,
+        "Creation Time should be positive");
+    Preconditions.checkArgument(this.layoutVersion ==
+            DataNodeLayoutVersion.getLatestVersion().getVersion(),
+        "Version File should have the latest LayOutVersion");
+
+    File versionFile = getVersionFile();
+    LOG.debug("Writing Version file to disk, {}", versionFile);
+
+    DatanodeVersionFile dnVersionFile = new DatanodeVersionFile(this.storageID,
+        this.clusterID, this.datanodeUuid, this.cTime, this.layoutVersion);
+    dnVersionFile.createVersionFile(versionFile);
+  }
+
+  /**
+   * Read Version File and update property fields.
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be read.
+   *
+   * @throws IOException on error
+   */
+  private void readVersionFile() throws IOException {
+    File versionFile = getVersionFile();
+    Properties props = DatanodeVersionFile.readFrom(versionFile);
+    if (props.isEmpty()) {
+      throw new InconsistentStorageStateException(
+          "Version file " + versionFile + " is missing");
+    }
+
+    LOG.debug("Reading Version file from disk, {}", versionFile);
+    this.storageID = HddsVolumeUtil.getStorageID(props, versionFile);
+    this.clusterID = HddsVolumeUtil.getClusterID(props, versionFile,
+        this.clusterID);
+    this.datanodeUuid = HddsVolumeUtil.getDatanodeUUID(props, versionFile,
+        this.datanodeUuid);
+    this.cTime = HddsVolumeUtil.getCreationTime(props, versionFile);
+    this.layoutVersion = HddsVolumeUtil.getLayOutVersion(props, versionFile);
+  }
+
+  private File getVersionFile() {
+    return HddsVolumeUtil.getVersionFile(hddsRootDir);
+  }
+
+  public File getHddsRootDir() {
+    return hddsRootDir;
+  }
+
+  public StorageType getStorageType() {
+    return volumeInfo.getStorageType();
+  }
+
+  public String getStorageID() {
+    return storageID;
+  }
+
+  public String getClusterID() {
+    return clusterID;
+  }
+
+  public String getDatanodeUuid() {
+    return datanodeUuid;
+  }
+
+  public long getCTime() {
+    return cTime;
+  }
+
+  public int getLayoutVersion() {
+    return layoutVersion;
+  }
+
+  public VolumeState getStorageState() {
+    return state;
+  }
+
+  public long getCapacity() throws IOException {
+    return volumeInfo.getCapacity();
+  }
+
+  public long getAvailable() throws IOException {
+    return volumeInfo.getAvailable();
+  }
+
+  public void setState(VolumeState state) {
+    this.state = state;
+  }
+
+  public boolean isFailed() {
+    return (state == VolumeState.FAILED);
+  }
+
+  public void failVolume() {
+    setState(VolumeState.FAILED);
+    volumeInfo.shutdownUsageThread();
+  }
+
+  public void shutdown() {
+    this.state = VolumeState.NON_EXISTENT;
+    volumeInfo.shutdownUsageThread();
+  }
+
+  /**
+   * VolumeState represents the different states a HddsVolume can be in.
+   */
+  public enum VolumeState {
+    NORMAL,
+    FAILED,
+    NON_EXISTENT,
+    NOT_FORMATTED,
+    NOT_INITIALIZED
+  }
+
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    volumeInfo.setScmUsageForTesting(scmUsageForTest);
+  }
+}

+ 5 - 4
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java

@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.ozone.container.common.impl;
+
+package org.apache.hadoop.ozone.container.common.volume;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,13 +34,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
 
   public static final Log LOG = LogFactory.getLog(
-		RoundRobinVolumeChoosingPolicy.class);
+      RoundRobinVolumeChoosingPolicy.class);
 
   // Stores the index of the next volume to be returned.
   private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
 
   @Override
-  public VolumeInfo chooseVolume(List<VolumeInfo> volumes,
+  public HddsVolume chooseVolume(List<HddsVolume> volumes,
       long maxContainerSize) throws IOException {
 
     // No volumes available to choose from
@@ -56,7 +57,7 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
     long maxAvailable = 0;
 
     while (true) {
-      final VolumeInfo volume = volumes.get(currentVolumeIndex);
+      final HddsVolume volume = volumes.get(currentVolumeIndex);
       long availableVolumeSize = volume.getAvailable();
 
       currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();

+ 24 - 54
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java

@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.common.impl;
+package org.apache.hadoop.ozone.container.common.volume;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.fs.StorageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,9 +35,8 @@ public class VolumeInfo {
 
   private static final Logger LOG = LoggerFactory.getLogger(VolumeInfo.class);
 
-  private final Path rootDir;
+  private final String rootDir;
   private final StorageType storageType;
-  private VolumeState state;
 
   // Space usage calculator
   private VolumeUsage usage;
@@ -45,35 +45,27 @@ public class VolumeInfo {
   // query from the filesystem.
   private long configuredCapacity;
 
+  /**
+   * Builder for VolumeInfo.
+   */
   public static class Builder {
     private final Configuration conf;
-    private final Path rootDir;
+    private final String rootDir;
     private StorageType storageType;
-    private VolumeState state;
     private long configuredCapacity;
 
-    public Builder(Path rootDir, Configuration conf) {
-      this.rootDir = rootDir;
-      this.conf = conf;
+    public Builder(String root, Configuration config) {
+      this.rootDir = root;
+      this.conf = config;
     }
 
-    public Builder(String rootDirStr, Configuration conf) {
-      this.rootDir = new Path(rootDirStr);
-      this.conf = conf;
-    }
-
-    public Builder storageType(StorageType storageType) {
-      this.storageType = storageType;
-      return this;
-    }
-
-    public Builder volumeState(VolumeState state) {
-      this.state = state;
+    public Builder storageType(StorageType st) {
+      this.storageType = st;
       return this;
     }
 
-    public Builder configuredCapacity(long configuredCapacity) {
-      this.configuredCapacity = configuredCapacity;
+    public Builder configuredCapacity(long capacity) {
+      this.configuredCapacity = capacity;
       return this;
     }
 
@@ -85,7 +77,7 @@ public class VolumeInfo {
   private VolumeInfo(Builder b) throws IOException {
 
     this.rootDir = b.rootDir;
-    File root = new File(rootDir.toString());
+    File root = new File(this.rootDir);
 
     Boolean succeeded = root.isDirectory() || root.mkdirs();
 
@@ -100,12 +92,7 @@ public class VolumeInfo {
     this.configuredCapacity = (b.configuredCapacity != 0 ?
         b.configuredCapacity : -1);
 
-    this.state = (b.state != null ? b.state : VolumeState.NOT_FORMATTED);
-
     this.usage = new VolumeUsage(root, b.conf);
-
-    LOG.info("Creating Volume : " + rootDir + " of storage type : " +
-        storageType + " and capacity : " + configuredCapacity);
   }
 
   public long getCapacity() {
@@ -120,32 +107,14 @@ public class VolumeInfo {
     return usage.getScmUsed();
   }
 
-  void shutdown() {
-    this.state = VolumeState.NON_EXISTENT;
-    shutdownUsageThread();
-  }
-
-  void failVolume() {
-    setState(VolumeState.FAILED);
-    shutdownUsageThread();
-  }
-
-  private void shutdownUsageThread() {
+  protected void shutdownUsageThread() {
     if (usage != null) {
       usage.shutdown();
     }
     usage = null;
   }
 
-  void setState(VolumeState state) {
-    this.state = state;
-  }
-
-  public boolean isFailed() {
-    return (state == VolumeState.FAILED);
-  }
-
-  public Path getRootDir() {
+  public String getRootDir() {
     return this.rootDir;
   }
 
@@ -153,10 +122,11 @@ public class VolumeInfo {
     return this.storageType;
   }
 
-  public enum VolumeState {
-    NORMAL,
-    FAILED,
-    NON_EXISTENT,
-    NOT_FORMATTED,
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    usage.setScmUsageForTesting(scmUsageForTest);
   }
 }

+ 309 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java

@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.InstrumentedLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * VolumeSet to manage volumes in a DataNode.
+ */
+public class VolumeSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
+
+  private Configuration conf;
+
+  /**
+   * {@link VolumeSet#volumeMap} maintains a map of all active volumes in the
+   * DataNode. Each volume has one-to-one mapping with a volumeInfo object.
+   */
+  private Map<String, HddsVolume> volumeMap;
+  /**
+   * {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have
+   * failed. The keys in this map and {@link VolumeSet#volumeMap} are
+   * mutually exclusive.
+   */
+  private Map<String, HddsVolume> failedVolumeMap;
+  /**
+   * {@link VolumeSet#volumeStateMap} maintains a list of active volumes per
+   * StorageType.
+   */
+  private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
+
+  /**
+   * Lock to synchronize changes to the VolumeSet. Any update to
+   * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
+   * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
+   */
+  private final AutoCloseableLock volumeSetLock;
+
+  private final DatanodeDetails dnDetails;
+  private String datanodeUuid;
+  private String clusterID;
+
+  public VolumeSet(DatanodeDetails datanodeDetails, Configuration conf)
+      throws DiskOutOfSpaceException {
+    this(datanodeDetails, null, conf);
+  }
+
+  public VolumeSet(DatanodeDetails datanodeDetails, String clusterID,
+      Configuration conf)
+      throws DiskOutOfSpaceException {
+    this.dnDetails = datanodeDetails;
+    this.datanodeUuid = datanodeDetails.getUuidString();
+    this.clusterID = clusterID;
+    this.conf = conf;
+    this.volumeSetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+            new ReentrantLock(true),
+            conf.getTimeDuration(
+                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+                TimeUnit.MILLISECONDS),
+            300));
+
+    initializeVolumeSet();
+  }
+
+  // Add DN volumes configured through ConfigKeys to volumeMap.
+  private void initializeVolumeSet() throws DiskOutOfSpaceException {
+    volumeMap = new ConcurrentHashMap<>();
+    failedVolumeMap = new ConcurrentHashMap<>();
+    volumeStateMap = new EnumMap<>(StorageType.class);
+
+    Collection<String> rawLocations = conf.getTrimmedStringCollection(
+        HDDS_DATANODE_DIR_KEY);
+    if (rawLocations.isEmpty()) {
+      rawLocations = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    }
+    if (rawLocations.isEmpty()) {
+      throw new IllegalArgumentException("No location configured in either "
+          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
+    }
+
+    for (StorageType storageType : StorageType.values()) {
+      volumeStateMap.put(storageType, new ArrayList<HddsVolume>());
+    }
+
+    for (String locationString : rawLocations) {
+      try {
+        StorageLocation location = StorageLocation.parse(locationString);
+
+        HddsVolume hddsVolume = createVolume(location.getUri().getPath(),
+            location.getStorageType());
+
+        checkAndSetClusterID(hddsVolume.getClusterID());
+
+        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+        LOG.info("Added Volume : {} to VolumeSet",
+            hddsVolume.getHddsRootDir().getPath());
+      } catch (IOException e) {
+        LOG.error("Failed to parse the storage location: " + locationString, e);
+      }
+    }
+
+    if (volumeMap.size() == 0) {
+      throw new DiskOutOfSpaceException("No storage location configured");
+    }
+  }
+
+  /**
+   * If Version file exists and the {@link VolumeSet#clusterID} is not set yet,
+   * assign it the value from Version file. Otherwise, check that the given
+   * id matches with the id from version file.
+   * @param idFromVersionFile value of the property from Version file
+   * @throws InconsistentStorageStateException
+   */
+  private void checkAndSetClusterID(String idFromVersionFile)
+      throws InconsistentStorageStateException {
+    // If the clusterID is null (not set), assign it the value
+    // from version file.
+    if (this.clusterID == null) {
+      this.clusterID = idFromVersionFile;
+      return;
+    }
+
+    // If the clusterID is already set, it should match with the value from the
+    // version file.
+    if (!idFromVersionFile.equals(this.clusterID)) {
+      throw new InconsistentStorageStateException(
+          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
+              ", and version file has: " + idFromVersionFile);
+    }
+  }
+
+  public void acquireLock() {
+    volumeSetLock.acquire();
+  }
+
+  public void releaseLock() {
+    volumeSetLock.release();
+  }
+
+  private HddsVolume createVolume(String locationString,
+      StorageType storageType) throws IOException {
+    HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
+        .conf(conf)
+        .datanodeUuid(datanodeUuid)
+        .clusterID(clusterID)
+        .storageType(storageType);
+    return volumeBuilder.build();
+  }
+
+
+  // Add a volume to VolumeSet
+  public void addVolume(String dataDir) throws IOException {
+    addVolume(dataDir, StorageType.DEFAULT);
+  }
+
+  // Add a volume to VolumeSet
+  public void addVolume(String volumeRoot, StorageType storageType)
+      throws IOException {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        LOG.warn("Volume : {} already exists in VolumeMap", hddsRoot);
+      } else {
+        if (failedVolumeMap.containsKey(hddsRoot)) {
+          failedVolumeMap.remove(hddsRoot);
+        }
+
+        HddsVolume hddsVolume = createVolume(volumeRoot, storageType);
+        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+
+        LOG.info("Added Volume : {} to VolumeSet",
+            hddsVolume.getHddsRootDir().getPath());
+      }
+    }
+  }
+
+  // Mark a volume as failed
+  public void failVolume(String dataDir) {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
+        hddsVolume.failVolume();
+
+        volumeMap.remove(hddsRoot);
+        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
+        failedVolumeMap.put(hddsRoot, hddsVolume);
+
+        LOG.info("Moving Volume : {} to failed Volumes", hddsRoot);
+      } else if (failedVolumeMap.containsKey(hddsRoot)) {
+        LOG.info("Volume : {} is not active", hddsRoot);
+      } else {
+        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+      }
+    }
+  }
+
+  // Remove a volume from the VolumeSet completely.
+  public void removeVolume(String dataDir) throws IOException {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
+        hddsVolume.shutdown();
+
+        volumeMap.remove(hddsRoot);
+        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
+
+        LOG.info("Removed Volume : {} from VolumeSet", hddsRoot);
+      } else if (failedVolumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = failedVolumeMap.get(hddsRoot);
+        hddsVolume.setState(VolumeState.NON_EXISTENT);
+
+        failedVolumeMap.remove(hddsRoot);
+        LOG.info("Removed Volume : {} from failed VolumeSet", hddsRoot);
+      } else {
+        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+      }
+    }
+  }
+
+  public HddsVolume chooseVolume(long containerSize,
+      VolumeChoosingPolicy choosingPolicy) throws IOException {
+    return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
+  }
+
+  public void shutdown() {
+    for (HddsVolume hddsVolume : volumeMap.values()) {
+      try {
+        hddsVolume.shutdown();
+      } catch (Exception ex) {
+        LOG.error("Failed to shutdown volume : " + hddsVolume.getHddsRootDir(),
+            ex);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public List<HddsVolume> getVolumesList() {
+    return ImmutableList.copyOf(volumeMap.values());
+  }
+
+  @VisibleForTesting
+  public List<HddsVolume> getFailedVolumesList() {
+    return ImmutableList.copyOf(failedVolumeMap.values());
+  }
+
+  @VisibleForTesting
+  public Map<String, HddsVolume> getVolumeMap() {
+    return ImmutableMap.copyOf(volumeMap);
+  }
+
+  @VisibleForTesting
+  public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
+    return ImmutableMap.copyOf(volumeStateMap);
+  }
+}

+ 10 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java

@@ -16,8 +16,9 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.common.impl;
+package org.apache.hadoop.ozone.container.common.volume;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
 import org.apache.hadoop.fs.DF;
@@ -186,4 +187,12 @@ public class VolumeUsage {
       IOUtils.cleanupWithLogger(null, out);
     }
   }
+
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    this.scmUsage = scmUsageForTest;
+  }
 }

+ 21 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+/**
+ This package contains volume/ disk related classes.
+ */

+ 38 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests DatanodeLayOutVersion.
+ */
+public class TestDatanodeLayOutVersion {
+
+  @Test
+  public void testDatanodeLayOutVersion() {
+    // Check Latest Version and description
+    Assert.assertEquals(1, DataNodeLayoutVersion.getLatestVersion()
+        .getVersion());
+    Assert.assertEquals("HDDS Datanode LayOut Version 1", DataNodeLayoutVersion
+        .getLatestVersion().getDescription());
+    Assert.assertEquals(DataNodeLayoutVersion.getAllVersions().length,
+        DataNodeLayoutVersion.getAllVersions().length);
+  }
+}

+ 134 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestDatanodeVersionFile.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * This class tests {@link DatanodeVersionFile}.
+ */
+public class TestDatanodeVersionFile {
+
+  private File versionFile;
+  private DatanodeVersionFile dnVersionFile;
+  private Properties properties;
+
+  private String storageID;
+  private String clusterID;
+  private String datanodeUUID;
+  private long cTime;
+  private int lv;
+
+  @Rule
+  public TemporaryFolder folder= new TemporaryFolder();
+
+  @Before
+  public void setup() throws IOException {
+    versionFile = folder.newFile("Version");
+    storageID = UUID.randomUUID().toString();
+    clusterID = UUID.randomUUID().toString();
+    datanodeUUID = UUID.randomUUID().toString();
+    cTime = Time.now();
+    lv = DataNodeLayoutVersion.getLatestVersion().getVersion();
+
+    dnVersionFile = new DatanodeVersionFile(
+        storageID, clusterID, datanodeUUID, cTime, lv);
+
+    dnVersionFile.createVersionFile(versionFile);
+
+    properties = dnVersionFile.readFrom(versionFile);
+  }
+
+  @Test
+  public void testCreateAndReadVersionFile() throws IOException{
+
+    //Check VersionFile exists
+    assertTrue(versionFile.exists());
+
+    assertEquals(storageID, HddsVolumeUtil.getStorageID(
+        properties, versionFile));
+    assertEquals(clusterID, HddsVolumeUtil.getClusterID(
+        properties, versionFile, clusterID));
+    assertEquals(datanodeUUID, HddsVolumeUtil.getDatanodeUUID(
+        properties, versionFile, datanodeUUID));
+    assertEquals(cTime, HddsVolumeUtil.getCreationTime(
+        properties, versionFile));
+    assertEquals(lv, HddsVolumeUtil.getLayOutVersion(
+        properties, versionFile));
+  }
+
+  @Test
+  public void testIncorrectClusterId() throws IOException{
+    try {
+      String randomClusterID = UUID.randomUUID().toString();
+      HddsVolumeUtil.getClusterID(properties, versionFile,
+          randomClusterID);
+      fail("Test failure in testIncorrectClusterId");
+    } catch (InconsistentStorageStateException ex) {
+      GenericTestUtils.assertExceptionContains("Mismatched ClusterIDs", ex);
+    }
+  }
+
+  @Test
+  public void testVerifyCTime() throws IOException{
+    long invalidCTime = -10;
+    dnVersionFile = new DatanodeVersionFile(
+        storageID, clusterID, datanodeUUID, invalidCTime, lv);
+    dnVersionFile.createVersionFile(versionFile);
+    properties = dnVersionFile.readFrom(versionFile);
+
+    try {
+      HddsVolumeUtil.getCreationTime(properties, versionFile);
+      fail("Test failure in testVerifyCTime");
+    } catch (InconsistentStorageStateException ex) {
+      GenericTestUtils.assertExceptionContains("Invalid Creation time in " +
+          "Version File : " + versionFile, ex);
+    }
+  }
+
+  @Test
+  public void testVerifyLayOut() throws IOException{
+    int invalidLayOutVersion = 100;
+    dnVersionFile = new DatanodeVersionFile(
+        storageID, clusterID, datanodeUUID, cTime, invalidLayOutVersion);
+    dnVersionFile.createVersionFile(versionFile);
+    Properties props = dnVersionFile.readFrom(versionFile);
+
+    try {
+      HddsVolumeUtil.getLayOutVersion(props, versionFile);
+      fail("Test failure in testVerifyLayOut");
+    } catch (InconsistentStorageStateException ex) {
+      GenericTestUtils.assertExceptionContains("Invalid layOutVersion.", ex);
+    }
+  }
+}

+ 0 - 100
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestRoundRobinVolumeChoosingPolicy.java

@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests {@link RoundRobinVolumeChoosingPolicy}.
- */
-public class TestRoundRobinVolumeChoosingPolicy {
-
-  private RoundRobinVolumeChoosingPolicy policy;
-
-  @Before
-  public void setup() {
-   policy = ReflectionUtils.newInstance(
-       RoundRobinVolumeChoosingPolicy.class, null);
-  }
-
-  @Test
-  public void testRRVolumeChoosingPolicy() throws Exception {
-    final List<VolumeInfo> volumes = new ArrayList<>();
-
-    // First volume, with 100 bytes of space.
-    volumes.add(Mockito.mock(VolumeInfo.class));
-    Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
-
-    // Second volume, with 200 bytes of space.
-    volumes.add(Mockito.mock(VolumeInfo.class));
-    Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-
-    // Test two rounds of round-robin choosing
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
-    Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
-
-    // The first volume has only 100L space, so the policy should
-    // choose the second one in case we ask for more.
-    Assert.assertEquals(volumes.get(1),
-        policy.chooseVolume(volumes, 150));
-
-    // Fail if no volume has enough space available
-    try {
-      policy.chooseVolume(volumes, Long.MAX_VALUE);
-      Assert.fail();
-    } catch (IOException e) {
-      // Passed.
-    }
-  }
-
-  @Test
-  public void testRRPolicyExceptionMessage() throws Exception {
-    final List<VolumeInfo> volumes = new ArrayList<>();
-
-    // First volume, with 100 bytes of space.
-    volumes.add(Mockito.mock(VolumeInfo.class));
-    Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
-
-    // Second volume, with 200 bytes of space.
-    volumes.add(Mockito.mock(VolumeInfo.class));
-    Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
-
-    int blockSize = 300;
-    try {
-      policy.chooseVolume(volumes, blockSize);
-      Assert.fail("expected to throw DiskOutOfSpaceException");
-    } catch(DiskOutOfSpaceException e) {
-      Assert.assertEquals("Not returnig the expected message",
-          "Out of space: The volume with the most available space (=" + 200
-              + " B) is less than the container size (=" + blockSize + " B).",
-          e.getMessage());
-    }
-  }
-}

+ 145 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestHddsVolume.java

@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Unit tests for {@link HddsVolume}.
+ */
+public class TestHddsVolume {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private static final String DATANODE_UUID = UUID.randomUUID().toString();
+  private static final String CLUSTER_ID = UUID.randomUUID().toString();
+  private static final Configuration CONF = new Configuration();
+  private static final String DU_CACHE_FILE = "scmUsed";
+
+  private File rootDir;
+  private HddsVolume volume;
+  private File versionFile;
+
+  @Before
+  public void setup() throws Exception {
+    rootDir = new File(folder.getRoot(), HddsVolume.HDDS_VOLUME_DIR);
+    volume = new HddsVolume.Builder(folder.getRoot().getPath())
+        .datanodeUuid(DATANODE_UUID)
+        .conf(CONF)
+        .build();
+    versionFile = HddsVolumeUtil.getVersionFile(rootDir);
+  }
+
+  @Test
+  public void testHddsVolumeInitialization() throws Exception {
+
+    // The initial state of HddsVolume should be "NOT_FORMATTED" when
+    // clusterID is not specified and the version file should not be written
+    // to disk.
+    assertTrue(volume.getClusterID() == null);
+    assertEquals(volume.getStorageType(), StorageType.DEFAULT);
+    assertEquals(volume.getStorageState(),
+        HddsVolume.VolumeState.NOT_FORMATTED);
+    assertFalse("Version file should not be created when clusterID is not " +
+        "known.", versionFile.exists());
+
+
+    // Format the volume with clusterID.
+    volume.format(CLUSTER_ID);
+
+    // The state of HddsVolume after formatting with clusterID should be
+    // NORMAL and the version file should exist.
+    assertTrue("Volume format should create Version file",
+        versionFile.exists());
+    assertEquals(volume.getClusterID(), CLUSTER_ID);
+    assertEquals(volume.getStorageState(), HddsVolume.VolumeState.NORMAL);
+  }
+
+  @Test
+  public void testReadPropertiesFromVersionFile() throws Exception {
+    volume.format(CLUSTER_ID);
+
+    Properties properties = DatanodeVersionFile.readFrom(versionFile);
+
+    String storageID = HddsVolumeUtil.getStorageID(properties, versionFile);
+    String clusterID = HddsVolumeUtil.getClusterID(
+        properties, versionFile, CLUSTER_ID);
+    String datanodeUuid = HddsVolumeUtil.getDatanodeUUID(
+        properties, versionFile, DATANODE_UUID);
+    long cTime = HddsVolumeUtil.getCreationTime(
+        properties, versionFile);
+    int layoutVersion = HddsVolumeUtil.getLayOutVersion(
+        properties, versionFile);
+
+    assertEquals(volume.getStorageID(), storageID);
+    assertEquals(volume.getClusterID(), clusterID);
+    assertEquals(volume.getDatanodeUuid(), datanodeUuid);
+    assertEquals(volume.getCTime(), cTime);
+    assertEquals(volume.getLayoutVersion(), layoutVersion);
+  }
+
+  @Test
+  public void testShutdown() throws Exception{
+    // Return dummy value > 0 for scmUsage so that scm cache file is written
+    // during shutdown.
+    GetSpaceUsed scmUsageMock = Mockito.mock(GetSpaceUsed.class);
+    volume.setScmUsageForTesting(scmUsageMock);
+    Mockito.when(scmUsageMock.getUsed()).thenReturn(Long.valueOf(100));
+
+    assertTrue("Available volume should be positive",
+        volume.getAvailable() > 0);
+
+    // Shutdown the volume.
+    volume.shutdown();
+
+    // Volume state should be "NON_EXISTENT" when volume is shutdown.
+    assertEquals(volume.getStorageState(),
+        HddsVolume.VolumeState.NON_EXISTENT);
+
+    // Volume should save scmUsed cache file once volume is shutdown
+    File scmUsedFile = new File(folder.getRoot(), DU_CACHE_FILE);
+    System.out.println("scmUsedFile: " + scmUsedFile);
+    assertTrue("scmUsed cache file should be saved on shutdown",
+        scmUsedFile.exists());
+
+    try {
+      // Volume.getAvailable() should fail with NullPointerException as usage
+      // is shutdown.
+      volume.getAvailable();
+      fail("HddsVolume#shutdown test failed");
+    } catch (Exception ex){
+      assertTrue(ex instanceof NullPointerException);
+    }
+  }
+}

+ 131 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests {@link RoundRobinVolumeChoosingPolicy}.
+ */
+public class TestRoundRobinVolumeChoosingPolicy {
+
+  private RoundRobinVolumeChoosingPolicy policy;
+  private List<HddsVolume> volumes;
+
+  private final String baseDir = MiniDFSCluster.getBaseDirectory();
+	private final String volume1 = baseDir + "disk1";
+	private final String volume2 = baseDir + "disk2";
+  private static final String DUMMY_IP_ADDR = "0.0.0.0";
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String dataDirKey = volume1 + "," + volume2;
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirKey);
+    policy = ReflectionUtils.newInstance(
+        RoundRobinVolumeChoosingPolicy.class, null);
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID().toString())
+        .setIpAddress(DUMMY_IP_ADDR)
+        .build();
+    VolumeSet volumeSet = new VolumeSet(datanodeDetails, conf);
+    volumes = volumeSet.getVolumesList();
+  }
+
+  @Test
+  public void testRRVolumeChoosingPolicy() throws Exception {
+    HddsVolume hddsVolume1 = volumes.get(0);
+    HddsVolume hddsVolume2 = volumes.get(1);
+
+    // Set available space in volume1 to 100L
+    setAvailableSpace(hddsVolume1, 100L);
+
+    // Set available space in volume1 to 200L
+    setAvailableSpace(hddsVolume2, 200L);
+
+    Assert.assertEquals(100L, hddsVolume1.getAvailable());
+    Assert.assertEquals(200L, hddsVolume2.getAvailable());
+
+    // Test two rounds of round-robin choosing
+    Assert.assertEquals(hddsVolume1, policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(hddsVolume2, policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(hddsVolume1, policy.chooseVolume(volumes, 0));
+    Assert.assertEquals(hddsVolume2, policy.chooseVolume(volumes, 0));
+
+    // The first volume has only 100L space, so the policy should
+    // choose the second one in case we ask for more.
+    Assert.assertEquals(hddsVolume2,
+        policy.chooseVolume(volumes, 150));
+
+    // Fail if no volume has enough space available
+    try {
+      policy.chooseVolume(volumes, Long.MAX_VALUE);
+      Assert.fail();
+    } catch (IOException e) {
+      // Passed.
+    }
+  }
+
+  @Test
+  public void testRRPolicyExceptionMessage() throws Exception {
+    HddsVolume hddsVolume1 = volumes.get(0);
+    HddsVolume hddsVolume2 = volumes.get(1);
+
+    // Set available space in volume1 to 100L
+    setAvailableSpace(hddsVolume1, 100L);
+
+    // Set available space in volume1 to 200L
+    setAvailableSpace(hddsVolume2, 200L);
+
+    int blockSize = 300;
+    try {
+      policy.chooseVolume(volumes, blockSize);
+      Assert.fail("expected to throw DiskOutOfSpaceException");
+    } catch(DiskOutOfSpaceException e) {
+      Assert.assertEquals("Not returnig the expected message",
+          "Out of space: The volume with the most available space (=" + 200
+              + " B) is less than the container size (=" + blockSize + " B).",
+          e.getMessage());
+    }
+  }
+
+  private void setAvailableSpace(HddsVolume hddsVolume, long availableSpace)
+      throws IOException {
+    GetSpaceUsed scmUsageMock = Mockito.mock(GetSpaceUsed.class);
+    hddsVolume.setScmUsageForTesting(scmUsageMock);
+    // Set used space to capacity -requiredAvailableSpace so that
+    // getAvailable() returns us the specified availableSpace.
+    Mockito.when(scmUsageMock.getUsed()).thenReturn(
+        (hddsVolume.getCapacity() - availableSpace));
+  }
+}

+ 25 - 17
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestVolumeSet.java → hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSet.java

@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.common.interfaces;
+package org.apache.hadoop.ozone.container.common.volume;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.ozone.container.common.impl.VolumeInfo;
-import org.apache.hadoop.ozone.container.common.impl.VolumeSet;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -36,6 +36,7 @@ import org.junit.rules.Timeout;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Tests {@link VolumeSet} operations.
@@ -43,14 +44,20 @@ import java.util.List;
 public class TestVolumeSet {
 
   private OzoneConfiguration conf;
-  protected VolumeSet volumeSet;
-  protected final String baseDir = MiniDFSCluster.getBaseDirectory();
-  protected final String volume1 = baseDir + "disk1";
-  protected final String volume2 = baseDir + "disk2";
+  private VolumeSet volumeSet;
+  private final String baseDir = MiniDFSCluster.getBaseDirectory();
+  private final String volume1 = baseDir + "disk1";
+  private final String volume2 = baseDir + "disk2";
   private final List<String> volumes = new ArrayList<>();
 
+  private static final String DUMMY_IP_ADDR = "0.0.0.0";
+
   private void initializeVolumeSet() throws Exception {
-    volumeSet = new VolumeSet(conf);
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID().toString())
+        .setIpAddress(DUMMY_IP_ADDR)
+        .build();
+    volumeSet = new VolumeSet(datanodeDetails, conf);
   }
 
   @Rule
@@ -69,7 +76,7 @@ public class TestVolumeSet {
   @Test
   public void testVolumeSetInitialization() throws Exception {
 
-    List<VolumeInfo> volumesList = volumeSet.getVolumesList();
+    List<HddsVolume> volumesList = volumeSet.getVolumesList();
 
     // VolumeSet initialization should add volume1 and volume2 to VolumeSet
     assertEquals("VolumeSet intialization is incorrect",
@@ -83,7 +90,6 @@ public class TestVolumeSet {
   @Test
   public void testAddVolume() throws Exception {
 
-    List<VolumeInfo> volumesList = volumeSet.getVolumesList();
     assertEquals(2, volumeSet.getVolumesList().size());
 
     // Add a volume to VolumeSet
@@ -107,8 +113,9 @@ public class TestVolumeSet {
     // Failed volume should be added to FailedVolumeList
     assertEquals("Failed volume not present in FailedVolumeMap",
         1, volumeSet.getFailedVolumesList().size());
-    assertEquals("Failed Volume list did not match", volume1,
-        volumeSet.getFailedVolumesList().get(0).getRootDir().toString());
+    assertEquals("Failed Volume list did not match",
+        HddsVolumeUtil.getHddsRoot(volume1),
+        volumeSet.getFailedVolumesList().get(0).getHddsRootDir().getPath());
     assertTrue(volumeSet.getFailedVolumesList().get(0).isFailed());
 
     // Failed volume should not exist in VolumeMap
@@ -119,7 +126,7 @@ public class TestVolumeSet {
   @Test
   public void testRemoveVolume() throws Exception {
 
-    List<VolumeInfo> volumesList = volumeSet.getVolumesList();
+    List<HddsVolume> volumesList = volumeSet.getVolumesList();
     assertEquals(2, volumeSet.getVolumesList().size());
 
     // Remove a volume from VolumeSet
@@ -132,15 +139,16 @@ public class TestVolumeSet {
         LogFactory.getLog(VolumeSet.class));
     volumeSet.removeVolume(volume1);
     assertEquals(1, volumeSet.getVolumesList().size());
-    String expectedLogMessage = "Volume : " + volume1 + " does not exist in "
-        + "VolumeSet";
+    String expectedLogMessage = "Volume : " +
+        HddsVolumeUtil.getHddsRoot(volume1) + " does not exist in VolumeSet";
     assertTrue("Log output does not contain expected log message: "
         + expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
   }
 
   private boolean checkVolumeExistsInVolumeSet(String volume) {
-    for (VolumeInfo volumeInfo : volumeSet.getVolumesList()) {
-      if (volumeInfo.getRootDir().toString().equals(volume)) {
+    for (HddsVolume hddsVolume : volumeSet.getVolumesList()) {
+      if (hddsVolume.getHddsRootDir().getPath().equals(
+          HddsVolumeUtil.getHddsRoot(volume))) {
         return true;
       }
     }