Browse Source

HDFS-12720. Ozone: Ratis options are not passed from KSM Client protobuf helper correctly. Contributed by Mukul Kumar Singh.

Mukul Kumar Singh 7 years ago
parent
commit
013c36f3cc
15 changed files with 278 additions and 57 deletions
  1. 19 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  3. 4 4
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
  5. 10 17
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
  6. 1 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
  7. 10 12
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
  8. 8 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
  9. 15 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
  10. 9 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
  11. 12 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
  12. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  13. 29 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  14. 123 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
  15. 30 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java

+ 19 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@@ -100,7 +102,8 @@ public class ChunkGroupOutputStream extends OutputStream {
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
       KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
-      int chunkSize, String requestId) throws IOException {
+      int chunkSize, String requestId, ReplicationFactor factor,
+      ReplicationType type) throws IOException {
     this.streamEntries = new ArrayList<>();
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.currentStreamIndex = 0;
     this.byteOffset = 0;
     this.byteOffset = 0;
@@ -111,6 +114,8 @@ public class ChunkGroupOutputStream extends OutputStream {
         .setVolumeName(info.getVolumeName())
         .setVolumeName(info.getVolumeName())
         .setBucketName(info.getBucketName())
         .setBucketName(info.getBucketName())
         .setKeyName(info.getKeyName())
         .setKeyName(info.getKeyName())
+        .setType(type)
+        .setFactor(factor)
         .setDataSize(info.getDataSize()).build();
         .setDataSize(info.getDataSize()).build();
     this.openID = handler.getId();
     this.openID = handler.getId();
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClientManager = xceiverClientManager;
@@ -292,6 +297,8 @@ public class ChunkGroupOutputStream extends OutputStream {
     private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
     private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
     private int chunkSize;
     private int chunkSize;
     private String requestID;
     private String requestID;
+    private ReplicationType type;
+    private ReplicationFactor factor;
 
 
     public Builder setHandler(OpenKeySession handler) {
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
       this.openHandler = handler;
@@ -325,9 +332,19 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
+    public Builder setType(ReplicationType type) {
+      this.type = type;
+      return this;
+    }
+
+    public Builder setFactor(ReplicationFactor replicationFactor) {
+      this.factor = replicationFactor;
+      return this;
+    }
+
     public ChunkGroupOutputStream build() throws IOException {
     public ChunkGroupOutputStream build() throws IOException {
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          ksmClient, chunkSize, requestID);
+          ksmClient, chunkSize, requestID, factor, type);
     }
     }
   }
   }
 
 

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -461,6 +461,8 @@ public class RpcClient implements ClientProtocol {
             .setKsmClient(keySpaceManagerClient)
             .setKsmClient(keySpaceManagerClient)
             .setChunkSize(chunkSize)
             .setChunkSize(chunkSize)
             .setRequestID(requestId)
             .setRequestID(requestId)
+            .setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
+            .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
             .build();
             .build();
     return new OzoneOutputStream(groupOutputStream);
     return new OzoneOutputStream(groupOutputStream);
   }
   }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java

@@ -101,13 +101,13 @@ public final class KsmKeyArgs {
       return this;
       return this;
     }
     }
 
 
-    public Builder setType(ReplicationType type) {
-      this.type = type;
+    public Builder setType(ReplicationType replicationType) {
+      this.type = replicationType;
       return this;
       return this;
     }
     }
 
 
-    public Builder  setFactor(ReplicationFactor factor) {
-      this.factor = factor;
+    public Builder setFactor(ReplicationFactor replicationFactor) {
+      this.factor = replicationFactor;
       return this;
       return this;
     }
     }
 
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java

@@ -520,6 +520,8 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setBucketName(args.getBucketName())
+        .setFactor(args.getFactor())
+        .setType(args.getType())
         .setKeyName(args.getKeyName());
         .setKeyName(args.getKeyName());
     if (args.getDataSize() > 0) {
     if (args.getDataSize() > 0) {
       keyArgs.setDataSize(args.getDataSize());
       keyArgs.setDataSize(args.getDataSize());
@@ -547,6 +549,8 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
         .setVolumeName(args.getVolumeName())
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .setKeyName(args.getKeyName())
+        .setFactor(args.getFactor())
+        .setType(args.getType())
         .setDataSize(args.getDataSize()).build();
         .setDataSize(args.getDataSize()).build();
     req.setKeyArgs(keyArgs);
     req.setKeyArgs(keyArgs);
     req.setClientID(clientID);
     req.setClientID(clientID);

+ 10 - 17
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.scm;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.List;
 import java.util.List;
 
 
 /**
 /**
@@ -49,7 +48,7 @@ public class XceiverClient extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
   static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
   private final Pipeline pipeline;
   private final Pipeline pipeline;
   private final Configuration config;
   private final Configuration config;
-  private ChannelFuture channelFuture;
+  private Channel channel;
   private Bootstrap b;
   private Bootstrap b;
   private EventLoopGroup group;
   private EventLoopGroup group;
 
 
@@ -70,9 +69,7 @@ public class XceiverClient extends XceiverClientSpi {
 
 
   @Override
   @Override
   public void connect() throws Exception {
   public void connect() throws Exception {
-    if (channelFuture != null
-        && channelFuture.channel() != null
-        && channelFuture.channel().isActive()) {
+    if (channel != null && channel.isActive()) {
       throw new IOException("This client is already connected to a host.");
       throw new IOException("This client is already connected to a host.");
     }
     }
 
 
@@ -92,7 +89,7 @@ public class XceiverClient extends XceiverClientSpi {
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     }
     }
     LOG.debug("Connecting to server Port : " + port);
     LOG.debug("Connecting to server Port : " + port);
-    channelFuture = b.connect(leader.getHostName(), port).sync();
+    channel = b.connect(leader.getHostName(), port).sync().channel();
   }
   }
 
 
   /**
   /**
@@ -102,17 +99,13 @@ public class XceiverClient extends XceiverClientSpi {
    */
    */
   @VisibleForTesting
   @VisibleForTesting
   public boolean isConnected() {
   public boolean isConnected() {
-    return channelFuture.channel().isActive();
+    return channel.isActive();
   }
   }
 
 
   @Override
   @Override
   public void close() {
   public void close() {
     if (group != null) {
     if (group != null) {
-      group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
-    }
-
-    if (channelFuture != null) {
-      channelFuture.channel().close();
+      group.shutdownGracefully().awaitUninterruptibly();
     }
     }
   }
   }
 
 
@@ -126,11 +119,11 @@ public class XceiverClient extends XceiverClientSpi {
       ContainerProtos.ContainerCommandRequestProto request)
       ContainerProtos.ContainerCommandRequestProto request)
       throws IOException {
       throws IOException {
     try {
     try {
-      if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
+      if ((channel == null) || (!channel.isActive())) {
         throw new IOException("This channel is not connected.");
         throw new IOException("This channel is not connected.");
       }
       }
       XceiverClientHandler handler =
       XceiverClientHandler handler =
-          channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+          channel.pipeline().get(XceiverClientHandler.class);
 
 
       return handler.sendCommand(request);
       return handler.sendCommand(request);
     } catch (ExecutionException | InterruptedException e) {
     } catch (ExecutionException | InterruptedException e) {
@@ -149,11 +142,11 @@ public class XceiverClient extends XceiverClientSpi {
   public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
   public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
       sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
       sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
       throws IOException, ExecutionException, InterruptedException {
-    if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
+    if ((channel == null) || (!channel.isActive())) {
       throw new IOException("This channel is not connected.");
       throw new IOException("This channel is not connected.");
     }
     }
     XceiverClientHandler handler =
     XceiverClientHandler handler =
-        channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+        channel.pipeline().get(XceiverClientHandler.class);
     return handler.sendCommandAsync(request);
     return handler.sendCommandAsync(request);
   }
   }
 
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java

@@ -199,6 +199,7 @@ public class XceiverClientManager implements Closeable {
    */
    */
   public OzoneProtos.ReplicationType getType() {
   public OzoneProtos.ReplicationType getType() {
     // TODO : Fix me and make Ratis default before release.
     // TODO : Fix me and make Ratis default before release.
+    // TODO: Remove this as replication factor and type are pipeline properties
     if(isUseRatis()) {
     if(isUseRatis()) {
       return OzoneProtos.ReplicationType.RATIS;
       return OzoneProtos.ReplicationType.RATIS;
     }
     }

+ 10 - 12
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -37,13 +38,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 
 /**
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -77,11 +76,10 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    */
    */
   public void createPipeline(String clusterId, List<DatanodeID> datanodes)
   public void createPipeline(String clusterId, List<DatanodeID> datanodes)
       throws IOException {
       throws IOException {
-    final List<RaftPeer> newPeers = datanodes.stream()
-        .map(RatisHelper::toRaftPeer)
-        .collect(Collectors.toList());
-    LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers);
-    reinitialize(datanodes, newPeers);
+    RaftGroup group = RatisHelper.newRaftGroup(datanodes);
+    LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
+        group.getPeers());
+    reinitialize(datanodes, group);
   }
   }
 
 
   /**
   /**
@@ -94,7 +92,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   }
   }
 
 
   private void reinitialize(
   private void reinitialize(
-      List<DatanodeID> datanodes, Collection<RaftPeer> newPeers)
+      List<DatanodeID> datanodes, RaftGroup group)
       throws IOException {
       throws IOException {
     if (datanodes.isEmpty()) {
     if (datanodes.isEmpty()) {
       return;
       return;
@@ -103,7 +101,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     IOException exception = null;
     IOException exception = null;
     for (DatanodeID d : datanodes) {
     for (DatanodeID d : datanodes) {
       try {
       try {
-        reinitialize(d, newPeers);
+        reinitialize(d, group);
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         if (exception == null) {
         if (exception == null) {
           exception = new IOException(
           exception = new IOException(
@@ -121,14 +119,14 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   /**
   /**
    * Adds a new peers to the Ratis Ring.
    * Adds a new peers to the Ratis Ring.
    * @param datanode - new datanode
    * @param datanode - new datanode
-   * @param newPeers - Raft machines
+   * @param group - Raft group
    * @throws IOException - on Failure.
    * @throws IOException - on Failure.
    */
    */
-  private void reinitialize(DatanodeID datanode, Collection<RaftPeer> newPeers)
+  private void reinitialize(DatanodeID datanode, RaftGroup group)
       throws IOException {
       throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
     try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
     try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
-      client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId());
+      client.reinitialize(group, p.getId());
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
           p, datanode, ioe);
           p, datanode, ioe);

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.scm.container.common.helpers;
 
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
@@ -136,4 +137,11 @@ public class BlockContainerInfo extends ContainerInfo
   public int compareTo(BlockContainerInfo o) {
   public int compareTo(BlockContainerInfo o) {
     return this.compare(this, o);
     return this.compare(this, o);
   }
   }
+
+  public boolean canAllocate(long size, long containerSize) {
+    //TODO: move container size inside Container Info
+    return ((getState() == OzoneProtos.LifeCycleState.ALLOCATED ||
+        getState() == OzoneProtos.LifeCycleState.OPEN) &&
+        (getAllocated() + size <= containerSize));
+  }
 }
 }

+ 15 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java

@@ -28,10 +28,15 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -68,7 +73,8 @@ public interface RatisHelper {
   /* TODO: use a dummy id for all groups for the moment.
   /* TODO: use a dummy id for all groups for the moment.
    *       It should be changed to a unique id for each group.
    *       It should be changed to a unique id for each group.
    */
    */
-  RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId();
+  RaftGroupId DUMMY_GROUP_ID =
+      RaftGroupId.valueOf(ByteString.copyFromUtf8("AOZONERATISGROUP"));
 
 
   RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
   RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
       Collections.emptyList());
       Collections.emptyList());
@@ -77,6 +83,13 @@ public interface RatisHelper {
     return EMPTY_GROUP;
     return EMPTY_GROUP;
   }
   }
 
 
+  static RaftGroup newRaftGroup(List<DatanodeID> datanodes) {
+    final List<RaftPeer> newPeers = datanodes.stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+    return RatisHelper.newRaftGroup(newPeers);
+  }
+
   static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
   static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
     return peers.isEmpty()? emptyRaftGroup()
     return peers.isEmpty()? emptyRaftGroup()
         : new RaftGroup(DUMMY_GROUP_ID, peers);
         : new RaftGroup(DUMMY_GROUP_ID, peers);

+ 9 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java

@@ -410,8 +410,7 @@ public class ContainerStateManager {
 
 
       while (iter.hasNext()) {
       while (iter.hasNext()) {
         BlockContainerInfo info = iter.next();
         BlockContainerInfo info = iter.next();
-        if (info.getAllocated() + size <= this.containerSize) {
-
+        if (info.canAllocate(size, this.containerSize)) {
           queue.remove(info);
           queue.remove(info);
           info.addAllocated(size);
           info.addAllocated(size);
           info.setLastUsed(Time.monotonicNow());
           info.setLastUsed(Time.monotonicNow());
@@ -419,10 +418,14 @@ public class ContainerStateManager {
 
 
           return info;
           return info;
         } else {
         } else {
-          // We should close this container.
-          LOG.info("Moving {} to containerCloseQueue.", info.toString());
-          containerCloseQueue.add(info);
-          //TODO: Next JIRA will handle these containers to close.
+          if (info.getState() != LifeCycleState.CLOSED) {
+            // We should close this container.
+            LOG.info("Moving {} to containerCloseQueue.", info.toString());
+            info.setState(LifeCycleState.CLOSED);
+            containerCloseQueue.add(info);
+            //TODO: Next JIRA will handle these containers to close.
+            //TODO: move container to right queue
+          }
         }
         }
       }
       }
 
 

+ 12 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java

@@ -38,8 +38,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
-    .LifeCycleState.ALLOCATED;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
 import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
     .LifeCycleState.OPEN;
     .LifeCycleState.OPEN;
 
 
@@ -121,7 +119,12 @@ public class RatisManagerImpl implements PipelineManager {
             .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
             .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
       }
       }
     } else {
     } else {
-      pipeline = findOpenPipeline();
+      Pipeline openPipeline = findOpenPipeline(replicationFactor);
+      if (openPipeline != null) {
+        // if an open pipeline is found use the same machines
+        pipeline = allocateRatisPipeline(openPipeline.getMachines(),
+            containerName, replicationFactor);
+      }
     }
     }
     if (pipeline == null) {
     if (pipeline == null) {
       LOG.error("Get pipeline call failed. We are not able to find free nodes" +
       LOG.error("Get pipeline call failed. We are not able to find free nodes" +
@@ -135,7 +138,7 @@ public class RatisManagerImpl implements PipelineManager {
    *
    *
    * @return - Pipeline or null
    * @return - Pipeline or null
    */
    */
-  Pipeline findOpenPipeline() {
+  Pipeline findOpenPipeline(OzoneProtos.ReplicationFactor factor) {
     Pipeline pipeline = null;
     Pipeline pipeline = null;
     final int sentinal = -1;
     final int sentinal = -1;
     if (activePipelines.size() == 0) {
     if (activePipelines.size() == 0) {
@@ -149,7 +152,7 @@ public class RatisManagerImpl implements PipelineManager {
       Pipeline temp =
       Pipeline temp =
           activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
           activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
       // if we find an operational pipeline just return that.
       // if we find an operational pipeline just return that.
-      if (temp.getLifeCycleState() == OPEN) {
+      if ((temp.getLifeCycleState() == OPEN) && (temp.getFactor() == factor)) {
         pipeline = temp;
         pipeline = temp;
         break;
         break;
       }
       }
@@ -173,7 +176,7 @@ public class RatisManagerImpl implements PipelineManager {
       String pipelineName = PREFIX +
       String pipelineName = PREFIX +
           UUID.randomUUID().toString().substring(PREFIX.length());
           UUID.randomUUID().toString().substring(PREFIX.length());
       pipeline.setType(OzoneProtos.ReplicationType.RATIS);
       pipeline.setType(OzoneProtos.ReplicationType.RATIS);
-      pipeline.setLifeCycleState(ALLOCATED);
+      pipeline.setLifeCycleState(OPEN);
       pipeline.setFactor(factor);
       pipeline.setFactor(factor);
       pipeline.setPipelineName(pipelineName);
       pipeline.setPipelineName(pipelineName);
       pipeline.setContainerName(containerName);
       pipeline.setContainerName(containerName);
@@ -210,10 +213,10 @@ public class RatisManagerImpl implements PipelineManager {
       Preconditions.checkNotNull(datanode);
       Preconditions.checkNotNull(datanode);
       if (!ratisMembers.contains(datanode)) {
       if (!ratisMembers.contains(datanode)) {
         newNodesList.add(datanode);
         newNodesList.add(datanode);
-        // once a datanode has been added to a pipeline, exclude it from
-        // further allocations
-        ratisMembers.add(datanode);
         if (newNodesList.size() == count) {
         if (newNodesList.size() == count) {
+          // once a datanode has been added to a pipeline, exclude it from
+          // further allocations
+          ratisMembers.addAll(newNodesList);
           LOG.info("Allocating a new pipeline of size: {}", count);
           LOG.info("Allocating a new pipeline of size: {}", count);
           return newNodesList;
           return newNodesList;
         }
         }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -416,6 +416,8 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setKsmClient(keySpaceManagerClient)
             .setKsmClient(keySpaceManagerClient)
             .setChunkSize(chunkSize)
             .setChunkSize(chunkSize)
             .setRequestID(args.getRequestID())
             .setRequestID(args.getRequestID())
+            .setType(xceiverClientManager.getType())
+            .setFactor(xceiverClientManager.getFactor())
             .build();
             .build();
     return new OzoneOutputStream(groupOutputStream);
     return new OzoneOutputStream(groupOutputStream);
   }
   }

+ 29 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -26,13 +26,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.container.common
 import org.apache.hadoop.ozone.container.common
     .statemachine.DatanodeStateMachine;
     .statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
 import org.apache.hadoop.ozone.ksm.KeySpaceManager;
+import org.apache.hadoop.ozone.ksm.protocolPB
+    .KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.web.client.OzoneRestClient;
 import org.apache.hadoop.ozone.web.client.OzoneRestClient;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.protocolPB
 import org.apache.hadoop.scm.protocolPB
@@ -252,6 +257,30 @@ public final class MiniOzoneCluster extends MiniDFSCluster
             Client.getRpcTimeout(conf)));
             Client.getRpcTimeout(conf)));
   }
   }
 
 
+  /**
+   * Creates an RPC proxy connected to this cluster's KeySpaceManager
+   * for accessing Key Space Manager information. Callers take ownership of
+   * the proxy and must close it when done.
+   *
+   * @return RPC proxy for accessing Key Space Manager information
+   * @throws IOException if there is an I/O error
+   */
+  public KeySpaceManagerProtocolClientSideTranslatorPB
+      createKeySpaceManagerClient() throws IOException {
+    long ksmVersion = RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
+    InetSocketAddress ksmAddress = OzoneClientUtils
+        .getKsmAddressForClients(conf);
+    LOG.info("Creating KeySpaceManager RPC client with address {}",
+        ksmAddress);
+    RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    return new KeySpaceManagerProtocolClientSideTranslatorPB(
+            RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
+                ksmAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+  }
+
   /**
   /**
    * Waits for the Ozone cluster to be ready for processing requests.
    * Waits for the Ozone cluster to be ready for processing requests.
    */
    */

+ 123 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -38,7 +38,16 @@ import org.apache.hadoop.ozone.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.ksm.protocolPB.
+    KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.
+    StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -64,6 +73,10 @@ public class TestOzoneRpcClient {
   private static MiniOzoneCluster cluster = null;
   private static MiniOzoneCluster cluster = null;
   private static OzoneClient ozClient = null;
   private static OzoneClient ozClient = null;
   private static ObjectStore store = null;
   private static ObjectStore store = null;
+  private static KeySpaceManagerProtocolClientSideTranslatorPB
+      keySpaceManagerClient;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
 
 
   /**
   /**
    * Create a MiniOzoneCluster for testing.
    * Create a MiniOzoneCluster for testing.
@@ -78,13 +91,16 @@ public class TestOzoneRpcClient {
     OzoneConfiguration conf = new OzoneConfiguration();
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = new MiniOzoneCluster.Builder(conf)
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(5)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
     conf.set("ozone.client.protocol",
     conf.set("ozone.client.protocol",
         "org.apache.hadoop.ozone.client.rpc.RpcClient");
         "org.apache.hadoop.ozone.client.rpc.RpcClient");
     OzoneClientFactory.setConfiguration(conf);
     OzoneClientFactory.setConfiguration(conf);
     ozClient = OzoneClientFactory.getClient();
     ozClient = OzoneClientFactory.getClient();
     store = ozClient.getObjectStore();
     store = ozClient.getObjectStore();
+    storageContainerLocationClient =
+        cluster.createStorageContainerLocationClient();
+    keySpaceManagerClient = cluster.createKeySpaceManagerClient();
   }
   }
 
 
   @Test
   @Test
@@ -360,6 +376,29 @@ public class TestOzoneRpcClient {
     volume.getBucket(bucketName);
     volume.getBucket(bucketName);
   }
   }
 
 
+  private boolean verifyRatisReplication(String volumeName, String bucketName,
+      String keyName, ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    OzoneProtos.ReplicationType replicationType =
+        OzoneProtos.ReplicationType.valueOf(type.toString());
+    OzoneProtos.ReplicationFactor replicationFactor =
+        OzoneProtos.ReplicationFactor.valueOf(factor.getValue());
+    KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
+    for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) {
+      Pipeline pipeline =
+          storageContainerLocationClient.getContainer(info.getContainerName());
+      if ((pipeline.getFactor() != replicationFactor) ||
+          (pipeline.getType() != replicationType)) {
+        return false;
+      }
+    }
+    return true;
+  }
 
 
   @Test
   @Test
   public void testPutKey()
   public void testPutKey()
@@ -387,6 +426,80 @@ public class TestOzoneRpcClient {
       OzoneInputStream is = bucket.readKey(keyName);
       OzoneInputStream is = bucket.readKey(keyName);
       byte[] fileContent = new byte[value.getBytes().length];
       byte[] fileContent = new byte[value.getBytes().length];
       is.read(fileContent);
       is.read(fileContent);
+      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+          keyName, ReplicationType.STAND_ALONE,
+          ReplicationFactor.ONE));
+      Assert.assertEquals(value, new String(fileContent));
+      Assert.assertTrue(key.getCreationTime() >= currentTime);
+      Assert.assertTrue(key.getModificationTime() >= currentTime);
+    }
+  }
+
+  @Test
+  public void testPutKeyRatisOneNode()
+      throws IOException, OzoneException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    for (int i = 0; i < 10; i++) {
+      String keyName = UUID.randomUUID().toString();
+
+      OzoneOutputStream out = bucket.createKey(keyName,
+          value.getBytes().length, ReplicationType.RATIS,
+          ReplicationFactor.ONE);
+      out.write(value.getBytes());
+      out.close();
+      OzoneKey key = bucket.getKey(keyName);
+      Assert.assertEquals(keyName, key.getName());
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] fileContent = new byte[value.getBytes().length];
+      is.read(fileContent);
+      is.close();
+      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+          keyName, ReplicationType.RATIS, ReplicationFactor.ONE));
+      Assert.assertEquals(value, new String(fileContent));
+      Assert.assertTrue(key.getCreationTime() >= currentTime);
+      Assert.assertTrue(key.getModificationTime() >= currentTime);
+    }
+  }
+
+  @Test
+  public void testPutKeyRatisThreeNodes()
+      throws IOException, OzoneException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    for (int i = 0; i < 10; i++) {
+      String keyName = UUID.randomUUID().toString();
+
+      OzoneOutputStream out = bucket.createKey(keyName,
+          value.getBytes().length, ReplicationType.RATIS,
+          ReplicationFactor.THREE);
+      out.write(value.getBytes());
+      out.close();
+      OzoneKey key = bucket.getKey(keyName);
+      Assert.assertEquals(keyName, key.getName());
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] fileContent = new byte[value.getBytes().length];
+      is.read(fileContent);
+      is.close();
+      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+          keyName, ReplicationType.RATIS,
+          ReplicationFactor.THREE));
       Assert.assertEquals(value, new String(fileContent));
       Assert.assertEquals(value, new String(fileContent));
       Assert.assertTrue(key.getCreationTime() >= currentTime);
       Assert.assertTrue(key.getCreationTime() >= currentTime);
       Assert.assertTrue(key.getModificationTime() >= currentTime);
       Assert.assertTrue(key.getModificationTime() >= currentTime);
@@ -691,6 +804,15 @@ public class TestOzoneRpcClient {
     if(ozClient != null) {
     if(ozClient != null) {
       ozClient.close();
       ozClient.close();
     }
     }
+
+    if (storageContainerLocationClient != null) {
+      storageContainerLocationClient.close();
+    }
+
+    if (keySpaceManagerClient != null) {
+      keySpaceManagerClient.close();
+    }
+
     if (cluster != null) {
     if (cluster != null) {
       cluster.shutdown();
       cluster.shutdown();
     }
     }

+ 30 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java

@@ -117,22 +117,48 @@ public class TestCorona {
   }
   }
 
 
   @Test
   @Test
-  public void ratisTest() throws Exception {
+  public void multiThread() throws Exception {
     List<String> args = new ArrayList<>();
     List<String> args = new ArrayList<>();
     args.add("-numOfVolumes");
     args.add("-numOfVolumes");
+    args.add("10");
+    args.add("-numOfBuckets");
     args.add("1");
     args.add("1");
+    args.add("-numOfKeys");
+    args.add("10");
+    args.add("-numOfThread");
+    args.add("10");
+    args.add("-keySize");
+    args.add("10240");
+    Corona corona = new Corona(conf);
+    int res = ToolRunner.run(conf, corona,
+        args.toArray(new String[0]));
+    Assert.assertEquals(10, corona.getNumberOfVolumesCreated());
+    Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
+    Assert.assertEquals(100, corona.getNumberOfKeysAdded());
+    Assert.assertEquals(0, res);
+  }
+
+  @Test
+  public void ratisTest3() throws Exception {
+    List<String> args = new ArrayList<>();
+    args.add("-numOfVolumes");
+    args.add("10");
     args.add("-numOfBuckets");
     args.add("-numOfBuckets");
     args.add("1");
     args.add("1");
     args.add("-numOfKeys");
     args.add("-numOfKeys");
     args.add("10");
     args.add("10");
     args.add("-ratis");
     args.add("-ratis");
     args.add("3");
     args.add("3");
+    args.add("-numOfThread");
+    args.add("10");
+    args.add("-keySize");
+    args.add("10240");
     Corona corona = new Corona(conf);
     Corona corona = new Corona(conf);
     int res = ToolRunner.run(conf, corona,
     int res = ToolRunner.run(conf, corona,
         args.toArray(new String[0]));
         args.toArray(new String[0]));
-    Assert.assertEquals(1, corona.getNumberOfVolumesCreated());
-    Assert.assertEquals(1, corona.getNumberOfBucketsCreated());
-    Assert.assertEquals(10, corona.getNumberOfKeysAdded());
+    Assert.assertEquals(10, corona.getNumberOfVolumesCreated());
+    Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
+    Assert.assertEquals(100, corona.getNumberOfKeysAdded());
     Assert.assertEquals(0, res);
     Assert.assertEquals(0, res);
   }
   }
 }
 }