Przeglądaj źródła

HDDS-726. Ozone Client should update SCM to move the container out of allocation path in case a write transaction fails. Contributed by Shashikant Banerjee.

Shashikant Banerjee 6 lat temu
rodzic
commit
de1dae64f2
40 zmienionych plików z 872 dodań i 240 usunięć
  1. 10 10
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  2. 49 24
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  3. 2 3
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
  4. 36 15
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
  5. 18 9
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java
  6. 5 5
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
  7. 103 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
  8. 5 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
  9. 3 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
  10. 7 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
  11. 3 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
  12. 1 0
      hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
  13. 6 0
      hadoop-hdds/common/src/main/proto/hdds.proto
  14. 5 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
  15. 24 5
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
  16. 5 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
  17. 9 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
  18. 53 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
  19. 15 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
  20. 6 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
  21. 15 9
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
  22. 15 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
  23. 11 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
  24. 85 49
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
  25. 6 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  26. 5 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  27. 1 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  28. 22 64
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  29. 250 16
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
  30. 60 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  31. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
  32. 5 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
  33. 6 3
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  34. 4 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  35. 4 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
  36. 3 1
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
  37. 3 1
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
  38. 4 2
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
  39. 2 1
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java
  40. 4 2
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java

+ 10 - 10
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -214,14 +214,14 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
   @Override
   public XceiverClientReply sendCommand(
-      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
       throws IOException {
     Preconditions.checkState(HddsUtils.isReadOnly(request));
     return sendCommandWithRetry(request, excludeDns);
   }
 
   private XceiverClientReply sendCommandWithRetry(
-      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
       throws IOException {
     ContainerCommandResponseProto responseProto = null;
 
@@ -231,24 +231,24 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     // TODO: cache the correct leader info in here, so that any subsequent calls
     // should first go to leader
     List<DatanodeDetails> dns = pipeline.getNodes();
-    DatanodeDetails datanode = null;
     List<DatanodeDetails> healthyDns =
         excludeDns != null ? dns.stream().filter(dnId -> {
-          for (UUID excludeId : excludeDns) {
-            if (dnId.getUuid().equals(excludeId)) {
+          for (DatanodeDetails excludeId : excludeDns) {
+            if (dnId.equals(excludeId)) {
               return false;
             }
           }
           return true;
         }).collect(Collectors.toList()) : dns;
+    XceiverClientReply reply = new XceiverClientReply(null);
     for (DatanodeDetails dn : healthyDns) {
       try {
         LOG.debug("Executing command " + request + " on datanode " + dn);
         // In case the command gets retried on a 2nd datanode,
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
+        reply.addDatanode(dn);
         responseProto = sendCommandAsync(request, dn).getResponse().get();
-        datanode = dn;
         if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
           break;
         }
@@ -264,8 +264,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     }
 
     if (responseProto != null) {
-      return new XceiverClientReply(
-          CompletableFuture.completedFuture(responseProto), datanode.getUuid());
+      reply.setResponse(CompletableFuture.completedFuture(responseProto));
+      return reply;
     } else {
       throw new IOException(
           "Failed to execute command " + request + " on the pipeline "
@@ -382,11 +382,11 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   }
 
   @Override
-  public long watchForCommit(long index, long timeout)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     // there is no notion of watch for commit index in standalone pipeline
-    return 0;
+    return null;
   };
 
   public long getReplicatedMinCommitIndex() {

+ 49 - 24
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hdds.scm;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -91,7 +92,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   private final GrpcTlsConfig tlsConfig;
 
   // Map to track commit index at every server
-  private final ConcurrentHashMap<String, Long> commitInfoMap;
+  private final ConcurrentHashMap<UUID, Long> commitInfoMap;
 
   // create a separate RaftClient for watchForCommit API
   private RaftClient watchClient;
@@ -118,7 +119,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     // of the servers
     if (commitInfoMap.isEmpty()) {
       commitInfoProtos.forEach(proto -> commitInfoMap
-          .put(proto.getServer().getAddress(), proto.getCommitIndex()));
+          .put(RatisHelper.toDatanodeId(proto.getServer()),
+              proto.getCommitIndex()));
       // In case the commit is happening 2 way, just update the commitIndex
       // for the servers which have been successfully updating the commit
       // indexes. This is important because getReplicatedMinCommitIndex()
@@ -126,7 +128,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       // been replicating data successfully.
     } else {
       commitInfoProtos.forEach(proto -> commitInfoMap
-          .computeIfPresent(proto.getServer().getAddress(),
+          .computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
               (address, index) -> {
                 index = proto.getCommitIndex();
                 return index;
@@ -218,15 +220,23 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return minIndex.isPresent() ? minIndex.getAsLong() : 0;
   }
 
+  private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    builder.setUuid(address.toString());
+    reply.addDatanode(builder.build());
+  }
+
   @Override
-  public long watchForCommit(long index, long timeout)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     long commitIndex = getReplicatedMinCommitIndex();
+    XceiverClientReply clientReply = new XceiverClientReply(null);
     if (commitIndex >= index) {
       // return the min commit index till which the log has been replicated to
       // all servers
-      return commitIndex;
+      clientReply.setLogIndex(commitIndex);
+      return clientReply;
     }
     LOG.debug("commit index : {} watch timeout : {}", index, timeout);
     // create a new RaftClient instance for watch request
@@ -250,26 +260,30 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       // TODO : need to remove the code to create the new RaftClient instance
       // here once the watch request bypassing sliding window in Raft Client
       // gets fixed.
-      watchClient =
-          RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
+      watchClient = RatisHelper
+          .newRaftClient(rpcType, getPipeline(), retryPolicy,
               maxOutstandingRequests, tlsConfig);
       reply = watchClient
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);
-      Optional<RaftProtos.CommitInfoProto>
-          proto = reply.getCommitInfos().stream().min(Comparator.comparing(
-          RaftProtos.CommitInfoProto :: getCommitIndex));
-      Preconditions.checkState(proto.isPresent());
-      String address = proto.get().getServer().getAddress();
-      // since 3 way commit has failed, the updated map from now on  will
-      // only store entries for those datanodes which have had successful
-      // replication.
-      commitInfoMap.remove(address);
-      LOG.info(
-          "Could not commit " + index + " to all the nodes. Server " + address
-              + " has failed." + " Committed by majority.");
+      List<RaftProtos.CommitInfoProto> commitInfoProtoList =
+          reply.getCommitInfos().stream()
+              .filter(i -> i.getCommitIndex() < index)
+              .collect(Collectors.toList());
+      commitInfoProtoList.parallelStream().forEach(proto -> {
+        UUID address = RatisHelper.toDatanodeId(proto.getServer());
+        addDatanodetoReply(address, clientReply);
+        // since 3 way commit has failed, the updated map from now on  will
+        // only store entries for those datanodes which have had successful
+        // replication.
+        commitInfoMap.remove(address);
+        LOG.info(
+            "Could not commit " + index + " to all the nodes. Server " + address
+                + " has failed." + " Committed by majority.");
+      });
     }
-    return index;
+    clientReply.setLogIndex(index);
+    return clientReply;
   }
 
   /**
@@ -296,17 +310,28 @@ public final class XceiverClientRatis extends XceiverClientSpi {
                 RaftRetryFailureException raftRetryFailureException =
                     reply.getRetryFailureException();
                 if (raftRetryFailureException != null) {
+                  // in case of raft retry failure, the raft client is
+                  // not able to connect to the leader hence the pipeline
+                  // can not be used but this instance of RaftClient will close
+                  // and refreshed again. In case the client cannot connect to
+                   // leader, getClient call will fail.
+
+                  // No need to set the failed Server ID here. Ozone client
+                  // will directly exclude this pipeline in next allocate block
+                  // to SCM as in this case, it is the raft client which is not
+                  // able to connect to leader in the pipeline, though the
+                  // pipeline can still be functional.
                   throw new CompletionException(raftRetryFailureException);
                 }
                 ContainerCommandResponseProto response =
                     ContainerCommandResponseProto
                         .parseFrom(reply.getMessage().getContent());
+                UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
                 if (response.getResult() == ContainerProtos.Result.SUCCESS) {
                   updateCommitInfosMap(reply.getCommitInfos());
-                  asyncReply.setLogIndex(reply.getLogIndex());
-                  asyncReply.setDatanode(
-                      RatisHelper.toDatanodeId(reply.getReplierId()));
                 }
+                asyncReply.setLogIndex(reply.getLogIndex());
+                addDatanodetoReply(serverId, asyncReply);
                 return response;
               } catch (InvalidProtocolBufferException e) {
                 throw new CompletionException(e);

+ 2 - 3
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

@@ -42,7 +42,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -290,7 +289,7 @@ public class BlockInputStream extends InputStream implements Seekable {
     XceiverClientReply reply;
     ReadChunkResponseProto readChunkResponse = null;
     final ChunkInfo chunkInfo = chunks.get(chunkIndex);
-    List<UUID> excludeDns = null;
+    List<DatanodeDetails> excludeDns = null;
     ByteString byteString;
     List<DatanodeDetails> dnList = xceiverClient.getPipeline().getNodes();
     while (true) {
@@ -334,7 +333,7 @@ public class BlockInputStream extends InputStream implements Seekable {
         if (excludeDns == null) {
           excludeDns = new ArrayList<>();
         }
-        excludeDns.add(reply.getDatanode());
+        excludeDns.addAll(reply.getDatanodes());
         if (excludeDns.size() == dnList.size()) {
           throw ioe;
         }

+ 36 - 15
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -41,6 +42,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.List;
 import java.util.ArrayList;
@@ -102,14 +104,17 @@ public class BlockOutputStream extends OutputStream {
   // by all servers
   private long totalAckDataLength;
 
-  // list to hold up all putBlock futures
-  private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
-      futureList;
+  // future Map to hold up all putBlock futures
+  private ConcurrentHashMap<Long,
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+      futureMap;
   // map containing mapping for putBlock logIndex to to flushedDataLength Map.
   private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
 
   private int currentBufferIndex;
 
+  private List<DatanodeDetails> failedServers;
+
   /**
    * Creates a new BlockOutputStream.
    *
@@ -157,10 +162,11 @@ public class BlockOutputStream extends OutputStream {
     responseExecutor = Executors.newSingleThreadExecutor();
     commitIndex2flushedDataMap = new ConcurrentHashMap<>();
     totalAckDataLength = 0;
-    futureList = new ArrayList<>();
+    futureMap = new ConcurrentHashMap<>();
     totalDataFlushedLength = 0;
     currentBufferIndex = 0;
     writtenDataLength = 0;
+    failedServers = Collections.emptyList();
   }
 
   public BlockID getBlockID() {
@@ -182,6 +188,9 @@ public class BlockOutputStream extends OutputStream {
     return dataLength;
   }
 
+  public List<DatanodeDetails> getFailedServers() {
+    return failedServers;
+  }
 
   @Override
   public void write(int b) throws IOException {
@@ -299,7 +308,7 @@ public class BlockOutputStream extends OutputStream {
       Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
       totalAckDataLength = commitIndex2flushedDataMap.remove(index);
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
-      futureList.remove(0);
+      futureMap.remove(totalAckDataLength);
       // Flush has been committed to required servers successful.
       // just swap the bufferList head and tail after clearing.
       ByteBuffer currentBuffer = bufferList.remove(0);
@@ -320,7 +329,7 @@ public class BlockOutputStream extends OutputStream {
   private void handleFullBuffer() throws IOException {
     try {
       checkOpen();
-      if (!futureList.isEmpty()) {
+      if (!futureMap.isEmpty()) {
         waitOnFlushFutures();
       }
     } catch (InterruptedException | ExecutionException e) {
@@ -362,9 +371,22 @@ public class BlockOutputStream extends OutputStream {
   private void watchForCommit(long commitIndex) throws IOException {
     checkOpen();
     Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
+    long index;
     try {
-      long index =
+      XceiverClientReply reply =
           xceiverClient.watchForCommit(commitIndex, watchTimeout);
+      if (reply == null) {
+        index = 0;
+      } else {
+        List<DatanodeDetails> dnList = reply.getDatanodes();
+        if (!dnList.isEmpty()) {
+          if (failedServers.isEmpty()) {
+            failedServers = new ArrayList<>();
+          }
+          failedServers.addAll(dnList);
+        }
+        index = reply.getLogIndex();
+      }
       adjustBuffers(index);
     } catch (TimeoutException | InterruptedException | ExecutionException e) {
       LOG.warn("watchForCommit failed for index " + commitIndex, e);
@@ -392,8 +414,7 @@ public class BlockOutputStream extends OutputStream {
         try {
           validateResponse(e);
         } catch (IOException sce) {
-          future.completeExceptionally(sce);
-          return e;
+          throw new CompletionException(sce);
         }
         // if the ioException is not set, putBlock is successful
         if (ioException == null) {
@@ -422,7 +443,7 @@ public class BlockOutputStream extends OutputStream {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
-    futureList.add(flushFuture);
+    futureMap.put(flushPos, flushFuture);
     return flushFuture;
   }
 
@@ -516,8 +537,8 @@ public class BlockOutputStream extends OutputStream {
 
   private void waitOnFlushFutures()
       throws InterruptedException, ExecutionException {
-    CompletableFuture<Void> combinedFuture = CompletableFuture
-        .allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+        futureMap.values().toArray(new CompletableFuture[futureMap.size()]));
     // wait for all the transactions to complete
     combinedFuture.get();
   }
@@ -553,10 +574,10 @@ public class BlockOutputStream extends OutputStream {
     }
     xceiverClientManager = null;
     xceiverClient = null;
-    if (futureList != null) {
-      futureList.clear();
+    if (futureMap != null) {
+      futureMap.clear();
     }
-    futureList = null;
+    futureMap = null;
     if (commitIndex2flushedDataMap != null) {
       commitIndex2flushedDataMap.clear();
     }

+ 18 - 9
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientReply.java

@@ -19,20 +19,28 @@
 package org.apache.hadoop.hdds.scm;
 
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * This class represents the Async reply from XceiverClient.
+ * This class represents the reply from XceiverClient.
  */
 public class XceiverClientReply {
 
   private CompletableFuture<ContainerCommandResponseProto> response;
   private Long logIndex;
-  private UUID dnId;
+
+  /**
+   * List of datanodes where the command got executed and reply is received.
+   * If there is an exception in the reply, these datanodes will inform
+   * about the servers where there is a failure.
+   */
+  private List<DatanodeDetails> datanodes;
 
   public XceiverClientReply(
       CompletableFuture<ContainerCommandResponseProto> response) {
@@ -40,10 +48,11 @@ public class XceiverClientReply {
   }
 
   public XceiverClientReply(
-      CompletableFuture<ContainerCommandResponseProto> response, UUID dnId) {
+      CompletableFuture<ContainerCommandResponseProto> response,
+      List<DatanodeDetails> datanodes) {
     this.logIndex = (long) 0;
     this.response = response;
-    this.dnId = dnId;
+    this.datanodes = datanodes == null ? new ArrayList<>() : datanodes;
   }
 
   public CompletableFuture<ContainerCommandResponseProto> getResponse() {
@@ -58,12 +67,12 @@ public class XceiverClientReply {
     this.logIndex = logIndex;
   }
 
-  public UUID getDatanode() {
-    return dnId;
+  public List<DatanodeDetails> getDatanodes() {
+    return datanodes;
   }
 
-  public void setDatanode(UUID datanodeId) {
-    this.dnId = datanodeId;
+  public void addDatanode(DatanodeDetails dn) {
+    datanodes.add(dn);
   }
 
   public void setResponse(

+ 5 - 5
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -21,11 +21,11 @@ package org.apache.hadoop.hdds.scm;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -123,7 +123,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @throws IOException
    */
   public XceiverClientReply sendCommand(
-      ContainerCommandRequestProto request, List<UUID> excludeDns)
+      ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
       throws IOException {
     try {
       XceiverClientReply reply;
@@ -157,14 +157,14 @@ public abstract class XceiverClientSpi implements Closeable {
    * Check if an specfic commitIndex is replicated to majority/all servers.
    * @param index index to watch for
    * @param timeout timeout provided for the watch ipeartion to complete
-   * @return the min commit index replicated to all or majority servers
-   *         in case of a failure
+   * @return reply containing the min commit index replicated to all or majority
+   *         servers in case of a failure
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
    * @throws IOException
    */
-  public abstract long watchForCommit(long index, long timeout)
+  public abstract XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException;
 

+ 103 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+import java.util.*;
+
+/**
+ * This class contains set of dns and containers which ozone client provides
+ * to be handed over to SCM when block allocation request comes.
+ */
+public class ExcludeList {
+
+  private final List<DatanodeDetails> datanodes;
+  private final List<ContainerID> containerIds;
+  private final List<PipelineID> pipelineIds;
+
+
+  public ExcludeList() {
+    datanodes = new ArrayList<>();
+    containerIds = new ArrayList<>();
+    pipelineIds = new ArrayList<>();
+  }
+
+  public List<ContainerID> getContainerIds() {
+    return containerIds;
+  }
+
+  public List<DatanodeDetails> getDatanodes() {
+    return datanodes;
+  }
+
+  public void addDatanodes(Collection<DatanodeDetails> dns) {
+    datanodes.addAll(dns);
+  }
+
+  public void addDatanode(DatanodeDetails dn) {
+    datanodes.add(dn);
+  }
+
+  public void addConatinerId(ContainerID containerId) {
+    containerIds.add(containerId);
+  }
+
+  public void addPipeline(PipelineID pipelineId) {
+    pipelineIds.add(pipelineId);
+  }
+
+  public List<PipelineID> getPipelineIds() {
+    return pipelineIds;
+  }
+
+  public HddsProtos.ExcludeListProto getProtoBuf() {
+    HddsProtos.ExcludeListProto.Builder builder =
+        HddsProtos.ExcludeListProto.newBuilder();
+    containerIds.parallelStream()
+        .forEach(id -> builder.addContainerIds(id.getId()));
+    datanodes.parallelStream().forEach(dn -> {
+      builder.addDatanodes(dn.getUuidString());
+    });
+    pipelineIds.parallelStream().forEach(pipelineID -> {
+      builder.addPipelineIds(pipelineID.getProtobuf());
+    });
+    return builder.build();
+  }
+
+  public static ExcludeList getFromProtoBuf(
+      HddsProtos.ExcludeListProto excludeListProto) {
+    ExcludeList excludeList = new ExcludeList();
+    excludeListProto.getContainerIdsList().parallelStream().forEach(id -> {
+      excludeList.addConatinerId(ContainerID.valueof(id));
+    });
+    DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
+    excludeListProto.getDatanodesList().forEach(dn -> {
+      builder.setUuid(dn);
+      excludeList.addDatanode(builder.build());
+    });
+    excludeListProto.getPipelineIdsList().forEach(pipelineID -> {
+      excludeList.addPipeline(PipelineID.getFromProtobuf(pipelineID));
+    });
+    return excludeList;
+  }
+}

+ 5 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.protocol;
 
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@@ -47,11 +48,14 @@ public interface ScmBlockLocationProtocol extends Closeable {
    * Asks SCM where a block should be allocated. SCM responds with the
    * set of datanodes that should be used creating this block.
    * @param size - size of the block.
+   * @param excludeList List of datanodes/containers to exclude during block
+   *                    allocation.
    * @return allocated block accessing info (key, pipeline).
    * @throws IOException
    */
   AllocatedBlock allocateBlock(long size, ReplicationType type,
-      ReplicationFactor factor, String owner) throws IOException;
+      ReplicationFactor factor, String owner, ExcludeList excludeList)
+      throws IOException;
 
   /**
    * Delete blocks for a set of object keys.

+ 3 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Dele
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
@@ -80,7 +81,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
   @Override
   public AllocatedBlock allocateBlock(long size,
       HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
-      String owner) throws IOException {
+      String owner, ExcludeList excludeList) throws IOException {
     Preconditions.checkArgument(size > 0, "block size must be greater than 0");
 
     AllocateScmBlockRequestProto request =
@@ -90,6 +91,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
             .setFactor(factor)
             .setOwner(owner)
             .setTraceID(TracingUtil.exportCurrentSpan())
+            .setExcludeList(excludeList.getProtoBuf())
             .build();
     final AllocateScmBlockResponseProto response;
     try {

+ 7 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .BlockNotCommittedException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
 import org.apache.hadoop.io.Text;
@@ -71,7 +73,6 @@ import org.apache.hadoop.hdds.client.BlockID;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -232,7 +233,8 @@ public final class ContainerProtocolCalls  {
    * @throws IOException if there is an I/O error while performing the call
    */
   public static XceiverClientReply readChunk(XceiverClientSpi xceiverClient,
-      ChunkInfo chunk, BlockID blockID, String traceID, List<UUID> excludeDns)
+      ChunkInfo chunk, BlockID blockID, String traceID,
+      List<DatanodeDetails> excludeDns)
       throws IOException {
     ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
         .newBuilder()
@@ -563,6 +565,9 @@ public final class ContainerProtocolCalls  {
     } else if (response.getResult()
         == ContainerProtos.Result.BLOCK_NOT_COMMITTED) {
       throw new BlockNotCommittedException(response.getMessage());
+    } else if (response.getResult()
+        == ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+      throw new ContainerNotOpenException(response.getMessage());
     }
     throw new StorageContainerException(
         response.getMessage(), response.getResult());

+ 3 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java

@@ -24,6 +24,7 @@ import io.opentracing.Scope;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -77,7 +78,8 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
             request.getTraceID())) {
       AllocatedBlock allocatedBlock =
           impl.allocateBlock(request.getSize(), request.getType(),
-              request.getFactor(), request.getOwner());
+              request.getFactor(), request.getOwner(),
+              ExcludeList.getFromProtoBuf(request.getExcludeList()));
       if (allocatedBlock != null) {
         return
             AllocateScmBlockResponseProto.newBuilder()

+ 1 - 0
hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto

@@ -42,6 +42,7 @@ message AllocateScmBlockRequestProto {
   required hadoop.hdds.ReplicationFactor factor = 3;
   required string owner = 4;
   optional string traceID = 5;
+  optional ExcludeListProto excludeList = 6;
 
 }
 

+ 6 - 0
hadoop-hdds/common/src/main/proto/hdds.proto

@@ -199,6 +199,12 @@ enum ScmOps {
     queryNode = 11;
 }
 
+message ExcludeListProto {
+    repeated string datanodes = 1;
+    repeated int64 containerIds = 2;
+    repeated PipelineID pipelineIds = 3;
+}
+
 /**
  * Block ID that uniquely identify a block by SCM.
  */

+ 5 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.block;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -36,11 +37,14 @@ public interface BlockManager extends Closeable {
    * @param size - Block Size
    * @param type Replication Type
    * @param factor - Replication Factor
+   * @param excludeList List of datanodes/containers to exclude during block
+   *                    allocation.
    * @return AllocatedBlock
    * @throws IOException
    */
   AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor factor, String owner) throws IOException;
+      HddsProtos.ReplicationFactor factor, String owner,
+      ExcludeList excludeList) throws IOException;
 
   /**
    * Deletes a list of blocks in an atomic operation. Internally, SCM

+ 24 - 5
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java

@@ -35,9 +35,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -60,6 +62,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import java.util.function.Predicate;
+
 
 /** Block Manager manages the block access for SCM. */
 public class BlockManagerImpl implements EventHandler<Boolean>,
@@ -145,12 +149,14 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
    * @param size - Block Size
    * @param type Replication Type
    * @param factor - Replication Factor
+   * @param excludeList List of datanodes/containers to exclude during block
+   *                    allocation.
    * @return Allocated block
    * @throws IOException on failure.
    */
   @Override
-  public AllocatedBlock allocateBlock(final long size,
-      ReplicationType type, ReplicationFactor factor, String owner)
+  public AllocatedBlock allocateBlock(final long size, ReplicationType type,
+      ReplicationFactor factor, String owner, ExcludeList excludeList)
       throws IOException {
     LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
     ScmUtils.preCheck(ScmOps.allocateBlock, chillModePrecheck);
@@ -177,8 +183,10 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
     ContainerInfo containerInfo;
 
     while (true) {
-      List<Pipeline> availablePipelines = pipelineManager
-          .getPipelines(type, factor, Pipeline.PipelineState.OPEN);
+      List<Pipeline> availablePipelines =
+          pipelineManager
+              .getPipelines(type, factor, Pipeline.PipelineState.OPEN,
+                  excludeList.getDatanodes(), excludeList.getPipelineIds());
       Pipeline pipeline;
       if (availablePipelines.size() == 0) {
         try {
@@ -197,7 +205,13 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
       // look for OPEN containers that match the criteria.
       containerInfo = containerManager
           .getMatchingContainer(size, owner, pipeline);
-      if (containerInfo != null) {
+
+      // TODO: if getMachingContainer results in containers which are in exclude
+      // list, we may end up in this loop forever. This case needs to be
+      // addressed.
+      if (containerInfo != null && (excludeList.getContainerIds() == null
+          || !discardContainer(containerInfo.containerID(),
+          excludeList.getContainerIds()))) {
         return newBlock(containerInfo);
       }
     }
@@ -210,6 +224,11 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
     return null;
   }
 
+  private boolean discardContainer(ContainerID containerId,
+      List<ContainerID> containers) {
+    Predicate<ContainerID> predicate = p -> p.equals(containerId);
+    return containers.parallelStream().anyMatch(predicate);
+  }
   /**
    * newBlock - returns a new block assigned to a container.
    *

+ 5 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -51,6 +52,10 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
   List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines);
+
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
       throws IOException;
 

+ 9 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -27,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -81,6 +83,13 @@ class PipelineStateManager {
     return pipelineStateMap.getPipelines(type, factor, state);
   }
 
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state, Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines) {
+    return pipelineStateMap
+        .getPipelines(type, factor, state, excludeDns, excludePipelines);
+  }
+
   List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
     return pipelineStateMap.getPipelines(type, states);
   }

+ 53 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -217,6 +219,57 @@ class PipelineStateMap {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Get list of pipeline corresponding to specified replication type,
+   * replication factor and pipeline state.
+   *
+   * @param type - ReplicationType
+   * @param state - Required PipelineState
+   * @param excludeDns list of dns to exclude
+   * @param excludePipelines pipelines to exclude
+   * @return List of pipelines with specified replication type,
+   * replication factor and pipeline state
+   */
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state, Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+    Preconditions.checkNotNull(factor, "Replication factor cannot be null");
+    Preconditions.checkNotNull(state, "Pipeline state cannot be null");
+    Preconditions
+        .checkNotNull(excludeDns, "Datanode exclude list cannot be null");
+    Preconditions
+        .checkNotNull(excludeDns, "Pipeline exclude list cannot be null");
+    return getPipelines(type, factor, state).stream().filter(
+        pipeline -> !discardPipeline(pipeline, excludePipelines)
+            && !discardDatanode(pipeline, excludeDns))
+        .collect(Collectors.toList());
+  }
+
+  private boolean discardPipeline(Pipeline pipeline,
+      Collection<PipelineID> excludePipelines) {
+    if (excludePipelines.isEmpty()) {
+      return false;
+    }
+    Predicate<PipelineID> predicate = p -> p.equals(pipeline.getId());
+    return excludePipelines.parallelStream().anyMatch(predicate);
+  }
+
+  private boolean discardDatanode(Pipeline pipeline,
+      Collection<DatanodeDetails> excludeDns) {
+    if (excludeDns.isEmpty()) {
+      return false;
+    }
+    boolean discard = false;
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      Predicate<DatanodeDetails> predicate = p -> p.equals(dn);
+      discard = excludeDns.parallelStream().anyMatch(predicate);
+      if (discard) {
+        break;
+      }
+    }
+    return discard;
+  }
   /**
    * Get set of containerIDs corresponding to a pipeline.
    *

+ 15 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.Collection;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -205,6 +206,20 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+      ReplicationFactor factor, Pipeline.PipelineState state,
+      Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines) {
+    lock.readLock().lock();
+    try {
+      return stateManager
+          .getPipelines(type, factor, state, excludeDns, excludePipelines);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public void addContainerToPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {

+ 6 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -155,9 +156,9 @@ public class SCMBlockProtocolServer implements
   }
 
   @Override
-  public AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType
-      type, HddsProtos.ReplicationFactor factor, String owner) throws
-      IOException {
+  public AllocatedBlock allocateBlock(long size,
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+      String owner, ExcludeList excludeList) throws IOException {
     Map<String, String> auditMap = Maps.newHashMap();
     auditMap.put("size", String.valueOf(size));
     auditMap.put("type", type.name());
@@ -165,7 +166,8 @@ public class SCMBlockProtocolServer implements
     auditMap.put("owner", owner);
     boolean auditSuccess = true;
     try {
-      return scm.getScmBlockManager().allocateBlock(size, type, factor, owner);
+      return scm.getScmBlockManager()
+          .allocateBlock(size, type, factor, owner, excludeList);
     } catch (Exception ex) {
       auditSuccess = false;
       AUDIT.logWriteFailure(

+ 15 - 9
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -127,7 +128,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
       return !blockManager.isScmInChillMode();
     }, 10, 1000 * 5);
     AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
-        type, factor, containerOwner);
+        type, factor, containerOwner, new ExcludeList());
     Assert.assertNotNull(block);
   }
 
@@ -140,7 +141,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
     long size = 6 * GB;
     thrown.expectMessage("Unsupported block size");
     AllocatedBlock block = blockManager.allocateBlock(size,
-        type, factor, containerOwner);
+        type, factor, containerOwner, new ExcludeList());
   }
 
 
@@ -154,7 +155,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
     thrown.expectMessage("ChillModePrecheck failed for "
         + "allocateBlock");
     blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
-        type, factor, containerOwner);
+        type, factor, containerOwner, new ExcludeList());
   }
 
   @Test
@@ -165,7 +166,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
       return !blockManager.isScmInChillMode();
     }, 10, 1000 * 5);
     Assert.assertNotNull(blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
-        type, factor, containerOwner));
+        type, factor, containerOwner, new ExcludeList()));
   }
 
   @Test(timeout = 10000)
@@ -179,12 +180,14 @@ public class TestBlockManager implements EventHandler<Boolean> {
     pipelineManager.createPipeline(type, factor);
 
     AllocatedBlock allocatedBlock = blockManager
-        .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
+        .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+            new ExcludeList());
     // block should be allocated in different pipelines
     GenericTestUtils.waitFor(() -> {
       try {
         AllocatedBlock block = blockManager
-            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
+            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+                new ExcludeList());
         return !block.getPipeline().getId()
             .equals(allocatedBlock.getPipeline().getId());
       } catch (IOException e) {
@@ -227,7 +230,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
     GenericTestUtils.waitFor(() -> {
       try {
         blockManager
-            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
+            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+                new ExcludeList());
       } catch (IOException e) {
       }
       return verifyNumberOfContainersInPipelines(
@@ -250,7 +254,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
     GenericTestUtils.waitFor(() -> {
       try {
         blockManager
-            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner);
+            .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+                new ExcludeList());
       } catch (IOException e) {
       }
       return verifyNumberOfContainersInPipelines(
@@ -271,7 +276,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
     }
     Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
     Assert.assertNotNull(blockManager
-        .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner));
+        .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+            new ExcludeList()));
     Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
   }
 

+ 15 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java

@@ -19,17 +19,28 @@ package org.apache.hadoop.ozone.client;
 
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.*;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.RaftRetryFailureException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 
   private OzoneClientUtils() {}
 
+  private static final List<Class<? extends Exception>> EXCEPTION_LIST =
+      new ArrayList<Class<? extends Exception>>() {{
+        add(TimeoutException.class);
+        add(ContainerNotOpenException.class);
+        add(RaftRetryFailureException.class);
+        add(AlreadyClosedException.class);
+      }};
   /**
    * Returns a BucketInfo object constructed using fields of the input
    * OzoneBucket object.
@@ -110,4 +121,8 @@ public final class OzoneClientUtils {
     keyInfo.setFileEncryptionInfo(key.getFileEncryptionInfo());
     return keyInfo;
   }
+
+  public static List<Class<? extends Exception>> getExceptionList() {
+    return EXCEPTION_LIST;
+  }
 }

+ 11 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ChecksumType;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -32,6 +33,8 @@ import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
+import java.util.Collection;
+
 /**
  * Helper class used inside {@link BlockOutputStream}.
  * */
@@ -159,6 +162,14 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
+  Collection<DatanodeDetails> getFailedServers() throws IOException {
+    if (outputStream != null) {
+      BlockOutputStream out = (BlockOutputStream) this.outputStream;
+      return out.getFailedServers();
+    }
+    return null;
+  }
+
   long getWrittenDataLength() throws IOException {
     if (outputStream != null) {
       BlockOutputStream out = (BlockOutputStream) this.outputStream;

+ 85 - 49
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -21,20 +21,22 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ChecksumType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -46,7 +48,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
+import java.util.Collection;
 import java.util.ListIterator;
 import java.util.concurrent.TimeoutException;
 
@@ -84,7 +86,7 @@ public class KeyOutputStream extends OutputStream {
   private List<ByteBuffer> bufferList;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
-
+  private ExcludeList excludeList;
   /**
    * A constructor for testing purpose only.
    */
@@ -181,6 +183,7 @@ public class KeyOutputStream extends OutputStream {
     Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
     Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
     this.bufferList = new ArrayList<>();
+    this.excludeList = new ExcludeList();
   }
 
   /**
@@ -307,9 +310,8 @@ public class KeyOutputStream extends OutputStream {
           current.write(b, off, writeLen);
         }
       } catch (IOException ioe) {
-        boolean retryFailure = checkForRetryFailure(ioe);
-        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
-            || retryFailure) {
+        Throwable t = checkForException(ioe);
+        if (t != null) {
           // for the current iteration, totalDataWritten - currentPos gives the
           // amount of data already written to the buffer
 
@@ -321,7 +323,7 @@ public class KeyOutputStream extends OutputStream {
           writeLen = retry ? (int) len :
               (int) (current.getWrittenDataLength() - currentPos);
           LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, currentStreamIndex, retryFailure);
+          handleException(current, currentStreamIndex, t);
         } else {
           throw ioe;
         }
@@ -340,8 +342,10 @@ public class KeyOutputStream extends OutputStream {
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
    * @param containerID id of the closed container
+   * @param pipelineId id of the associated pipeline
    */
-  private void discardPreallocatedBlocks(long containerID) {
+  private void discardPreallocatedBlocks(long containerID,
+      PipelineID pipelineId) {
     // currentStreamIndex < streamEntries.size() signifies that, there are still
     // pre allocated blocks available.
     if (currentStreamIndex < streamEntries.size()) {
@@ -349,8 +353,10 @@ public class KeyOutputStream extends OutputStream {
           streamEntries.listIterator(currentStreamIndex);
       while (streamEntryIterator.hasNext()) {
         BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
-        if (streamEntry.getBlockID().getContainerID()
-            == containerID && streamEntry.getCurrentPosition() == 0) {
+        if (((pipelineId != null && streamEntry.getPipeline().getId()
+            .equals(pipelineId)) || (containerID != -1
+            && streamEntry.getBlockID().getContainerID() == containerID))
+            && streamEntry.getCurrentPosition() == 0) {
           streamEntryIterator.remove();
         }
       }
@@ -382,17 +388,39 @@ public class KeyOutputStream extends OutputStream {
    *
    * @param streamEntry StreamEntry
    * @param streamIndex Index of the entry
-   * @param retryFailure if true the xceiverClient needs to be invalidated in
-   *                     the client cache.
+   * @param exception actual exception that occurred
    * @throws IOException Throws IOException if Write fails
    */
   private void handleException(BlockOutputStreamEntry streamEntry,
-      int streamIndex, boolean retryFailure) throws IOException {
+      int streamIndex, Throwable exception) throws IOException {
+    boolean retryFailure = checkForRetryFailure(exception);
+    boolean closedContainerException = false;
+    if (!retryFailure) {
+      closedContainerException = checkIfContainerIsClosed(exception);
+    }
+    PipelineID pipelineId = null;
     long totalSuccessfulFlushedData =
         streamEntry.getTotalSuccessfulFlushedData();
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = computeBufferData();
+    LOG.warn("Encountered exception {}", exception);
+    LOG.info(
+        "The last committed block length is {}, uncommitted data length is {}",
+        totalSuccessfulFlushedData, bufferedDataLen);
+    Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
+    long containerId = streamEntry.getBlockID().getContainerID();
+    Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
+    Preconditions.checkNotNull(failedServers);
+    if (!failedServers.isEmpty()) {
+      excludeList.addDatanodes(failedServers);
+    }
+    if (checkIfContainerIsClosed(exception)) {
+      excludeList.addConatinerId(ContainerID.valueof(containerId));
+    } else if (retryFailure || exception instanceof TimeoutException) {
+      pipelineId = streamEntry.getPipeline().getId();
+      excludeList.addPipeline(pipelineId);
+    }
     // just clean up the current stream.
     streamEntry.cleanup(retryFailure);
     if (bufferedDataLen > 0) {
@@ -405,21 +433,21 @@ public class KeyOutputStream extends OutputStream {
       streamEntries.remove(streamIndex);
       currentStreamIndex -= 1;
     }
-    // discard subsequent pre allocated blocks from the streamEntries list
-    // from the closed container
-    discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID());
-  }
 
-  private boolean checkIfContainerIsClosed(IOException ioe) {
-    if (ioe.getCause() != null) {
-      return checkForException(ioe, ContainerNotOpenException.class) || Optional
-          .of(ioe.getCause())
-          .filter(e -> e instanceof StorageContainerException)
-          .map(e -> (StorageContainerException) e)
-          .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
-          .isPresent();
+    if (closedContainerException) {
+      // discard subsequent pre allocated blocks from the streamEntries list
+      // from the closed container
+      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+          null);
+    } else {
+      // In case there is timeoutException or Watch for commit happening over
+      // majority or the client connection failure to the leader in the
+      // pipeline, just discard all the preallocated blocks on this pipeline.
+      // Next block allocation will happen with excluding this specific pipeline
+      // This will ensure if 2 way commit happens , it cannot span over multiple
+      // blocks
+      discardPreallocatedBlocks(-1, pipelineId);
     }
-    return false;
   }
 
   /**
@@ -427,31 +455,27 @@ public class KeyOutputStream extends OutputStream {
    * In case of retry failure, ratis client throws RaftRetryFailureException
    * and all succeeding operations are failed with AlreadyClosedException.
    */
-  private boolean checkForRetryFailure(IOException ioe) {
-    return checkForException(ioe, RaftRetryFailureException.class,
-        AlreadyClosedException.class);
+  private boolean checkForRetryFailure(Throwable t) {
+    return t instanceof RaftRetryFailureException
+        || t instanceof AlreadyClosedException;
+  }
+
+  private boolean checkIfContainerIsClosed(Throwable t) {
+    return t instanceof ContainerNotOpenException;
   }
 
-  private boolean checkForException(IOException ioe, Class... classes) {
+  private Throwable checkForException(IOException ioe) {
     Throwable t = ioe.getCause();
     while (t != null) {
-      for (Class cls : classes) {
+      for (Class<? extends Exception> cls : OzoneClientUtils
+          .getExceptionList()) {
         if (cls.isInstance(t)) {
-          return true;
+          return t;
         }
       }
       t = t.getCause();
     }
-    return false;
-  }
-
-  private boolean checkIfTimeoutException(IOException ioe) {
-    if (ioe.getCause() != null) {
-      return Optional.of(ioe.getCause())
-          .filter(e -> e instanceof TimeoutException).isPresent();
-    } else {
-      return false;
-    }
+    return null;
   }
 
   private long getKeyLength() {
@@ -469,7 +493,8 @@ public class KeyOutputStream extends OutputStream {
    * @throws IOException
    */
   private void allocateNewBlock(int index) throws IOException {
-    OmKeyLocationInfo subKeyInfo = omClient.allocateBlock(keyArgs, openID);
+    OmKeyLocationInfo subKeyInfo =
+        omClient.allocateBlock(keyArgs, openID, excludeList);
     addKeyLocationInfo(subKeyInfo);
   }
 
@@ -495,19 +520,25 @@ public class KeyOutputStream extends OutputStream {
     BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
     if (entry != null) {
       try {
+        Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+
+        // failed servers can be null in case there is no data written in the
+        // stream
+        if (failedServers != null && !failedServers.isEmpty()) {
+          excludeList.addDatanodes(failedServers);
+        }
         if (close) {
           entry.close();
         } else {
           entry.flush();
         }
       } catch (IOException ioe) {
-        boolean retryFailure = checkForRetryFailure(ioe);
-        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
-            || retryFailure) {
+        Throwable t = checkForException(ioe);
+        if (t != null) {
           // This call will allocate a new streamEntry and write the Data.
           // Close needs to be retried on the newly allocated streamEntry as
           // as well.
-          handleException(entry, streamIndex, retryFailure);
+          handleException(entry, streamIndex, t);
           handleFlushOrClose(close);
         } else {
           throw ioe;
@@ -564,6 +595,11 @@ public class KeyOutputStream extends OutputStream {
     return feInfo;
   }
 
+  @VisibleForTesting
+  public ExcludeList getExcludeList() {
+    return excludeList;
+  }
+
   /**
    * Builder class of KeyOutputStream.
    */

+ 6 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.protocol;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAc
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+
 import org.apache.hadoop.security.KerberosInfo;
 
 /**
@@ -175,11 +177,13 @@ public interface OzoneManagerProtocol
    *
    * @param args the key to append
    * @param clientID the client identification
+   * @param excludeList List of datanodes/containers to exclude during block
+   *                    allocation
    * @return an allocated block
    * @throws IOException
    */
-  OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
-      throws IOException;
+  OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
+      ExcludeList excludeList) throws IOException;
 
   /**
    * Look up for the container of an existing key.

+ 5 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -674,8 +675,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
   }
 
   @Override
-  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId)
-      throws IOException {
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
+      ExcludeList excludeList) throws IOException {
     AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
     KeyArgs keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
@@ -684,6 +685,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setDataSize(args.getDataSize()).build();
     req.setKeyArgs(keyArgs);
     req.setClientID(clientId);
+    req.setExcludeList(excludeList.getProtoBuf());
+
 
     OMRequest omRequest = createOMRequest(Type.AllocateBlock)
         .setAllocateBlockRequest(req)

+ 1 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -611,6 +611,7 @@ message CommitKeyResponse {
 message AllocateBlockRequest {
     required KeyArgs keyArgs = 1;
     required uint64 clientID = 2;
+    optional hadoop.hdds.ExcludeListProto excludeList = 3;
 }
 
 message AllocateBlockResponse {

+ 22 - 64
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -24,9 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -43,8 +40,6 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -56,7 +51,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
@@ -232,22 +226,32 @@ public class TestCloseContainerHandlingByClient {
   public void testMultiBlockWrites2() throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
+        createKey(keyName, ReplicationType.RATIS, 2 * blockSize);
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) key.getOutputStream();
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    // With the initial size provided, it should have pre allocated 4 blocks
-    Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
+    // With the initial size provided, it should have pre allocated 2 blocks
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     String dataString =
         ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize));
     byte[] data = dataString.getBytes(UTF_8);
     key.write(data);
-    // 3 block are completely written to the DataNode in 3 blocks.
+    // 2 block are completely written to the DataNode in 3 blocks.
     // Data of length half of chunkSize resides in the chunkOutput stream buffer
     String dataString2 =
-        ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2);
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize);
     key.write(dataString2.getBytes(UTF_8));
+    key.flush();
+
+    String dataString3 =
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize);
+    key.write(dataString3.getBytes(UTF_8));
+    key.flush();
+
+    String dataString4 =
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2);
+    key.write(dataString4.getBytes(UTF_8));
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
@@ -260,15 +264,16 @@ public class TestCloseContainerHandlingByClient {
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
     // Though we have written only block initially, the close will hit
     // closeContainerException and remaining data in the chunkOutputStream
     // buffer will be copied into a different allocated block and will be
     // committed.
-    Assert.assertEquals(dataString.concat(dataString2).getBytes(UTF_8).length,
+
+    String dataCommitted =
+        dataString.concat(dataString2).concat(dataString3).concat(dataString4);
+    Assert.assertEquals(dataCommitted.getBytes(UTF_8).length,
         keyInfo.getDataSize());
-    validateData(keyName, dataString.concat(dataString2).getBytes(UTF_8));
+    validateData(keyName, dataCommitted.getBytes(UTF_8));
   }
 
   @Test
@@ -337,55 +342,8 @@ public class TestCloseContainerHandlingByClient {
       containerIdList.add(info.getContainerID());
     }
     Assert.assertTrue(!containerIdList.isEmpty());
-    waitForContainerClose(containerIdList.toArray(new Long[0]));
-  }
-
-  private void waitForContainerClose(Long... containerIdList)
-      throws ContainerNotFoundException, PipelineNotFoundException,
-      TimeoutException, InterruptedException {
-    List<Pipeline> pipelineList = new ArrayList<>();
-    for (long containerID : containerIdList) {
-      cluster.getStorageContainerManager().getEventQueue()
-          .fireEvent(SCMEvents.CLOSE_CONTAINER,
-              ContainerID.valueof(containerID));
-      ContainerInfo container =
-          cluster.getStorageContainerManager().getContainerManager()
-              .getContainer(ContainerID.valueof(containerID));
-      Pipeline pipeline =
-          cluster.getStorageContainerManager().getPipelineManager()
-              .getPipeline(container.getPipelineID());
-      pipelineList.add(pipeline);
-      List<DatanodeDetails> datanodes = pipeline.getNodes();
-      for (DatanodeDetails details : datanodes) {
-        Assert.assertFalse(ContainerTestHelper
-            .isContainerClosed(cluster, containerID, details));
-        // send the order to close the container
-        cluster.getStorageContainerManager().getScmNodeManager()
-            .addDatanodeCommand(details.getUuid(),
-                new CloseContainerCommand(containerID, pipeline.getId()));
-      }
-    }
-    int index = 0;
-    for (long containerID : containerIdList) {
-      Pipeline pipeline = pipelineList.get(index);
-      List<DatanodeDetails> datanodes = pipeline.getNodes();
-      // Below condition avoids the case where container has been allocated
-      // but not yet been used by the client. In such a case container is never
-      // created.
-      if (datanodes.stream().anyMatch(dn -> ContainerTestHelper
-          .isContainerPresent(cluster, containerID, dn))) {
-        for (DatanodeDetails datanodeDetails : datanodes) {
-          GenericTestUtils.waitFor(() -> ContainerTestHelper
-                  .isContainerClosed(cluster, containerID, datanodeDetails),
-              500, 15 * 1000);
-          //double check if it's really closed
-          // (waitFor also throws an exception)
-          Assert.assertTrue(ContainerTestHelper
-              .isContainerClosed(cluster, containerID, datanodeDetails));
-        }
-      }
-      index++;
-    }
+    ContainerTestHelper
+        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
   }
 
   @Ignore // test needs to be fixed after close container is handled for

+ 250 - 16
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java

@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -36,12 +37,11 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -54,16 +54,16 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTER
  */
 public class TestFailureHandlingByClient {
 
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf;
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
-  private static int maxRetries;
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf;
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private int chunkSize;
+  private int blockSize;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+  private int maxRetries;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -72,8 +72,7 @@ public class TestFailureHandlingByClient {
    *
    * @throws IOException
    */
-  @Before
-  public void init() throws Exception {
+  private void init() throws Exception {
     conf = new OzoneConfiguration();
     maxRetries = 100;
     chunkSize = (int) OzoneConsts.MB;
@@ -101,11 +100,14 @@ public class TestFailureHandlingByClient {
     objectStore.getVolume(volumeName).createBucket(bucketName);
   }
 
+  private void startCluster() throws Exception {
+    init();
+  }
+
   /**
    * Shutdown MiniDFSCluster.
    */
-  @After
-  public void shutdown() {
+  private void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -113,6 +115,7 @@ public class TestFailureHandlingByClient {
 
   @Test
   public void testBlockWritesWithDnFailures() throws Exception {
+    startCluster();
     String keyName = UUID.randomUUID().toString();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     byte[] data =
@@ -148,10 +151,12 @@ public class TestFailureHandlingByClient {
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
     Assert.assertEquals(data.length, keyInfo.getDataSize());
     validateData(keyName, data);
+    shutdown();
   }
 
   @Test
   public void testMultiBlockWritesWithDnFailures() throws Exception {
+    startCluster();
     String keyName = "ratis3";
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     String data =
@@ -188,11 +193,13 @@ public class TestFailureHandlingByClient {
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
     Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
     validateData(keyName, data.concat(data).getBytes());
+    shutdown();
   }
 
   @Test
   public void testMultiBlockWritesWithIntermittentDnFailures()
       throws Exception {
+    startCluster();
     String keyName = UUID.randomUUID().toString();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 6 * blockSize);
@@ -232,8 +239,235 @@ public class TestFailureHandlingByClient {
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
     Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
     validateData(keyName, data.concat(data).concat(data).getBytes());
+    shutdown();
+  }
+
+  @Test
+  public void testWriteSmallFile() throws Exception {
+    startCluster();
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 0);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString,  chunkSize/2);
+    key.write(data.getBytes());
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+    long containerId = locationInfoList.get(0).getContainerID();
+    BlockID blockId = locationInfoList.get(0).getBlockID();
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+    cluster.shutdownHddsDatanode(datanodes.get(1));
+    key.close();
+    // this will throw AlreadyClosedException and and current stream
+    // will be discarded and write a new block
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    // Make sure a new block is written
+    Assert.assertNotEquals(
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+            .getBlockID(), blockId);
+    Assert.assertEquals(data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.getBytes());
+    shutdown();
+  }
+
+
+  @Test
+  public void testContainerExclusionWithClosedContainerException()
+      throws Exception {
+    startCluster();
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, blockSize);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString,  chunkSize);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+
+    // Assert that 1 block will be preallocated
+    Assert.assertEquals(1, locationInfoList.size());
+    key.write(data.getBytes());
+    key.flush();
+    long containerId = locationInfoList.get(0).getContainerID();
+    BlockID blockId = locationInfoList.get(0).getBlockID();
+    List<Long> containerIdList = new ArrayList<>();
+    containerIdList.add(containerId);
+
+    // below check will assert if the container does not get closed
+    ContainerTestHelper
+        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+
+    // This write will hit ClosedContainerException and this container should
+    // will be added in the excludelist
+    key.write(data.getBytes());
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.getExcludeList().getContainerIds()
+        .contains(ContainerID.valueof(containerId)));
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getPipelineIds().isEmpty());
+
+    // The close will just write to the buffer
+    key.close();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    // Make sure a new block is written
+    Assert.assertNotEquals(
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+            .getBlockID(), blockId);
+    Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).getBytes());
+    shutdown();
   }
 
+  @Test
+  public void testDatanodeExclusionWithMajorityCommit() throws Exception {
+    startCluster();
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, blockSize);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString,  chunkSize);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+
+    // Assert that 1 block will be preallocated
+    Assert.assertEquals(1, locationInfoList.size());
+    key.write(data.getBytes());
+    key.flush();
+    long containerId = locationInfoList.get(0).getContainerID();
+    BlockID blockId = locationInfoList.get(0).getBlockID();
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+    // shutdown 1 datanode. This will make sure the 2 way commit happens for
+    // next write ops.
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+
+    key.write(data.getBytes());
+    key.write(data.getBytes());
+    // The close will just write to the buffer
+    key.close();
+    Assert.assertTrue(keyOutputStream.getExcludeList().getDatanodes()
+        .contains(datanodes.get(0)));
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getContainerIds().isEmpty());
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getPipelineIds().isEmpty());
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    // Make sure a new block is written
+    Assert.assertNotEquals(
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+            .getBlockID(), blockId);
+    Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).concat(data).getBytes());
+    shutdown();
+  }
+
+
+  @Test
+  public void testPipelineExclusionWithPipelineFailure() throws Exception {
+    startCluster();
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, blockSize);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString,  chunkSize);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+
+    // Assert that 1 block will be preallocated
+    Assert.assertEquals(1, locationInfoList.size());
+    key.write(data.getBytes());
+    key.flush();
+    long containerId = locationInfoList.get(0).getContainerID();
+    BlockID blockId = locationInfoList.get(0).getBlockID();
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+    // Two nodes, next write will hit AlraedyClosedException , the pipeline
+    // will be added in the exclude list
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+    cluster.shutdownHddsDatanode(datanodes.get(1));
+
+    key.write(data.getBytes());
+    key.write(data.getBytes());
+    // The close will just write to the buffer
+    key.close();
+    Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
+        .contains(pipeline.getId()));
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getContainerIds().isEmpty());
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    // Make sure a new block is written
+    Assert.assertNotEquals(
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+            .getBlockID(), blockId);
+    Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).concat(data).getBytes());
+    shutdown();
+  }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {

+ 60 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -44,8 +45,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -62,6 +68,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -697,4 +704,57 @@ public final class ContainerTestHelper {
   public static String getFixedLengthString(String string, int length) {
     return String.format("%1$" + length + "s", string);
   }
+
+  public static void waitForContainerClose(MiniOzoneCluster cluster,
+      Long... containerIdList)
+      throws ContainerNotFoundException, PipelineNotFoundException,
+      TimeoutException, InterruptedException {
+    List<Pipeline> pipelineList = new ArrayList<>();
+    for (long containerID : containerIdList) {
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueof(containerID));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      pipelineList.add(pipeline);
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      for (DatanodeDetails details : datanodes) {
+        // Client will issue write chunk and it will create the container on
+        // datanodes.
+        // wait for the container to be created
+        GenericTestUtils
+            .waitFor(() -> isContainerPresent(cluster, containerID, details),
+                500, 100 * 1000);
+        Assert.assertTrue(isContainerPresent(cluster, containerID, details));
+
+        // make sure the container gets created first
+        Assert.assertFalse(ContainerTestHelper
+            .isContainerClosed(cluster, containerID, details));
+        // send the order to close the container
+        cluster.getStorageContainerManager().getEventQueue()
+            .fireEvent(SCMEvents.CLOSE_CONTAINER,
+                ContainerID.valueof(containerID));
+      }
+    }
+    int index = 0;
+    for (long containerID : containerIdList) {
+      Pipeline pipeline = pipelineList.get(index);
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+      // Below condition avoids the case where container has been allocated
+      // but not yet been used by the client. In such a case container is never
+      // created.
+      for (DatanodeDetails datanodeDetails : datanodes) {
+        GenericTestUtils.waitFor(() -> ContainerTestHelper
+                .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+            15 * 1000);
+        //double check if it's really closed
+        // (waitFor also throws an exception)
+        Assert.assertTrue(ContainerTestHelper
+            .isContainerClosed(cluster, containerID, datanodeDetails));
+      }
+      index++;
+    }
+  }
 }

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -148,7 +149,7 @@ public class TestOmBlockVersioning {
 
     // this block will be appended to the latest version of version 2.
     OmKeyLocationInfo locationInfo =
-        ozoneManager.allocateBlock(keyArgs, openKey.getId());
+        ozoneManager.allocateBlock(keyArgs, openKey.getId(), new ExcludeList());
     List<OmKeyLocationInfo> locationInfoList =
         openKey.getKeyInfo().getLatestVersionLocations()
             .getBlocksLatestVersionOnly();

+ 5 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -67,11 +68,13 @@ public interface KeyManager {
    *
    * @param args the key to append
    * @param clientID the client requesting block.
+   * @param excludeList List of datanodes/containers to exclude during block
+   *                    allocation.
    * @return the reference to the new block.
    * @throws IOException
    */
-  OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
-      throws IOException;
+  OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
+      ExcludeList excludeList) throws IOException;
   /**
    * Given the args of a key to put, write an open key entry to meta data.
    *

+ 6 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -220,7 +221,8 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
+      ExcludeList excludeList)
       throws IOException {
     Preconditions.checkNotNull(args);
     String volumeName = args.getVolumeName();
@@ -242,7 +244,7 @@ public class KeyManagerImpl implements KeyManager {
     try {
       allocatedBlock =
           scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
-              keyInfo.getFactor(), omId);
+              keyInfo.getFactor(), omId, excludeList);
     } catch (SCMException ex) {
       if (ex.getResult()
           .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
@@ -390,7 +392,8 @@ public class KeyManagerImpl implements KeyManager {
         AllocatedBlock allocatedBlock;
         try {
           allocatedBlock = scmBlockClient
-              .allocateBlock(allocateSize, type, factor, omId);
+              .allocateBlock(allocateSize, type, factor, omId,
+                  new ExcludeList());
         } catch (IOException ex) {
           if (ex instanceof SCMException) {
             if (((SCMException) ex).getResult()

+ 4 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
@@ -1867,7 +1868,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @Override
-  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
+      ExcludeList excludeList)
       throws IOException {
     if(isAclEnabled) {
       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
@@ -1879,7 +1881,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
     try {
       metrics.incNumBlockAllocateCalls();
-      return keyManager.allocateBlock(args, clientID);
+      return keyManager.allocateBlock(args, clientID, excludeList);
     } catch (Exception ex) {
       metrics.incNumBlockAllocateCallFails();
       auditSuccess = false;

+ 4 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -24,6 +24,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@@ -626,8 +627,9 @@ public class OzoneManagerRequestHandler {
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
         .build();
-    OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs,
-        request.getClientID());
+    OmKeyLocationInfo newLocation =
+        impl.allocateBlock(omKeyArgs, request.getClientID(),
+            ExcludeList.getFromProtoBuf(request.getExcludeList()));
     resp.setKeyLocation(newLocation.getProtobuf());
 
     return resp.build();

+ 3 - 1
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -108,13 +109,14 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
    * @param type Replication Type
    * @param factor - Replication factor
    * @param owner - String owner.
+   * @param excludeList list of dns/pipelines to exclude
    * @return
    * @throws IOException
    */
   @Override
   public AllocatedBlock allocateBlock(long size,
       HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
-      String owner) throws IOException {
+      String owner, ExcludeList excludeList) throws IOException {
     DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
     Pipeline pipeline = createPipeline(datanodeDetails);
     long containerID = Time.monotonicNow();

+ 3 - 1
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java

@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -196,7 +197,8 @@ public class TestKeyDeletingService {
       //Open, Commit and Delete the Keys in the Key Manager.
       OpenKeySession session = keyManager.openKey(arg);
       for (int i = 0; i < numBlocks; i++) {
-        arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId()));
+        arg.addLocationInfo(
+            keyManager.allocateBlock(arg, session.getId(), new ExcludeList()));
       }
       keyManager.commitKey(arg, session.getId());
       keyManager.deleteKey(arg);

+ 4 - 2
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -91,7 +92,8 @@ public class TestKeyManagerImpl {
   private void setupMocks() throws Exception {
     Mockito.when(scmBlockLocationProtocol
         .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
-            Mockito.any(ReplicationFactor.class), Mockito.anyString()))
+            Mockito.any(ReplicationFactor.class), Mockito.anyString(),
+            Mockito.any(ExcludeList.class)))
         .thenThrow(
             new SCMException("ChillModePrecheck failed for allocateBlock",
                 ResultCodes.CHILL_MODE_EXCEPTION));
@@ -180,7 +182,7 @@ public class TestKeyManagerImpl {
         .setVolumeName(VOLUME_NAME).build();
     LambdaTestUtils.intercept(OMException.class,
         "ChillModePrecheck failed for allocateBlock", () -> {
-          keyManager.allocateBlock(keyArgs, 1);
+          keyManager.allocateBlock(keyArgs, 1, new ExcludeList());
         });
   }
 

+ 2 - 1
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkBlockManager.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -162,6 +163,6 @@ public class BenchMarkBlockManager {
       Blackhole bh) throws IOException {
     state.blockManager
         .allocateBlock(50, ReplicationType.RATIS, ReplicationFactor.THREE,
-            "Genesis");
+            "Genesis", new ExcludeList());
   }
 }

+ 4 - 2
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.scm;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -149,7 +150,7 @@ public class TestContainerSQLCli {
     }
     assertEquals(2, nodeManager.getAllNodes().size());
     AllocatedBlock ab1 = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE, type,
-        factor, CONTAINER_OWNER);
+        factor, CONTAINER_OWNER, new ExcludeList());
     blockContainerMap.put(ab1.getBlockID().getLocalID(),
         ab1.getBlockID().getContainerID());
 
@@ -162,7 +163,8 @@ public class TestContainerSQLCli {
     // the size of blockContainerMap will vary each time the test is run.
     while (true) {
       ab2 = blockManager
-          .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, CONTAINER_OWNER);
+          .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, CONTAINER_OWNER,
+              new ExcludeList());
       blockContainerMap.put(ab2.getBlockID().getLocalID(),
           ab2.getBlockID().getContainerID());
       if (ab1.getBlockID().getContainerID() !=