Bladeren bron

HDFS-12799. Ozone: SCM: Close containers: extend SCMCommandResponseProto with SCMCloseContainerCmdResponseProto. Contributed by Elek, Marton.

Chen Liang 7 jaren geleden
bovenliggende
commit
87ee6d06d5

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CloseContainerHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
@@ -82,6 +84,7 @@ public class DatanodeStateMachine implements Closeable {
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
         .addHandler(new ContainerReportHandler())
+        .addHandler(new CloseContainerHandler())
         .addHandler(new DeleteBlocksCommandHandler(
             container.getContainerManager(), conf))
         .setConnectionManager(connectionManager)

+ 112 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container Report handler.
+ */
+public class CloseContainerHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(CloseContainerHandler.class);
+  private int invocationCount;
+  private long totalTime;
+
+  /**
+   * Constructs a ContainerReport handler.
+   */
+  public CloseContainerHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param container         - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Close Container command.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+    String containerName = "UNKNOWN";
+    try {
+
+      SCMCloseContainerCmdResponseProto
+          closeContainerProto =
+          SCMCloseContainerCmdResponseProto
+              .parseFrom(command.getProtoBufMessage());
+      containerName = closeContainerProto.getContainerName();
+
+      container.getContainerManager().closeContainer(containerName);
+
+    } catch (Exception e) {
+      LOG.error("Can't close container " + containerName, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getCommandType() {
+    return Type.closeContainerCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount;
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
@@ -166,6 +167,16 @@ public class HeartbeatEndpointTask
           this.context.addCommand(db);
         }
         break;
+      case closeContainerCommand:
+        CloseContainerCommand closeContainer =
+            CloseContainerCommand.getFromProtobuf(
+                commandResponseProto.getCloseContainerProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM container close request for container {}",
+              closeContainer.getContainerName());
+        }
+        this.context.addCommand(closeContainer);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCmdType().name());

+ 75 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type;
+import static org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.Type.closeContainerCommand;
+
+/**
+ * Asks datanode to close a container.
+ */
+public class CloseContainerCommand
+    extends SCMCommand<SCMCloseContainerCmdResponseProto> {
+
+  private String containerName;
+
+  public CloseContainerCommand(String containerName) {
+    this.containerName = containerName;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getType() {
+    return closeContainerCommand;
+  }
+
+  /**
+   * Gets the protobuf message of this object.
+   *
+   * @return A protobuf message.
+   */
+  @Override
+  public byte[] getProtoBufMessage() {
+    return getProto().toByteArray();
+  }
+
+  public SCMCloseContainerCmdResponseProto getProto() {
+    return SCMCloseContainerCmdResponseProto.newBuilder()
+        .setContainerName(containerName).build();
+  }
+
+  public static CloseContainerCommand getFromProtobuf(
+      SCMCloseContainerCmdResponseProto closeContainerProto) {
+    Preconditions.checkNotNull(closeContainerProto);
+    return new CloseContainerCommand(closeContainerProto.getContainerName());
+
+  }
+
+  public String getContainerName() {
+    return containerName;
+  }
+}

+ 11 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -26,6 +26,7 @@ import com.google.common.cache.RemovalNotification;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.io.IOUtils;
@@ -34,13 +35,15 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.jmx.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.StorageInfo;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -88,7 +91,6 @@ import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -102,6 +104,7 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -112,10 +115,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.Collections;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
 import static org.apache.hadoop.scm.ScmConfigKeys
@@ -580,6 +581,10 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
       return builder.setCmdType(Type.deleteBlocksCommand)
           .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
           .build();
+    case closeContainerCommand:
+      return builder.setCmdType(Type.closeContainerCommand)
+          .setCloseContainerProto(((CloseContainerCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Not implemented");
     }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -210,6 +210,13 @@ This command tells the data node to send in the container report when possible
 message SendContainerReportProto {
 }
 
+/**
+This command asks the datanode to close a specific container.
+*/
+message SCMCloseContainerCmdResponseProto {
+  required string containerName = 1;
+}
+
 /**
 Type of commands supported by SCM to datanode protocol.
 */
@@ -219,6 +226,7 @@ enum Type {
   sendContainerReport = 4;
   reregisterCommand = 5;
   deleteBlocksCommand = 6;
+  closeContainerCommand = 7;
 }
 
 /*
@@ -232,6 +240,7 @@ message SCMCommandResponseProto {
   optional SCMReregisterCmdResponseProto reregisterProto = 6;
   optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
   required string datanodeUUID = 8;
+  optional SCMCloseContainerCmdResponseProto closeContainerProto = 9;
 }
 
 

+ 113 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.ReplicationFactor;
+import org.apache.hadoop.ozone.client.ReplicationType;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test to behaviour of the datanode when recieve close container command.
+ */
+public class TestCloseContainerHandler {
+
+  @Test
+  public void test() throws IOException, TimeoutException, InterruptedException,
+      OzoneException {
+
+    //setup a cluster (1G free space is enough for a unit test)
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OZONE_SCM_CONTAINER_SIZE_GB, "1");
+    MiniOzoneClassicCluster cluster =
+        new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
+            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    cluster.waitOzoneReady();
+
+    //the easiest way to create an open container is creating a key
+    OzoneClientFactory.setConfiguration(conf);
+    OzoneClient client = OzoneClientFactory.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+    objectStore.createVolume("test");
+    objectStore.getVolume("test").createBucket("test");
+    OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
+        .createKey("test", 1024, ReplicationType.STAND_ALONE,
+            ReplicationFactor.ONE);
+    key.write("test".getBytes());
+    key.close();
+
+    //get the name of a valid container
+    KsmKeyArgs keyArgs =
+        new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
+            .setType(OzoneProtos.ReplicationType.STAND_ALONE)
+            .setFactor(OzoneProtos.ReplicationFactor.ONE).setDataSize(1024)
+            .setKeyName("test").build();
+
+    KsmKeyLocationInfo ksmKeyLocationInfo =
+        cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions()
+            .get(0).getBlocksLatestVersionOnly().get(0);
+
+    String containerName = ksmKeyLocationInfo.getContainerName();
+
+    Assert.assertFalse(isContainerClosed(cluster, containerName));
+
+    //send the order to close the container
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .addDatanodeCommand(cluster.getDataNodes().get(0).getDatanodeId(),
+            new CloseContainerCommand(containerName));
+
+    GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerName),
+            500,
+            5 * 1000);
+
+    //double check if it's really closed (waitFor also throws an exception)
+    Assert.assertTrue(isContainerClosed(cluster, containerName));
+  }
+
+  private Boolean isContainerClosed(MiniOzoneClassicCluster cluster,
+      String containerName) {
+    ContainerData containerData;
+    try {
+      containerData = cluster.getDataNodes().get(0).getOzoneContainerManager()
+          .getContainerManager().readContainer(containerName);
+      return !containerData.isOpen();
+    } catch (StorageContainerException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+}