Преглед на файлове

HDDS-476. Add Pipeline reports to make pipeline active on SCM restart.
Contributed by Mukul Kumar Singh.

Nanda kumar преди 6 години
родител
ревизия
c0956ee2a8
променени са 51 файла, в които са добавени 809 реда и са изтрити 476 реда
  1. 1 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  2. 5 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
  3. 17 5
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
  4. 11 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java
  5. 8 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  6. 21 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
  7. 73 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
  8. 4 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
  9. 6 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
  10. 16 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
  11. 9 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
  12. 29 34
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  13. 12 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  14. 7 3
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  15. 5 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
  16. 5 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
  17. 4 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
  18. 10 0
      hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
  19. 6 2
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
  20. 0 19
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
  21. 3 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
  22. 2 13
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
  23. 12 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
  24. 4 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
  25. 5 14
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
  26. 10 113
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java
  27. 162 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
  28. 57 48
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java
  29. 6 39
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
  30. 19 5
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
  31. 20 32
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
  32. 59 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java
  33. 58 45
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
  34. 13 28
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
  35. 16 28
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
  36. 23 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
  37. 13 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
  38. 8 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  39. 9 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
  40. 3 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
  41. 4 2
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java
  42. 14 14
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java
  43. 4 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
  44. 4 1
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
  45. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
  46. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
  47. 19 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
  48. 9 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
  49. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
  50. 0 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
  51. 1 1
      hadoop-project/pom.xml

+ 1 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -110,7 +110,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
     LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
     callRatisRpc(pipeline.getMachines(), (raftClient, peer) -> raftClient
-        .groupRemove(group.getGroupId(), peer.getId()));
+        .groupRemove(group.getGroupId(), true, peer.getId()));
   }
 
   /**

+ 5 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

@@ -46,6 +46,11 @@ public final class HddsConfigKeys {
   public static final String HDDS_CONTAINER_REPORT_INTERVAL_DEFAULT =
       "60s";
 
+  public static final String HDDS_PIPELINE_REPORT_INTERVAL =
+          "hdds.pipeline.report.interval";
+  public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
+          "60s";
+
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =

+ 17 - 5
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
 
 /**
@@ -83,7 +83,7 @@ public class Pipeline {
     this.type = replicationType;
     this.factor = replicationFactor;
     this.id = id;
-    datanodes = new TreeMap<>();
+    datanodes = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -151,9 +151,21 @@ public class Pipeline {
     return getDatanodes().get(leaderID);
   }
 
-  public void addMember(DatanodeDetails datanodeDetails) {
-    datanodes.put(datanodeDetails.getUuid().toString(),
-        datanodeDetails);
+  /**
+   * Adds a datanode to pipeline
+   * @param datanodeDetails datanode to be added.
+   * @return true if the dn was not earlier present, false otherwise
+   */
+  public boolean addMember(DatanodeDetails datanodeDetails) {
+    return datanodes.put(datanodeDetails.getUuid().toString(),
+        datanodeDetails) == null;
+
+  }
+
+  public void resetPipeline() {
+    // reset datanodes in pipeline and learn about them through
+    // pipeline reports on SCM restart
+    datanodes.clear();
   }
 
   public Map<String, DatanodeDetails> getDatanodes() {

+ 11 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineID.java

@@ -28,7 +28,7 @@ import java.util.UUID;
  * in Ratis as RaftGroupId, GroupID is used by the datanodes to initialize
  * the ratis group they are part of.
  */
-public class PipelineID {
+public final class PipelineID implements Comparable<PipelineID> {
 
   private UUID id;
   private RaftGroupId groupId;
@@ -42,8 +42,12 @@ public class PipelineID {
     return new PipelineID(UUID.randomUUID());
   }
 
+  public static PipelineID valueOf(UUID id) {
+    return new PipelineID(id);
+  }
+
   public static PipelineID valueOf(RaftGroupId groupId) {
-    return new PipelineID(groupId.getUuid());
+    return valueOf(groupId.getUuid());
   }
 
   public RaftGroupId getRaftGroupID() {
@@ -67,6 +71,11 @@ public class PipelineID {
     return "pipelineId=" + id;
   }
 
+  @Override
+  public int compareTo(PipelineID o) {
+    return this.id.compareTo(o.id);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {

+ 8 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -224,6 +224,14 @@
       received from SCM to SCM. Unit could be defined with postfix
       (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>hdds.pipeline.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, PIPELINE, MANAGEMENT</tag>
+    <description>Time interval of the datanode to send pipeline report. Each
+      datanode periodically send pipeline report to SCM. Unit could be
+      defined with postfix (ns,ms,s,m,h,d)</description>
+  </property>
   <!--Ozone Settings-->
   <property>
     <name>ozone.administrators</name>

+ 21 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java

@@ -18,12 +18,15 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
@@ -312,4 +315,22 @@ public final class HddsServerUtil {
     services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
     return services;
   }
+
+  public static String getOzoneDatanodeRatisDirectory(Configuration conf) {
+    final String ratisDir = File.separator + "ratis";
+    String storageDir = conf.get(
+            OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = conf.get(OzoneConfigKeys
+              .OZONE_METADATA_DIRS);
+      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
+              "cannot be null, Please check your configs.");
+      storageDir = storageDir.concat(ratisDir);
+      LOG.warn("Storage directory for Ratis is not configured." +
+               "Mapping Ratis storage under {}. It is a good idea " +
+               "to map this to an SSD disk.", storageDir);
+    }
+    return storageDir;
+  }
 }

+ 73 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes Pipeline which will be sent to SCM as part of heartbeat.
+ * PipelineReport consist of the following information about each containers:
+ *   - pipelineID
+ *
+ */
+public class PipelineReportPublisher extends
+    ReportPublisher<PipelineReportsProto> {
+
+  private Long pipelineReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (pipelineReportInterval == null) {
+      pipelineReportInterval = getConf().getTimeDuration(
+          HDDS_PIPELINE_REPORT_INTERVAL,
+          HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+          getConf());
+
+      Preconditions.checkState(
+          heartbeatFrequency <= pipelineReportInterval,
+              HDDS_PIPELINE_REPORT_INTERVAL +
+              " cannot be configured lower than heartbeat frequency.");
+    }
+    // Add a random delay (0~30s) on top of the pipeline report
+    // interval (60s) so tha the SCM is overwhelmed by the pipeline reports
+    // sent in sync.
+    return pipelineReportInterval + getRandomReportDelay();
+  }
+
+  private long getRandomReportDelay() {
+    return RandomUtils.nextLong(0, pipelineReportInterval);
+  }
+
+  @Override
+  protected PipelineReportsProto getReport() {
+    return getContext().getParent().getContainer().getPipelineReport();
+  }
+}

+ 4 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.
+        StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -53,6 +55,8 @@ public class ReportPublisherFactory {
         ContainerReportPublisher.class);
     report2publisher.put(CommandStatusReportsProto.class,
         CommandStatusReportPublisher.class);
+    report2publisher.put(PipelineReportsProto.class,
+            PipelineReportPublisher.class);
   }
 
   /**

+ 6 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java

@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.hdds.protocol.proto
@@ -108,13 +110,15 @@ public final class RegisterEndpointTask implements
     rpcEndPoint.lock();
     try {
 
-      ContainerReportsProto contianerReport = datanodeContainerManager
+      ContainerReportsProto containerReport = datanodeContainerManager
           .getContainerReport();
       NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
+      PipelineReportsProto pipelineReportsProto =
+              datanodeContainerManager.getPipelineReport();
       // TODO : Add responses to the command Queue.
       SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
           .register(datanodeDetails.getProtoBufMessage(), nodeReport,
-              contianerReport);
+                  containerReport, pipelineReportsProto);
       Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
               .equals(datanodeDetails.getUuid()),
           "Unexpected datanode ID in the response.");

+ 16 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 
@@ -38,6 +41,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 /**
  * Creates a Grpc server endpoint that acts as the communication layer for
@@ -47,6 +53,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
   private static final Logger
       LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
   private int port;
+  private UUID id;
   private Server server;
   private final ContainerDispatcher storageContainer;
 
@@ -59,6 +66,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
       ContainerDispatcher dispatcher, BindableService... additionalServices) {
     Preconditions.checkNotNull(conf);
 
+    this.id = datanodeDetails.getUuid();
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     // Get an available port on current node and
@@ -123,4 +131,12 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
       HddsProtos.PipelineID pipelineID) {
     storageContainer.dispatch(request);
   }
+
+  @Override
+  public List<PipelineReport> getPipelineReport() {
+    return Collections.singletonList(
+            PipelineReport.newBuilder()
+                    .setPipelineID(PipelineID.valueOf(id).getProtobuf())
+                    .build());
+  }
 }

+ 9 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java

@@ -21,8 +21,11 @@ package org.apache.hadoop.ozone.container.common.transport.server;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
  * containers. */
@@ -49,4 +52,10 @@ public interface XceiverServerSpi {
   void submitRequest(ContainerCommandRequestProto request,
       HddsProtos.PipelineID pipelineID)
       throws IOException;
+
+  /**
+   * Get pipeline report for the XceiverServer instance.
+   * @return list of report for each pipeline.
+   */
+  List<PipelineReport> getPipelineReport();
 }

+ 29 - 34
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -19,17 +19,18 @@
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -68,6 +69,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -96,12 +99,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final ReplicationLevel replicationLevel;
   private long nodeFailureTimeoutMs;
 
-  private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
+  private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext context)
       throws IOException {
     Objects.requireNonNull(dd, "id == null");
     this.port = port;
-    RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+    RaftProperties serverProperties = newRaftProperties(conf);
     final int numWriteChunkThreads = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
@@ -118,15 +121,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         new ContainerStateMachine(dispatcher, chunkExecutor, this);
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setGroup(RatisHelper.emptyRaftGroup())
         .setProperties(serverProperties)
         .setStateMachine(stateMachine)
         .build();
   }
 
 
-  private RaftProperties newRaftProperties(Configuration conf,
-      String storageDir) {
+  private RaftProperties newRaftProperties(Configuration conf) {
     final RaftProperties properties = new RaftProperties();
 
     // Set rpc type
@@ -235,6 +236,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
 
     // Set the ratis storage directory
+    String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
     RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
 
     // For grpc set the maximum message size
@@ -253,23 +255,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public static XceiverServerRatis newXceiverServerRatis(
       DatanodeDetails datanodeDetails, Configuration ozoneConf,
       ContainerDispatcher dispatcher, StateContext context) throws IOException {
-    final String ratisDir = File.separator + "ratis";
     int localPort = ozoneConf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
-    String storageDir = ozoneConf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
-
-    if (Strings.isNullOrEmpty(storageDir)) {
-      storageDir = ozoneConf.get(OzoneConfigKeys
-          .OZONE_METADATA_DIRS);
-      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
-          "cannot be null, Please check your configs.");
-      storageDir = storageDir.concat(ratisDir);
-      LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
-              "storage under {}. It is a good idea to map this to an SSD disk.",
-          storageDir);
-    }
 
     // Get an available port on current node and
     // use that as the container port
@@ -282,13 +270,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         socket.bind(address);
         localPort = socket.getLocalPort();
         LOG.info("Found a free port for the server : {}", localPort);
-        // If we have random local ports configured this means that it
-        // probably running under MiniOzoneCluster. Ratis locks the storage
-        // directories, so we need to pass different local directory for each
-        // local instance. So we map ratis directories under datanode ID.
-        storageDir =
-            storageDir.concat(File.separator +
-                datanodeDetails.getUuidString());
       } catch (IOException e) {
         LOG.error("Unable find a random free port for the server, "
             + "fallback to use default port {}", localPort, e);
@@ -296,7 +277,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     }
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
-    return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
+    return new XceiverServerRatis(datanodeDetails, localPort,
         dispatcher, ozoneConf, context);
   }
 
@@ -363,7 +344,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public void submitRequest(
       ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
       throws IOException {
-    // ReplicationLevel.ALL ensures the transactions corresponding to
+    // ReplicationLevel.MAJORITY ensures the transactions corresponding to
     // the request here are applied on all the raft servers.
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,
@@ -427,13 +408,27 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             + ".Reason : " + action.getClosePipeline().getDetailedReason());
   }
 
-  void handleNodeSlowness(
-      RaftGroup group, RoleInfoProto roleInfoProto) {
+  @Override
+  public List<PipelineReport> getPipelineReport() {
+    try {
+      Iterable<RaftGroupId> gids = server.getGroupIds();
+      List<PipelineReport> reports = new ArrayList<>();
+      for (RaftGroupId groupId : gids) {
+        reports.add(PipelineReport.newBuilder()
+                .setPipelineID(PipelineID.valueOf(groupId).getProtobuf())
+                .build());
+      }
+      return reports;
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(group.getGroupId(), roleInfoProto);
   }
 
-  void handleNoLeader(
-      RaftGroup group, RoleInfoProto roleInfoProto) {
+  void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(group.getGroupId(), roleInfoProto);
   }
 }

+ 12 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -164,6 +166,16 @@ public class OzoneContainer {
     return this.containerSet.getContainerReport();
   }
 
+  public PipelineReportsProto getPipelineReport() {
+    PipelineReportsProto.Builder pipelineReportsProto =
+            PipelineReportsProto.newBuilder();
+    for (XceiverServerSpi serverInstance : server) {
+      pipelineReportsProto
+              .addAllPipelineReport(serverInstance.getPipelineReport());
+    }
+    return pipelineReportsProto.build();
+  }
+
   /**
    * Submit ContainerRequest.
    * @param request

+ 7 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -69,9 +71,11 @@ public interface StorageContainerDatanodeProtocol {
    * @param containerReportsRequestProto - Container Reports.
    * @return SCM Command.
    */
-  SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails,
-      NodeReportProto nodeReport, ContainerReportsProto
-      containerReportsRequestProto) throws IOException;
+  SCMRegisteredResponseProto register(
+          DatanodeDetailsProto datanodeDetails,
+          NodeReportProto nodeReport,
+          ContainerReportsProto containerReportsRequestProto,
+          PipelineReportsProto pipelineReports) throws IOException;
 
   /**
    * Used by datanode to send block deletion ACK to SCM.

+ 5 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -51,10 +53,12 @@ public interface StorageContainerNodeProtocol {
    * Register the node if the node finds that it is not registered with any SCM.
    * @param datanodeDetails DatanodeDetails
    * @param nodeReport NodeReportProto
+   * @param pipelineReport PipelineReportsProto
    * @return  SCMHeartbeatResponseProto
    */
   RegisteredCommand register(DatanodeDetails datanodeDetails,
-                             NodeReportProto nodeReport);
+                             NodeReportProto nodeReport,
+                             PipelineReportsProto pipelineReport);
 
   /**
    * Send heartbeat to indicate the datanode is alive and doing well.

+ 5 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -149,12 +151,14 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
   @Override
   public SCMRegisteredResponseProto register(
       DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
-      ContainerReportsProto containerReportsRequestProto)
+      ContainerReportsProto containerReportsRequestProto,
+      PipelineReportsProto pipelineReportsProto)
       throws IOException {
     SCMRegisterRequestProto.Builder req =
         SCMRegisterRequestProto.newBuilder();
     req.setDatanodeDetails(datanodeDetailsProto);
     req.setContainerReport(containerReportsRequestProto);
+    req.setPipelineReports(pipelineReportsProto);
     req.setNodeReport(nodeReport);
     final SCMRegisteredResponseProto response;
     try {

+ 4 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.protocolPB;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -76,8 +78,9 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       ContainerReportsProto containerRequestProto = request
           .getContainerReport();
       NodeReportProto dnNodeReport = request.getNodeReport();
+      PipelineReportsProto pipelineReport = request.getPipelineReports();
       return impl.register(request.getDatanodeDetails(), dnNodeReport,
-          containerRequestProto);
+          containerRequestProto, pipelineReport);
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 10 - 0
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -52,6 +52,7 @@ message SCMRegisterRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   required NodeReportProto nodeReport = 2;
   required ContainerReportsProto containerReport = 3;
+  required PipelineReportsProto pipelineReports = 4;
 }
 
 /**
@@ -82,6 +83,7 @@ message SCMHeartbeatRequestProto {
   optional CommandStatusReportsProto commandStatusReport = 4;
   optional ContainerActionsProto containerActions = 5;
   optional PipelineActionsProto pipelineActions = 6;
+  optional PipelineReportsProto pipelineReports = 7;
 }
 
 /*
@@ -163,6 +165,14 @@ message ContainerAction {
   optional Reason reason = 3;
 }
 
+message PipelineReport {
+  required PipelineID pipelineID = 1;
+}
+
+message PipelineReportsProto {
+  repeated PipelineReport pipelineReport = 1;
+}
+
 message PipelineActionsProto {
   repeated PipelineAction pipelineActions = 1;
 }

+ 6 - 2
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java

@@ -18,6 +18,10 @@ package org.apache.hadoop.ozone.container.common;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.VersionInfo;
@@ -214,8 +218,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   public StorageContainerDatanodeProtocolProtos
       .SCMRegisteredResponseProto register(
           DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport,
-          StorageContainerDatanodeProtocolProtos.ContainerReportsProto
-              containerReportsRequestProto)
+          ContainerReportsProto containerReportsRequestProto,
+          PipelineReportsProto pipelineReportsProto)
       throws IOException {
     rpcCount.incrementAndGet();
     updateNodeReport(datanodeDetailsProto, nodeReport);

+ 0 - 19
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java

@@ -466,24 +466,6 @@ public class ContainerMapping implements Mapping {
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
-  public void handlePipelineClose(PipelineID pipelineID) {
-    try {
-      Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
-      if (pipeline != null) {
-        pipelineSelector.finalizePipeline(pipeline);
-      } else {
-        LOG.debug("pipeline:{} not found", pipelineID);
-      }
-    } catch (Exception e) {
-      LOG.info("failed to close pipeline:{}", pipelineID, e);
-    }
-  }
-
-  public Set<PipelineID> getPipelineOnDatanode(
-      DatanodeDetails datanodeDetails) {
-    return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
-  }
-
   /**
    * Process container report from Datanode.
    * <p>
@@ -710,7 +692,6 @@ public class ContainerMapping implements Mapping {
     return containerStore;
   }
 
-  @VisibleForTesting
   public PipelineSelector getPipelineSelector() {
     return pipelineSelector;
   }

+ 3 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java

@@ -89,20 +89,20 @@ public class ContainerReportHandler implements
           .map(ContainerID::new)
           .collect(Collectors.toSet());
 
-      ReportResult reportResult = node2ContainerMap
+      ReportResult<ContainerID> reportResult = node2ContainerMap
           .processReport(datanodeOrigin.getUuid(), containerIds);
 
       //we have the report, so we can update the states for the next iteration.
       node2ContainerMap
           .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
 
-      for (ContainerID containerID : reportResult.getMissingContainers()) {
+      for (ContainerID containerID : reportResult.getMissingEntries()) {
         containerStateManager
             .removeContainerReplica(containerID, datanodeOrigin);
         checkReplicationState(containerID, publisher);
       }
 
-      for (ContainerID containerID : reportResult.getNewContainers()) {
+      for (ContainerID containerID : reportResult.getNewEntries()) {
         containerStateManager.addContainerReplica(containerID, datanodeOrigin);
         checkReplicationState(containerID, publisher);
       }

+ 2 - 13
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java

@@ -25,13 +25,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -138,15 +137,5 @@ public interface Mapping extends Closeable {
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) throws IOException;
 
-  /**
-   * Handle a pipeline close event.
-   * @param pipelineID pipeline id
-   */
-  void handlePipelineClose(PipelineID pipelineID);
-
-  /**
-   * Get set of pipeline for a specific datanode.
-   * @param datanodeDetails datanode for which pipelines needs to be fetched.
-   */
-  Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
+  PipelineSelector getPipelineSelector();
 }

+ 12 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -27,9 +27,12 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .DeleteBlockCommandStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+        .CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+        .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -72,8 +75,7 @@ public final class SCMEvents {
 
   /**
    * ContainerReports are send out by Datanodes. This report is received by
-   * SCMDatanodeHeartbeatDispatcher and Container_Report Event
-   * isTestSCMDatanodeHeartbeatDispatcher generated.
+   * SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated.
    */
   public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
       new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
@@ -86,6 +88,13 @@ public final class SCMEvents {
       CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class,
       "Container_Actions");
 
+  /**
+   * PipelineReports are send out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and Pipeline_Report Event is generated.
+   */
+  public static final TypedEvent<PipelineReportFromDatanode> PIPELINE_REPORT =
+          new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
+
   /**
    * PipelineActions are sent by Datanode. This event is received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.

+ 4 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -363,7 +365,8 @@ public class SCMNodeManager
    */
   @Override
   public RegisteredCommand register(
-      DatanodeDetails datanodeDetails, NodeReportProto nodeReport) {
+      DatanodeDetails datanodeDetails, NodeReportProto nodeReport,
+      PipelineReportsProto pipelineReportsProto) {
 
     InetAddress dnAddress = Server.getRemoteIp();
     if (dnAddress != null) {

+ 5 - 14
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java

@@ -19,17 +19,13 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-
 /**
  * Handles Stale node event.
  */
@@ -37,22 +33,17 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
   static final Logger LOG = LoggerFactory.getLogger(StaleNodeHandler.class);
 
   private final Node2ContainerMap node2ContainerMap;
-  private final Mapping containerManager;
+  private final PipelineSelector pipelineSelector;
 
   public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
-      Mapping containerManager) {
+      PipelineSelector pipelineSelector) {
     this.node2ContainerMap = node2ContainerMap;
-    this.containerManager = containerManager;
+    this.pipelineSelector = pipelineSelector;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    Set<PipelineID> pipelineIDs =
-        containerManager.getPipelineOnDatanode(datanodeDetails);
-    for (PipelineID id : pipelineIDs) {
-      LOG.info("closing pipeline {}.", id);
-      publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
-    }
+    pipelineSelector.handleStaleNode(datanodeDetails);
   }
 }

+ 10 - 113
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java

@@ -18,21 +18,15 @@
 
 package org.apache.hadoop.hdds.scm.node.states;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .DUPLICATE_DATANODE;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .NO_SUCH_DATANODE;
 
@@ -40,26 +34,23 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
  * This data structure maintains the list of containers that is on a datanode.
  * This information is built from the DN container reports.
  */
-public class Node2ContainerMap {
-  private final Map<UUID, Set<ContainerID>> dn2ContainerMap;
+public class Node2ContainerMap extends Node2ObjectsMap<ContainerID> {
 
   /**
    * Constructs a Node2ContainerMap Object.
    */
   public Node2ContainerMap() {
-    dn2ContainerMap = new ConcurrentHashMap<>();
+    super();
   }
 
   /**
-   * Returns true if this a datanode that is already tracked by
-   * Node2ContainerMap.
+   * Returns null if there no containers associated with this datanode ID.
    *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
+   * @param datanode - UUID
+   * @return Set of containers or Null.
    */
-  public boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return dn2ContainerMap.containsKey(datanodeID);
+  public Set<ContainerID> getContainers(UUID datanode) {
+    return getObjects(datanode);
   }
 
   /**
@@ -70,13 +61,7 @@ public class Node2ContainerMap {
    */
   public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
       throws SCMException {
-    Preconditions.checkNotNull(containerIDs);
-    Preconditions.checkNotNull(datanodeID);
-    if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
-        != null) {
-      throw new SCMException("Node already exists in the map",
-          DUPLICATE_DATANODE);
-    }
+    super.insertNewDatanode(datanodeID, containerIDs);
   }
 
   /**
@@ -91,103 +76,15 @@ public class Node2ContainerMap {
       Set<ContainerID> containers) throws SCMException {
     Preconditions.checkNotNull(datanodeID);
     Preconditions.checkNotNull(containers);
-    if (dn2ContainerMap
+    if (dn2ObjectMap
         .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers))
         == null) {
       throw new SCMException("No such datanode", NO_SUCH_DATANODE);
     }
   }
 
-  /**
-   * Removes datanode Entry from the map.
-   *
-   * @param datanodeID - Datanode ID.
-   */
-  public void removeDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> null);
-  }
-
-  /**
-   * Returns null if there no containers associated with this datanode ID.
-   *
-   * @param datanode - UUID
-   * @return Set of containers or Null.
-   */
-  public Set<ContainerID> getContainers(UUID datanode) {
-    Preconditions.checkNotNull(datanode);
-    return dn2ContainerMap.computeIfPresent(datanode, (k, v) ->
-        Collections.unmodifiableSet(v));
-  }
-
-  public ReportResult processReport(UUID datanodeID, Set<ContainerID>
-      containers) {
-    Preconditions.checkNotNull(datanodeID);
-    Preconditions.checkNotNull(containers);
-
-    if (!isKnownDatanode(datanodeID)) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.NEW_DATANODE_FOUND)
-          .setNewContainers(containers)
-          .build();
-    }
-
-    // Conditions like Zero length containers should be handled by removeAll.
-    Set<ContainerID> currentSet = dn2ContainerMap.get(datanodeID);
-    TreeSet<ContainerID> newContainers = new TreeSet<>(containers);
-    newContainers.removeAll(currentSet);
-
-    TreeSet<ContainerID> missingContainers = new TreeSet<>(currentSet);
-    missingContainers.removeAll(containers);
-
-    if (newContainers.isEmpty() && missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.ALL_IS_WELL)
-          .build();
-    }
-
-    if (newContainers.isEmpty() && !missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.MISSING_CONTAINERS)
-          .setMissingContainers(missingContainers)
-          .build();
-    }
-
-    if (!newContainers.isEmpty() && missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.NEW_CONTAINERS_FOUND)
-          .setNewContainers(newContainers)
-          .build();
-    }
-
-    if (!newContainers.isEmpty() && !missingContainers.isEmpty()) {
-      return ReportResult.ReportResultBuilder.newBuilder()
-          .setStatus(ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND)
-          .setNewContainers(newContainers)
-          .setMissingContainers(missingContainers)
-          .build();
-    }
-
-    // default status & Make compiler happy
-    return ReportResult.ReportResultBuilder.newBuilder()
-        .setStatus(ReportStatus.ALL_IS_WELL)
-        .build();
-  }
-
-  /**
-   * Results possible from processing a container report by
-   * Node2ContainerMapper.
-   */
-  public enum ReportStatus {
-    ALL_IS_WELL,
-    MISSING_CONTAINERS,
-    NEW_CONTAINERS_FOUND,
-    MISSING_AND_NEW_CONTAINERS_FOUND,
-    NEW_DATANODE_FOUND
-  }
-
   @VisibleForTesting
   public int size() {
-    return dn2ContainerMap.size();
+    return dn2ObjectMap.size();
   }
 }

+ 162 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java

@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.node.states;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+import java.util.UUID;
+import java.util.Set;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.HashSet;
+import java.util.Collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
+
+/**
+ * This data structure maintains the list of containers that is on a datanode.
+ * This information is built from the DN container reports.
+ */
+public class Node2ObjectsMap<T> {
+  protected final Map<UUID, Set<T>> dn2ObjectMap;
+
+  /**
+   * Constructs a Node2ContainerMap Object.
+   */
+  public Node2ObjectsMap() {
+    dn2ObjectMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns true if this a datanode that is already tracked by
+   * Node2ContainerMap.
+   *
+   * @param datanodeID - UUID of the Datanode.
+   * @return True if this is tracked, false if this map does not know about it.
+   */
+  public boolean isKnownDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    return dn2ObjectMap.containsKey(datanodeID);
+  }
+
+  /**
+   * Insert a new datanode into Node2Container Map.
+   *
+   * @param datanodeID   -- Datanode UUID
+   * @param containerIDs - List of ContainerIDs.
+   */
+  public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
+      throws SCMException {
+    Preconditions.checkNotNull(containerIDs);
+    Preconditions.checkNotNull(datanodeID);
+    if (dn2ObjectMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs))
+        != null) {
+      throw new SCMException("Node already exists in the map",
+          DUPLICATE_DATANODE);
+    }
+  }
+
+  /**
+   * Removes datanode Entry from the map.
+   *
+   * @param datanodeID - Datanode ID.
+   */
+  void removeDatanode(UUID datanodeID) {
+    Preconditions.checkNotNull(datanodeID);
+    dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
+  }
+
+  /**
+   * Returns null if there no containers associated with this datanode ID.
+   *
+   * @param datanode - UUID
+   * @return Set of containers or Null.
+   */
+  Set<T> getObjects(UUID datanode) {
+    Preconditions.checkNotNull(datanode);
+    final Set<T> s = dn2ObjectMap.get(datanode);
+    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+  }
+
+  public ReportResult.ReportResultBuilder<T> newBuilder() {
+    return new ReportResult.ReportResultBuilder<>();
+  }
+
+  public ReportResult<T> processReport(UUID datanodeID, Set<T> objects) {
+    Preconditions.checkNotNull(datanodeID);
+    Preconditions.checkNotNull(objects);
+
+    if (!isKnownDatanode(datanodeID)) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.NEW_DATANODE_FOUND)
+          .setNewEntries(objects)
+          .build();
+    }
+
+    // Conditions like Zero length containers should be handled by removeAll.
+    Set<T> currentSet = dn2ObjectMap.get(datanodeID);
+    TreeSet<T> newObjects = new TreeSet<>(objects);
+    newObjects.removeAll(currentSet);
+
+    TreeSet<T> missingObjects = new TreeSet<>(currentSet);
+    missingObjects.removeAll(objects);
+
+    if (newObjects.isEmpty() && missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+          .build();
+    }
+
+    if (newObjects.isEmpty() && !missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.MISSING_ENTRIES)
+          .setMissingEntries(missingObjects)
+          .build();
+    }
+
+    if (!newObjects.isEmpty() && missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.NEW_ENTRIES_FOUND)
+          .setNewEntries(newObjects)
+          .build();
+    }
+
+    if (!newObjects.isEmpty() && !missingObjects.isEmpty()) {
+      return newBuilder()
+          .setStatus(ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND)
+          .setNewEntries(newObjects)
+          .setMissingEntries(missingObjects)
+          .build();
+    }
+
+    // default status & Make compiler happy
+    return newBuilder()
+        .setStatus(ReportResult.ReportStatus.ALL_IS_WELL)
+        .build();
+  }
+
+  @VisibleForTesting
+  public int size() {
+    return dn2ObjectMap.size();
+  }
+}

+ 57 - 48
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java

@@ -19,83 +19,92 @@
 
 package org.apache.hadoop.hdds.scm.node.states;
 
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-
 import java.util.Collections;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
 /**
- * A Container Report gets processsed by the Node2Container and returns
- * Report Result class.
+ * A Container/Pipeline Report gets processed by the
+ * Node2Container/Node2Pipeline and returns Report Result class.
  */
-public class ReportResult {
-  private Node2ContainerMap.ReportStatus status;
-  private Set<ContainerID> missingContainers;
-  private Set<ContainerID> newContainers;
-
-  ReportResult(Node2ContainerMap.ReportStatus status,
-      Set<ContainerID> missingContainers,
-      Set<ContainerID> newContainers) {
+public final class ReportResult<T> {
+  private ReportStatus status;
+  private Set<T> missingEntries;
+  private Set<T> newEntries;
+
+  private ReportResult(ReportStatus status,
+      Set<T> missingEntries,
+      Set<T> newEntries) {
     this.status = status;
-    Preconditions.checkNotNull(missingContainers);
-    Preconditions.checkNotNull(newContainers);
-    this.missingContainers = missingContainers;
-    this.newContainers = newContainers;
+    Preconditions.checkNotNull(missingEntries);
+    Preconditions.checkNotNull(newEntries);
+    this.missingEntries = missingEntries;
+    this.newEntries = newEntries;
   }
 
-  public Node2ContainerMap.ReportStatus getStatus() {
+  public ReportStatus getStatus() {
     return status;
   }
 
-  public Set<ContainerID> getMissingContainers() {
-    return missingContainers;
+  public Set<T> getMissingEntries() {
+    return missingEntries;
   }
 
-  public Set<ContainerID> getNewContainers() {
-    return newContainers;
+  public Set<T> getNewEntries() {
+    return newEntries;
   }
 
-  static class ReportResultBuilder {
-    private Node2ContainerMap.ReportStatus status;
-    private Set<ContainerID> missingContainers;
-    private Set<ContainerID> newContainers;
-
-    static ReportResultBuilder newBuilder() {
-      return new ReportResultBuilder();
-    }
-
-    public ReportResultBuilder setStatus(
-        Node2ContainerMap.ReportStatus newstatus) {
-      this.status = newstatus;
+  /**
+   * Result after processing report for node2Object map.
+   * @param <T>
+   */
+  public static class ReportResultBuilder<T> {
+    private ReportStatus status;
+    private Set<T> missingEntries;
+    private Set<T> newEntries;
+
+    public ReportResultBuilder<T> setStatus(
+        ReportStatus newStatus) {
+      this.status = newStatus;
       return this;
     }
 
-    public ReportResultBuilder setMissingContainers(
-        Set<ContainerID> missingContainersLit) {
-      this.missingContainers = missingContainersLit;
+    public ReportResultBuilder<T> setMissingEntries(
+        Set<T> missingEntriesList) {
+      this.missingEntries = missingEntriesList;
       return this;
     }
 
-    public ReportResultBuilder setNewContainers(
-        Set<ContainerID> newContainersList) {
-      this.newContainers = newContainersList;
+    public ReportResultBuilder<T> setNewEntries(
+        Set<T> newEntriesList) {
+      this.newEntries = newEntriesList;
       return this;
     }
 
-    ReportResult build() {
+    public ReportResult<T> build() {
 
-      Set<ContainerID> nullSafeMissingContainers = this.missingContainers;
-      Set<ContainerID> nullSafeNewContainers = this.newContainers;
-      if (nullSafeNewContainers == null) {
-        nullSafeNewContainers = Collections.emptySet();
+      Set<T> nullSafeMissingEntries = this.missingEntries;
+      Set<T> nullSafeNewEntries = this.newEntries;
+      if (nullSafeNewEntries == null) {
+        nullSafeNewEntries = Collections.emptySet();
       }
-      if (nullSafeMissingContainers == null) {
-        nullSafeMissingContainers = Collections.emptySet();
+      if (nullSafeMissingEntries == null) {
+        nullSafeMissingEntries = Collections.emptySet();
       }
-      return new ReportResult(status, nullSafeMissingContainers,
-          nullSafeNewContainers);
+      return new ReportResult<T>(status, nullSafeMissingEntries,
+              nullSafeNewEntries);
     }
   }
+
+  /**
+   * Results possible from processing a report.
+   */
+  public enum ReportStatus {
+    ALL_IS_WELL,
+    MISSING_ENTRIES,
+    NEW_ENTRIES_FOUND,
+    MISSING_AND_NEW_ENTRIES_FOUND,
+    NEW_DATANODE_FOUND,
+  }
 }

+ 6 - 39
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java

@@ -16,19 +16,15 @@
  *
  */
 
-package org.apache.hadoop.hdds.scm.pipelines;
+package org.apache.hadoop.hdds.scm.node.states;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This data structure maintains the list of pipelines which the given datanode is a part of. This
@@ -36,33 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
  *
  * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
  */
-public class Node2PipelineMap {
-  private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
+public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
 
   /** Constructs a Node2PipelineMap Object. */
   public Node2PipelineMap() {
-    dn2PipelineMap = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns true if this a datanode that is already tracked by Node2PipelineMap.
-   *
-   * @param datanodeID - UUID of the Datanode.
-   * @return True if this is tracked, false if this map does not know about it.
-   */
-  private boolean isKnownDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    return dn2PipelineMap.containsKey(datanodeID);
-  }
-
-  /**
-   * Removes datanode Entry from the map.
-   *
-   * @param datanodeID - Datanode ID.
-   */
-  public synchronized void removeDatanode(UUID datanodeID) {
-    Preconditions.checkNotNull(datanodeID);
-    dn2PipelineMap.computeIfPresent(datanodeID, (k, v) -> null);
+    super();
   }
 
   /**
@@ -72,9 +46,7 @@ public class Node2PipelineMap {
    * @return Set of pipelines or Null.
    */
   public Set<PipelineID> getPipelines(UUID datanode) {
-    Preconditions.checkNotNull(datanode);
-    final Set<PipelineID> s = dn2PipelineMap.get(datanode);
-    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
+    return getObjects(datanode);
   }
 
   /**
@@ -85,7 +57,7 @@ public class Node2PipelineMap {
   public synchronized void addPipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+      dn2ObjectMap.computeIfAbsent(dnId, k -> new HashSet<>())
           .add(pipeline.getId());
     }
   }
@@ -93,16 +65,11 @@ public class Node2PipelineMap {
   public synchronized void removePipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap.computeIfPresent(
-          dnId,
+      dn2ObjectMap.computeIfPresent(dnId,
           (k, v) -> {
             v.remove(pipeline.getId());
             return v;
           });
     }
   }
-
-  public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
-    return Collections.unmodifiableMap(dn2PipelineMap);
-  }
 }

+ 19 - 5
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java

@@ -17,22 +17,36 @@
 
 package org.apache.hadoop.hdds.scm.pipelines;
 
-import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles pipeline close event.
  */
 public class PipelineCloseHandler implements EventHandler<PipelineID> {
-  private final Mapping mapping;
-  public PipelineCloseHandler(Mapping mapping) {
-    this.mapping = mapping;
+  private static final Logger LOG = LoggerFactory
+          .getLogger(PipelineCloseHandler.class);
+
+  private final PipelineSelector pipelineSelector;
+  public PipelineCloseHandler(PipelineSelector pipelineSelector) {
+    this.pipelineSelector = pipelineSelector;
   }
 
   @Override
   public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
-    mapping.handlePipelineClose(pipelineID);
+    Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+    try {
+      if (pipeline != null) {
+        pipelineSelector.finalizePipeline(pipeline);
+      } else {
+        LOG.debug("pipeline:{} not found", pipelineID);
+      }
+    } catch (Exception e) {
+      LOG.info("failed to close pipeline:{}", pipelineID, e);
+    }
   }
 }

+ 20 - 32
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java

@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.pipelines;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -36,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineManager.class);
-  private final ArrayList<ActivePipelines> activePipelines;
+  protected final ArrayList<ActivePipelines> activePipelines;
 
   public PipelineManager() {
     activePipelines = new ArrayList<>();
@@ -45,7 +47,10 @@ public abstract class PipelineManager {
     }
   }
 
-  private static class ActivePipelines {
+  /**
+   * List of active pipelines.
+   */
+  public static class ActivePipelines {
     private final List<PipelineID> activePipelines;
     private final AtomicInteger pipelineIndex;
 
@@ -55,10 +60,12 @@ public abstract class PipelineManager {
     }
 
     void addPipeline(PipelineID pipelineID) {
-      activePipelines.add(pipelineID);
+      if (!activePipelines.contains(pipelineID)) {
+        activePipelines.add(pipelineID);
+      }
     }
 
-    void removePipeline(PipelineID pipelineID) {
+    public void removePipeline(PipelineID pipelineID) {
       activePipelines.remove(pipelineID);
     }
 
@@ -117,17 +124,6 @@ public abstract class PipelineManager {
             .addPipeline(pipeline.getId());
   }
 
-  protected static int getReplicationCount(ReplicationFactor factor) {
-    switch (factor) {
-    case ONE:
-      return 1;
-    case THREE:
-      return 3;
-    default:
-      throw new IllegalArgumentException("Unexpected replication count");
-    }
-  }
-
   public abstract Pipeline allocatePipeline(
       ReplicationFactor replicationFactor);
 
@@ -137,6 +133,14 @@ public abstract class PipelineManager {
    */
   public abstract void initializePipeline(Pipeline pipeline) throws IOException;
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    if (pipeline.addMember(dn)
+        &&(pipeline.getDatanodes().size() == pipeline.getFactor().getNumber())
+        && pipeline.getLifeCycleState() == HddsProtos.LifeCycleState.OPEN) {
+      addOpenPipeline(pipeline);
+    }
+  }
+
   /**
    * Creates a pipeline with a specified replication factor and type.
    * @param replicationFactor - Replication Factor.
@@ -157,27 +161,11 @@ public abstract class PipelineManager {
    * Remove the pipeline from active allocation.
    * @param pipeline pipeline to be finalized
    */
-  public synchronized void finalizePipeline(Pipeline pipeline) {
-    activePipelines.get(pipeline.getFactor().ordinal())
-            .removePipeline(pipeline.getId());
-  }
+  public abstract boolean finalizePipeline(Pipeline pipeline);
 
   /**
    *
    * @param pipeline
    */
   public abstract void closePipeline(Pipeline pipeline) throws IOException;
-
-  /**
-   * list members in the pipeline.
-   * @return the datanode
-   */
-  public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException;
-
-  /**
-   * Update the datanode list of the pipeline.
-   */
-  public abstract void updatePipeline(PipelineID pipelineID,
-      List<DatanodeDetails> newDatanodes) throws IOException;
 }

+ 59 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineReportHandler.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.server
+        .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles Node Reports from datanode.
+ */
+public class PipelineReportHandler implements
+        EventHandler<PipelineReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PipelineReportHandler.class);
+  private final PipelineSelector pipelineSelector;
+
+  public PipelineReportHandler(PipelineSelector pipelineSelector) {
+    Preconditions.checkNotNull(pipelineSelector);
+    this.pipelineSelector = pipelineSelector;
+  }
+
+  @Override
+  public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(pipelineReportFromDatanode);
+    DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
+    PipelineReportsProto pipelineReport =
+            pipelineReportFromDatanode.getReport();
+    Preconditions.checkNotNull(dn, "Pipeline Report is "
+        + "missing DatanodeDetails.");
+    LOGGER.trace("Processing pipeline report for dn: {}", dn);
+    pipelineSelector.processPipelineReport(dn, pipelineReport);
+  }
+}

+ 58 - 45
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java

@@ -16,9 +16,12 @@
  */
 package org.apache.hadoop.hdds.scm.pipelines;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@@ -30,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -75,11 +79,9 @@ public class PipelineSelector {
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineSelector.class);
   private final ContainerPlacementPolicy placementPolicy;
-  private final NodeManager nodeManager;
+  private final Map<ReplicationType, PipelineManager> pipelineManagerMap;
   private final Configuration conf;
   private final EventPublisher eventPublisher;
-  private final RatisManagerImpl ratisManager;
-  private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final MetadataStore pipelineStore;
   private final PipelineStateManager stateManager;
@@ -96,7 +98,6 @@ public class PipelineSelector {
    */
   public PipelineSelector(NodeManager nodeManager, Configuration conf,
       EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
-    this.nodeManager = nodeManager;
     this.conf = conf;
     this.eventPublisher = eventPublisher;
     this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
@@ -106,12 +107,14 @@ public class PipelineSelector {
         StorageUnit.BYTES);
     node2PipelineMap = new Node2PipelineMap();
     pipelineMap = new ConcurrentHashMap<>();
-    this.standaloneManager =
-        new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize);
-    this.ratisManager =
-        new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf);
+    pipelineManagerMap = new HashMap<>();
+
+    pipelineManagerMap.put(ReplicationType.STAND_ALONE,
+            new StandaloneManagerImpl(nodeManager, placementPolicy,
+            containerSize));
+    pipelineManagerMap.put(ReplicationType.RATIS,
+            new RatisManagerImpl(nodeManager, placementPolicy,
+                    containerSize, conf));
     long pipelineCreationLeaseTimeout = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
@@ -154,6 +157,7 @@ public class PipelineSelector {
     }
   }
 
+  @VisibleForTesting
   public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
     return pipeline2ContainerMap.get(pipelineID);
   }
@@ -226,30 +230,6 @@ public class PipelineSelector {
     }
   }
 
-  /**
-   * Return the pipeline manager from the replication type.
-   *
-   * @param replicationType - Replication Type Enum.
-   * @return pipeline Manager.
-   * @throws IllegalArgumentException If an pipeline type gets added
-   * and this function is not modified we will throw.
-   */
-  private PipelineManager getPipelineManager(ReplicationType replicationType)
-      throws IllegalArgumentException {
-    switch (replicationType) {
-    case RATIS:
-      return this.ratisManager;
-    case STAND_ALONE:
-      return this.standaloneManager;
-    case CHAINED:
-      throw new IllegalArgumentException("Not implemented yet");
-    default:
-      throw new IllegalArgumentException("Unexpected enum found. Does not" +
-          " know how to handle " + replicationType.toString());
-    }
-
-  }
-
   /**
    * This function is called by the Container Manager while allocating a new
    * container. The client specifies what kind of replication pipeline is needed
@@ -260,7 +240,7 @@ public class PipelineSelector {
   public Pipeline getReplicationPipeline(ReplicationType replicationType,
       HddsProtos.ReplicationFactor replicationFactor)
       throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
+    PipelineManager manager = pipelineManagerMap.get(replicationType);
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Getting replication pipeline forReplicationType {} :" +
             " ReplicationFactor {}", replicationType.toString(),
@@ -316,7 +296,7 @@ public class PipelineSelector {
    * Finalize a given pipeline.
    */
   public void finalizePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
         pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
@@ -327,17 +307,17 @@ public class PipelineSelector {
     }
 
     // Remove the pipeline from active allocation
-    manager.finalizePipeline(pipeline);
-
-    LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
-    updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
-    closePipelineIfNoOpenContainers(pipeline);
+    if (manager.finalizePipeline(pipeline)) {
+      LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
+      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
+      closePipelineIfNoOpenContainers(pipeline);
+    }
   }
 
   /**
    * Close a given pipeline.
    */
-  public void closePipelineIfNoOpenContainers(Pipeline pipeline)
+  private void closePipelineIfNoOpenContainers(Pipeline pipeline)
       throws IOException {
     if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
       return;
@@ -354,7 +334,7 @@ public class PipelineSelector {
    * Close a given pipeline.
    */
   private void closePipeline(Pipeline pipeline) throws IOException {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
     HashSet<ContainerID> containers =
@@ -367,7 +347,7 @@ public class PipelineSelector {
    * Add to a given pipeline.
    */
   private void addOpenPipeline(Pipeline pipeline) {
-    PipelineManager manager = getPipelineManager(pipeline.getType());
+    PipelineManager manager = pipelineManagerMap.get(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
     manager.addOpenPipeline(pipeline);
@@ -381,7 +361,7 @@ public class PipelineSelector {
     }
   }
 
-  public Set<PipelineID> getPipelineId(UUID dnId) {
+  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
     return node2PipelineMap.getPipelines(dnId);
   }
 
@@ -400,6 +380,9 @@ public class PipelineSelector {
       pipelineMap.put(pipeline.getId(), pipeline);
       pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
       node2PipelineMap.addPipeline(pipeline);
+      // reset the datanodes in the pipeline
+      // they will be reset on
+      pipeline.resetPipeline();
       break;
     case CLOSED:
       // if the pipeline is in closed state, nothing to do.
@@ -409,6 +392,36 @@ public class PipelineSelector {
     }
   }
 
+  public void handleStaleNode(DatanodeDetails dn) {
+    Set<PipelineID> pipelineIDs = getPipelineByDnID(dn.getUuid());
+    for (PipelineID id : pipelineIDs) {
+      LOG.info("closing pipeline {}.", id);
+      eventPublisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
+    }
+  }
+
+  void processPipelineReport(DatanodeDetails dn,
+                                    PipelineReportsProto pipelineReport) {
+    Set<PipelineID> reportedPipelines = new HashSet<>();
+    pipelineReport.getPipelineReportList().
+            forEach(p ->
+                    reportedPipelines.add(
+                            processPipelineReport(p.getPipelineID(), dn)));
+
+    //TODO: handle missing pipelines and new pipelines later
+  }
+
+  private PipelineID processPipelineReport(
+          HddsProtos.PipelineID id, DatanodeDetails dn) {
+    PipelineID pipelineID = PipelineID.getFromProtobuf(id);
+    Pipeline pipeline = pipelineMap.get(pipelineID);
+    if (pipeline != null) {
+      pipelineManagerMap.get(pipeline.getType())
+              .processPipelineReport(pipeline, dn);
+    }
+    return pipelineID;
+  }
+
   /**
    * Update the Pipeline State to the next state.
    *

+ 13 - 28
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java

@@ -73,20 +73,19 @@ public class RatisManagerImpl extends PipelineManager {
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
     List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    int count = getReplicationCount(factor);
     //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
     // data from datanode instead of maintaining a set.
     for (DatanodeDetails datanode : datanodes) {
       Preconditions.checkNotNull(datanode);
       if (!ratisMembers.contains(datanode)) {
         newNodesList.add(datanode);
-        if (newNodesList.size() == count) {
+        if (newNodesList.size() == factor.getNumber()) {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           ratisMembers.addAll(newNodesList);
           PipelineID pipelineID = PipelineID.randomId();
           LOG.info("Allocating a new ratis pipeline of size: {} id: {}",
-              count, pipelineID);
+                  factor.getNumber(), pipelineID);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
               ReplicationType.RATIS, factor, pipelineID);
         }
@@ -103,6 +102,17 @@ public class RatisManagerImpl extends PipelineManager {
     }
   }
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    super.processPipelineReport(pipeline, dn);
+    ratisMembers.add(dn);
+  }
+
+  public synchronized boolean finalizePipeline(Pipeline pipeline) {
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .removePipeline(pipeline.getId());
+    return true;
+  }
+
   /**
    * Close the pipeline.
    */
@@ -116,29 +126,4 @@ public class RatisManagerImpl extends PipelineManager {
       Preconditions.checkArgument(ratisMembers.remove(node));
     }
   }
-
-  /**
-   * list members in the pipeline .
-   *
-   * @param pipelineID
-   * @return the datanode
-   */
-  @Override
-  public List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Update the datanode list of the pipeline.
-   *
-   * @param pipelineID
-   * @param newDatanodes
-   */
-  @Override
-  public void updatePipeline(PipelineID pipelineID,
-                             List<DatanodeDetails> newDatanodes)
-      throws IOException {
-
-  }
 }

+ 16 - 28
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java

@@ -74,18 +74,19 @@ public class StandaloneManagerImpl extends PipelineManager {
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
     List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
-    int count = getReplicationCount(factor);
     for (DatanodeDetails datanode : datanodes) {
       Preconditions.checkNotNull(datanode);
       if (!standAloneMembers.contains(datanode)) {
         newNodesList.add(datanode);
-        if (newNodesList.size() == count) {
+        if (newNodesList.size() == factor.getNumber()) {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           standAloneMembers.addAll(newNodesList);
-          PipelineID pipelineID = PipelineID.randomId();
+          // Standalone pipeline use node id as pipeline
+          PipelineID pipelineID =
+                  PipelineID.valueOf(newNodesList.get(0).getUuid());
           LOG.info("Allocating a new standalone pipeline of size: {} id: {}",
-              count, pipelineID);
+              factor.getNumber(), pipelineID);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
               ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineID);
         }
@@ -98,6 +99,17 @@ public class StandaloneManagerImpl extends PipelineManager {
     // Nothing to be done for standalone pipeline
   }
 
+  public void processPipelineReport(Pipeline pipeline, DatanodeDetails dn) {
+    super.processPipelineReport(pipeline, dn);
+    standAloneMembers.add(dn);
+  }
+
+  public synchronized boolean finalizePipeline(Pipeline pipeline) {
+    activePipelines.get(pipeline.getFactor().ordinal())
+            .removePipeline(pipeline.getId());
+    return false;
+  }
+
   /**
    * Close the pipeline.
    */
@@ -107,28 +119,4 @@ public class StandaloneManagerImpl extends PipelineManager {
       Preconditions.checkArgument(standAloneMembers.remove(node));
     }
   }
-
-  /**
-   * list members in the pipeline .
-   *
-   * @param pipelineID
-   * @return the datanode
-   */
-  @Override
-  public List<DatanodeDetails> getMembers(PipelineID pipelineID)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Update the datanode list of the pipeline.
-   *
-   * @param pipelineID
-   * @param newDatanodes
-   */
-  @Override
-  public void updatePipeline(PipelineID pipelineID, List<DatanodeDetails>
-      newDatanodes) throws IOException {
-
-  }
 }

+ 23 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -46,6 +48,7 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 
 /**
  * This class is responsible for dispatching heartbeat from datanode to
@@ -103,6 +106,14 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getContainerActions()));
     }
 
+    if (heartbeat.hasPipelineReports()) {
+      LOG.debug("Dispatching Pipeline Report.");
+      eventPublisher.fireEvent(PIPELINE_REPORT,
+              new PipelineReportFromDatanode(datanodeDetails,
+                      heartbeat.getPipelineReports()));
+
+    }
+
     if (heartbeat.hasPipelineActions()) {
       LOG.debug("Dispatching Pipeline Actions.");
       eventPublisher.fireEvent(PIPELINE_ACTIONS,
@@ -178,6 +189,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
     }
   }
 
+  /**
+   * Pipeline report event payload with origin.
+   */
+  public static class PipelineReportFromDatanode
+          extends ReportFromDatanode<PipelineReportsProto> {
+
+    public PipelineReportFromDatanode(DatanodeDetails datanodeDetails,
+                                      PipelineReportsProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
   /**
    * Pipeline action event payload with origin.
    */

+ 13 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java

@@ -33,6 +33,8 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -74,7 +76,10 @@ import static org.apache.hadoop.hdds.protocol.proto
 
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .ReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+        .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -102,6 +107,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRES
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 
@@ -190,13 +196,14 @@ public class SCMDatanodeProtocolServer implements
   public SCMRegisteredResponseProto register(
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto,
       NodeReportProto nodeReport,
-      ContainerReportsProto containerReportsProto)
+      ContainerReportsProto containerReportsProto,
+          PipelineReportsProto pipelineReportsProto)
       throws IOException {
     DatanodeDetails datanodeDetails = DatanodeDetails
         .getFromProtoBuf(datanodeDetailsProto);
     // TODO : Return the list of Nodes that forms the SCM HA.
     RegisteredCommand registeredCommand = scm.getScmNodeManager()
-        .register(datanodeDetails, nodeReport);
+        .register(datanodeDetails, nodeReport, pipelineReportsProto);
     if (registeredCommand.getError()
         == SCMRegisteredResponseProto.ErrorCode.success) {
       scm.getScmContainerManager().processContainerReports(datanodeDetails,
@@ -204,6 +211,9 @@ public class SCMDatanodeProtocolServer implements
       eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
           new NodeRegistrationContainerReport(datanodeDetails,
               containerReportsProto));
+      eventPublisher.fireEvent(PIPELINE_REPORT,
+              new PipelineReportFromDatanode(datanodeDetails,
+                      pipelineReportsProto));
     }
     return getRegisteredResponse(registeredCommand);
   }

+ 8 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -217,13 +218,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new CloseContainerEventHandler(scmContainerManager);
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(scmNodeManager);
-
+    PipelineReportHandler pipelineReportHandler =
+            new PipelineReportHandler(
+                    scmContainerManager.getPipelineSelector());
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(node2ContainerMap, scmContainerManager);
+        new StaleNodeHandler(node2ContainerMap,
+                scmContainerManager.getPipelineSelector());
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
         getScmContainerManager().getStateManager());
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -240,7 +244,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PipelineActionEventHandler();
 
     PipelineCloseHandler pipelineCloseHandler =
-        new PipelineCloseHandler(scmContainerManager);
+        new PipelineCloseHandler(scmContainerManager.getPipelineSelector());
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -300,6 +304,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
         (BlockManagerImpl) scmBlockManager);
     eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
+    eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
 
     registerMXBean();
   }

+ 9 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java

@@ -17,6 +17,8 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.mockito.Mockito;
 import static org.mockito.Mockito.when;
 
@@ -139,7 +141,8 @@ public final class TestUtils {
   public static DatanodeDetails createRandomDatanodeAndRegister(
       SCMNodeManager nodeManager) {
     return getDatanodeDetails(
-        nodeManager.register(randomDatanodeDetails(), null));
+        nodeManager.register(randomDatanodeDetails(), null,
+                getRandomPipelineReports()));
   }
 
   /**
@@ -299,6 +302,11 @@ public final class TestUtils {
     return getContainerReports(containerInfos);
   }
 
+
+  public static PipelineReportsProto getRandomPipelineReports() {
+    return PipelineReportsProto.newBuilder().build();
+  }
+
   /**
    * Creates container report with the given ContainerInfo(s).
    *

+ 3 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
@@ -356,7 +358,7 @@ public class MockNodeManager implements NodeManager {
    */
   @Override
   public RegisteredCommand register(DatanodeDetails datanodeDetails,
-      NodeReportProto nodeReport) {
+      NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto) {
     return null;
   }
 

+ 4 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java

@@ -286,7 +286,8 @@ public class TestNodeManager {
         TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null);
     try (SCMNodeManager nodemanager = createNodeManager(conf)) {
       nodemanager.register(datanodeDetails,
-          TestUtils.createNodeReport(report));
+          TestUtils.createNodeReport(report),
+          TestUtils.getRandomPipelineReports());
       List<SCMCommand> command = nodemanager.processHeartbeat(datanodeDetails);
       Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
       Assert.assertTrue("On regular HB calls, SCM responses a "
@@ -1122,7 +1123,8 @@ public class TestNodeManager {
       eq.addHandler(DATANODE_COMMAND, nodemanager);
 
       nodemanager
-          .register(datanodeDetails, TestUtils.createNodeReport(report));
+          .register(datanodeDetails, TestUtils.createNodeReport(report),
+                  TestUtils.getRandomPipelineReports());
       eq.fireEvent(DATANODE_COMMAND,
           new CommandForDatanode<>(datanodeDetails.getUuid(),
               new CloseContainerCommand(1L, ReplicationType.STAND_ALONE,

+ 14 - 14
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java

@@ -116,7 +116,7 @@ public class TestNode2ContainerMap {
     Assert.assertTrue(map.isKnownDatanode(key));
     ReportResult result = map.processReport(key, values);
     Assert.assertEquals(result.getStatus(),
-        Node2ContainerMap.ReportStatus.ALL_IS_WELL);
+        ReportResult.ReportStatus.ALL_IS_WELL);
   }
 
   @Test
@@ -181,9 +181,9 @@ public class TestNode2ContainerMap {
     UUID key = getFirstKey();
     TreeSet<ContainerID> values = testData.get(key);
     ReportResult result = map.processReport(key, values);
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_DATANODE_FOUND,
+    Assert.assertEquals(ReportResult.ReportStatus.NEW_DATANODE_FOUND,
         result.getStatus());
-    Assert.assertEquals(result.getNewContainers().size(), values.size());
+    Assert.assertEquals(result.getNewEntries().size(), values.size());
   }
 
   /**
@@ -216,15 +216,15 @@ public class TestNode2ContainerMap {
     ReportResult result = map.processReport(key, newContainersSet);
 
     //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.NEW_CONTAINERS_FOUND,
+    Assert.assertEquals(ReportResult.ReportStatus.NEW_ENTRIES_FOUND,
         result.getStatus());
 
     Assert.assertEquals(addedContainers.size(),
-        result.getNewContainers().size());
+        result.getNewEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All objects are not removed.",
-        result.getNewContainers().removeAll(addedContainers));
+        result.getNewEntries().removeAll(addedContainers));
   }
 
   /**
@@ -261,14 +261,14 @@ public class TestNode2ContainerMap {
 
 
     //Assert that expected size of missing container is same as addedContainers
-    Assert.assertEquals(Node2ContainerMap.ReportStatus.MISSING_CONTAINERS,
+    Assert.assertEquals(ReportResult.ReportStatus.MISSING_ENTRIES,
         result.getStatus());
     Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
+        result.getMissingEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
+        result.getMissingEntries().removeAll(removedContainers));
   }
 
   @Test
@@ -307,21 +307,21 @@ public class TestNode2ContainerMap {
 
 
     Assert.assertEquals(
-        Node2ContainerMap.ReportStatus.MISSING_AND_NEW_CONTAINERS_FOUND,
+            ReportResult.ReportStatus.MISSING_AND_NEW_ENTRIES_FOUND,
         result.getStatus());
     Assert.assertEquals(removedContainers.size(),
-        result.getMissingContainers().size());
+        result.getMissingEntries().size());
 
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All missing containers not found.",
-        result.getMissingContainers().removeAll(removedContainers));
+        result.getMissingEntries().removeAll(removedContainers));
 
     Assert.assertEquals(insertedSet.size(),
-        result.getNewContainers().size());
+        result.getNewEntries().size());
 
     // Assert that the Container IDs are the same as we added new.
     Assert.assertTrue("All inserted containers are not found.",
-        result.getNewContainers().removeAll(insertedSet));
+        result.getNewEntries().removeAll(insertedSet));
   }
 }

+ 4 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -280,7 +280,8 @@ public class TestEndPoint {
           .register(nodeToRegister.getProtoBufMessage(), TestUtils
                   .createNodeReport(
                       getStorageReports(nodeToRegister.getUuid())),
-              TestUtils.getRandomContainerReports(10));
+              TestUtils.getRandomContainerReports(10),
+                  TestUtils.getRandomPipelineReports());
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(nodeToRegister.getUuidString(),
           responseProto.getDatanodeUUID());
@@ -308,6 +309,8 @@ public class TestEndPoint {
         .createNodeReport(getStorageReports(UUID.randomUUID())));
     when(ozoneContainer.getContainerReport()).thenReturn(
         TestUtils.getRandomContainerReports(10));
+    when(ozoneContainer.getPipelineReport()).thenReturn(
+            TestUtils.getRandomPipelineReports());
     RegisterEndpointTask endpointTask =
         new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer,
             mock(StateContext.class));

+ 4 - 1
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java

@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.container.testutils;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.CommandQueue;
@@ -252,7 +254,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
    */
   @Override
   public RegisteredCommand register(DatanodeDetails dd,
-                                    NodeReportProto nodeReport) {
+                                    NodeReportProto nodeReport,
+                                    PipelineReportsProto pipelineReportsProto) {
     return null;
   }
 

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java

@@ -98,7 +98,7 @@ public class TestNode2PipelineMap {
 
     // get pipeline details by dnid
     Set<PipelineID> pipelines = mapping.getPipelineSelector()
-        .getPipelineId(dns.get(0).getUuid());
+        .getPipelineByDnID(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
     pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java

@@ -119,7 +119,7 @@ public class TestPipelineClose {
         HddsProtos.LifeCycleState.CLOSED);
     for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
-      Assert.assertEquals(pipelineSelector.getPipelineId(
+      Assert.assertEquals(pipelineSelector.getPipelineByDnID(
           dn.getUuid()).size(), 0);
     }
   }

+ 19 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -87,7 +88,7 @@ public class TestSCMRestart {
   }
 
   @Test
-  public void testPipelineWithScmRestart() {
+  public void testPipelineWithScmRestart() throws IOException {
     // After restart make sure that the pipeline are still present
     Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector()
             .getPipeline(ratisPipeline1.getId());
@@ -97,5 +98,22 @@ public class TestSCMRestart {
     Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
     Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
     Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
+
+    for (DatanodeDetails dn : ratisPipeline1.getMachines()) {
+      Assert.assertEquals(dn, ratisPipeline1AfterRestart.getDatanodes()
+              .get(dn.getUuidString()));
+    }
+
+    for (DatanodeDetails dn : ratisPipeline2.getMachines()) {
+      Assert.assertEquals(dn, ratisPipeline2AfterRestart.getDatanodes()
+              .get(dn.getUuidString()));
+    }
+
+    // Try creating a new ratis pipeline, it should be from the same pipeline
+    // as was before restart
+    Pipeline newRatisPipeline =
+            newMapping.allocateContainer(RATIS, THREE, "Owner1")
+                    .getPipeline();
+    Assert.assertEquals(newRatisPipeline.getId(), ratisPipeline1.getId());
   }
 }

+ 9 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java

@@ -36,8 +36,12 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
 /**
  * Helpers for Ratis tests.
  */
@@ -60,6 +64,7 @@ public interface RatisTestHelper {
     public RatisTestSuite()
         throws IOException, TimeoutException, InterruptedException {
       conf = newOzoneConfiguration(RPC);
+
       cluster = newMiniOzoneCluster(NUM_DATANODES, conf);
     }
 
@@ -96,6 +101,8 @@ public interface RatisTestHelper {
   static void initRatisConf(RpcType rpc, Configuration conf) {
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
     LOG.info(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY
         + " = " + rpc.name());
   }
@@ -104,6 +111,8 @@ public interface RatisTestHelper {
       int numDatanodes, OzoneConfiguration conf)
       throws IOException, TimeoutException, InterruptedException {
     final MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
         .setNumDatanodes(numDatanodes).build();
     cluster.waitForClusterToBeReady();
     return cluster;

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java

@@ -136,6 +136,7 @@ public class TestKeys {
     ozoneCluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1)
         .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
         .build();
     ozoneCluster.waitForClusterToBeReady();
     client = new RpcClient(conf);
@@ -328,7 +329,6 @@ public class TestKeys {
     cluster.restartHddsDatanode(datanodeIdx);
   }
 
-  @Ignore("Causes a JVm exit")
   @Test
   public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(

+ 0 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java

@@ -26,7 +26,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.Ignore;
 import org.junit.rules.Timeout;
 
 import static org.apache.hadoop.ozone.web.client
@@ -83,7 +82,6 @@ public class TestKeysRatis {
         getMultiPartKey(delimiter)));
   }
 
-  @Ignore("disabling for now, datanodes restart with ratis is buggy")
   @Test
   public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(

+ 1 - 1
hadoop-project/pom.xml

@@ -101,7 +101,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-50588bd-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-eca3531-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>