Преглед изворни кода

HDDS-191. Queue SCMCommands via EventQueue in SCM.
Contributed by Elek, Marton.

Anu Engineer пре 7 година
родитељ
комит
a55d6bba71

+ 45 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import java.util.UUID;
+
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * Command for the datanode with the destination address.
+ */
+public class CommandForDatanode<T extends GeneratedMessage> {
+
+  private final UUID datanodeId;
+
+  private final SCMCommand<T> command;
+
+  public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) {
+    this.datanodeId = datanodeId;
+    this.command = command;
+  }
+
+  public UUID getDatanodeId() {
+    return datanodeId;
+  }
+
+  public SCMCommand<T> getCommand() {
+    return command;
+  }
+}

+ 19 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java

@@ -25,6 +25,10 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -42,11 +46,14 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import com.google.protobuf.GeneratedMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +107,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
  * as soon as you read it.
  */
 public class SCMNodeManager
-    implements NodeManager, StorageContainerNodeProtocol {
+    implements NodeManager, StorageContainerNodeProtocol,
+    EventHandler<CommandForDatanode> {
 
   @VisibleForTesting
   static final Logger LOG =
@@ -154,6 +162,9 @@ public class SCMNodeManager
   private final SCMNodePoolManager nodePoolManager;
   private final StorageContainerManager scmManager;
 
+  public static final Event<CommandForDatanode> DATANODE_COMMAND =
+      new TypedEvent<>(CommandForDatanode.class, "DATANODE_COMMAND");
+
   /**
    * Constructs SCM machine Manager.
    */
@@ -871,4 +882,11 @@ public class SCMNodeManager
   public void setStaleNodeIntervalMs(long interval) {
     this.staleNodeIntervalMs = interval;
   }
+
+  @Override
+  public void onMessage(CommandForDatanode commandForDatanode,
+      EventPublisher publisher) {
+    addDatanodeCommand(commandForDatanode.getDatanodeId(),
+        commandForDatanode.getCommand());
+  }
 }

+ 7 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -51,6 +52,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.StringUtils;
@@ -161,8 +163,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       throw new SCMException("SCM not initialized.", ResultCodes
           .SCM_NOT_INITIALIZED);
     }
+    EventQueue eventQueue = new EventQueue();
+
+    SCMNodeManager nm = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
+    scmNodeManager = nm;
+    eventQueue.addHandler(SCMNodeManager.DATANODE_COMMAND, nm);
 
-    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
     scmContainerManager = new ContainerMapping(conf, getScmNodeManager(),
         cacheSize);
 

+ 39 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.base.Supplier;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -29,7 +30,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -1165,4 +1169,39 @@ public class TestNodeManager {
       assertEquals(expectedRemaining, foundRemaining);
     }
   }
+
+  @Test
+  public void testHandlingSCMCommandEvent() {
+    OzoneConfiguration conf = getConf();
+    conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        100, TimeUnit.MILLISECONDS);
+
+    DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
+    String dnId = datanodeDetails.getUuidString();
+    String storagePath = testDir.getAbsolutePath() + "/" + dnId;
+    List<StorageReportProto> reports =
+        TestUtils.createStorageReport(100, 10, 90,
+            storagePath, null, dnId, 1);
+
+    EventQueue eq = new EventQueue();
+    try (SCMNodeManager nodemanager = createNodeManager(conf)) {
+      eq.addHandler(SCMNodeManager.DATANODE_COMMAND, nodemanager);
+
+      nodemanager
+          .register(datanodeDetails, TestUtils.createNodeReport(reports));
+      eq.fireEvent(SCMNodeManager.DATANODE_COMMAND,
+          new CommandForDatanode(datanodeDetails.getUuid(),
+              new CloseContainerCommand(1L, ReplicationType.STAND_ALONE)));
+
+      eq.processAll(1000L);
+      List<SCMCommand> command =
+          nodemanager.sendHeartbeat(datanodeDetails, null);
+      Assert.assertEquals(1, command.size());
+      Assert
+          .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
 }