Browse Source

HDFS-11830. Ozone: Datanode needs to re-register to SCM if SCM is restarted. Contributed by Weiwei Yang.

Weiwei Yang 8 years ago
parent
commit
b4e5c55436

+ 22 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -23,12 +23,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
 import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
-import org.apache.hadoop.ozone.protocol.proto
-     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto
      .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -125,10 +125,28 @@ public class HeartbeatEndpointTask
   private void processResponse(SCMHeartbeatResponseProto response) {
     for (SCMCommandResponseProto commandResponseProto : response
         .getCommandsList()) {
-      if (commandResponseProto.getCmdType() ==
-          StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) {
+      switch (commandResponseProto.getCmdType()) {
+      case sendContainerReport:
         this.context.addCommand(SendContainerCommand.getFromProtobuf(
             commandResponseProto.getSendReport()));
+        break;
+      case reregisterCommand:
+        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Received SCM notification to register."
+                + " Interrupt HEARTBEAT and transit to REGISTER state.");
+          }
+          rpcEndpoint.setState(EndPointStates.REGISTER);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Illegal state {} found, expecting {}.",
+                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown response : "
+            + commandResponseProto.getCmdType().name());
       }
     }
   }

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
+
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type.reregisterCommand;
+
+/**
+ * Informs a datanode to register itself with SCM again.
+ */
+public class ReregisterCommand extends
+    SCMCommand<SCMReregisterCmdResponseProto>{
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getType() {
+    return reregisterCommand;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  public byte[] getProtoBufMessage() {
+    return getProto().toByteArray();
+  }
+
+  public SCMReregisterCmdResponseProto getProto() {
+    return SCMReregisterCmdResponseProto
+        .newBuilder()
+        .build();
+  }
+}

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -68,6 +68,8 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB
     .StorageContainerDatanodeProtocolServerSideTranslatorPB;
@@ -325,6 +327,11 @@ public class StorageContainerManager
       return builder.setCmdType(Type.sendContainerReport)
           .setSendReport(SendContainerReportProto.getDefaultInstance())
           .build();
+    case reregisterCommand:
+      return builder.setCmdType(Type.reregisterCommand)
+          .setReregisterProto(SCMReregisterCmdResponseProto
+              .getDefaultInstance())
+          .build();
     default:
       throw new IllegalArgumentException("Not implemented");
     }

+ 2 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java

@@ -60,19 +60,12 @@ public class CommandQueue {
   @SuppressWarnings("unchecked")
   List<SCMCommand> getCommand(final DatanodeID datanodeID) {
     lock.lock();
-
     try {
-      if (commandMap.containsKey(datanodeID)) {
-        List temp = commandMap.get(datanodeID);
-        if (temp.size() > 0) {
-          commandMap.put(datanodeID, DEFAULT_LIST);
-          return temp;
-        }
-      }
+      List<SCMCommand> cmds = commandMap.remove(datanodeID);
+      return cmds == null ? DEFAULT_LIST : cmds;
     } finally {
       lock.unlock();
     }
-    return DEFAULT_LIST;
   }
 
   /**

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto
@@ -585,7 +586,10 @@ public class SCMNodeManager
       updateNodeStat(datanodeID, nodeReport);
       return;
     }
+
     LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID);
+    this.commandQueue.addCommand(hbItem.getDatanodeID(),
+        new ReregisterCommand());
   }
 
   private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) {
@@ -704,7 +708,6 @@ public class SCMNodeManager
    * @return SCMCommand
    */
   private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
-
     if (datanodeID.getDatanodeUuid() != null &&
         nodes.containsKey(datanodeID.getDatanodeUuid())) {
       LOG.trace("Datanode is already registered. Datanode: {}",

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -171,6 +171,12 @@ message SCMRegisteredCmdResponseProto {
   optional SCMNodeAddressList addressList = 5;
 }
 
+/**
+ * SCM informs a datanode to register itself again.
+ * With recieving this command, datanode will transit to REGISTER state.
+ */
+message SCMReregisterCmdResponseProto {}
+
 /**
  * Container ID maintains the container's Identity along with cluster ID
  * after the registration is done.
@@ -195,6 +201,7 @@ enum Type {
   versionCommand = 2;
   registeredCommand = 3;
   sendContainerReport = 4;
+  reregisterCommand = 5;
 }
 
 /*
@@ -205,7 +212,7 @@ message SCMCommandResponseProto {
   optional SCMRegisteredCmdResponseProto registeredProto = 3;
   optional SCMVersionResponseProto versionProto = 4;
   optional SendContainerReportProto sendReport = 5;
-
+  optional SCMReregisterCmdResponseProto reregisterProto = 6;
 }
 
 

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.protocol.proto
@@ -46,6 +48,8 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
 import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
 import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
 import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
@@ -241,6 +245,49 @@ public class TestNodeManager {
         nodeManager.getLastHBProcessedCount());
   }
 
+  /**
+   * Asserts scm informs datanodes to re-register with the nodemanager
+   * on a restart.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScmHeartbeatAfterRestart() throws Exception {
+    OzoneConfiguration conf = getConf();
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
+    DatanodeID datanodeID = SCMTestUtils.getDatanodeID();
+    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
+      nodemanager.register(datanodeID);
+      List<SCMCommand> command = nodemanager.sendHeartbeat(datanodeID, null);
+      Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID));
+      Assert.assertTrue("On regular HB calls, SCM responses a "
+          + "datanode with an empty command list", command.isEmpty());
+    }
+
+    // Sends heartbeat without registering to SCM.
+    // This happens when SCM restarts.
+    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
+      Assert.assertFalse(nodemanager
+          .getAllNodes().contains(datanodeID));
+      try {
+        // SCM handles heartbeat asynchronously.
+        // It may need more than one heartbeat processing to
+        // send the notification.
+        GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override public Boolean get() {
+            List<SCMCommand> command =
+                nodemanager.sendHeartbeat(datanodeID, null);
+            return command.size() == 1 && command.get(0).getType()
+                .equals(Type.reregisterCommand);
+          }
+        }, 100, 3 * 1000);
+      } catch (TimeoutException e) {
+        Assert.fail("Times out to verify that scm informs "
+            + "datanode to re-register itself.");
+      }
+    }
+  }
+
   /**
    * Asserts that we detect as many healthy nodes as we have generated heartbeat
    * for.