Browse Source

HDFS-11444. Ozone: Fix datanode ID handling in MiniOzoneCluster. Contributed by Weiwei Yang.

Anu Engineer 8 years ago
parent
commit
98d3360e3a
14 changed files with 524 additions and 82 deletions
  1. 10 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
  2. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  3. 85 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
  4. 63 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  5. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  6. 25 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
  7. 24 71
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
  8. 25 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
  9. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
  10. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  11. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  12. 223 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
  13. 19 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
  14. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.scm;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
@@ -87,6 +88,15 @@ public class XceiverClient implements XceiverClientSpi {
     channelFuture = b.connect(leader.getHostName(), port).sync();
   }
 
+  /**
+   * Returns if the exceiver client connects to a server.
+   * @return True if the connection is alive, false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isConnected() {
+    return channelFuture.channel().isActive();
+  }
+
   @Override
   public void close() {
     if(group != null) {

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -30,6 +30,20 @@ public final class OzoneConfigKeys {
   public static final String DFS_CONTAINER_IPC_PORT =
       "dfs.container.ipc";
   public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+
+  /**
+   *
+   * When set to true, allocate a random free port for ozone container,
+   * so that a mini cluster is able to launch multiple containers on a node.
+   *
+   * When set to false (default), container port is fixed as specified by
+   * DFS_CONTAINER_IPC_PORT_DEFAULT.
+   */
+  public static final String DFS_CONTAINER_IPC_RANDOM_PORT =
+      "dfs.container.ipc.random.port";
+  public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
+      false;
+
   public static final String OZONE_LOCALSTORAGE_ROOT =
       "ozone.localstorage.root";
   public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";

+ 85 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java

@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.utils.LevelDBStore;
@@ -30,9 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.apache.commons.io.FilenameUtils.removeExtension;
 import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
@@ -367,4 +373,83 @@ public final class ContainerUtils {
     FileUtils.forceDelete(containerPath.toFile());
     FileUtils.forceDelete(metaPath.toFile());
   }
+
+  /**
+   * Write datanode ID protobuf messages to an ID file.
+   * The old ID file will be overwritten.
+   *
+   * @param ids A set of {@link DatanodeID}
+   * @param path Local ID file path
+   * @throws IOException When read/write error occurs
+   */
+  private synchronized static void writeDatanodeIDs(List<DatanodeID> ids,
+      File path) throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the datanode ID file.");
+      }
+    } else {
+      if(!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create datanode ID directories.");
+      }
+    }
+    try (FileOutputStream out = new FileOutputStream(path)) {
+      for (DatanodeID id : ids) {
+        HdfsProtos.DatanodeIDProto dnId = id.getProtoBufMessage();
+        dnId.writeDelimitedTo(out);
+      }
+    }
+  }
+
+  /**
+   * Persistent a {@link DatanodeID} to a local file.
+   * It reads the IDs first and append a new entry only if the ID is new.
+   * This is to avoid on some dirty environment, this file gets too big.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  public synchronized static void writeDatanodeIDTo(DatanodeID dnID,
+      File path) throws IOException {
+    List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(path);
+    // Only create or overwrite the file
+    // if the ID doesn't exist in the ID file
+    for (DatanodeID id : ids) {
+      if (id.getProtoBufMessage()
+          .equals(dnID.getProtoBufMessage())) {
+        return;
+      }
+    }
+    ids.add(dnID);
+    writeDatanodeIDs(ids, path);
+  }
+
+  /**
+   * Read {@link DatanodeID} from a local ID file and return a set of
+   * datanode IDs. If the ID file doesn't exist, an empty set is returned.
+   *
+   * @param path ID file local path
+   * @return A set of {@link DatanodeID}
+   * @throws IOException If the id file is malformed or other I/O exceptions
+   */
+  public synchronized static List<DatanodeID> readDatanodeIDsFrom(File path)
+      throws IOException {
+    List<DatanodeID> ids = new ArrayList<DatanodeID>();
+    if (!path.exists()) {
+      return ids;
+    }
+    try(FileInputStream in = new FileInputStream(path)) {
+      while(in.available() > 0) {
+        try {
+          HdfsProtos.DatanodeIDProto id =
+              HdfsProtos.DatanodeIDProto.parseDelimitedFrom(in);
+          ids.add(DatanodeID.getFromProtoBuf(id));
+        } catch (IOException e) {
+          throw new IOException("Failed to parse Datanode ID from "
+              + path.getAbsolutePath(), e);
+        }
+      }
+    }
+    return ids;
+  }
 }

+ 63 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.Time;
@@ -44,13 +45,16 @@ public class DatanodeStateMachine implements Closeable {
   private final long heartbeatFrequency;
   private StateContext context;
   private final OzoneContainer container;
+  private DatanodeID datanodeID = null;
 
   /**
    * Constructs a a datanode state machine.
    *
+   * @param datanodeID - DatanodeID used to identify a datanode
    * @param conf - Configration.
    */
-  public DatanodeStateMachine(Configuration conf) throws IOException {
+  public DatanodeStateMachine(DatanodeID datanodeID,
+      Configuration conf) throws IOException {
     this.conf = conf;
     executorService = HadoopExecutors.newCachedThreadPool(
                 new ThreadFactoryBuilder().setDaemon(true)
@@ -60,6 +64,26 @@ public class DatanodeStateMachine implements Closeable {
     heartbeatFrequency = TimeUnit.SECONDS.toMillis(
         OzoneClientUtils.getScmHeartbeatInterval(conf));
     container = new OzoneContainer(conf);
+    this.datanodeID = datanodeID;
+  }
+
+  public DatanodeStateMachine(Configuration conf)
+      throws IOException {
+    this(null, conf);
+  }
+
+  public void setDatanodeID(DatanodeID datanodeID) {
+    this.datanodeID = datanodeID;
+  }
+
+  /**
+   *
+   * Return DatanodeID if set, return null otherwise.
+   *
+   * @return datanodeID
+   */
+  public DatanodeID getDatanodeID() {
+    return this.datanodeID;
   }
 
   /**
@@ -71,10 +95,14 @@ public class DatanodeStateMachine implements Closeable {
     return connectionManager;
   }
 
+  public OzoneContainer getContainer() {
+    return this.container;
+  }
+
   /**
    * Runs the state machine at a fixed frequency.
    */
-  public void start() throws IOException {
+  private void start() throws IOException {
     long now = 0;
     long nextHB = 0;
     container.start();
@@ -216,12 +244,14 @@ public class DatanodeStateMachine implements Closeable {
     }
   }
 
-  public static DatanodeStateMachine initStateMachine(Configuration conf)
-      throws IOException {
-    DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
+  /**
+   * Start datanode state machine as a single thread daemon.
+   */
+  public void startDaemon() {
     Runnable startStateMachineTask = () -> {
       try {
-        stateMachine.start();
+        start();
+        LOG.info("Ozone container server started.");
       } catch (Exception ex) {
         LOG.error("Unable to start the DatanodeState Machine", ex);
       }
@@ -231,6 +261,32 @@ public class DatanodeStateMachine implements Closeable {
         .setNameFormat("Datanode State Machine Thread - %d")
         .build().newThread(startStateMachineTask);
     thread.start();
-    return stateMachine;
+  }
+
+  /**
+   * Stop the daemon thread of the datanode state machine.
+   */
+  public synchronized void stopDaemon() {
+    try {
+      context.setState(DatanodeStates.SHUTDOWN);
+      this.close();
+      LOG.info("Ozone container server stopped.");
+    } catch (IOException e) {
+      LOG.error("Stop ozone container server failed.", e);
+    }
+  }
+
+  /**
+   *
+   * Check if the datanode state machine daemon is stopped.
+   *
+   * @return True if datanode state machine daemon is stopped
+   * and false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isDaemonStopped() {
+    return this.executorService.isShutdown()
+        && this.getContext().getExecutionCount() == 0
+        && this.getContext().getState() == DatanodeStates.SHUTDOWN;
   }
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -80,6 +80,15 @@ public class StateContext {
     return parent;
   }
 
+  /**
+   * Get the container server port.
+   * @return The container server port if available, return -1 if otherwise
+   */
+  public int getContainerPort() {
+    return parent == null ?
+        -1 : parent.getContainer().getContainerServerPort();
+  }
+
   /**
    * Returns true if we are entering a new state.
    *

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java

@@ -18,14 +18,19 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
 
 import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.scm.ScmConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -87,9 +92,29 @@ public class InitDatanodeState implements DatanodeState,
         connectionManager.addSCMServer(addr);
       }
     }
+
+    // If datanode ID is set, persist it to the ID file.
+    persistContainerDatanodeID();
+
     return this.context.getState().getNextState();
   }
 
+  /**
+   * Update Ozone container port to the datanode ID,
+   * and persist the ID to a local file.
+   */
+  private void persistContainerDatanodeID() throws IOException {
+    String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID);
+    File idPath = new File(dataNodeIDPath);
+    int containerPort = this.context.getContainerPort();
+    DatanodeID datanodeID = this.context.getParent().getDatanodeID();
+    if (datanodeID != null) {
+      datanodeID.setContainerPort(containerPort);
+      ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
+      LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
+    }
+  }
+
   /**
    * Called before entering this state.
    */

+ 24 - 71
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java

@@ -19,8 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
@@ -35,16 +34,11 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -83,63 +77,30 @@ public class RunningDatanodeState implements DatanodeState {
   private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
       readPersistedDatanodeID(Path idPath) throws IOException {
     Preconditions.checkNotNull(idPath);
-    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-        containerIDProto;
-    try (FileInputStream stream = new FileInputStream(idPath.toFile())) {
-      containerIDProto = StorageContainerDatanodeProtocolProtos
-          .ContainerNodeIDProto.parseFrom(stream);
-      return containerIDProto;
+    DatanodeID datanodeID = null;
+    List<DatanodeID> datanodeIDs =
+        ContainerUtils.readDatanodeIDsFrom(idPath.toFile());
+    int containerPort = this.context.getContainerPort();
+    for(DatanodeID dnId : datanodeIDs) {
+      if(dnId.getContainerPort() == containerPort) {
+        datanodeID = dnId;
+        break;
+      }
     }
-  }
-
-  /**
-   * Create a DatanodeID from the datanode information.
-   *
-   * @return DatanodeID
-   * @throws UnknownHostException
-   */
-  private DatanodeID createDatanodeID() throws UnknownHostException {
-    DatanodeID temp = new DatanodeID(
-        //TODO : Replace this with proper network and kerberos
-        // support code.
-        InetAddress.getLocalHost().getHostAddress(),
-        DataNode.getHostName(conf),
-        UUID.randomUUID().toString(),
-        0, /** XferPort - SCM does not use this port  */
-        0, /** Info port - SCM does not use this port */
-        0, /** Info Secure Port - SCM does not use this port */
-        0); /** IPC port - SCM does not use this port */
-
-    // TODO: make this dynamically discoverable. SCM can hand out this
-    // port number to calling applications. This makes it easy to run multiple
-    // container endpoints on the same machine.
-    temp.setContainerPort(OzoneClientUtils.getContainerPort(conf));
-    return temp;
-  }
 
-  /**
-   * Creates a new ContainerID that persists both DatanodeID and ClusterID.
-   *
-   * @param idPath Path to the id file.
-   * @return ContainerNodeIDProto
-   * @throws UnknownHostException
-   */
-  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-      createNewContainerID(Path idPath)
-      throws IOException {
-
-    if(!idPath.getParent().toFile().exists() &&
-        !idPath.getParent().toFile().mkdirs()) {
-      LOG.error("Failed to create container ID locations. Path: {}",
-          idPath.getParent());
-      throw new IOException("Unable to create container ID directories.");
-    }
-    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-        containerIDProto = StorageContainerDatanodeProtocolProtos
-        .ContainerNodeIDProto.newBuilder()
-        .setDatanodeID(createDatanodeID().getProtoBufMessage()).build();
-    try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) {
-      stream.write(containerIDProto.toByteArray());
+    if (datanodeID == null) {
+      throw new IOException("No valid datanode ID found from "
+          + idPath.toFile().getAbsolutePath()
+          + " that matches container port "
+          + containerPort);
+    } else {
+      StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+          containerIDProto =
+          StorageContainerDatanodeProtocolProtos
+              .ContainerNodeIDProto
+              .newBuilder()
+              .setDatanodeID(datanodeID.getProtoBufMessage())
+              .build();
       return containerIDProto;
     }
   }
@@ -171,15 +132,7 @@ public class RunningDatanodeState implements DatanodeState {
     } catch (IOException ex) {
       LOG.trace("Not able to find container Node ID, creating it.", ex);
     }
-    // Not found, let us create a new datanode ID, persist it and return that
-    // info to SCM.
-    try {
-      nodeID = createNewContainerID(Paths.get(dataNodeIDPath));
-      LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
-      return nodeID;
-    } catch (IOException ex) {
-      LOG.error("Creating new node ID failed.", ex);
-    }
+    this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
     return null;
   }
 

+ 25 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java

@@ -29,15 +29,20 @@ import io.netty.handler.logging.LoggingHandler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.ServerSocket;
 
 /**
  * Creates a netty server endpoint that acts as the communication layer for
  * Ozone containers.
  */
 public final class XceiverServer implements XceiverServerSpi {
-  private final int port;
+  private static final Logger
+      LOG = LoggerFactory.getLogger(XceiverServer.class);
+  private int port;
   private final ContainerDispatcher storageContainer;
 
   private EventLoopGroup bossGroup;
@@ -52,11 +57,30 @@ public final class XceiverServer implements XceiverServerSpi {
   public XceiverServer(Configuration conf,
                        ContainerDispatcher dispatcher) {
     Preconditions.checkNotNull(conf);
+
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    // Get an available port on current node and
+    // use that as the container port
+    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        socket.setReuseAddress(true);
+        this.port = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", this.port);
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", this.port, e);
+      }
+    }
     this.storageContainer = dispatcher;
   }
 
+  @Override
+  public int getIPCPort() {
+    return this.port;
+  }
+
   @Override
   public void start() throws IOException {
     bossGroup = new NioEventLoopGroup();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java

@@ -28,4 +28,7 @@ public interface XceiverServerSpi {
 
   /** Stops a running server. */
   void stop();
+
+  /** Get server IPC port. */
+  int getIPCPort();
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -167,4 +167,13 @@ public class OzoneContainer {
   public SCMNodeReport getNodeReport() throws IOException {
     return this.manager.getNodeReport();
   }
+
+  /**
+   * Returns the container server IPC port.
+   *
+   * @return Container server IPC port.
+   */
+  public int getContainerServerPort() {
+    return server.getIPCPort();
+  }
 }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -173,6 +173,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
         LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
             scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
             numDataNodes);
+
         return false;
       }
     }, 1000, 5 * 60 * 1000); //wait for 5 mins.
@@ -228,6 +229,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
     private Boolean ozoneEnabled = true;
     private Boolean waitForChillModeFinish = true;
     private int containerWorkerThreadInterval = 1;
+    private Boolean randomContainerPort = true;
 
     /**
      * Creates a new Builder.
@@ -247,6 +249,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster
       runID = UUID.randomUUID();
     }
 
+    public Builder setRandomContainerPort(boolean randomPort) {
+      this.randomContainerPort = randomPort;
+      return this;
+    }
+
     @Override
     public Builder numDataNodes(int val) {
       super.numDataNodes(val);
@@ -319,6 +326,10 @@ public final class MiniOzoneCluster extends MiniDFSCluster
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
 
+      // Use random ports for ozone containers in mini cluster,
+      // in order to launch multiple container servers per node.
+      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+          randomContainerPort);
 
       StorageContainerManager scm = new StorageContainerManager(conf);
       scm.start();

+ 223 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java

@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.test.TestGenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.*;
+
+/**
+ * Test cases for mini ozone cluster.
+ */
+public class TestMiniOzoneCluster {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
+  private final static File WRITE_TMP = new File(TEST_ROOT, "write");
+  private final static File READ_TMP = new File(TEST_ROOT, "read");
+
+  @BeforeClass
+  public static void setup() {
+    conf = new OzoneConfiguration();
+    WRITE_TMP.mkdirs();
+    READ_TMP.mkdirs();
+    WRITE_TMP.deleteOnExit();
+    READ_TMP.deleteOnExit();
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster.close();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testStartMultipleDatanodes() throws Exception {
+    final int numberOfNodes = 3;
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(numberOfNodes)
+        .setHandlerType("distributed").build();
+
+    // make sure datanode.id file is correct
+    File idPath = new File(
+        conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
+    assertTrue(idPath.exists());
+    List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(idPath);
+    assertEquals(numberOfNodes, ids.size());
+
+    List<DataNode> datanodes = cluster.getDataNodes();
+    assertEquals(datanodes.size(), numberOfNodes);
+    for(DataNode dn : datanodes) {
+      // Each datanode ID should match an entry in the ID file
+      assertTrue("Datanode ID not found in ID file",
+          ids.contains(dn.getDatanodeId()));
+
+      // Create a single member pipe line
+      String containerName = OzoneUtils.getRequestID();
+      DatanodeID dnId = dn.getDatanodeId();
+      Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid());
+      pipeline.addMember(dnId);
+      pipeline.setContainerName(containerName);
+
+      // Verify client is able to connect to the container
+      try (XceiverClient client = new XceiverClient(pipeline, conf)){
+        client.connect();
+        assertTrue(client.isConnected());
+      }
+    }
+  }
+
+  @Test
+  public void testDatanodeIDPersistent() throws Exception {
+    // Generate IDs for testing
+    DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
+    DatanodeID id2 = DFSTestUtil.getLocalDatanodeID(2);
+    DatanodeID id3 = DFSTestUtil.getLocalDatanodeID(3);
+    id1.setContainerPort(1);
+    id2.setContainerPort(2);
+    id3.setContainerPort(3);
+
+    // Write a single ID to the file and read it out
+    File validIdsFile = new File(WRITE_TMP, "valid-values.id");
+    validIdsFile.delete();
+    ContainerUtils.writeDatanodeIDTo(id1, validIdsFile);
+    List<DatanodeID> validIds = ContainerUtils
+        .readDatanodeIDsFrom(validIdsFile);
+    assertEquals(1, validIds.size());
+    DatanodeID id11 = validIds.iterator().next();
+    assertEquals(id11, id1);
+    assertEquals(id11.getProtoBufMessage(), id1.getProtoBufMessage());
+
+    // Write should avoid duplicate entries
+    File noDupIDFile = new File(WRITE_TMP, "no-dup-values.id");
+    noDupIDFile.delete();
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id2, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id3, noDupIDFile);
+
+    List<DatanodeID> noDupIDs =ContainerUtils
+        .readDatanodeIDsFrom(noDupIDFile);
+    assertEquals(3, noDupIDs.size());
+    assertTrue(noDupIDs.contains(id1));
+    assertTrue(noDupIDs.contains(id2));
+    assertTrue(noDupIDs.contains(id3));
+
+    // Write should fail if unable to create file or directory
+    File invalidPath = new File(WRITE_TMP, "an/invalid/path");
+    try {
+      ContainerUtils.writeDatanodeIDTo(id1, invalidPath);
+    } catch (Exception e) {
+      e.printStackTrace();
+      assertTrue(e instanceof IOException);
+    }
+
+    // Read should return an empty value if file doesn't exist
+    File nonExistFile = new File(READ_TMP, "non_exist.id");
+    nonExistFile.delete();
+    List<DatanodeID> emptyIDs =
+        ContainerUtils.readDatanodeIDsFrom(nonExistFile);
+    assertTrue(emptyIDs.isEmpty());
+
+    // Read should fail if the file is malformed
+    File malformedFile = new File(READ_TMP, "malformed.id");
+    createMalformedIDFile(malformedFile);
+    try {
+      ContainerUtils.readDatanodeIDsFrom(malformedFile);
+      fail("Read a malformed ID file should fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IOException);
+    }
+  }
+
+  @Test
+  public void testContainerRandomPort() throws IOException {
+    Configuration ozoneConf = SCMTestUtils.getConf();
+    File testDir = PathUtils.getTestDir(TestOzoneContainer.class);
+    ozoneConf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
+
+    // Each instance of SM will create an ozone container
+    // that bounds to a random port.
+    ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
+    try (
+        DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
+    ) {
+      HashSet<Integer> ports = new HashSet<Integer>();
+      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm2.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm3.getContainer().getContainerServerPort()));
+    }
+
+    // Turn off the random port flag and test again
+    ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
+    try (
+        DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
+    ) {
+      HashSet<Integer> ports = new HashSet<Integer>();
+      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
+      assertFalse(ports.add(sm2.getContainer().getContainerServerPort()));
+      assertFalse(ports.add(sm3.getContainer().getContainerServerPort()));
+      assertEquals(ports.iterator().next().intValue(),
+          conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+              OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT));
+    }
+  }
+
+  private void createMalformedIDFile(File malformedFile)
+      throws IOException{
+    malformedFile.delete();
+    DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
+    ContainerUtils.writeDatanodeIDTo(id1, malformedFile);
+
+    FileOutputStream out = new FileOutputStream(malformedFile);
+    out.write("malformed".getBytes());
+    out.close();
+  }
+}

+ 19 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java

@@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.container.common;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -51,6 +54,7 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.scm.ScmConfigKeys
     .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the datanode state machine class and its states.
@@ -134,14 +138,18 @@ public class TestDatanodeStateMachine {
    * @throws InterruptedException
    */
   @Test
-  public void testDatanodeStateMachineStartThread() throws IOException,
+  public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        DatanodeStateMachine.initStateMachine(conf)) {
+        new DatanodeStateMachine(conf)) {
+      stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
       GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
           1000, 30000);
+
+      stateMachine.stopDaemon();
+      assertTrue(stateMachine.isDaemonStopped());
     }
   }
 
@@ -178,6 +186,15 @@ public class TestDatanodeStateMachine {
   @Test
   public void testDatanodeStateContext() throws IOException,
       InterruptedException, ExecutionException, TimeoutException {
+    // There is no mini cluster started in this test,
+    // create a ID file so that state machine could load a fake datanode ID.
+    File idPath = new File(
+        conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
+    idPath.delete();
+    DatanodeID dnID = DFSTestUtil.getLocalDatanodeID();
+    dnID.setContainerPort(ScmConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    ContainerUtils.writeDatanodeIDTo(dnID, idPath);
+
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -57,6 +57,7 @@ public class TestOzoneContainer {
     MiniOzoneCluster cluster = null;
     try {
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
       // We don't start Ozone Container via data node, we will do it
       // independently in our test path.
@@ -108,6 +109,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -208,6 +210,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -273,6 +276,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -364,5 +368,4 @@ public class TestOzoneContainer {
       }
     }
   }
-
 }