瀏覽代碼

HDFS-11492. Ozone: Add the ability to handle sendContainerReport Command. Contributed by Anu Engineer.

Anu Engineer 8 年之前
父節點
當前提交
85c2312e7d
共有 16 個文件被更改,包括 804 次插入24 次删除
  1. 12 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
  2. 141 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
  3. 8 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
  4. 82 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
  5. 177 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
  6. 59 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
  7. 122 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
  8. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
  9. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  10. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
  11. 22 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
  12. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
  13. 31 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
  14. 9 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
  15. 42 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
  16. 49 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.ozone.container.common.helpers;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.util.Time;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -49,6 +51,7 @@ public class ContainerData {
   public ContainerData(String containerName) {
     this.metadata = new TreeMap<>();
     this.containerName = containerName;
+    this.open = true;
   }
 
   /**
@@ -213,15 +216,20 @@ public class ContainerData {
    * checks if the container is open.
    * @return - boolean
    */
-  public boolean isOpen() {
+  public synchronized  boolean isOpen() {
     return open;
   }
 
   /**
    * Marks this container as closed.
    */
-  public void closeContainer() {
-    this.open = false;
+  public synchronized void closeContainer() {
+    setOpen(false);
+
+    // Some thing brain dead for now. name + Time stamp of when we get the close
+    // container message.
+    setHash(DigestUtils.sha256Hex(this.getContainerName() +
+        Long.toString(Time.monotonicNow())));
   }
 
   /**
@@ -242,7 +250,7 @@ public class ContainerData {
    * Sets the open or closed values.
    * @param open
    */
-  public void setOpen(boolean open) {
+  public synchronized void setOpen(boolean open) {
     this.open = open;
   }
 

+ 141 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java

@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
+
+/**
+ * Container Report iterates the closed containers and sends a container report
+ * to SCM.
+ * <p>
+ * The protobuf counter part of this class looks like this.
+ * message ContainerInfo {
+ * required string containerName = 1;
+ * repeated bytes finalhash = 2;
+ * optional int64 size = 3;
+ * optional int64 keycount = 4;
+ * }
+ */
+public class ContainerReport {
+  private static final int UNKNOWN = -1;
+  private final String containerName;
+  private final String finalhash;
+  private long size;
+  private long keyCount;
+
+  /**
+   * Constructs the ContainerReport.
+   *
+   * @param containerName - Container Name.
+   * @param finalhash - Final Hash.
+   */
+  public ContainerReport(String containerName, String finalhash) {
+    this.containerName = containerName;
+    this.finalhash = finalhash;
+    this.size = UNKNOWN;
+    this.keyCount = UNKNOWN;
+  }
+
+  /**
+   * Gets a containerReport from protobuf class.
+   *
+   * @param info - ContainerInfo.
+   * @return - ContainerReport.
+   */
+  public static ContainerReport getFromProtoBuf(ContainerInfo info) {
+    Preconditions.checkNotNull(info);
+    ContainerReport report = new ContainerReport(info.getContainerName(),
+        info.getFinalhash());
+    if (info.hasSize()) {
+      report.setSize(info.getSize());
+    }
+    if (info.hasKeycount()) {
+      report.setKeyCount(info.getKeycount());
+    }
+    return report;
+  }
+
+  /**
+   * Gets the container name.
+   *
+   * @return - Name
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Returns the final signature for this container.
+   *
+   * @return - hash
+   */
+  public String getFinalhash() {
+    return finalhash;
+  }
+
+  /**
+   * Returns a positive number it is a valid number, -1 if not known.
+   *
+   * @return size or -1
+   */
+  public long getSize() {
+    return size;
+  }
+
+  /**
+   * Sets the size of the container on disk.
+   *
+   * @param size - int
+   */
+  public void setSize(long size) {
+    this.size = size;
+  }
+
+  /**
+   * Gets number of keys in the container if known.
+   *
+   * @return - Number of keys or -1 for not known.
+   */
+  public long getKeyCount() {
+    return keyCount;
+  }
+
+  /**
+   * Sets the key count.
+   *
+   * @param keyCount - Key Count
+   */
+  public void setKeyCount(long keyCount) {
+    this.keyCount = keyCount;
+  }
+
+  /**
+   * Gets a containerInfo protobuf message from ContainerReports.
+   *
+   * @return ContainerInfo
+   */
+  public ContainerInfo getProtoBufMessage() {
+    return ContainerInfo.newBuilder()
+        .setContainerName(this.getContainerName())
+        .setKeycount(this.getKeyCount())
+        .setSize(this.getSize())
+        .setFinalhash(this.getFinalhash())
+        .build();
+  }
+}

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java

@@ -25,8 +25,7 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
@@ -151,4 +150,11 @@ public interface ContainerManager extends RwLock {
    * @return node report.
    */
   SCMNodeReport getNodeReport() throws IOException;
+
+  /**
+   * Gets container reports.
+   * @return List of all closed containers.
+   * @throws IOException
+   */
+  List<ContainerData> getContainerReports() throws IOException;
 }

+ 82 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

@@ -21,7 +21,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.slf4j.Logger;
@@ -31,6 +34,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * State Machine Class.
@@ -46,12 +50,15 @@ public class DatanodeStateMachine implements Closeable {
   private StateContext context;
   private final OzoneContainer container;
   private DatanodeID datanodeID = null;
+  private final CommandDispatcher commandDispatcher;
+  private long commandsHandled;
+  private AtomicLong nextHB;
 
   /**
    * Constructs a a datanode state machine.
    *
    * @param datanodeID - DatanodeID used to identify a datanode
-   * @param conf - Configration.
+   * @param conf - Configuration.
    */
   public DatanodeStateMachine(DatanodeID datanodeID,
       Configuration conf) throws IOException {
@@ -65,6 +72,17 @@ public class DatanodeStateMachine implements Closeable {
         OzoneClientUtils.getScmHeartbeatInterval(conf));
     container = new OzoneContainer(conf);
     this.datanodeID = datanodeID;
+    nextHB = new AtomicLong(Time.monotonicNow());
+
+
+     // When we add new handlers just adding a new handler here should do the
+     // trick.
+    commandDispatcher = CommandDispatcher.newBuilder()
+      .addHandler(new ContainerReportHandler())
+      .setConnectionManager(connectionManager)
+      .setContainer(container)
+      .setContext(context)
+      .build();
   }
 
   public DatanodeStateMachine(Configuration conf)
@@ -104,18 +122,19 @@ public class DatanodeStateMachine implements Closeable {
    */
   private void start() throws IOException {
     long now = 0;
-    long nextHB = 0;
+
     container.start();
+    initCommandHandlerThread(conf);
     while (context.getState() != DatanodeStates.SHUTDOWN) {
       try {
         LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
-        nextHB = Time.monotonicNow() + heartbeatFrequency;
+        nextHB.set(Time.monotonicNow() + heartbeatFrequency);
         context.setReportState(container.getNodeReport());
         context.execute(executorService, heartbeatFrequency,
             TimeUnit.MILLISECONDS);
         now = Time.monotonicNow();
-        if (now < nextHB) {
-          Thread.sleep(nextHB - now);
+        if (now < nextHB.get()) {
+          Thread.sleep(nextHB.get() - now);
         }
       } catch (Exception e) {
         LOG.error("Unable to finish the execution.", e);
@@ -162,7 +181,7 @@ public class DatanodeStateMachine implements Closeable {
       }
 
       if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOG.error("Unable to shutdown statemachine properly.");
+        LOG.error("Unable to shutdown state machine properly.");
       }
     } catch (InterruptedException e) {
       LOG.error("Error attempting to shutdown.", e);
@@ -289,4 +308,61 @@ public class DatanodeStateMachine implements Closeable {
         && this.getContext().getExecutionCount() == 0
         && this.getContext().getState() == DatanodeStates.SHUTDOWN;
   }
+
+  /**
+   * Create a command handler thread.
+   *
+   * @param conf
+   */
+  private void initCommandHandlerThread(Configuration conf) {
+
+    /**
+     * Task that periodically checks if we have any outstanding commands.
+     * It is assumed that commands can be processed slowly and in order.
+     * This assumption might change in future. Right now due to this assumption
+     * we have single command  queue process thread.
+     */
+    Runnable processCommandQueue = () -> {
+      long now;
+      while (getContext().getState() != DatanodeStates.SHUTDOWN) {
+        SCMCommand command = getContext().getNextCommand();
+        if (command != null) {
+          commandDispatcher.handle(command);
+          commandsHandled++;
+        } else {
+          try {
+            // Sleep till the next HB + 1 second.
+            now = Time.monotonicNow();
+            if (nextHB.get() > now) {
+              Thread.sleep((nextHB.get() - now) + 1000L);
+            }
+          } catch (InterruptedException e) {
+            // Ignore this exception.
+          }
+        }
+      }
+    };
+
+    // We will have only one thread for command processing in a datanode.
+    Thread cmdProcessThread = new Thread(processCommandQueue);
+    cmdProcessThread.setDaemon(true);
+    cmdProcessThread.setName("Command processor thread");
+    cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Command processor thread encountered an " +
+          "error. Thread: {}", t.toString(), e);
+      cmdProcessThread.start();
+    });
+    cmdProcessThread.start();
+  }
+
+  /**
+   * Returns the number of commands handled  by the datanode.
+   * @return  count
+   */
+  @VisibleForTesting
+  public long getCommandHandled() {
+    return commandsHandled;
+  }
 }

+ 177 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Dispatches command to the correct handler.
+ */
+public final class CommandDispatcher {
+  static final Logger LOG =
+      LoggerFactory.getLogger(CommandDispatcher.class);
+  private final StateContext context;
+  private final Map<Type, CommandHandler> handlerMap;
+  private final OzoneContainer container;
+  private final SCMConnectionManager connectionManager;
+
+  /**
+   * Constructs a command Dispatcher.
+   * @param context - Context.
+   */
+  /**
+   * Constructs a command dispatcher.
+   *
+   * @param container - Ozone Container
+   * @param context - Context
+   * @param handlers - Set of handlers.
+   */
+  private CommandDispatcher(OzoneContainer container, SCMConnectionManager
+      connectionManager, StateContext context,
+      CommandHandler... handlers) {
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(handlers);
+    Preconditions.checkArgument(handlers.length > 0);
+    Preconditions.checkNotNull(container);
+    Preconditions.checkNotNull(connectionManager);
+    this.context = context;
+    this.container = container;
+    this.connectionManager = connectionManager;
+    handlerMap = new HashMap<>();
+    for (CommandHandler h : handlers) {
+      if(handlerMap.containsKey(h.getCommandType())){
+        LOG.error("Duplicate handler for the same command. Exiting. Handle " +
+            "key : { }", h.getCommandType().getDescriptorForType().getName());
+        throw new IllegalArgumentException("Duplicate handler for the same " +
+            "command.");
+      }
+      handlerMap.put(h.getCommandType(), h);
+    }
+  }
+
+  /**
+   * Dispatch the command to the correct handler.
+   *
+   * @param command - SCM Command.
+   */
+  public void handle(SCMCommand command) {
+    Preconditions.checkNotNull(command);
+    CommandHandler handler = handlerMap.get(command.getType());
+    if (handler != null) {
+      handler.handle(command, container, context, connectionManager);
+    } else {
+      LOG.error("Unknown SCM Command queued. There is no handler for this " +
+          "command. Command: {}", command.getType().getDescriptorForType()
+          .getName());
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Helper class to construct command dispatcher.
+   */
+  public static class Builder {
+    private final List<CommandHandler> handlerList;
+    private OzoneContainer container;
+    private StateContext context;
+    private SCMConnectionManager connectionManager;
+
+    public Builder() {
+      handlerList = new LinkedList<>();
+    }
+
+    /**
+     * Adds a handler.
+     *
+     * @param handler - handler
+     * @return Builder
+     */
+    public Builder addHandler(CommandHandler handler) {
+      Preconditions.checkNotNull(handler);
+      handlerList.add(handler);
+      return this;
+    }
+
+    /**
+     * Add the OzoneContainer.
+     *
+     * @param container - ozone container.
+     * @return Builder
+     */
+    public Builder setContainer(OzoneContainer container) {
+      Preconditions.checkNotNull(container);
+      this.container = container;
+      return this;
+    }
+
+    /**
+     * Set the Connection Manager.
+     *
+     * @param connectionManager
+     * @return this
+     */
+    public Builder setConnectionManager(SCMConnectionManager
+        connectionManager) {
+      Preconditions.checkNotNull(connectionManager);
+      this.connectionManager = connectionManager;
+      return this;
+    }
+
+    /**
+     * Sets the Context.
+     *
+     * @param context - StateContext
+     * @return this
+     */
+    public Builder setContext(StateContext context) {
+      Preconditions.checkNotNull(context);
+      this.context = context;
+      return this;
+    }
+
+    /**
+     * Builds a command Dispatcher.
+     * @return Command Dispatcher.
+     */
+    public CommandDispatcher build() {
+      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
+          " manager.");
+      Preconditions.checkNotNull(this.container, "Missing container.");
+      Preconditions.checkNotNull(this.context, "Missing context.");
+      Preconditions.checkArgument(this.handlerList.size() > 0);
+      return new CommandDispatcher(this.container, this.connectionManager,
+          this.context, handlerList.toArray(
+              new CommandHandler[handlerList.size()]));
+    }
+  }
+}

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+/**
+ * Generic interface for handlers.
+ */
+public interface CommandHandler {
+
+  /**
+   * Handles a given SCM command.
+   * @param command - SCM Command
+   * @param container - Ozone Container.
+   * @param context - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager);
+
+  /**
+   * Returns the command type that this command handler handles.
+   * @return Type
+   */
+  Type getCommandType();
+
+  /**
+   * Returns number of times this handler has been invoked.
+   * @return int
+   */
+  int getInvocationCount();
+
+  /**
+   * Returns the average time this function takes to run.
+   * @return  long
+   */
+  long getAverageRunTime();
+
+}

+ 122 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Container Report handler.
+ */
+public class ContainerReportHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportHandler.class);
+  private int invocationCount;
+  private long totalTime;
+
+  /**
+   * Constructs a ContainerReport handler.
+   */
+  public ContainerReportHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command - SCM Command
+   * @param container - Ozone Container.
+   * @param context - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Container Report.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+    try {
+      ContainerReportsProto.Builder contianerReportsBuilder =
+          ContainerReportsProto.newBuilder();
+      List<ContainerData> closedContainerList = container.getContainerReports();
+      for (ContainerData cd : closedContainerList) {
+        ContainerReport report =
+            new ContainerReport(cd.getContainerName(), cd.getHash());
+        contianerReportsBuilder.addReports(report.getProtoBufMessage());
+      }
+      contianerReportsBuilder.setType(ContainerReportsProto.reportType
+          .fullReport);
+
+      // TODO : We send this report to all SCMs.Check if it is enough only to
+      // send to the leader once we have RAFT enabled SCMs.
+      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+        endPoint.getEndPoint().sendContainerReport(
+            contianerReportsBuilder.build());
+      }
+    } catch (IOException ex) {
+      LOG.error("Unable to process the Container Report command.", ex);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public Type getCommandType() {
+    return Type.sendContainerReport;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount;
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
@@ -176,4 +177,13 @@ public class OzoneContainer {
   public int getContainerServerPort() {
     return server.getIPCPort();
   }
+
+  /**
+   * Returns the list of closed containers.
+   * @return - List of closed containers.
+   * @throws IOException
+   */
+  public List<ContainerData> getContainerReports() throws IOException {
+    return this.manager.getContainerReports();
+  }
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
@@ -61,4 +62,12 @@ public interface StorageContainerDatanodeProtocol {
   SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
       String[] scmAddresses) throws IOException;
 
+  /**
+   * Send a container report.
+   * @param reports -- Container report
+   * @return HeartbeatRespose.nullcommand.
+   * @throws IOException
+   */
+  SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
+      throws IOException;
 }

+ 22 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto
@@ -31,14 +32,12 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -159,4 +158,23 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
     return response;
   }
 
+  /**
+   * Send a container report.
+   *
+   * @param reports -- Container report
+   * @return HeartbeatRespose.nullcommand.
+   * @throws IOException
+   */
+  @Override
+  public SCMHeartbeatResponseProto sendContainerReport(
+      ContainerReportsProto reports) throws IOException {
+    final SCMHeartbeatResponseProto resp;
+    try {
+      resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return resp;
+  }
+
 }

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 
 import java.io.IOException;
 
@@ -84,4 +85,16 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public SCMHeartbeatResponseProto
+      sendContainerReport(RpcController controller,
+      ContainerReportsProto request)
+      throws ServiceException {
+    try {
+      return impl.sendContainerReport(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 31 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -34,11 +34,12 @@ import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.protocol.LocatedContainer;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.ozone.protocol.commands.NullCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto
@@ -297,16 +298,24 @@ public class StorageContainerManager
     Type type = cmd.getType();
     switch (type) {
     case nullCmd:
-      Preconditions.checkState(cmd.getClass() == NullCommand.class);
-      return SCMCommandResponseProto.newBuilder().setCmdType(cmd.getType())
-          .setNullCommand(
-              NullCmdResponseProto.parseFrom(cmd.getProtoBufMessage()))
-          .build();
+      return getNullCmdResponse();
     default:
       throw new IllegalArgumentException("Not implemented");
     }
   }
 
+  /**
+   * Returns a null command response.
+   * @return
+   * @throws InvalidProtocolBufferException
+   */
+  private static SCMCommandResponseProto getNullCmdResponse()  {
+    return SCMCommandResponseProto.newBuilder()
+        .setCmdType(Type.nullCmd)
+        .setNullCommand(NullCmdResponseProto.getDefaultInstance())
+        .build();
+  }
+
   @VisibleForTesting
   public static SCMRegisteredCmdResponseProto getRegisteredResponse(
       SCMCommand cmd, SCMNodeAddressList addressList) {
@@ -480,6 +489,22 @@ public class StorageContainerManager
     return getRegisteredResponse(scmNodeManager.register(datanodeID), null);
   }
 
+  /**
+   * Send a container report.
+   *
+   * @param reports -- Container report
+   * @return HeartbeatRespose.nullcommand.
+   * @throws IOException
+   */
+  @Override
+  public SCMHeartbeatResponseProto
+      sendContainerReport(ContainerReportsProto reports) throws IOException {
+    // TODO : fix this in the server side code changes for handling this request
+    // correctly.
+    return SCMHeartbeatResponseProto.newBuilder()
+        .addCommands(getNullCmdResponse()).build();
+  }
+
   /**
    * Returns the Number of Datanodes that are communicating with SCM.
    *

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -96,7 +96,7 @@ A container report contains the following information.
 */
 message ContainerInfo {
   required string containerName = 1;
-  repeated bytes finalhash = 2;
+  required string finalhash = 2;
   optional int64 size = 3;
   optional int64 keycount = 4;
 }
@@ -105,7 +105,7 @@ message ContainerInfo {
 A set of container reports, max count is generally set to
 8192 since that keeps the size of the reports under 1 MB.
 */
-message ContainerReports {
+message ContainerReportsProto {
   enum reportType {
     fullReport = 0;
     deltaReport = 1;
@@ -306,4 +306,11 @@ service StorageContainerDatanodeProtocolService {
    * extremely light weight and contains no data payload.
    */
   rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
+
+  /**
+    send container reports sends the container report to SCM. This will
+    return a null command as response.
+  */
+  rpc sendContainerReport(ContainerReportsProto)  returns (SCMHeartbeatResponseProto);
+
 }

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java

@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
@@ -37,6 +38,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
   private AtomicInteger heartbeatCount = new AtomicInteger(0);
   private AtomicInteger rpcCount = new AtomicInteger(0);
   private ReportState reportState;
+  private AtomicInteger containerReportsCount = new AtomicInteger(0);
+  private AtomicInteger closedContainerCount = new AtomicInteger(0);
 
   /**
    * Returns the number of heartbeats made to this class.
@@ -74,6 +77,22 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
     this.rpcResponseDelay = rpcResponseDelay;
   }
 
+  /**
+   * Returns the number of container reports server has seen.
+   * @return int
+   */
+  public int getContainerReportsCount() {
+    return containerReportsCount.get();
+  }
+
+  /**
+   * Returns the number of closed containers that have been reported so far.
+   * @return - count of closed containers.
+   */
+  public int getClosedContainerCount() {
+    return closedContainerCount.get();
+  }
+
   /**
    * Returns SCM version.
    *
@@ -118,6 +137,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
     heartbeatCount.incrementAndGet();
     this.reportState = reportState;
     sleepIfNeeded();
+    return getNullRespose();
+  }
+
+  private StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+      getNullRespose() throws
+      com.google.protobuf.InvalidProtocolBufferException {
     StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
         cmdResponse = StorageContainerDatanodeProtocolProtos
         .SCMCommandResponseProto
@@ -155,6 +180,23 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
                 .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
   }
 
+  /**
+   * Send a container report.
+   *
+   * @param reports -- Container report
+   * @return HeartbeatResponse.nullcommand.
+   * @throws IOException
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+      sendContainerReport(StorageContainerDatanodeProtocolProtos
+      .ContainerReportsProto reports) throws IOException {
+    Preconditions.checkNotNull(reports);
+    containerReportsCount.incrementAndGet();
+    closedContainerCount.addAndGet(reports.getReportsCount());
+    return getNullRespose();
+  }
+
   public ReportState getReportState() {
     return this.reportState;
   }

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java

@@ -16,10 +16,12 @@
  */
 package org.apache.hadoop.ozone.container.common;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
 import org.apache.hadoop.ozone.container.common.statemachine
     .DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine
@@ -344,4 +346,51 @@ public class TestEndPoint {
     scmServerImpl.setRpcResponseDelay(0);
     Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
   }
+
+  /**
+   * Returns a new container report.
+   * @return
+   */
+  ContainerReport getRandomContainerReport() {
+    return new ContainerReport(UUID.randomUUID().toString()
+        ,DigestUtils.sha256Hex("Random"));
+  }
+
+  /**
+   * Creates dummy container reports.
+   * @param count - The number of closed containers to create.
+   * @return ContainerReportsProto
+   */
+  StorageContainerDatanodeProtocolProtos.ContainerReportsProto
+      createDummyContainerReports(int count) {
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
+        reportsBuilder = StorageContainerDatanodeProtocolProtos
+        .ContainerReportsProto.newBuilder();
+    for (int x = 0; x < count; x++) {
+      reportsBuilder.addReports(getRandomContainerReport()
+          .getProtoBufMessage());
+    }
+    reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
+        .ContainerReportsProto.reportType.fullReport);
+    return reportsBuilder.build();
+  }
+
+  /**
+   * Tests that rpcEndpoint sendContainerReport works as expected.
+   * @throws Exception
+   */
+  @Test
+  public void testContainerReportSend() throws Exception {
+    final int count = 1000;
+    try (EndpointStateMachine rpcEndPoint =
+             SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+                 serverAddress, 1000)) {
+      SCMHeartbeatResponseProto responseProto = rpcEndPoint
+          .getEndPoint().sendContainerReport(createDummyContainerReports(
+              count));
+      Assert.assertNotNull(responseProto);
+    }
+    Assert.assertEquals(1, scmServerImpl.getContainerReportsCount());
+    Assert.assertEquals(count, scmServerImpl.getClosedContainerCount());
+  }
 }