Browse Source

HDDS-737. Introduce Incremental Container Report.
Contributed by Nanda kumar.

Nanda kumar 6 years ago
parent
commit
c80f753b0e
41 changed files with 558 additions and 1157 deletions
  1. 8 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
  2. 15 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  3. 5 8
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  4. 34 39
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
  5. 7 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  6. 1 1
      hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
  7. 0 4
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
  8. 0 27
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
  9. 0 101
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
  10. 0 11
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
  11. 27 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
  12. 148 54
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
  13. 98 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
  14. 0 107
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
  15. 0 7
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
  16. 12 22
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
  17. 25 12
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
  18. 1 15
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
  19. 13 37
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
  20. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java
  21. 7 62
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
  22. 16 50
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
  23. 1 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
  24. 30 37
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
  25. 16 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
  26. 4 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
  27. 10 11
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  28. 0 14
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
  29. 20 46
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
  30. 8 7
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
  31. 3 0
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
  32. 4 103
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
  33. 9 3
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
  34. 14 10
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
  35. 0 289
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
  36. 1 5
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
  37. 8 47
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
  38. 4 7
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
  39. 2 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
  40. 2 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
  41. 4 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

+ 8 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java

@@ -158,10 +158,18 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
     return usedBytes;
   }
 
+  public void setUsedBytes(long value) {
+    usedBytes = value;
+  }
+
   public long getNumberOfKeys() {
     return numberOfKeys;
   }
 
+  public void setNumberOfKeys(long value) {
+    numberOfKeys = value;
+  }
+
   public long getDeleteTransactionId() {
     return deleteTransactionId;
   }

+ 15 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -168,10 +168,14 @@ public class DatanodeStateMachine implements Closeable {
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();
         if (now < nextHB.get()) {
-          Thread.sleep(nextHB.get() - now);
+          if(!Thread.interrupted()) {
+            Thread.sleep(nextHB.get() - now);
+          }
         }
       } catch (InterruptedException e) {
-        // Ignore this exception.
+        // Some one has sent interrupt signal, this could be because
+        // 1. Trigger heartbeat immediately
+        // 2. Shutdown has be initiated.
       } catch (Exception e) {
         LOG.error("Unable to finish the execution.", e);
       }
@@ -324,6 +328,15 @@ public class DatanodeStateMachine implements Closeable {
     stateMachineThread.start();
   }
 
+  /**
+   * Calling this will immediately trigger a heartbeat to the SCMs.
+   * This heartbeat will also include all the reports which are ready to
+   * be sent by datanode.
+   */
+  public void triggerHeartbeat() {
+    stateMachineThread.interrupt();
+  }
+
   /**
    * Waits for DatanodeStateMachine to exit.
    *

+ 5 - 8
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -447,6 +447,11 @@ public class StateContext {
    * @param cmd - {@link SCMCommand}.
    */
   public void addCmdStatus(SCMCommand cmd) {
+    if (cmd.getType().equals(Type.closeContainerCommand)) {
+      // We will be removing CommandStatus completely.
+      // As a first step, removed it for CloseContainerCommand.
+      return;
+    }
     CommandStatusBuilder statusBuilder;
     if (cmd.getType() == Type.deleteBlocksCommand) {
       statusBuilder = new DeleteBlockCommandStatusBuilder();
@@ -468,14 +473,6 @@ public class StateContext {
     return cmdStatusMap;
   }
 
-  /**
-   * Remove object from cache in StateContext#cmdStatusMap.
-   *
-   */
-  public void removeCommandStatus(Long cmdId) {
-    cmdStatusMap.remove(cmdId);
-  }
-
   /**
    * Updates status of a pending status command.
    * @param cmdId       command id

+ 34 - 39
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java

@@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -63,62 +65,55 @@ public class CloseContainerCommandHandler implements CommandHandler {
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
     invocationCount++;
-    cmdExecuted = false;
     long startTime = Time.monotonicNow();
     // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
     long containerID = -1;
     try {
-
-      CloseContainerCommandProto
-          closeContainerProto =
-          CloseContainerCommandProto
-              .parseFrom(command.getProtoBufMessage());
+      CloseContainerCommandProto closeContainerProto =
+          CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
       containerID = closeContainerProto.getContainerID();
-      if (container.getContainerSet().getContainer(containerID)
+      // CloseContainer operation is idempotent, if the container is already
+      // closed, then do nothing.
+      if (!container.getContainerSet().getContainer(containerID)
           .getContainerData().isClosed()) {
-        LOG.debug("Container {} is already closed", containerID);
-        // It might happen that the where the first attempt of closing the
-        // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
-        // retry to check the container got really closed via Ratis.
-        // In such cases of the retry attempt, if the container is already
-        // closed via Ratis, we should just return.
-        cmdExecuted = true;
-        return;
-      }
-      HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
-      HddsProtos.ReplicationType replicationType =
-          closeContainerProto.getReplicationType();
+        LOG.debug("Closing container {}.", containerID);
+        HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID();
+        HddsProtos.ReplicationType replicationType =
+            closeContainerProto.getReplicationType();
 
-      ContainerProtos.ContainerCommandRequestProto.Builder request =
-          ContainerProtos.ContainerCommandRequestProto.newBuilder();
-      request.setCmdType(ContainerProtos.Type.CloseContainer);
-      request.setContainerID(containerID);
-      request.setCloseContainer(
-          ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
-      request.setTraceID(UUID.randomUUID().toString());
-      request.setDatanodeUuid(
-          context.getParent().getDatanodeDetails().getUuidString());
-      // submit the close container request for the XceiverServer to handle
-      container.submitContainerRequest(
-          request.build(), replicationType, pipelineID);
+        ContainerProtos.ContainerCommandRequestProto.Builder request =
+            ContainerProtos.ContainerCommandRequestProto.newBuilder();
+        request.setCmdType(ContainerProtos.Type.CloseContainer);
+        request.setContainerID(containerID);
+        request.setCloseContainer(
+            ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
+        request.setTraceID(UUID.randomUUID().toString());
+        request.setDatanodeUuid(
+            context.getParent().getDatanodeDetails().getUuidString());
+        // submit the close container request for the XceiverServer to handle
+        container.submitContainerRequest(
+            request.build(), replicationType, pipelineID);
+        // Since the container is closed, we trigger an ICR
+        IncrementalContainerReportProto icr = IncrementalContainerReportProto
+            .newBuilder()
+            .addReport(container.getContainerSet()
+                .getContainer(containerID).getContainerReport())
+            .build();
+        context.addReport(icr);
+        context.getParent().triggerHeartbeat();
+      }
     } catch (Exception e) {
       if (e instanceof NotLeaderException) {
         // If the particular datanode is not the Ratis leader, the close
         // container command will not be executed by the follower but will be
         // executed by Ratis stateMachine transactions via leader to follower.
         // There can also be case where the datanode is in candidate state.
-        // In these situations, NotLeaderException is thrown. Remove the status
-        // from cmdStatus Map here so that it will be retried only by SCM if the
-        // leader could not not close the container after a certain time.
-        context.removeCommandStatus(containerID);
-        LOG.info(e.getLocalizedMessage());
+        // In these situations, NotLeaderException is thrown.
+        LOG.info("Follower cannot close the container {}.", containerID);
       } else {
         LOG.error("Can't close container " + containerID, e);
-        cmdExecuted = false;
       }
     } finally {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG);
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;
     }

+ 7 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -141,7 +141,9 @@ public class HeartbeatEndpointTask
       rpcEndpoint.zeroMissedCount();
     } catch (IOException ex) {
       // put back the reports which failed to be sent
-      putBackReports(requestBuilder);
+      if (requestBuilder != null) {
+        putBackReports(requestBuilder);
+      }
       rpcEndpoint.logIfNeeded(ex);
     } finally {
       rpcEndpoint.unlock();
@@ -159,10 +161,10 @@ public class HeartbeatEndpointTask
       reports.add(requestBuilder.getNodeReport());
     }
     if (requestBuilder.getCommandStatusReportsCount() != 0) {
-      for (GeneratedMessage msg : requestBuilder
-          .getCommandStatusReportsList()) {
-        reports.add(msg);
-      }
+      reports.addAll(requestBuilder.getCommandStatusReportsList());
+    }
+    if (requestBuilder.getIncrementalContainerReportCount() != 0) {
+      reports.addAll(requestBuilder.getIncrementalContainerReportList());
     }
     context.putBackReports(reports);
   }

+ 1 - 1
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional IncrementalContainerReportProto incrementalContainerReport = 4;
+  repeated IncrementalContainerReportProto incrementalContainerReport = 4;
   repeated CommandStatusReportsProto commandStatusReports = 5;
   optional ContainerActionsProto containerActions = 6;
   optional PipelineActionsProto pipelineActions = 7;

+ 0 - 4
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java

@@ -58,10 +58,6 @@ public class CommandStatusReportHandler implements
         publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
             ReplicationStatus(cmdStatus));
         break;
-      case closeContainerCommand:
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-            CloseContainerStatus(cmdStatus));
-        break;
       case deleteBlocksCommand:
         if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
           publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,

+ 0 - 27
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java

@@ -22,14 +22,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
 
 /**
  * In case of a node failure, volume failure, volume out of spapce, node
@@ -128,32 +126,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
             new CommandForDatanode<>(node.getUuid(), closeContainerCommand))
         .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command));
 
-    publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(containerID));
-
     LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
         pipeline, containerID);
   }
-
-  /**
-   * Class to create retryable event. Prevents redundant requests for same
-   * container Id.
-   */
-  public static class CloseContainerRetryableReq implements
-      IdentifiableEventPayload {
-
-    private ContainerID containerID;
-    public CloseContainerRetryableReq(ContainerID containerID) {
-      this.containerID = containerID;
-    }
-
-    public ContainerID getContainerID() {
-      return containerID;
-    }
-
-    @Override
-    public long getId() {
-      return containerID.getId();
-    }
-  }
 }

+ 0 - 101
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java

@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
-
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-    .CloseContainerRetryableReq;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
- * CommandStatusReport. If required it will re-trigger CloseContainer command
- * for DataNodes to CloseContainerEventHandler.
- */
-public class CloseContainerWatcher extends
-    EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerWatcher.class);
-  private final ContainerManager containerManager;
-
-  public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
-      Event<CloseContainerStatus> completionEvent,
-      LeaseManager<Long> leaseManager, ContainerManager containerManager) {
-    super(startEvent, completionEvent, leaseManager);
-    this.containerManager = containerManager;
-  }
-
-  @Override
-  protected void onTimeout(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    // Let CloseContainerEventHandler handle this message.
-    this.resendEventToHandler(payload.getId(), publisher);
-  }
-
-  @Override
-  protected void onFinished(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
-        .getContainerID().getId());
-  }
-
-  @Override
-  protected synchronized void handleCompletion(CloseContainerStatus status,
-      EventPublisher publisher) throws LeaseNotFoundException {
-    // If status is PENDING then return without doing anything.
-    if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
-      return;
-    }
-
-    CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
-    super.handleCompletion(status, publisher);
-    // If status is FAILED then send a msg to Handler to resend the command.
-    if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
-        != null) {
-      this.resendEventToHandler(closeCont.getId(), publisher);
-    }
-  }
-
-  private void resendEventToHandler(long containerID, EventPublisher
-      publisher) {
-    try {
-      // Check if container is still open
-      if (containerManager.getContainer(
-          ContainerID.valueof(containerID)).isOpen()) {
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
-            ContainerID.valueof(containerID));
-      }
-    } catch (IOException e) {
-      LOG.warn("Error in CloseContainerWatcher while processing event " +
-          "for containerId {} ExceptionMsg: ", containerID, e.getMessage());
-    }
-  }
-}

+ 0 - 11
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java

@@ -16,14 +16,11 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 
 import java.io.Closeable;
@@ -122,14 +119,6 @@ public interface ContainerManager extends Closeable {
   HddsProtos.LifeCycleState updateContainerState(ContainerID containerID,
       HddsProtos.LifeCycleEvent event) throws IOException;
 
-  /**
-   * Process container report from Datanode.
-   *
-   * @param reports Container report
-   */
-  void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports) throws IOException;
-
   /**
    * Returns the latest list of replicas for given containerId.
    *

+ 27 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java

@@ -22,6 +22,8 @@ import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 
 import java.util.Optional;
 import java.util.UUID;
@@ -32,15 +34,18 @@ import java.util.UUID;
 public final class ContainerReplica implements Comparable<ContainerReplica> {
 
   final private ContainerID containerID;
+  final private ContainerReplicaProto.State state;
   final private DatanodeDetails datanodeDetails;
   final private UUID placeOfBirth;
 
   private Long sequenceId;
 
 
-  private ContainerReplica(ContainerID containerID, DatanodeDetails datanode,
-      UUID originNodeId) {
+  private ContainerReplica(final ContainerID containerID,
+      final ContainerReplicaProto.State state, final DatanodeDetails datanode,
+      final UUID originNodeId) {
     this.containerID = containerID;
+    this.state = state;
     this.datanodeDetails = datanode;
     this.placeOfBirth = originNodeId;
   }
@@ -67,6 +72,15 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
     return placeOfBirth;
   }
 
+  /**
+   * Returns the state of this replica.
+   *
+   * @return replica state
+   */
+  public ContainerReplicaProto.State getState() {
+    return state;
+  }
+
   /**
    * Returns the Sequence Id of this replica.
    *
@@ -126,6 +140,7 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
   public static class ContainerReplicaBuilder {
 
     private ContainerID containerID;
+    private ContainerReplicaProto.State state;
     private DatanodeDetails datanode;
     private UUID placeOfBirth;
     private Long sequenceId;
@@ -142,6 +157,12 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
       return this;
     }
 
+    public ContainerReplicaBuilder setContainerState(
+        final ContainerReplicaProto.State  containerState) {
+      state = containerState;
+      return this;
+    }
+
     /**
      * Set DatanodeDetails.
      *
@@ -184,9 +205,12 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
     public ContainerReplica build() {
       Preconditions.checkNotNull(containerID,
           "Container Id can't be null");
+      Preconditions.checkNotNull(state,
+          "Container state can't be null");
       Preconditions.checkNotNull(datanode,
           "DatanodeDetails can't be null");
-      ContainerReplica replica = new ContainerReplica(containerID, datanode,
+      ContainerReplica replica = new ContainerReplica(
+          containerID, state, datanode,
           Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()));
       Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId);
       return replica;

+ 148 - 54
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java

@@ -17,19 +17,26 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -47,73 +54,160 @@ public class ContainerReportHandler implements
       LoggerFactory.getLogger(ContainerReportHandler.class);
 
   private final NodeManager nodeManager;
+  private final PipelineManager pipelineManager;
   private final ContainerManager containerManager;
-  private ReplicationActivityStatus replicationStatus;
+  private final ReplicationActivityStatus replicationStatus;
 
-  public ContainerReportHandler(ContainerManager containerManager,
-      NodeManager nodeManager,
-      ReplicationActivityStatus replicationActivityStatus) {
-    Preconditions.checkNotNull(containerManager);
+  public ContainerReportHandler(final NodeManager nodeManager,
+      final PipelineManager pipelineManager,
+      final ContainerManager containerManager,
+      final ReplicationActivityStatus replicationActivityStatus) {
     Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(pipelineManager);
+    Preconditions.checkNotNull(containerManager);
     Preconditions.checkNotNull(replicationActivityStatus);
     this.nodeManager = nodeManager;
+    this.pipelineManager = pipelineManager;
     this.containerManager = containerManager;
     this.replicationStatus = replicationActivityStatus;
   }
 
   @Override
-  public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
-      EventPublisher publisher) {
+  public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
+      final EventPublisher publisher) {
+
+    final DatanodeDetails datanodeDetails =
+        reportFromDatanode.getDatanodeDetails();
 
-    DatanodeDetails datanodeOrigin =
-        containerReportFromDatanode.getDatanodeDetails();
+    final ContainerReportsProto containerReport =
+        reportFromDatanode.getReport();
 
-    ContainerReportsProto containerReport =
-        containerReportFromDatanode.getReport();
     try {
 
-      //update state in container db and trigger close container events
-      containerManager
-          .processContainerReports(datanodeOrigin, containerReport);
-
-      Set<ContainerID> containerIds = containerReport.getReportsList().stream()
-          .map(StorageContainerDatanodeProtocolProtos
-              .ContainerReplicaProto::getContainerID)
-          .map(ContainerID::new)
-          .collect(Collectors.toSet());
-
-      ReportResult<ContainerID> reportResult = nodeManager
-          .processContainerReport(datanodeOrigin.getUuid(), containerIds);
-
-      //we have the report, so we can update the states for the next iteration.
-      nodeManager
-          .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds);
-
-      for (ContainerID containerID : reportResult.getMissingEntries()) {
-        final ContainerReplica replica = ContainerReplica.newBuilder()
-            .setContainerID(containerID)
-            .setDatanodeDetails(datanodeOrigin)
-            .build();
-        containerManager
-            .removeContainerReplica(containerID, replica);
-        checkReplicationState(containerID, publisher);
+      final List<ContainerReplicaProto> replicas = containerReport
+          .getReportsList();
+
+      // ContainerIDs which SCM expects this datanode to have.
+      final Set<ContainerID> expectedContainerIDs = nodeManager
+          .getContainers(datanodeDetails);
+
+      // ContainerIDs that this datanode actually has.
+      final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
+          .map(ContainerReplicaProto::getContainerID)
+          .map(ContainerID::valueof).collect(Collectors.toSet());
+
+      // Container replicas which SCM is not aware of.
+      final  Set<ContainerID> newReplicas =
+          new HashSet<>(actualContainerIDs);
+      newReplicas.removeAll(expectedContainerIDs);
+
+      // Container replicas which are missing from datanode.
+      final Set<ContainerID> missingReplicas =
+          new HashSet<>(expectedContainerIDs);
+      missingReplicas.removeAll(actualContainerIDs);
+
+      processContainerReplicas(datanodeDetails, replicas, publisher);
+
+      // Remove missing replica from ContainerManager
+      for (ContainerID id : missingReplicas) {
+        try {
+          containerManager.getContainerReplicas(id)
+              .stream()
+              .filter(replica ->
+                  replica.getDatanodeDetails().equals(datanodeDetails))
+              .findFirst()
+              .ifPresent(replica -> {
+                try {
+                  containerManager.removeContainerReplica(id, replica);
+                } catch (ContainerNotFoundException |
+                    ContainerReplicaNotFoundException e) {
+                  // This should not happen, but even if it happens, not an
+                  // issue
+                }
+              });
+        } catch (ContainerNotFoundException e) {
+          LOG.warn("Cannot remove container replica, container {} not found",
+              id);
+        }
       }
 
-      for (ContainerID containerID : reportResult.getNewEntries()) {
-        final ContainerReplica replica = ContainerReplica.newBuilder()
-            .setContainerID(containerID)
-            .setDatanodeDetails(datanodeOrigin)
-            .build();
-        containerManager.updateContainerReplica(containerID, replica);
-        checkReplicationState(containerID, publisher);
-      }
+      // Update the latest set of containers for this datanode in NodeManager.
+      nodeManager.setContainers(datanodeDetails, actualContainerIDs);
+
+      // Replicate if needed.
+      newReplicas.forEach(id -> checkReplicationState(id, publisher));
+      missingReplicas.forEach(id -> checkReplicationState(id, publisher));
+
+    } catch (NodeNotFoundException ex) {
+      LOG.error("Received container report from unknown datanode {}",
+          datanodeDetails);
+    }
+
+  }
 
-    } catch (IOException e) {
-      //TODO: stop all the replication?
-      LOG.error("Error on processing container report from datanode {}",
-          datanodeOrigin, e);
+  private void processContainerReplicas(final DatanodeDetails datanodeDetails,
+      final List<ContainerReplicaProto> replicas,
+      final EventPublisher publisher) {
+    final PendingDeleteStatusList pendingDeleteStatusList =
+        new PendingDeleteStatusList(datanodeDetails);
+    for (ContainerReplicaProto replicaProto : replicas) {
+      try {
+        final ContainerID containerID = ContainerID.valueof(
+            replicaProto.getContainerID());
+        final ContainerInfo containerInfo = containerManager
+            .getContainer(containerID);
+        updateContainerState(datanodeDetails, containerInfo,
+            replicaProto, publisher);
+        if (containerInfo.getDeleteTransactionId() >
+            replicaProto.getDeleteTransactionId()) {
+          pendingDeleteStatusList
+              .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(),
+                  containerInfo.getDeleteTransactionId(),
+                  containerInfo.getContainerID());
+        }
+      } catch (ContainerNotFoundException e) {
+        LOG.error("Received container report for an unknown container {}",
+            replicaProto.getContainerID());
+      }
     }
+    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
+      publisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
+          pendingDeleteStatusList);
+    }
+  }
 
+  private void updateContainerState(final DatanodeDetails datanodeDetails,
+      final ContainerInfo containerInfo,
+      final ContainerReplicaProto replicaProto,
+      final EventPublisher publisher)
+      throws ContainerNotFoundException {
+
+    final ContainerID id = containerInfo.containerID();
+    final ContainerReplica datanodeContainerReplica = ContainerReplica
+        .newBuilder()
+        .setContainerID(id)
+        .setContainerState(replicaProto.getState())
+        .setDatanodeDetails(datanodeDetails)
+        .build();
+    // TODO: Add bcsid and origin datanode to replica.
+
+    final ContainerReplica scmContainerReplica = containerManager
+        .getContainerReplicas(id)
+        .stream()
+        .filter(replica ->
+            replica.getDatanodeDetails().equals(datanodeDetails))
+        .findFirst().orElse(null);
+
+    // This is an in-memory update.
+    containerManager.updateContainerReplica(id, datanodeContainerReplica);
+    containerInfo.setUsedBytes(replicaProto.getUsed());
+    containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
+
+    // Check if there is state change in container replica.
+    if (scmContainerReplica == null ||
+        scmContainerReplica.getState() != datanodeContainerReplica.getState()) {
+      //TODO: Handler replica state change.
+    }
   }
 
   private void checkReplicationState(ContainerID containerID,

+ 98 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handles incremental container reports from datanode.
+ */
+public class IncrementalContainerReportHandler implements
+    EventHandler<IncrementalContainerReportFromDatanode> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IncrementalContainerReportHandler.class);
+
+  private final PipelineManager pipelineManager;
+  private final ContainerManager containerManager;
+
+  public IncrementalContainerReportHandler(
+      final PipelineManager pipelineManager,
+      final ContainerManager containerManager)  {
+    Preconditions.checkNotNull(pipelineManager);
+    Preconditions.checkNotNull(containerManager);
+    this.pipelineManager = pipelineManager;
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  public void onMessage(
+      final IncrementalContainerReportFromDatanode containerReportFromDatanode,
+      final EventPublisher publisher) {
+
+    for (ContainerReplicaProto replicaProto :
+         containerReportFromDatanode.getReport().getReportList()) {
+      try {
+        final DatanodeDetails datanodeDetails = containerReportFromDatanode
+            .getDatanodeDetails();
+        final ContainerID containerID = ContainerID
+            .valueof(replicaProto.getContainerID());
+        final ContainerInfo containerInfo = containerManager
+            .getContainer(containerID);
+
+        ContainerReplica replica = ContainerReplica.newBuilder()
+            .setContainerID(ContainerID.valueof(replicaProto.getContainerID()))
+            .setContainerState(replicaProto.getState())
+            .setDatanodeDetails(datanodeDetails)
+            .build();
+
+        containerManager.updateContainerReplica(containerID, replica);
+
+        // Check if the state of the container is changed.
+        if (replicaProto.getState() == ContainerReplicaProto.State.CLOSED &&
+            containerInfo.getState() == HddsProtos.LifeCycleState.CLOSING) {
+          containerManager.updateContainerState(containerID,
+              HddsProtos.LifeCycleEvent.CLOSE);
+        }
+
+        // TODO: Handler replica state change
+
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Container {} not found!", replicaProto.getContainerID());
+      } catch (IOException e) {
+        LOG.error("Exception while processing ICR for container {}",
+            replicaProto.getContainerID());
+      }
+    }
+
+  }
+}

+ 0 - 107
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java

@@ -23,13 +23,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -37,10 +34,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -488,106 +481,6 @@ public class SCMContainerManager implements ContainerManager {
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
-  /**
-   * Process container report from Datanode.
-   * <p>
-   * Processing follows a very simple logic for time being.
-   * <p>
-   * 1. Datanodes report the current State -- denoted by the datanodeState
-   * <p>
-   * 2. We are the older SCM state from the Database -- denoted by
-   * the knownState.
-   * <p>
-   * 3. We copy the usage etc. from currentState to newState and log that
-   * newState to the DB. This allows us SCM to bootup again and read the
-   * state of the world from the DB, and then reconcile the state from
-   * container reports, when they arrive.
-   *
-   * @param reports Container report
-   */
-  @Override
-  public void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports) throws IOException {
-    List<ContainerReplicaProto>
-        containerInfos = reports.getReportsList();
-    PendingDeleteStatusList pendingDeleteStatusList =
-        new PendingDeleteStatusList(datanodeDetails);
-    for (ContainerReplicaProto newInfo :
-        containerInfos) {
-      ContainerID id = ContainerID.valueof(newInfo.getContainerID());
-      ContainerReplica replica = ContainerReplica.newBuilder()
-          .setContainerID(id)
-          .setDatanodeDetails(datanodeDetails)
-          .setOriginNodeId(datanodeDetails.getUuid())
-          .build();
-      lock.lock();
-      try {
-        containerStateManager.updateContainerReplica(id, replica);
-        ContainerInfo currentInfo = containerStateManager.getContainer(id);
-        if (newInfo.getState() == ContainerReplicaProto.State.CLOSED
-            && currentInfo.getState() == LifeCycleState.CLOSING) {
-          currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
-          if (!currentInfo.isOpen()) {
-            pipelineManager.removeContainerFromPipeline(
-                currentInfo.getPipelineID(), id);
-          }
-        }
-
-        ContainerInfoProto newState =
-            reconcileState(newInfo, currentInfo);
-
-        if (currentInfo.getDeleteTransactionId() >
-            newInfo.getDeleteTransactionId()) {
-          pendingDeleteStatusList
-                .addPendingDeleteStatus(newInfo.getDeleteTransactionId(),
-                    currentInfo.getDeleteTransactionId(),
-                    currentInfo.getContainerID());
-        }
-        containerStateManager.updateContainerInfo(
-            ContainerInfo.fromProtobuf(newState));
-        containerStore.put(id.getBytes(), newState.toByteArray());
-      } catch (ContainerNotFoundException e) {
-        LOG.error("Error while processing container report from datanode :" +
-                " {}, for container: {}, reason: container doesn't exist in" +
-                "container database.", datanodeDetails, id);
-      } finally {
-        lock.unlock();
-      }
-    }
-    if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
-      eventPublisher.fireEvent(SCMEvents.PENDING_DELETE_STATUS,
-          pendingDeleteStatusList);
-    }
-
-  }
-
-  /**
-   * Reconciles the state from Datanode with the state in SCM.
-   *
-   * @param datanodeState - State from the Datanode.
-   * @param knownState - State inside SCM.
-   * @return new SCM State for this container.
-   */
-  private HddsProtos.ContainerInfoProto reconcileState(
-      ContainerReplicaProto datanodeState,
-      ContainerInfo knownState) {
-    HddsProtos.ContainerInfoProto.Builder builder =
-        HddsProtos.ContainerInfoProto.newBuilder();
-    builder.setContainerID(knownState.getContainerID())
-        .setPipelineID(knownState.getPipelineID().getProtobuf())
-        .setState(knownState.getState())
-        .setReplicationType(knownState.getReplicationType())
-        .setReplicationFactor(knownState.getReplicationFactor())
-        .setUsedBytes(datanodeState.getUsed())
-        .setNumberOfKeys(datanodeState.getKeyCount())
-        .setStateEnterTime(knownState.getStateEnterTime())
-        .setDeleteTransactionId(knownState.getDeleteTransactionId());
-    if (knownState.getOwner() != null) {
-      builder.setOwner(knownState.getOwner());
-    }
-    return builder.build();
-  }
-
   /**
    * Returns the latest list of DataNodes where replica for given containerId
    * exist. Throws an SCMException if no entry is found for given containerId.

+ 0 - 7
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm.container.states;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -263,12 +262,6 @@ public class ContainerStateMap {
     }
   }
 
-  @VisibleForTesting
-  // TODO: fix the test case and remove this method!
-  public static Logger getLOG() {
-    return LOG;
-  }
-
   /**
    * Just update the container State.
    * @param info ContainerInfo.

+ 12 - 22
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -21,14 +21,12 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-        .CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
         .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -78,6 +76,16 @@ public final class SCMEvents {
   public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
       new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
 
+  /**
+   * IncrementalContainerReports are send out by Datanodes.
+   * This report is received by SCMDatanodeHeartbeatDispatcher and
+   * Incremental_Container_Report Event is generated.
+   */
+  public static final TypedEvent<IncrementalContainerReportFromDatanode>
+      INCREMENTAL_CONTAINER_REPORT = new TypedEvent<>(
+      IncrementalContainerReportFromDatanode.class,
+      "Incremental_Container_Report");
+
   /**
    * ContainerActions are sent by Datanode. This event is received by
    * SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated.
@@ -137,16 +145,6 @@ public final class SCMEvents {
   public static final TypedEvent<ContainerID> CLOSE_CONTAINER =
       new TypedEvent<>(ContainerID.class, "Close_Container");
 
-  /**
-   * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by
-   * CloseContainerEventHandler after sending a SCMCommand to DataNode.
-   * CloseContainerWatcher will track this event. Watcher will be responsible
-   * for retrying it in event of failure or timeout.
-   */
-  public static final TypedEvent<CloseContainerRetryableReq>
-      CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>(
-      CloseContainerRetryableReq.class, "Close_Container_Retryable");
-
   /**
    * This event will be triggered whenever a new datanode is registered with
    * SCM.
@@ -174,14 +172,6 @@ public final class SCMEvents {
    */
   public static final Event<ReplicationStatus> REPLICATION_STATUS = new
       TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
-  /**
-   * This event will be triggered by CommandStatusReportHandler whenever a
-   * status for CloseContainer SCMCommand is received.
-   */
-  public static final Event<CloseContainerStatus>
-      CLOSE_CONTAINER_STATUS =
-      new TypedEvent<>(CloseContainerStatus.class,
-          "Close_Container_Command_Status");
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
    * status for DeleteBlock SCMCommand is received.

+ 25 - 12
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -61,7 +62,14 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
     // TODO: check if there are any pipeline on this node and fire close
     // pipeline event
     Set<ContainerID> ids =
-        nodeManager.getContainers(datanodeDetails.getUuid());
+        null;
+    try {
+      ids = nodeManager.getContainers(datanodeDetails);
+    } catch (NodeNotFoundException e) {
+      // This should not happen, we cannot get a dead node event for an
+      // unregistered node!
+      LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails);
+    }
     if (ids == null) {
       LOG.info("There's no containers in dead datanode {}, no replica will be"
           + " removed from the in-memory state.", datanodeDetails.getUuid());
@@ -72,18 +80,23 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
     for (ContainerID id : ids) {
       try {
         final ContainerInfo container = containerManager.getContainer(id);
+        // TODO: For open containers, trigger close on other nodes
+        // TODO: Check replica count and call replication manager
+        // on these containers.
         if (!container.isOpen()) {
-          final ContainerReplica replica = ContainerReplica.newBuilder()
-              .setContainerID(id)
-              .setDatanodeDetails(datanodeDetails)
-              .build();
-          try {
-            containerManager.removeContainerReplica(id, replica);
-            replicateIfNeeded(container, publisher);
-          } catch (ContainerException ex) {
-            LOG.warn("Exception while removing container replica #{} for " +
-                "container #{}.", replica, container, ex);
-          }
+          Set<ContainerReplica> replicas = containerManager
+              .getContainerReplicas(id);
+          replicas.stream()
+              .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
+              .findFirst()
+              .ifPresent(replica -> {
+                try {
+                  containerManager.removeContainerReplica(id, replica);
+                } catch (ContainerException ex) {
+                  LOG.warn("Exception while removing container replica #{} " +
+                      "for container #{}.", replica, container, ex);
+                }
+              });
         }
       } catch (ContainerNotFoundException cnfe) {
         LOG.warn("Container Not found!", cnfe);

+ 1 - 15
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java

@@ -19,31 +19,17 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
-import java.util.Collections;
-
 /**
  * Handles New Node event.
  */
 public class NewNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final NodeManager nodeManager;
-
-  public NewNodeHandler(NodeManager nodeManager) {
-    this.nodeManager = nodeManager;
-  }
-
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    try {
-      nodeManager.addDatanodeInContainerMap(datanodeDetails.getUuid(),
-          Collections.emptySet());
-    } catch (SCMException e) {
-      // TODO: log exception message.
-    }
+    // We currently have nothing to do when we receive new node event.
   }
 }

+ 13 - 37
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -63,13 +62,6 @@ import java.util.UUID;
  */
 public interface NodeManager extends StorageContainerNodeProtocol,
     EventHandler<CommandForDatanode>, NodeManagerMXBean, Closeable {
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  void removeNode(DatanodeDetails node) throws NodeNotFoundException;
 
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
@@ -102,6 +94,7 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * Return a map of node stats.
    * @return a map of individual node stats (live/stale but not dead).
    */
+  // TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat>
   Map<UUID, SCMNodeStat> getNodeStats();
 
   /**
@@ -121,10 +114,10 @@ public interface NodeManager extends StorageContainerNodeProtocol,
 
   /**
    * Get set of pipelines a datanode is part of.
-   * @param dnId - datanodeID
+   * @param datanodeDetails DatanodeDetails
    * @return Set of PipelineID
    */
-  Set<PipelineID> getPipelineByDnID(UUID dnId);
+  Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails);
 
   /**
    * Add pipeline information in the NodeManager.
@@ -139,40 +132,22 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   void removePipeline(Pipeline pipeline);
 
   /**
-   * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
+   * Remaps datanode to containers mapping to the new set of containers.
+   * @param datanodeDetails - DatanodeDetails
    * @param containerIds - Set of containerIDs
    * @throws SCMException - if datanode is not known. For new datanode use
    *                        addDatanodeInContainerMap call.
    */
-  void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
-      throws SCMException;
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds);
+  void setContainers(DatanodeDetails datanodeDetails,
+      Set<ContainerID> containerIds) throws NodeNotFoundException;
 
   /**
    * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
-   * @return - set of containerIDs
-   */
-  Set<ContainerID> getContainers(UUID uuid);
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
+   * @param datanodeDetails DatanodeDetails
+   * @return set of containerIDs
    */
-  void addDatanodeInContainerMap(UUID uuid, Set<ContainerID> containerIDs)
-      throws SCMException;
+  Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException;
 
   /**
    * Add a {@link SCMCommand} to the command queue, which are
@@ -188,7 +163,7 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @param dnUuid
    * @param nodeReport
    */
-  void processNodeReport(UUID dnUuid, NodeReportProto nodeReport);
+  void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport);
 
   /**
    * Process a dead node event in this Node Manager.
@@ -202,5 +177,6 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @param dnID - Datanode uuid.
    * @return list of commands
    */
+  // TODO: We can give better name to this method!
   List<SCMCommand> getCommandQueue(UUID dnID);
 }

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java

@@ -50,6 +50,6 @@ public class NodeReportHandler implements EventHandler<NodeReportFromDatanode> {
         + "missing DatanodeDetails.");
     LOGGER.trace("Processing node report for dn: {}", dn);
     nodeManager
-        .processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport());
+        .processNodeReport(dn, nodeReportFromDatanode.getReport());
   }
 }

+ 7 - 62
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java

@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.*;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 import org.apache.hadoop.hdds.server.events.Event;
@@ -93,11 +92,6 @@ public class NodeStateManager implements Runnable, Closeable {
    * Maintains the mapping from node to pipelines a node is part of.
    */
   private final Node2PipelineMap node2PipelineMap;
-  /**
-   * Maintains the map from node to ContainerIDs for the containers
-   * available on the node.
-   */
-  private final Node2ContainerMap node2ContainerMap;
   /**
    * Used for publishing node state change events.
    */
@@ -131,7 +125,6 @@ public class NodeStateManager implements Runnable, Closeable {
   public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
     this.nodeStateMap = new NodeStateMap();
     this.node2PipelineMap = new Node2PipelineMap();
-    this.node2ContainerMap = new Node2ContainerMap();
     this.eventPublisher = eventPublisher;
     this.state2EventMap = new HashMap<>();
     initialiseState2EventMap();
@@ -430,18 +423,6 @@ public class NodeStateManager implements Runnable, Closeable {
     return nodeStateMap.getTotalNodeCount();
   }
 
-  /**
-   * Removes a node from NodeStateManager.
-   *
-   * @param datanodeDetails DatanodeDetails
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public void removeNode(DatanodeDetails datanodeDetails)
-      throws NodeNotFoundException {
-    nodeStateMap.removeNode(datanodeDetails.getUuid());
-  }
-
   /**
    * Returns the current stats of the node.
    *
@@ -474,19 +455,6 @@ public class NodeStateManager implements Runnable, Closeable {
     nodeStateMap.setNodeStat(uuid, newstat);
   }
 
-  /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not present.
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    return nodeStateMap.removeNodeStat(uuid);
-  }
-
   /**
    * Removes a pipeline from the node2PipelineMap.
    * @param pipeline - Pipeline to be removed
@@ -498,23 +466,11 @@ public class NodeStateManager implements Runnable, Closeable {
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
    * @param containerIds - Set of containerIDs
-   * @throws SCMException - if datanode is not known. For new datanode use
-   *                        addDatanodeInContainerMap call.
-   */
-  public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
-      throws SCMException {
-    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
+   * @throws NodeNotFoundException - if datanode is not known.
    */
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return node2ContainerMap.processReport(uuid, containerIds);
+  public void setContainers(UUID uuid, Set<ContainerID> containerIds)
+      throws NodeNotFoundException {
+    nodeStateMap.setContainers(uuid, containerIds);
   }
 
   /**
@@ -522,20 +478,9 @@ public class NodeStateManager implements Runnable, Closeable {
    * @param uuid - DatanodeID
    * @return - set of containerIDs
    */
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return node2ContainerMap.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
+  public Set<ContainerID> getContainers(UUID uuid)
+      throws NodeNotFoundException {
+    return nodeStateMap.getContainers(uuid);
   }
 
   /**

+ 16 - 50
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -129,16 +128,6 @@ public class SCMNodeManager
     }
   }
 
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  @Override
-  public void removeNode(DatanodeDetails node) throws NodeNotFoundException {
-    nodeStateManager.removeNode(node);
-  }
 
   /**
    * Gets all datanodes that are in a certain state. This function works by
@@ -270,10 +259,8 @@ public class SCMNodeManager
       datanodeDetails.setHostName(dnAddress.getHostName());
       datanodeDetails.setIpAddress(dnAddress.getHostAddress());
     }
-    UUID dnId = datanodeDetails.getUuid();
     try {
       nodeStateManager.addNode(datanodeDetails);
-      nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
       // Updating Node Report, as registration is successful
       updateNodeStat(datanodeDetails.getUuid(), nodeReport);
       LOG.info("Registered Data node : {}", datanodeDetails);
@@ -326,8 +313,9 @@ public class SCMNodeManager
    * @param nodeReport
    */
   @Override
-  public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
-    this.updateNodeStat(dnUuid, nodeReport);
+  public void processNodeReport(DatanodeDetails dnUuid,
+                                NodeReportProto nodeReport) {
+    this.updateNodeStat(dnUuid.getUuid(), nodeReport);
   }
 
   /**
@@ -377,12 +365,12 @@ public class SCMNodeManager
 
   /**
    * Get set of pipelines a datanode is part of.
-   * @param dnId - datanodeID
+   * @param datanodeDetails - datanodeID
    * @return Set of PipelineID
    */
   @Override
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return nodeStateManager.getPipelineByDnID(dnId);
+  public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) {
+    return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
   }
 
 
@@ -406,50 +394,27 @@ public class SCMNodeManager
 
   /**
    * Update set of containers available on a datanode.
-   * @param uuid - DatanodeID
+   * @param datanodeDetails - DatanodeID
    * @param containerIds - Set of containerIDs
    * @throws SCMException - if datanode is not known. For new datanode use
    *                        addDatanodeInContainerMap call.
    */
   @Override
-  public void setContainersForDatanode(UUID uuid,
-      Set<ContainerID> containerIds) throws SCMException {
-    nodeStateManager.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  @Override
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return nodeStateManager.processContainerReport(uuid, containerIds);
+  public void setContainers(DatanodeDetails datanodeDetails,
+      Set<ContainerID> containerIds) throws NodeNotFoundException {
+    nodeStateManager.setContainers(datanodeDetails.getUuid(),
+        containerIds);
   }
 
   /**
    * Return set of containerIDs available on a datanode.
-   * @param uuid - DatanodeID
+   * @param datanodeDetails - DatanodeID
    * @return - set of containerIDs
    */
   @Override
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return nodeStateManager.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  @Override
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    nodeStateManager.addDatanodeInContainerMap(uuid, containerIDs);
+  public Set<ContainerID> getContainers(DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    return nodeStateManager.getContainers(datanodeDetails.getUuid());
   }
 
   // TODO:
@@ -481,6 +446,7 @@ public class SCMNodeManager
    * @param dnUuid datanode uuid.
    */
   @Override
+  // TODO: This should be removed.
   public void processDeadNode(UUID dnUuid) {
     try {
       SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java

@@ -49,7 +49,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
     Set<PipelineID> pipelineIds =
-        nodeManager.getPipelineByDnID(datanodeDetails.getUuid());
+        nodeManager.getPipelines(datanodeDetails);
     for (PipelineID pipelineID : pipelineIds) {
       try {
         pipelineManager.finalizePipeline(pipelineID);

+ 30 - 37
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.node.states;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 
@@ -48,6 +49,10 @@ public class NodeStateMap {
    * Represents the current stats of node.
    */
   private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
+  /**
+   * Node to set of containers on the node.
+   */
+  private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer;
 
   private final ReadWriteLock lock;
 
@@ -59,6 +64,7 @@ public class NodeStateMap {
     nodeMap = new ConcurrentHashMap<>();
     stateMap = new ConcurrentHashMap<>();
     nodeStats = new ConcurrentHashMap<>();
+    nodeToContainer = new ConcurrentHashMap<>();
     initStateMap();
   }
 
@@ -88,6 +94,8 @@ public class NodeStateMap {
         throw new NodeAlreadyExistsException("Node UUID: " + id);
       }
       nodeMap.put(id, new DatanodeInfo(datanodeDetails));
+      nodeStats.put(id, new SCMNodeStat());
+      nodeToContainer.put(id, Collections.emptySet());
       stateMap.get(nodeState).add(id);
     } finally {
       lock.writeLock().unlock();
@@ -237,30 +245,6 @@ public class NodeStateMap {
     }
   }
 
-  /**
-   * Removes the node from NodeStateMap.
-   *
-   * @param uuid node id
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public void removeNode(UUID uuid) throws NodeNotFoundException {
-    lock.writeLock().lock();
-    try {
-      if (nodeMap.containsKey(uuid)) {
-        for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
-          if(entry.getValue().remove(uuid)) {
-            break;
-          }
-          nodeMap.remove(uuid);
-        }
-        throw new NodeNotFoundException("Node UUID: " + uuid);
-      }
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
   /**
    * Returns the current stats of the node.
    *
@@ -298,21 +282,30 @@ public class NodeStateMap {
     nodeStats.put(uuid, newstat);
   }
 
-  /**
-   * Remove the current stats of the specify node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat the stat removed from the node.
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public SCMNodeStat removeNodeStat(UUID uuid) throws NodeNotFoundException {
-    SCMNodeStat stat = nodeStats.remove(uuid);
-    if (stat == null) {
+  public void setContainers(UUID uuid, Set<ContainerID> containers)
+      throws NodeNotFoundException{
+    if (!nodeToContainer.containsKey(uuid)) {
       throw new NodeNotFoundException("Node UUID: " + uuid);
     }
-    return stat;
+    nodeToContainer.put(uuid, containers);
+  }
+
+  public Set<ContainerID> getContainers(UUID uuid)
+      throws NodeNotFoundException {
+    Set<ContainerID> containers = nodeToContainer.get(uuid);
+    if (containers == null) {
+      throw new NodeNotFoundException("Node UUID: " + uuid);
+    }
+    return Collections.unmodifiableSet(containers);
+  }
+
+  public void removeContainer(UUID uuid, ContainerID containerID) throws
+      NodeNotFoundException {
+    Set<ContainerID> containers = nodeToContainer.get(uuid);
+    if (containers == null) {
+      throw new NodeNotFoundException("Node UUID: " + uuid);
+    }
+    containers.remove(containerID);
   }
 
   /**

+ 16 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.hdds.scm.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+    .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -209,6 +211,19 @@ public final class SCMDatanodeHeartbeatDispatcher {
     }
   }
 
+  /**
+   * Incremental Container report event payload with origin.
+   */
+  public static class IncrementalContainerReportFromDatanode
+      extends ReportFromDatanode<IncrementalContainerReportProto> {
+
+    public IncrementalContainerReportFromDatanode(
+        DatanodeDetails datanodeDetails,
+        IncrementalContainerReportProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
   /**
    * Container action event payload with origin.
    */

+ 4 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java

@@ -97,6 +97,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRES
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -196,8 +197,9 @@ public class SCMDatanodeProtocolServer implements
         .register(datanodeDetails, nodeReport, pipelineReportsProto);
     if (registeredCommand.getError()
         == SCMRegisteredResponseProto.ErrorCode.success) {
-      scm.getContainerManager().processContainerReports(datanodeDetails,
-          containerReportsProto);
+      eventPublisher.fireEvent(CONTAINER_REPORT,
+          new SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode(
+              datanodeDetails, containerReportsProto));
       eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
           new NodeRegistrationContainerReport(datanodeDetails,
               containerReportsProto));

+ 10 - 11
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -39,10 +39,10 @@ import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
 import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
-import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
 import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.replication
@@ -221,7 +221,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     CommandStatusReportHandler cmdStatusReportHandler =
         new CommandStatusReportHandler();
 
-    NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager);
+    NewNodeHandler newNodeHandler = new NewNodeHandler();
     StaleNodeHandler staleNodeHandler =
         new StaleNodeHandler(scmNodeManager, pipelineManager);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
@@ -231,8 +231,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
 
     ContainerReportHandler containerReportHandler =
-        new ContainerReportHandler(containerManager, scmNodeManager,
-            replicationStatus);
+        new ContainerReportHandler(scmNodeManager, pipelineManager,
+            containerManager, replicationStatus);
+
+    IncrementalContainerReportHandler incrementalContainerReportHandler =
+        new IncrementalContainerReportHandler(
+            pipelineManager, containerManager);
 
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager);
@@ -258,13 +262,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     replicationManager = new ReplicationManager(containerPlacementPolicy,
         containerManager, eventQueue, commandWatcherLeaseManager);
 
-    // setup CloseContainer watcher
-    CloseContainerWatcher closeContainerWatcher =
-        new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-            SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager,
-            containerManager);
-    closeContainerWatcher.start(eventQueue);
-
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);
     scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -282,6 +279,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
+    eventQueue.addHandler(SCMEvents.INCREMENTAL_CONTAINER_REPORT,
+        incrementalContainerReportHandler);
     eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
     eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);

+ 0 - 14
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java

@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -51,8 +50,6 @@ public class TestCommandStatusReportHandler implements EventPublisher {
   private static final Logger LOG = LoggerFactory
       .getLogger(TestCommandStatusReportHandler.class);
   private CommandStatusReportHandler cmdStatusReportHandler;
-  private String storagePath = GenericTestUtils.getRandomizedTempPath()
-      .concat("/" + UUID.randomUUID().toString());
 
   @Before
   public void setup() {
@@ -68,8 +65,6 @@ public class TestCommandStatusReportHandler implements EventPublisher {
         .emptyList());
     cmdStatusReportHandler.onMessage(report, this);
     assertFalse(logCapturer.getOutput().contains("Delete_Block_Status"));
-    assertFalse(logCapturer.getOutput().contains(
-        "Close_Container_Command_Status"));
     assertFalse(logCapturer.getOutput().contains("Replicate_Command_Status"));
 
 
@@ -77,13 +72,9 @@ public class TestCommandStatusReportHandler implements EventPublisher {
     cmdStatusReportHandler.onMessage(report, this);
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
         "Delete_Block_Status"));
-    assertTrue(logCapturer.getOutput().contains("firing event of type " +
-        "Close_Container_Command_Status"));
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
         "Replicate_Command_Status"));
 
-    assertTrue(logCapturer.getOutput().contains("type: " +
-        "closeContainerCommand"));
     assertTrue(logCapturer.getOutput().contains("type: " +
         "deleteBlocksCommand"));
     assertTrue(logCapturer.getOutput().contains("type: " +
@@ -119,11 +110,6 @@ public class TestCommandStatusReportHandler implements EventPublisher {
         .setType(Type.deleteBlocksCommand);
     reports.add(builder.build());
 
-    builder.setCmdId(HddsIdFactory.getLongId())
-        .setStatus(CommandStatus.Status.EXECUTED)
-        .setType(Type.closeContainerCommand);
-    reports.add(builder.build());
-
     builder.setMsg("Not enough space")
         .setCmdId(HddsIdFactory.getLongId())
         .setStatus(CommandStatus.Status.FAILED)

+ 20 - 46
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
@@ -44,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.assertj.core.util.Preconditions;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -138,18 +138,6 @@ public class MockNodeManager implements NodeManager {
     this.chillmode = chillmode;
   }
 
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  @Override
-  public void removeNode(DatanodeDetails node)
-      throws NodeNotFoundException {
-
-  }
-
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
    *
@@ -248,8 +236,8 @@ public class MockNodeManager implements NodeManager {
    * @return Set of PipelineID
    */
   @Override
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
-    return node2PipelineMap.getPipelines(dnId);
+  public Set<PipelineID> getPipelines(DatanodeDetails dnId) {
+    return node2PipelineMap.getPipelines(dnId.getUuid());
   }
 
   /**
@@ -290,7 +278,8 @@ public class MockNodeManager implements NodeManager {
    * @param nodeReport
    */
   @Override
-  public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
+  public void processNodeReport(DatanodeDetails dnUuid,
+      NodeReportProto nodeReport) {
     // do nothing
   }
 
@@ -302,21 +291,13 @@ public class MockNodeManager implements NodeManager {
    *                        addDatanodeInContainerMap call.
    */
   @Override
-  public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
-      throws SCMException {
-    node2ContainerMap.setContainersForDatanode(uuid, containerIds);
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  @Override
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
-    return node2ContainerMap.processReport(uuid, containerIds);
+  public void setContainers(DatanodeDetails uuid, Set<ContainerID> containerIds)
+      throws NodeNotFoundException {
+    try {
+      node2ContainerMap.setContainersForDatanode(uuid.getUuid(), containerIds);
+    } catch (SCMException e) {
+      throw new NodeNotFoundException(e.getMessage());
+    }
   }
 
   /**
@@ -325,21 +306,8 @@ public class MockNodeManager implements NodeManager {
    * @return - set of containerIDs
    */
   @Override
-  public Set<ContainerID> getContainers(UUID uuid) {
-    return node2ContainerMap.getContainers(uuid);
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  @Override
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
-    node2ContainerMap.insertNewDatanode(uuid, containerIDs);
+  public Set<ContainerID> getContainers(DatanodeDetails uuid) {
+    return node2ContainerMap.getContainers(uuid.getUuid());
   }
 
   // Returns the number of commands that is queued to this node manager.
@@ -393,6 +361,12 @@ public class MockNodeManager implements NodeManager {
   @Override
   public RegisteredCommand register(DatanodeDetails datanodeDetails,
       NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto) {
+    try {
+      node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
+          Collections.emptySet());
+    } catch (SCMException e) {
+      e.printStackTrace();
+    }
     return null;
   }
 

+ 8 - 7
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.replication
     .ReplicationActivityStatus;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -69,7 +70,7 @@ public class TestContainerReportHandler implements EventPublisher {
   //TODO: Rewrite it
   @Ignore
   @Test
-  public void test() throws IOException {
+  public void test() throws IOException, NodeNotFoundException {
     String testDir = GenericTestUtils.getTempPath(
         this.getClass().getSimpleName());
     //GIVEN
@@ -85,17 +86,17 @@ public class TestContainerReportHandler implements EventPublisher {
         new ReplicationActivityStatus();
 
     ContainerReportHandler reportHandler =
-        new ContainerReportHandler(containerManager, nodeManager,
-            replicationActivityStatus);
+        new ContainerReportHandler(nodeManager, pipelineManager,
+            containerManager, replicationActivityStatus);
 
     DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn2 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn3 = TestUtils.randomDatanodeDetails();
     DatanodeDetails dn4 = TestUtils.randomDatanodeDetails();
-    nodeManager.addDatanodeInContainerMap(dn1.getUuid(), new HashSet<>());
-    nodeManager.addDatanodeInContainerMap(dn2.getUuid(), new HashSet<>());
-    nodeManager.addDatanodeInContainerMap(dn3.getUuid(), new HashSet<>());
-    nodeManager.addDatanodeInContainerMap(dn4.getUuid(), new HashSet<>());
+    nodeManager.setContainers(dn1, new HashSet<>());
+    nodeManager.setContainers(dn2, new HashSet<>());
+    nodeManager.setContainers(dn3, new HashSet<>());
+    nodeManager.setContainers(dn4, new HashSet<>());
 
     ContainerInfo cont1 = containerManager
         .allocateContainer(ReplicationType.STAND_ALONE,

+ 3 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java

@@ -24,6 +24,8 @@ import java.util.Set;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -96,6 +98,7 @@ public class TestContainerStateManager {
       throws ContainerNotFoundException {
     ContainerReplica replica = ContainerReplica.newBuilder()
         .setContainerID(cont.containerID())
+        .setContainerState(ContainerReplicaProto.State.CLOSED)
         .setDatanodeDetails(node)
         .build();
     containerStateManager

+ 4 - 103
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java

@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -30,8 +29,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -47,8 +44,6 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
@@ -187,10 +182,12 @@ public class TestSCMContainerManager {
 
     containerManager.updateContainerReplica(contInfo.containerID(),
         ContainerReplica.newBuilder().setContainerID(contInfo.containerID())
-        .setDatanodeDetails(dn1).build());
+            .setContainerState(ContainerReplicaProto.State.CLOSED)
+            .setDatanodeDetails(dn1).build());
     containerManager.updateContainerReplica(contInfo.containerID(),
         ContainerReplica.newBuilder().setContainerID(contInfo.containerID())
-        .setDatanodeDetails(dn2).build());
+            .setContainerState(ContainerReplicaProto.State.CLOSED)
+            .setDatanodeDetails(dn2).build());
 
     Assert.assertEquals(2,
         containerManager.getContainerReplicas(
@@ -240,102 +237,6 @@ public class TestSCMContainerManager {
             HddsProtos.LifeCycleEvent.CREATED);
   }
 
-  @Test
-  public void testFullContainerReport() throws Exception {
-    ContainerInfo info = createContainer();
-    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
-    List<ContainerReplicaProto> reports =
-        new ArrayList<>();
-    ContainerReplicaProto.Builder ciBuilder =
-        ContainerReplicaProto.newBuilder();
-    ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
-        .setSize(5368709120L)
-        .setUsed(2000000000L)
-        .setKeyCount(100000000L)
-        .setReadCount(100000000L)
-        .setWriteCount(100000000L)
-        .setReadBytes(2000000000L)
-        .setWriteBytes(2000000000L)
-        .setContainerID(info.getContainerID())
-        .setState(ContainerReplicaProto.State.CLOSED)
-        .setDeleteTransactionId(0);
-
-    reports.add(ciBuilder.build());
-
-    ContainerReportsProto.Builder crBuilder = ContainerReportsProto
-        .newBuilder();
-    crBuilder.addAllReports(reports);
-
-    containerManager.processContainerReports(
-        datanodeDetails, crBuilder.build());
-
-    ContainerInfo updatedContainer =
-        containerManager.getContainer(info.containerID());
-    Assert.assertEquals(100000000L,
-        updatedContainer.getNumberOfKeys());
-    Assert.assertEquals(2000000000L, updatedContainer.getUsedBytes());
-
-    for (ContainerReplicaProto c : reports) {
-     Assert.assertEquals(containerManager.getContainerReplicas(
-         ContainerID.valueof(c.getContainerID())).size(), 1);
-    }
-
-    containerManager.processContainerReports(TestUtils.randomDatanodeDetails(),
-        crBuilder.build());
-    for (ContainerReplicaProto c : reports) {
-      Assert.assertEquals(containerManager.getContainerReplicas(
-              ContainerID.valueof(c.getContainerID())).size(), 2);
-    }
-  }
-
-  @Test
-  public void testListContainerAfterReport() throws Exception {
-    ContainerInfo info1 = createContainer();
-    ContainerInfo info2 = createContainer();
-    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
-    List<ContainerReplicaProto> reports =
-        new ArrayList<>();
-    ContainerReplicaProto.Builder ciBuilder =
-        ContainerReplicaProto.newBuilder();
-    long cID1 = info1.getContainerID();
-    long cID2 = info2.getContainerID();
-    ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
-        .setSize(1000000000L)
-        .setUsed(987654321L)
-        .setKeyCount(100000000L)
-        .setReadBytes(1000000000L)
-        .setWriteBytes(1000000000L)
-        .setContainerID(cID1)
-        .setState(ContainerReplicaProto.State.CLOSED);
-    reports.add(ciBuilder.build());
-
-    ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea54a9")
-        .setSize(1000000000L)
-        .setUsed(123456789L)
-        .setKeyCount(200000000L)
-        .setReadBytes(3000000000L)
-        .setWriteBytes(4000000000L)
-        .setContainerID(cID2);
-    reports.add(ciBuilder.build());
-
-    ContainerReportsProto.Builder crBuilder = ContainerReportsProto
-        .newBuilder();
-    crBuilder.addAllReports(reports);
-
-    containerManager.processContainerReports(
-        datanodeDetails, crBuilder.build());
-
-    List<ContainerInfo> list = containerManager.listContainer(
-        ContainerID.valueof(1), 50);
-    Assert.assertEquals(2, list.stream().filter(
-        x -> x.getContainerID() == cID1 || x.getContainerID() == cID2).count());
-    Assert.assertEquals(300000000L, list.stream().filter(
-        x -> x.getContainerID() == cID1 || x.getContainerID() == cID2)
-        .mapToLong(x -> x.getNumberOfKeys()).sum());
-    Assert.assertEquals(1111111110L, list.stream().filter(
-        x -> x.getContainerID() == cID1 || x.getContainerID() == cID2)
-        .mapToLong(x -> x.getUsedBytes()).sum());
-  }
 
   @Test
   public void testCloseContainer() throws IOException {

+ 9 - 3
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java

@@ -28,7 +28,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -36,8 +39,10 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication
+    .ReplicationManager.ReplicationRequestToRepeat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.lease.LeaseManager;
@@ -81,6 +86,7 @@ public class TestReplicationManager {
       listOfDatanodeDetails.add(dd);
       listOfContainerReplica.add(ContainerReplica.newBuilder()
           .setContainerID(ContainerID.valueof(i))
+          .setContainerState(ContainerReplicaProto.State.CLOSED)
           .setDatanodeDetails(dd).build());
     });
 

+ 14 - 10
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java

@@ -30,8 +30,12 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -42,10 +46,11 @@ import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .NodeReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -93,7 +98,7 @@ public class TestDeadNodeHandler {
   }
 
   @Test
-  public void testOnMessage() throws IOException {
+  public void testOnMessage() throws IOException, NodeNotFoundException {
     //GIVEN
     DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
     DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
@@ -269,11 +274,9 @@ public class TestDeadNodeHandler {
         container1.containerID(), HddsProtos.LifeCycleEvent.CREATED);
     TestUtils.closeContainer(containerManager, container1.containerID());
 
-    registerReplicas(dn1, container1);
-
     deadNodeHandler.onMessage(dn1, eventQueue);
     Assert.assertTrue(logCapturer.getOutput().contains(
-        "Exception while removing container replica "));
+        "DeadNode event for a unregistered node"));
   }
 
   private void registerReplicas(ContainerManager containerManager,
@@ -283,6 +286,7 @@ public class TestDeadNodeHandler {
       containerManager.updateContainerReplica(
           new ContainerID(container.getContainerID()),
           ContainerReplica.newBuilder()
+              .setContainerState(ContainerReplicaProto.State.OPEN)
               .setContainerID(container.containerID())
               .setDatanodeDetails(datanode).build());
     }
@@ -290,9 +294,9 @@ public class TestDeadNodeHandler {
 
   private void registerReplicas(DatanodeDetails datanode,
       ContainerInfo... containers)
-      throws SCMException {
+      throws NodeNotFoundException {
     nodeManager
-        .addDatanodeInContainerMap(datanode.getUuid(),
+        .setContainers(datanode,
             Arrays.stream(containers)
                 .map(container -> new ContainerID(container.getContainerID()))
                 .collect(Collectors.toSet()));

+ 0 - 289
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java

@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.container;
-
-import org.apache.hadoop.hdds.HddsIdFactory;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-    .CloseContainerRetryableReq;
-import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-/**
- * Test class for {@link CloseContainerWatcher}.
- * */
-public class TestCloseContainerWatcher implements EventHandler<ContainerID> {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestCloseContainerWatcher.class);
-  private static EventWatcher<CloseContainerRetryableReq, CloseContainerStatus>
-      watcher;
-  private static LeaseManager<Long> leaseManager;
-  private static SCMContainerManager containerManager = Mockito
-      .mock(SCMContainerManager.class);
-  private static EventQueue queue;
-  @Rule
-  public Timeout timeout = new Timeout(1000*15);
-
-  @After
-  public void stop() {
-    leaseManager.shutdown();
-    queue.close();
-  }
-
-  /*
-   * This test will test watcher for Failure status event.
-   * */
-  @Test
-  public void testWatcherForFailureStatusEvent() throws
-      InterruptedException, IOException {
-    setupWatcher(90000L);
-    long id1 = HddsIdFactory.getLongId();
-    long id2 = HddsIdFactory.getLongId();
-    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
-    setupMock(id1, id2, true);
-    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
-        .captureLogs(LOG);
-    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerWatcher.LOG);
-    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
-    testLogger.clearOutput();
-    watcherLogger.clearOutput();
-
-    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
-        .setCmdId(id1)
-        .setStatus(CommandStatus.Status.FAILED)
-        .setType(Type.closeContainerCommand).build();
-    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
-        .setCmdId(id2)
-        .setStatus(CommandStatus.Status.FAILED)
-        .setType(Type.closeContainerCommand).build();
-
-    // File events to watcher
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
-    Thread.sleep(10L);
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-        CloseContainerStatus(cmdStatus1));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-        CloseContainerStatus(cmdStatus2));
-
-    Thread.sleep(1000*4L);
-    // validation
-    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
-        "containerId: " + id1 + " executed"));
-    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
-        "containerId: " + id2 + " executed"));
-    assertTrue(
-        testLogger.getOutput().contains("Handling closeContainerEvent " +
-            "for containerId: id=" + id1));
-    assertTrue(testLogger.getOutput().contains("Handling closeContainerEvent " +
-        "for containerId: id=" + id2));
-
-  }
-
-  @Test
-  public void testWatcherForPendingStatusEvent() throws
-      InterruptedException, IOException {
-    setupWatcher(90000L);
-    long id1 = HddsIdFactory.getLongId();
-    long id2 = HddsIdFactory.getLongId();
-    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
-    setupMock(id1, id2, true);
-    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
-        .captureLogs(LOG);
-    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerWatcher.LOG);
-    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
-    testLogger.clearOutput();
-    watcherLogger.clearOutput();
-
-    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
-        .setCmdId(id1)
-        .setStatus(CommandStatus.Status.PENDING)
-        .setType(Type.closeContainerCommand).build();
-    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
-        .setCmdId(id2)
-        .setStatus(CommandStatus.Status.PENDING)
-        .setType(Type.closeContainerCommand).build();
-
-    // File events to watcher
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
-    Thread.sleep(10L);
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-        CloseContainerStatus(cmdStatus1));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-        CloseContainerStatus(cmdStatus2));
-
-    Thread.sleep(1000*2L);
-    // validation
-    assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
-        + "for containerId: " + id1 + " executed"));
-    assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
-        + "for containerId: " + id2 + " executed"));
-    assertFalse(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id1));
-    assertFalse(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id2));
-
-  }
-
-  @Test
-  public void testWatcherForExecutedStatusEvent()
-      throws IOException, InterruptedException {
-    setupWatcher(90000L);
-    long id1 = HddsIdFactory.getLongId();
-    long id2 = HddsIdFactory.getLongId();
-    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
-    setupMock(id1, id2, true);
-    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
-        .captureLogs(LOG);
-    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerWatcher.LOG);
-    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
-    testLogger.clearOutput();
-    watcherLogger.clearOutput();
-
-    // When both of the pending event are executed successfully by DataNode
-    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
-        .setCmdId(id1)
-        .setStatus(CommandStatus.Status.EXECUTED)
-        .setType(Type.closeContainerCommand).build();
-    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
-        .setCmdId(id2)
-        .setStatus(CommandStatus.Status.EXECUTED)
-        .setType(Type.closeContainerCommand).build();
-    // File events to watcher
-    testLogger.clearOutput();
-    watcherLogger.clearOutput();
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
-    Thread.sleep(10L);
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
-        new CloseContainerStatus(cmdStatus1));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
-        new CloseContainerStatus(cmdStatus2));
-
-    Thread.sleep(1000*3L);
-    // validation
-    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
-        + "for containerId: " + id1 + " executed"));
-    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
-        + "for containerId: " + id2 + " executed"));
-    assertFalse(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id1));
-    assertFalse(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id2));
-  }
-
-  private void setupWatcher(long time) {
-    leaseManager = new LeaseManager<>("TestCloseContainerWatcher#LeaseManager",
-        time);
-    leaseManager.start();
-    watcher = new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        SCMEvents.CLOSE_CONTAINER_STATUS, leaseManager, containerManager);
-    queue = new EventQueue();
-    watcher.start(queue);
-  }
-
-  /*
-   * This test will fire two retryable closeContainer events. Both will timeout.
-   * First event container will be open at time of handling so it should be
-   * sent back to appropriate handler. Second event container will be closed,
-   * so it should not be retried.
-   * */
-  @Test
-  public void testWatcherRetryableTimeoutHandling() throws InterruptedException,
-      IOException {
-
-    long id1 = HddsIdFactory.getLongId();
-    long id2 = HddsIdFactory.getLongId();
-    setupWatcher(1000L);
-    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
-    setupMock(id1, id2, false);
-    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
-        .captureLogs(LOG);
-    testLogger.clearOutput();
-
-    // File events to watcher
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
-    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
-
-    Thread.sleep(1000L + 10);
-
-    // validation
-    assertTrue(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id1));
-    assertFalse(testLogger.getOutput().contains("Handling "
-        + "closeContainerEvent for containerId: id=" + id2));
-  }
-
-
-  private void setupMock(long id1, long id2, boolean isOpen)
-      throws IOException {
-    ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
-    ContainerInfo containerInfo2 = Mockito.mock(ContainerInfo.class);
-    when(containerManager.getContainer(ContainerID.valueof(id1)))
-        .thenReturn(containerInfo);
-    when(containerManager.getContainer(ContainerID.valueof(id2)))
-        .thenReturn(containerInfo2);
-    when(containerInfo.isOpen()).thenReturn(true);
-    when(containerInfo2.isOpen()).thenReturn(isOpen);
-  }
-
-  @Override
-  public void onMessage(ContainerID containerID, EventPublisher publisher) {
-    LOG.info("Handling closeContainerEvent for containerId: {}", containerID);
-  }
-}

+ 1 - 5
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -413,17 +413,13 @@ public class TestEndPoint {
           serverAddress, 3000);
       Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
       assertNotNull(map);
-      assertEquals("Should have 3 objects", 3, map.size());
-      assertTrue(map.containsKey(Long.valueOf(1)));
+      assertEquals("Should have 2 objects", 2, map.size());
       assertTrue(map.containsKey(Long.valueOf(2)));
       assertTrue(map.containsKey(Long.valueOf(3)));
-      assertTrue(map.get(Long.valueOf(1)).getType()
-          .equals(Type.closeContainerCommand));
       assertTrue(map.get(Long.valueOf(2)).getType()
           .equals(Type.replicateContainerCommand));
       assertTrue(
           map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
-      assertTrue(map.get(Long.valueOf(1)).getStatus().equals(Status.PENDING));
       assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
       assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
 

+ 8 - 47
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.CommandQueue;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.scm.node.states.ReportResult;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -75,19 +73,6 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return null;
   }
 
-  /**
-   * Removes a data node from the management of this Node Manager.
-   *
-   * @param node - DataNode.
-   * @throws NodeNotFoundException
-   */
-  @Override
-  public void removeNode(DatanodeDetails node)
-      throws NodeNotFoundException {
-    nodeStateMap.remove(node);
-
-  }
-
   /**
    * Gets all Live Datanodes that is currently communicating with SCM.
    *
@@ -170,7 +155,7 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @return Set of PipelineID
    */
   @Override
-  public Set<PipelineID> getPipelineByDnID(UUID dnId) {
+  public Set<PipelineID> getPipelines(DatanodeDetails dnId) {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
@@ -196,24 +181,12 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
    * @param containerIds - Set of containerIDs
-   * @throws SCMException - if datanode is not known. For new datanode use
-   *                        addDatanodeInContainerMap call.
+   * @throws NodeNotFoundException - if datanode is not known. For new datanode
+   *                                 use addDatanodeInContainerMap call.
    */
   @Override
-  public void setContainersForDatanode(UUID uuid, Set<ContainerID> containerIds)
-      throws SCMException {
-    throw new UnsupportedOperationException("Not yet implemented");
-  }
-
-  /**
-   * Process containerReport received from datanode.
-   * @param uuid - DataonodeID
-   * @param containerIds - Set of containerIDs
-   * @return The result after processing containerReport
-   */
-  @Override
-  public ReportResult<ContainerID> processContainerReport(UUID uuid,
-      Set<ContainerID> containerIds) {
+  public void setContainers(DatanodeDetails uuid, Set<ContainerID> containerIds)
+      throws NodeNotFoundException {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
@@ -223,20 +196,7 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @return - set of containerIDs
    */
   @Override
-  public Set<ContainerID> getContainers(UUID uuid) {
-    throw new UnsupportedOperationException("Not yet implemented");
-  }
-
-  /**
-   * Insert a new datanode with set of containerIDs for containers available
-   * on it.
-   * @param uuid - DatanodeID
-   * @param containerIDs - Set of ContainerIDs
-   * @throws SCMException - if datanode already exists
-   */
-  @Override
-  public void addDatanodeInContainerMap(UUID uuid,
-      Set<ContainerID> containerIDs) throws SCMException {
+  public Set<ContainerID> getContainers(DatanodeDetails uuid) {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
@@ -329,7 +289,8 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @param nodeReport
    */
   @Override
-  public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
+  public void processNodeReport(DatanodeDetails dnUuid,
+                                NodeReportProto nodeReport) {
     // do nothing.
   }
 

+ 4 - 7
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java

@@ -21,14 +21,14 @@ import java.util.UUID;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,8 +39,6 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.TimeoutException;
 
-import org.slf4j.event.Level;
-
 /**
  * Tests for ContainerStateManager.
  */
@@ -317,9 +315,6 @@ public class TestContainerStateManagerIntegration {
 
   @Test
   public void testReplicaMap() throws Exception {
-    GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG);
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(ContainerStateMap.getLOG());
     DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1")
         .setIpAddress("1.1.1.1")
         .setUuid(UUID.randomUUID().toString()).build();
@@ -347,10 +342,12 @@ public class TestContainerStateManagerIntegration {
     // Test 2: Add replica nodes and then test
     ContainerReplica replicaOne = ContainerReplica.newBuilder()
         .setContainerID(id)
+        .setContainerState(ContainerReplicaProto.State.OPEN)
         .setDatanodeDetails(dn1)
         .build();
     ContainerReplica replicaTwo = ContainerReplica.newBuilder()
         .setContainerID(id)
+        .setContainerState(ContainerReplicaProto.State.OPEN)
         .setDatanodeDetails(dn2)
         .build();
     containerStateManager.updateContainerReplica(id, replicaOne);

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java

@@ -93,7 +93,7 @@ public class TestNode2PipelineMap {
 
     // get pipeline details by dnid
     Set<PipelineID> pipelines = scm.getScmNodeManager()
-        .getPipelineByDnID(dns.get(0).getUuid());
+        .getPipelines(dns.get(0));
     Assert.assertEquals(1, pipelines.size());
     pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));
@@ -116,7 +116,7 @@ public class TestNode2PipelineMap {
     pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
     pipelineManager.removePipeline(ratisContainer.getPipeline().getId());
     pipelines = scm.getScmNodeManager()
-        .getPipelineByDnID(dns.get(0).getUuid());
+        .getPipelines(dns.get(0));
     Assert.assertEquals(0, pipelines.size());
   }
 }

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java

@@ -115,8 +115,8 @@ public class TestPipelineClose {
     pipelineManager.removePipeline(pipeline1.getId());
     for (DatanodeDetails dn : ratisContainer1.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
-      Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID(
-          dn.getUuid()).size(), 0);
+      Assert.assertEquals(scm.getScmNodeManager().getPipelines(
+          dn).size(), 0);
     }
   }
 

+ 4 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

@@ -243,8 +243,10 @@ public class TestBlockDeletion {
     ContainerReportsProto dummyReport = dummyReportsBuilder.build();
 
     logCapturer.clearOutput();
-    scm.getContainerManager().processContainerReports(
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails(), dummyReport);
+    cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine().getContext().addReport(dummyReport);
+    cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine().triggerHeartbeat();
     // wait for event to be handled by event handler
     Thread.sleep(1000);
     String output = logCapturer.getOutput();