Procházet zdrojové kódy

HDDS-1888. Add containers to node2container map in SCM as part of ICR processing.

Signed-off-by: Nanda kumar <nanda@apache.org>
(cherry picked from commit 397a5633af767eee99083c0ac4a8d4282f651911)
Nanda kumar před 5 roky
rodič
revize
033c175cd6

+ 15 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
     .ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -39,9 +42,13 @@ public class IncrementalContainerReportHandler extends
   private static final Logger LOG = LoggerFactory.getLogger(
       IncrementalContainerReportHandler.class);
 
+  private final NodeManager nodeManager;
+
   public IncrementalContainerReportHandler(
+      final NodeManager nodeManager,
       final ContainerManager containerManager)  {
     super(containerManager, LOG);
+    this.nodeManager = nodeManager;
   }
 
   @Override
@@ -53,9 +60,16 @@ public class IncrementalContainerReportHandler extends
     for (ContainerReplicaProto replicaProto :
         report.getReport().getReportList()) {
       try {
-        processContainerReplica(report.getDatanodeDetails(), replicaProto);
+        final DatanodeDetails dd = report.getDatanodeDetails();
+        final ContainerID id = ContainerID.valueof(
+            replicaProto.getContainerID());
+        nodeManager.addContainer(dd, id);
+        processContainerReplica(dd, replicaProto);
       } catch (ContainerNotFoundException e) {
         LOG.warn("Container {} not found!", replicaProto.getContainerID());
+      } catch (NodeNotFoundException ex) {
+        LOG.error("Received ICR from unknown datanode {} {}",
+            report.getDatanodeDetails(), ex);
       } catch (IOException e) {
         LOG.error("Exception while processing ICR for container {}",
             replicaProto.getContainerID());

+ 11 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java

@@ -129,6 +129,17 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    */
   void removePipeline(Pipeline pipeline);
 
+  /**
+   * Adds the given container to the specified datanode.
+   *
+   * @param datanodeDetails - DatanodeDetails
+   * @param containerId - containerID
+   * @throws NodeNotFoundException - if datanode is not known. For new datanode
+   *                        use addDatanodeInContainerMap call.
+   */
+  void addContainer(DatanodeDetails datanodeDetails,
+                    ContainerID containerId) throws NodeNotFoundException;
+
   /**
    * Remaps datanode to containers mapping to the new set of containers.
    * @param datanodeDetails - DatanodeDetails

+ 15 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java

@@ -456,6 +456,21 @@ public class NodeStateManager implements Runnable, Closeable {
   public void removePipeline(Pipeline pipeline) {
     node2PipelineMap.removePipeline(pipeline);
   }
+
+  /**
+   * Adds the given container to the specified datanode.
+   *
+   * @param uuid - datanode uuid
+   * @param containerId - containerID
+   * @throws NodeNotFoundException - if datanode is not known. For new datanode
+   *                        use addDatanodeInContainerMap call.
+   */
+  public void addContainer(final UUID uuid,
+                           final ContainerID containerId)
+      throws NodeNotFoundException {
+    nodeStateMap.addContainer(uuid, containerId);
+  }
+
   /**
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID

+ 7 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -500,6 +500,13 @@ public class SCMNodeManager implements NodeManager {
     nodeStateManager.removePipeline(pipeline);
   }
 
+  @Override
+  public void addContainer(final DatanodeDetails datanodeDetails,
+                           final ContainerID containerId)
+      throws NodeNotFoundException {
+    nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId);
+  }
+
   /**
    * Update set of containers available on a datanode.
    * @param datanodeDetails - DatanodeID

+ 56 - 16
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java

@@ -108,6 +108,7 @@ public class NodeStateMap {
                               NodeState newState)throws NodeNotFoundException {
     lock.writeLock().lock();
     try {
+      checkIfNodeExist(nodeId);
       if (stateMap.get(currentState).remove(nodeId)) {
         stateMap.get(newState).add(nodeId);
       } else {
@@ -131,10 +132,8 @@ public class NodeStateMap {
   public DatanodeInfo getNodeInfo(UUID uuid) throws NodeNotFoundException {
     lock.readLock().lock();
     try {
-      if (nodeMap.containsKey(uuid)) {
-        return nodeMap.get(uuid);
-      }
-      throw new NodeNotFoundException("Node UUID: " + uuid);
+      checkIfNodeExist(uuid);
+      return nodeMap.get(uuid);
     } finally {
       lock.readLock().unlock();
     }
@@ -213,41 +212,70 @@ public class NodeStateMap {
   public NodeState getNodeState(UUID uuid) throws NodeNotFoundException {
     lock.readLock().lock();
     try {
+      checkIfNodeExist(uuid);
       for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
         if (entry.getValue().contains(uuid)) {
           return entry.getKey();
         }
       }
-      throw new NodeNotFoundException("Node UUID: " + uuid);
+      throw new NodeNotFoundException("Node not found in node state map." +
+          " UUID: " + uuid);
     } finally {
       lock.readLock().unlock();
     }
   }
 
+  /**
+   * Adds the given container to the specified datanode.
+   *
+   * @param uuid - datanode uuid
+   * @param containerId - containerID
+   * @throws NodeNotFoundException - if datanode is not known. For new datanode
+   *                        use addDatanodeInContainerMap call.
+   */
+  public void addContainer(final UUID uuid,
+                           final ContainerID containerId)
+      throws NodeNotFoundException {
+    lock.writeLock().lock();
+    try {
+      checkIfNodeExist(uuid);
+      nodeToContainer.get(uuid).add(containerId);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
   public void setContainers(UUID uuid, Set<ContainerID> containers)
       throws NodeNotFoundException{
-    if (!nodeToContainer.containsKey(uuid)) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
+    lock.writeLock().lock();
+    try {
+      checkIfNodeExist(uuid);
+      nodeToContainer.put(uuid, containers);
+    } finally {
+      lock.writeLock().unlock();
     }
-    nodeToContainer.put(uuid, containers);
   }
 
   public Set<ContainerID> getContainers(UUID uuid)
       throws NodeNotFoundException {
-    Set<ContainerID> containers = nodeToContainer.get(uuid);
-    if (containers == null) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
+    lock.readLock().lock();
+    try {
+      checkIfNodeExist(uuid);
+      return Collections.unmodifiableSet(nodeToContainer.get(uuid));
+    } finally {
+      lock.readLock().unlock();
     }
-    return Collections.unmodifiableSet(containers);
   }
 
   public void removeContainer(UUID uuid, ContainerID containerID) throws
       NodeNotFoundException {
-    Set<ContainerID> containers = nodeToContainer.get(uuid);
-    if (containers == null) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
+    lock.writeLock().lock();
+    try {
+      checkIfNodeExist(uuid);
+      nodeToContainer.get(uuid).remove(containerID);
+    } finally {
+      lock.writeLock().unlock();
     }
-    containers.remove(containerID);
   }
 
   /**
@@ -269,4 +297,16 @@ public class NodeStateMap {
     }
     return builder.toString();
   }
+
+  /**
+   * Throws NodeNotFoundException if the Node for given id doesn't exist.
+   *
+   * @param uuid Node UUID
+   * @throws NodeNotFoundException If the node is missing.
+   */
+  private void checkIfNodeExist(UUID uuid) throws NodeNotFoundException {
+    if (!nodeToContainer.containsKey(uuid)) {
+      throw new NodeNotFoundException("Node UUID: " + uuid);
+    }
+  }
 }

+ 2 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -299,7 +299,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new ContainerReportHandler(scmNodeManager, containerManager);
 
     IncrementalContainerReportHandler incrementalContainerReportHandler =
-        new IncrementalContainerReportHandler(containerManager);
+        new IncrementalContainerReportHandler(
+            scmNodeManager, containerManager);
 
     PipelineActionHandler pipelineActionHandler =
         new PipelineActionHandler(pipelineManager, conf);

+ 13 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -267,6 +267,19 @@ public class MockNodeManager implements NodeManager {
     node2PipelineMap.removePipeline(pipeline);
   }
 
+  @Override
+  public void addContainer(DatanodeDetails dd,
+                           ContainerID containerId)
+      throws NodeNotFoundException {
+    try {
+      Set<ContainerID> set = node2ContainerMap.getContainers(dd.getUuid());
+      set.add(containerId);
+      node2ContainerMap.setContainersForDatanode(dd.getUuid(), set);
+    } catch (SCMException e) {
+      e.printStackTrace();
+    }
+  }
+
   @Override
   public void addDatanodeCommand(UUID dnId, SCMCommand command) {
     if(commandMap.containsKey(dnId)) {

+ 6 - 3
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -47,6 +48,7 @@ import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
  */
 public class TestIncrementalContainerReportHandler {
 
+  private NodeManager nodeManager;
   private ContainerManager containerManager;
   private ContainerStateManager containerStateManager;
   private EventPublisher publisher;
@@ -55,6 +57,7 @@ public class TestIncrementalContainerReportHandler {
   public void setup() throws IOException {
     final Configuration conf = new OzoneConfiguration();
     this.containerManager = Mockito.mock(ContainerManager.class);
+    this.nodeManager = Mockito.mock(NodeManager.class);
     this.containerStateManager = new ContainerStateManager(conf);
     this.publisher = Mockito.mock(EventPublisher.class);
 
@@ -88,7 +91,7 @@ public class TestIncrementalContainerReportHandler {
   @Test
   public void testClosingToClosed() throws IOException {
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(containerManager);
+        new IncrementalContainerReportHandler(nodeManager, containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -122,7 +125,7 @@ public class TestIncrementalContainerReportHandler {
   @Test
   public void testClosingToQuasiClosed() throws IOException {
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(containerManager);
+        new IncrementalContainerReportHandler(nodeManager, containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@@ -157,7 +160,7 @@ public class TestIncrementalContainerReportHandler {
   @Test
   public void testQuasiClosedToClosed() throws IOException {
     final IncrementalContainerReportHandler reportHandler =
-        new IncrementalContainerReportHandler(containerManager);
+        new IncrementalContainerReportHandler(nodeManager, containerManager);
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
     final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();

+ 7 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java

@@ -182,6 +182,13 @@ public class ReplicationNodeManagerMock implements NodeManager {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 
+  @Override
+  public void addContainer(DatanodeDetails datanodeDetails,
+                           ContainerID containerId)
+      throws NodeNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
   /**
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID