Ver Fonte

HDDS-1817. GetKey fails with IllegalArgumentException.

Signed-off-by: Anu Engineer <aengineer@apache.org>
(cherry picked from commit 2546e6ece240924af2188bb39b3954a4896e4a4f)
Nanda kumar há 6 anos atrás
pai
commit
e87135ab20

+ 0 - 7
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java

@@ -468,11 +468,4 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
         || state == HddsProtos.LifeCycleState.CLOSING;
   }
 
-  /**
-   * Check if a container is in Open state, but Close has not been initiated.
-   * @return true if Open, false otherwise.
-   */
-  public boolean isOpenNotClosing() {
-    return state == HddsProtos.LifeCycleState.OPEN;
-  }
 }

+ 25 - 29
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.ScmUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
@@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -216,15 +218,14 @@ public class SCMClientProtocolServer implements
   @Override
   public ContainerWithPipeline getContainerWithPipeline(long containerID)
       throws IOException {
-    Map<String, String> auditMap = Maps.newHashMap();
-    auditMap.put("containerID", String.valueOf(containerID));
-    boolean auditSuccess = true;
+    final ContainerID cid = ContainerID.valueof(containerID);
     try {
+      final ContainerInfo container = scm.getContainerManager()
+          .getContainer(cid);
+
       if (safeModePrecheck.isInSafeMode()) {
-        ContainerInfo contInfo = scm.getContainerManager()
-            .getContainer(ContainerID.valueof(containerID));
-        if (contInfo.isOpen()) {
-          if (!hasRequiredReplicas(contInfo)) {
+        if (container.isOpen()) {
+          if (!hasRequiredReplicas(container)) {
             throw new SCMException("Open container " + containerID + " doesn't"
                 + " have enough replicas to service this operation in "
                 + "Safe mode.", ResultCodes.SAFE_MODE_EXCEPTION);
@@ -233,40 +234,35 @@ public class SCMClientProtocolServer implements
       }
       getScm().checkAdminAccess(null);
 
-      final ContainerID id = ContainerID.valueof(containerID);
-      final ContainerInfo container = scm.getContainerManager().
-          getContainer(id);
-      final Pipeline pipeline;
+      Pipeline pipeline;
+      try {
+        pipeline = container.isOpen() ? scm.getPipelineManager()
+            .getPipeline(container.getPipelineID()) : null;
+      } catch (PipelineNotFoundException ex) {
+        // The pipeline is destroyed.
+        pipeline = null;
+      }
 
-      if (container.isOpenNotClosing()) {
-        // Ratis pipeline
-        pipeline = scm.getPipelineManager()
-            .getPipeline(container.getPipelineID());
-      } else {
+      if (pipeline == null) {
         pipeline = scm.getPipelineManager().createPipeline(
             HddsProtos.ReplicationType.STAND_ALONE,
             container.getReplicationFactor(),
             scm.getContainerManager()
-                .getContainerReplicas(id).stream()
+                .getContainerReplicas(cid).stream()
                 .map(ContainerReplica::getDatanodeDetails)
                 .collect(Collectors.toList()));
       }
 
+      AUDIT.logReadSuccess(buildAuditMessageForSuccess(
+          SCMAction.GET_CONTAINER_WITH_PIPELINE,
+          Collections.singletonMap("containerID", cid.toString())));
+
       return new ContainerWithPipeline(container, pipeline);
     } catch (IOException ex) {
-      auditSuccess = false;
-      AUDIT.logReadFailure(
-          buildAuditMessageForFailure(SCMAction.GET_CONTAINER_WITH_PIPELINE,
-              auditMap, ex)
-      );
+      AUDIT.logReadFailure(buildAuditMessageForFailure(
+          SCMAction.GET_CONTAINER_WITH_PIPELINE,
+          Collections.singletonMap("containerID", cid.toString()), ex));
       throw ex;
-    } finally {
-      if(auditSuccess) {
-        AUDIT.logReadSuccess(
-            buildAuditMessageForSuccess(SCMAction.GET_CONTAINER_WITH_PIPELINE,
-                auditMap)
-        );
-      }
     }
   }
 

+ 14 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java

@@ -49,6 +49,8 @@ import java.util.UUID;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .INCREMENTAL_CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
@@ -121,6 +123,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
 
       }
 
+      final List<IncrementalContainerReportProto> icrs =
+          heartbeat.getIncrementalContainerReportList();
+
+      if (icrs.size() > 0) {
+        LOG.debug("Dispatching ICRs.");
+        for (IncrementalContainerReportProto icr : icrs) {
+          eventPublisher.fireEvent(INCREMENTAL_CONTAINER_REPORT,
+              new IncrementalContainerReportFromDatanode(
+                  datanodeDetails, icr));
+        }
+      }
+
       if (heartbeat.hasContainerActions()) {
         LOG.debug("Dispatching Container Actions.");
         eventPublisher.fireEvent(