Преглед на файлове

HDDS-601. On restart, SCM throws 'No such datanode' exception.

(cherry picked from commit eb34b5f8aff538e90c7820c0114322d2dd75f6e6)
Hanisha Koneru преди 6 години
родител
ревизия
43ff801391

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java

@@ -42,7 +42,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 
 
 /**
 /**
- * Register a container with SCM.
+ * Register a datanode with SCM.
  */
  */
 public final class RegisterEndpointTask implements
 public final class RegisterEndpointTask implements
     Callable<EndpointStateMachine.EndPointStates> {
     Callable<EndpointStateMachine.EndPointStates> {

+ 8 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java

@@ -67,4 +67,12 @@ public interface StorageContainerNodeProtocol {
    */
    */
   List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
   List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails);
 
 
+  /**
+   * Check if node is registered or not.
+   * Return true if Node is registered and false otherwise.
+   * @param datanodeDetails - Datanode ID.
+   * @return true if Node is registered, false otherwise
+   */
+  Boolean isNodeRegistered(DatanodeDetails datanodeDetails);
+
 }
 }

+ 7 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java

@@ -196,4 +196,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @param dnUuid datanode uuid.
    * @param dnUuid datanode uuid.
    */
    */
   void processDeadNode(UUID dnUuid);
   void processDeadNode(UUID dnUuid);
+
+  /**
+   * Get list of SCMCommands in the Command Queue for a particular Datanode.
+   * @param dnID - Datanode uuid.
+   * @return list of commands
+   */
+  List<SCMCommand> getCommandQueue(UUID dnID);
 }
 }

+ 19 - 7
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -52,7 +52,6 @@ import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -196,7 +195,7 @@ public class SCMNodeManager
     try {
     try {
       stat = nodeStateManager.getNodeStat(dnId);
       stat = nodeStateManager.getNodeStat(dnId);
     } catch (NodeNotFoundException e) {
     } catch (NodeNotFoundException e) {
-      LOG.debug("SCM updateNodeStat based on heartbeat from previous" +
+      LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
           "dead datanode {}", dnId);
           "dead datanode {}", dnId);
       stat = new SCMNodeStat();
       stat = new SCMNodeStat();
     }
     }
@@ -277,7 +276,7 @@ public class SCMNodeManager
       nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
       nodeStateManager.setNodeStat(dnId, new SCMNodeStat());
       // Updating Node Report, as registration is successful
       // Updating Node Report, as registration is successful
       updateNodeStat(datanodeDetails.getUuid(), nodeReport);
       updateNodeStat(datanodeDetails.getUuid(), nodeReport);
-      LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid());
+      LOG.info("Registered Data node : {}", datanodeDetails);
     } catch (NodeAlreadyExistsException e) {
     } catch (NodeAlreadyExistsException e) {
       LOG.trace("Datanode is already registered. Datanode: {}",
       LOG.trace("Datanode is already registered. Datanode: {}",
           datanodeDetails.toString());
           datanodeDetails.toString());
@@ -304,14 +303,22 @@ public class SCMNodeManager
     try {
     try {
       nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
       nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
     } catch (NodeNotFoundException e) {
     } catch (NodeNotFoundException e) {
-      LOG.warn("SCM receive heartbeat from unregistered datanode {}",
-          datanodeDetails);
-      commandQueue.addCommand(datanodeDetails.getUuid(),
-          new ReregisterCommand());
+      LOG.error("SCM trying to process heartbeat from an " +
+          "unregistered node {}. Ignoring the heartbeat.", datanodeDetails);
     }
     }
     return commandQueue.getCommand(datanodeDetails.getUuid());
     return commandQueue.getCommand(datanodeDetails.getUuid());
   }
   }
 
 
+  @Override
+  public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
+    try {
+      nodeStateManager.getNode(datanodeDetails);
+      return true;
+    } catch (NodeNotFoundException e) {
+      return false;
+    }
+  }
+
   /**
   /**
    * Process node report.
    * Process node report.
    *
    *
@@ -487,4 +494,9 @@ public class SCMNodeManager
           + " doesn't exist or decommissioned already.", dnUuid);
           + " doesn't exist or decommissioned already.", dnUuid);
     }
     }
   }
   }
+
+  @Override
+  public List<SCMCommand> getCommandQueue(UUID dnID) {
+    return commandQueue.getCommand(dnID);
+  }
 }
 }

+ 68 - 39
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java

@@ -35,13 +35,15 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.GeneratedMessage;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.UUID;
 
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
@@ -82,51 +84,78 @@ public final class SCMDatanodeHeartbeatDispatcher {
   public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
   public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
     DatanodeDetails datanodeDetails =
     DatanodeDetails datanodeDetails =
         DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
         DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
-    // should we dispatch heartbeat through eventPublisher?
-    List<SCMCommand> commands = nodeManager.processHeartbeat(datanodeDetails);
-    if (heartbeat.hasNodeReport()) {
-      LOG.debug("Dispatching Node Report.");
-      eventPublisher.fireEvent(NODE_REPORT,
-          new NodeReportFromDatanode(datanodeDetails,
-              heartbeat.getNodeReport()));
-    }
+    List<SCMCommand> commands;
+
+    // If node is not registered, ask the node to re-register. Do not process
+    // Heartbeat for unregistered nodes.
+    if (!nodeManager.isNodeRegistered(datanodeDetails)) {
+      LOG.info("SCM received heartbeat from an unregistered datanode {}. " +
+          "Asking datanode to re-register.", datanodeDetails);
+      UUID dnID = datanodeDetails.getUuid();
+      nodeManager.addDatanodeCommand(dnID, new ReregisterCommand());
+
+      commands = nodeManager.getCommandQueue(dnID);
+
+    } else {
+
+      // should we dispatch heartbeat through eventPublisher?
+      commands = nodeManager.processHeartbeat(datanodeDetails);
+      if (heartbeat.hasNodeReport()) {
+        LOG.debug("Dispatching Node Report.");
+        eventPublisher.fireEvent(
+            NODE_REPORT,
+            new NodeReportFromDatanode(
+                datanodeDetails,
+                heartbeat.getNodeReport()));
+      }
 
 
-    if (heartbeat.hasContainerReport()) {
-      LOG.debug("Dispatching Container Report.");
-      eventPublisher.fireEvent(CONTAINER_REPORT,
-          new ContainerReportFromDatanode(datanodeDetails,
-              heartbeat.getContainerReport()));
+      if (heartbeat.hasContainerReport()) {
+        LOG.debug("Dispatching Container Report.");
+        eventPublisher.fireEvent(
+            CONTAINER_REPORT,
+            new ContainerReportFromDatanode(
+                datanodeDetails,
+                heartbeat.getContainerReport()));
 
 
-    }
+      }
 
 
-    if (heartbeat.hasContainerActions()) {
-      LOG.debug("Dispatching Container Actions.");
-      eventPublisher.fireEvent(CONTAINER_ACTIONS,
-          new ContainerActionsFromDatanode(datanodeDetails,
-              heartbeat.getContainerActions()));
-    }
+      if (heartbeat.hasContainerActions()) {
+        LOG.debug("Dispatching Container Actions.");
+        eventPublisher.fireEvent(
+            CONTAINER_ACTIONS,
+            new ContainerActionsFromDatanode(
+                datanodeDetails,
+                heartbeat.getContainerActions()));
+      }
 
 
-    if (heartbeat.hasPipelineReports()) {
-      LOG.debug("Dispatching Pipeline Report.");
-      eventPublisher.fireEvent(PIPELINE_REPORT,
-              new PipelineReportFromDatanode(datanodeDetails,
-                      heartbeat.getPipelineReports()));
+      if (heartbeat.hasPipelineReports()) {
+        LOG.debug("Dispatching Pipeline Report.");
+        eventPublisher.fireEvent(
+            PIPELINE_REPORT,
+            new PipelineReportFromDatanode(
+                datanodeDetails,
+                heartbeat.getPipelineReports()));
 
 
-    }
+      }
 
 
-    if (heartbeat.hasPipelineActions()) {
-      LOG.debug("Dispatching Pipeline Actions.");
-      eventPublisher.fireEvent(PIPELINE_ACTIONS,
-          new PipelineActionsFromDatanode(datanodeDetails,
-              heartbeat.getPipelineActions()));
-    }
+      if (heartbeat.hasPipelineActions()) {
+        LOG.debug("Dispatching Pipeline Actions.");
+        eventPublisher.fireEvent(
+            PIPELINE_ACTIONS,
+            new PipelineActionsFromDatanode(
+                datanodeDetails,
+                heartbeat.getPipelineActions()));
+      }
 
 
-    if (heartbeat.getCommandStatusReportsCount() != 0) {
-      for (CommandStatusReportsProto commandStatusReport : heartbeat
-          .getCommandStatusReportsList()) {
-        eventPublisher.fireEvent(CMD_STATUS_REPORT,
-            new CommandStatusReportFromDatanode(datanodeDetails,
-                commandStatusReport));
+      if (heartbeat.getCommandStatusReportsCount() != 0) {
+        for (CommandStatusReportsProto commandStatusReport : heartbeat
+            .getCommandStatusReportsList()) {
+          eventPublisher.fireEvent(
+              CMD_STATUS_REPORT,
+              new CommandStatusReportFromDatanode(
+                  datanodeDetails,
+                  commandStatusReport));
+        }
       }
       }
     }
     }
 
 

+ 11 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

@@ -407,6 +407,12 @@ public class MockNodeManager implements NodeManager {
     return null;
     return null;
   }
   }
 
 
+  @Override
+  public Boolean isNodeRegistered(
+      DatanodeDetails datanodeDetails) {
+    return null;
+  }
+
   @Override
   @Override
   public Map<String, Integer> getNodeCount() {
   public Map<String, Integer> getNodeCount() {
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
     Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
@@ -470,6 +476,11 @@ public class MockNodeManager implements NodeManager {
     }
     }
   }
   }
 
 
+  @Override
+  public List<SCMCommand> getCommandQueue(UUID dnID) {
+    return null;
+  }
+
   /**
   /**
    * A class to declare some values for the nodes so that our tests
    * A class to declare some values for the nodes so that our tests
    * won't fail.
    * won't fail.

+ 0 - 53
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java

@@ -17,11 +17,8 @@
  */
  */
 package org.apache.hadoop.hdds.scm.node;
 package org.apache.hadoop.hdds.scm.node;
 
 
-import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
@@ -213,56 +210,6 @@ public class TestNodeManager {
     //TODO: add assertion
     //TODO: add assertion
   }
   }
 
 
-  /**
-   * Asserts scm informs datanodes to re-register with the nodemanager
-   * on a restart.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testScmHeartbeatAfterRestart() throws Exception {
-    OzoneConfiguration conf = getConf();
-    conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
-        100, TimeUnit.MILLISECONDS);
-    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
-    UUID dnId = datanodeDetails.getUuid();
-    String storagePath = testDir.getAbsolutePath() + "/" + dnId;
-    StorageReportProto report =
-        TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null);
-    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
-      nodemanager.register(datanodeDetails,
-          TestUtils.createNodeReport(report),
-          TestUtils.getRandomPipelineReports());
-      List<SCMCommand> command = nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
-      Assert.assertTrue("On regular HB calls, SCM responses a "
-          + "datanode with an empty command list", command.isEmpty());
-    }
-
-    // Sends heartbeat without registering to SCM.
-    // This happens when SCM restarts.
-    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
-      Assert.assertFalse(nodemanager
-          .getAllNodes().contains(datanodeDetails));
-      try {
-        // SCM handles heartbeat asynchronously.
-        // It may need more than one heartbeat processing to
-        // send the notification.
-        GenericTestUtils.waitFor(new Supplier<Boolean>() {
-          @Override public Boolean get() {
-            List<SCMCommand> command =
-                nodemanager.processHeartbeat(datanodeDetails);
-            return command.size() == 1 && command.get(0).getType()
-                .equals(SCMCommandProto.Type.reregisterCommand);
-          }
-        }, 100, 3 * 1000);
-      } catch (TimeoutException e) {
-        Assert.fail("Times out to verify that scm informs "
-            + "datanode to re-register itself.");
-      }
-    }
-  }
-
   /**
   /**
    * Asserts that we detect as many healthy nodes as we have generated heartbeat
    * Asserts that we detect as many healthy nodes as we have generated heartbeat
    * for.
    * for.

+ 39 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.server;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.UUID;
 
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.
 import org.apache.hadoop.hdds.protocol.proto.
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
     .NodeReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 
 
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
@@ -61,8 +63,12 @@ public class TestSCMDatanodeHeartbeatDispatcher {
 
 
     NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
     NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
 
 
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any()))
+        .thenReturn(true);
+
     SCMDatanodeHeartbeatDispatcher dispatcher =
     SCMDatanodeHeartbeatDispatcher dispatcher =
-        new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
+        new SCMDatanodeHeartbeatDispatcher(mockNodeManager,
             new EventPublisher() {
             new EventPublisher() {
           @Override
           @Override
           public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
           public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
@@ -99,8 +105,13 @@ public class TestSCMDatanodeHeartbeatDispatcher {
     CommandStatusReportsProto commandStatusReport =
     CommandStatusReportsProto commandStatusReport =
         CommandStatusReportsProto.getDefaultInstance();
         CommandStatusReportsProto.getDefaultInstance();
 
 
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any()))
+        .thenReturn(true);
+
     SCMDatanodeHeartbeatDispatcher dispatcher =
     SCMDatanodeHeartbeatDispatcher dispatcher =
-        new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class),
+        new SCMDatanodeHeartbeatDispatcher(
+            mockNodeManager,
             new EventPublisher() {
             new EventPublisher() {
           @Override
           @Override
           public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
           public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
@@ -135,4 +146,30 @@ public class TestSCMDatanodeHeartbeatDispatcher {
 
 
   }
   }
 
 
+  /**
+   * Asserts scm informs datanodes to re-register on a restart.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScmHeartbeatAfterRestart() throws Exception {
+
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    SCMDatanodeHeartbeatDispatcher dispatcher =
+        new SCMDatanodeHeartbeatDispatcher(
+            mockNodeManager, Mockito.mock(EventPublisher.class));
+
+    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+
+    SCMHeartbeatRequestProto heartbeat =
+        SCMHeartbeatRequestProto.newBuilder()
+            .setDatanodeDetails(datanodeDetails.getProtoBufMessage())
+            .build();
+
+    dispatcher.dispatch(heartbeat);
+    // If SCM receives heartbeat from a node after it restarts and the node
+    // is not registered, it should send a Re-Register command back to the node.
+    Mockito.verify(mockNodeManager, Mockito.times(1)).addDatanodeCommand(
+        Mockito.any(UUID.class), Mockito.any(ReregisterCommand.class));
+  }
 }
 }

+ 11 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java

@@ -295,6 +295,12 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return null;
     return null;
   }
   }
 
 
+  @Override
+  public Boolean isNodeRegistered(
+      DatanodeDetails datanodeDetails) {
+    return null;
+  }
+
   /**
   /**
    * Clears all nodes from the node Manager.
    * Clears all nodes from the node Manager.
    */
    */
@@ -341,4 +347,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
   public void processDeadNode(UUID dnUuid) {
   public void processDeadNode(UUID dnUuid) {
     // do nothing.
     // do nothing.
   }
   }
+
+  @Override
+  public List<SCMCommand> getCommandQueue(UUID dnID) {
+    return null;
+  }
 }
 }