Jelajahi Sumber

HDDS-1201. Reporting corrupted containers info to SCM (#1032)

Hrishikesh Gadre 5 tahun lalu
induk
melakukan
acef5e0cec

+ 3 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java

@@ -154,6 +154,8 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
 
   /**
    * check and report the structural integrity of the container.
+   * @return true if the integrity checks pass
+   *         false otherwise
    */
-  void check() throws StorageContainerException;
+  boolean check();
 }

+ 5 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -135,8 +135,12 @@ public class HeartbeatEndpointTask
       addReports(requestBuilder);
       addContainerActions(requestBuilder);
       addPipelineActions(requestBuilder);
+      SCMHeartbeatRequestProto request = requestBuilder.build();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending heartbeat message :: {}", request.toString());
+      }
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(requestBuilder.build());
+          .sendHeartbeat(request);
       processResponse(reponse, datanodeDetailsProto);
       rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
       rpcEndpoint.zeroMissedCount();

+ 6 - 14
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

@@ -648,7 +648,7 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
   /**
    * run integrity checks on the Container metadata.
    */
-  public void check() throws StorageContainerException {
+  public boolean check() {
     ContainerCheckLevel level = ContainerCheckLevel.NO_CHECK;
     long containerId = containerData.getContainerID();
 
@@ -671,14 +671,12 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
           containerData.getState());
       break;
     default:
-      throw new StorageContainerException(
-          "Invalid Container state found for Container : " + containerData
-              .getContainerID(), INVALID_CONTAINER_STATE);
+      break;
     }
 
     if (level == ContainerCheckLevel.NO_CHECK) {
       LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
-      return;
+      return true;
     }
 
     KeyValueContainerCheck checker =
@@ -687,17 +685,11 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
 
     switch (level) {
     case FAST_CHECK:
-      checker.fastCheck();
-      break;
+      return checker.fastCheck();
     case FULL_CHECK:
-      checker.fullCheck();
-      break;
-    case NO_CHECK:
-      LOG.debug("Skipping integrity checks for Container Id : {}", containerId);
-      break;
+      return checker.fullCheck();
     default:
-      // we should not be here at all, scuttle the ship!
-      Preconditions.checkNotNull(0, "Invalid Containercheck level");
+      return true;
     }
   }
 

+ 16 - 30
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java

@@ -72,37 +72,22 @@ public class KeyValueContainerCheck {
    * These checks do not look inside the metadata files.
    * Applicable for OPEN containers.
    *
-   * @return true : corruption detected, false : no corruption.
+   * @return true : integrity checks pass, false : otherwise.
    */
   public boolean fastCheck() {
-    boolean corruption = false;
+    LOG.info("Running basic checks for container {};", containerID);
+    boolean valid = false;
     try {
-      basicChecks();
+      loadContainerData();
+      checkLayout();
+      checkContainerFile();
+      valid = true;
 
     } catch (IOException e) {
       handleCorruption(e);
-      corruption = true;
     }
 
-    return corruption;
-  }
-
-  /**
-   * Checks :
-   * 1. check directory layout
-   * 2. check container file
-   *
-   * @return void
-   */
-
-  private void basicChecks() throws IOException {
-
-    LOG.trace("Running basic checks for container {};", containerID);
-
-    loadContainerData();
-
-    checkLayout();
-    checkContainerFile();
+    return valid;
   }
 
   /**
@@ -114,21 +99,22 @@ public class KeyValueContainerCheck {
    * <p>
    * fullCheck is a superset of fastCheck
    *
-   * @return true : corruption detected, false : no corruption.
+   * @return true : integrity checks pass, false : otherwise.
    */
   public boolean fullCheck() {
-    boolean corruption = false;
+    boolean valid = false;
 
     try {
-      basicChecks();
-      checkBlockDB();
-
+      valid = fastCheck();
+      if (valid) {
+        checkBlockDB();
+      }
     } catch (IOException e) {
       handleCorruption(e);
-      corruption = true;
+      valid = false;
     }
 
-    return corruption;
+    return valid;
   }
 
   /**

+ 11 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -900,8 +900,17 @@ public class KeyValueHandler extends Handler {
   public void markContainerUnhealthy(Container container)
       throws IOException {
     if (container.getContainerState() != State.UNHEALTHY) {
-      container.markContainerUnhealthy();
-      sendICR(container);
+      try {
+        container.markContainerUnhealthy();
+      } catch (IOException ex) {
+        // explicitly catch IOException here since the this operation
+        // will fail if the Rocksdb metadata is corrupted.
+        long id = container.getContainerData().getContainerID();
+        LOG.warn("Unexpected error while marking container "
+                +id+ " as unhealthy", ex);
+      } finally {
+        sendICR(container);
+      }
     }
   }
 

+ 12 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java

@@ -73,6 +73,18 @@ public class ContainerController {
     }
   }
 
+  /**
+   * Marks the container as UNHEALTHY.
+   *
+   * @param containerId Id of the container to update
+   * @throws IOException in case of exception
+   */
+  public void markContainerUnhealthy(final long containerId)
+          throws IOException {
+    Container container = containerSet.getContainer(containerId);
+    getHandler(container).markContainerUnhealthy(container);
+  }
+
   /**
    * Returns the container report.
    *

+ 17 - 14
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScrubber.java

@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.net.ntp.TimeStamp;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -56,11 +57,7 @@ public class ContainerScrubber implements Runnable {
     LOG.info("Background ContainerScrubber starting up");
     while (true) {
 
-      try {
-        scrub();
-      } catch (StorageContainerException e) {
-        LOG.error("Scrubber encountered StorageContainerException.");
-      }
+      scrub();
 
       if (this.halt) {
         break; // stop and exit if requested
@@ -129,22 +126,20 @@ public class ContainerScrubber implements Runnable {
     }
   }
 
-  private void scrub() throws StorageContainerException {
+  private void scrub() {
     Iterator<Container> containerIt = controller.getContainers();
     long count = 0;
 
     while (containerIt.hasNext() && !halt) {
       TimeStamp startTime = new TimeStamp(System.currentTimeMillis());
       Container container = containerIt.next();
-
       try {
-        container.check();
-      } catch (StorageContainerException e) {
-        LOG.error("Error unexpected exception {} for Container {}", e,
-            container.getContainerData().getContainerID());
-        container.markContainerUnhealthy();
-        // XXX Action required here
+        scrub(container);
+      } catch (IOException e) {
+        LOG.info("Unexpected error while scrubbing container {}",
+                container.getContainerData().getContainerID());
       }
+
       count++;
 
       throttleScrubber(startTime);
@@ -152,4 +147,12 @@ public class ContainerScrubber implements Runnable {
 
     LOG.debug("iterator ran integrity checks on {} containers", count);
   }
+
+  @VisibleForTesting
+  public void scrub(Container container) throws IOException {
+    if (!container.check()) {
+      controller.markContainerUnhealthy(
+              container.getContainerData().getContainerID());
+    }
+  }
 }

+ 5 - 6
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java

@@ -55,7 +55,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -101,7 +100,7 @@ import static org.junit.Assert.assertTrue;
     int deletedBlocks = 1;
     int normalBlocks = 3;
     int chunksPerBlock = 4;
-    boolean corruption = false;
+    boolean valid = false;
 
     // test Closed Container
     createContainerWithBlocks(containerID, normalBlocks, deletedBlocks, 65536,
@@ -115,14 +114,14 @@ import static org.junit.Assert.assertTrue;
             containerID);
 
     // first run checks on a Open Container
-    corruption = kvCheck.fastCheck();
-    assertFalse(corruption);
+    valid = kvCheck.fastCheck();
+    assertTrue(valid);
 
     container.close();
 
     // next run checks on a Closed Container
-    corruption = kvCheck.fullCheck();
-    assertFalse(corruption);
+    valid = kvCheck.fullCheck();
+    assertTrue(valid);
   }
 
   /**

+ 2 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java

@@ -47,6 +47,8 @@ public class IncrementalContainerReportHandler extends
   @Override
   public void onMessage(final IncrementalContainerReportFromDatanode report,
                         final EventPublisher publisher) {
+    LOG.debug("Processing incremental container report from data node {}",
+            report.getDatanodeDetails().getUuid());
 
     for (ContainerReplicaProto replicaProto :
         report.getReport().getReportList()) {

+ 211 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java

@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.dn.scrubber;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubber;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.UUID;
+import java.io.File;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * This class tests the data scrubber functionality.
+ */
+public class TestDataScrubber {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration ozoneConfig;
+  private static OzoneClient ozClient = null;
+  private static ObjectStore store = null;
+  private static OzoneManager ozoneManager;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
+    ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
+        .build();
+    cluster.waitForClusterToBeReady();
+    ozClient = OzoneClientFactory.getRpcClient(ozoneConfig);
+    store = ozClient.getObjectStore();
+    ozoneManager = cluster.getOzoneManager();
+    storageContainerLocationClient =
+        cluster.getStorageContainerLocationClient();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    if (ozClient != null) {
+      ozClient.close();
+    }
+    if (storageContainerLocationClient != null) {
+      storageContainerLocationClient.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testOpenContainerIntegrity() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    for (int i = 0; i < 10; i++) {
+      String keyName = UUID.randomUUID().toString();
+
+      OzoneOutputStream out = bucket.createKey(keyName,
+          value.getBytes().length, STAND_ALONE,
+          ONE, new HashMap<>());
+      out.write(value.getBytes());
+      out.close();
+      OzoneKey key = bucket.getKey(keyName);
+      Assert.assertEquals(keyName, key.getName());
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] fileContent = new byte[value.getBytes().length];
+      is.read(fileContent);
+      Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+          keyName, STAND_ALONE,
+          ONE));
+      Assert.assertEquals(value, new String(fileContent));
+      Assert.assertTrue(key.getCreationTime() >= currentTime);
+      Assert.assertTrue(key.getModificationTime() >= currentTime);
+    }
+
+    // wait for the container report to propagate to SCM
+    Thread.sleep(5000);
+
+
+    Assert.assertEquals(1, cluster.getHddsDatanodes().size());
+
+    HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
+    OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
+    ContainerSet cs = oc.getContainerSet();
+    Container c = cs.getContainerIterator().next();
+
+    Assert.assertTrue(cs.containerCount() > 0);
+
+    // delete the chunks directory.
+    File chunksDir = new File(c.getContainerData().getContainerPath(), "chunks");
+    deleteDirectory(chunksDir);
+    Assert.assertFalse(chunksDir.exists());
+
+    ContainerScrubber sb = new ContainerScrubber(ozoneConfig, oc.getController());
+    sb.scrub(c);
+
+    // wait for the incremental container report to propagate to SCM
+    Thread.sleep(5000);
+
+    ContainerManager cm = cluster.getStorageContainerManager().getContainerManager();
+    Set<ContainerReplica> replicas = cm.getContainerReplicas(
+        ContainerID.valueof(c.getContainerData().getContainerID()));
+    Assert.assertEquals(1, replicas.size());
+    ContainerReplica r = replicas.iterator().next();
+    Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
+        ContainerReplicaProto.State.UNHEALTHY, r.getState());
+  }
+
+  boolean deleteDirectory(File directoryToBeDeleted) {
+    File[] allContents = directoryToBeDeleted.listFiles();
+    if (allContents != null) {
+      for (File file : allContents) {
+        deleteDirectory(file);
+      }
+    }
+    return directoryToBeDeleted.delete();
+  }
+
+  private boolean verifyRatisReplication(String volumeName, String bucketName,
+                                         String keyName, ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setRefreshPipeline(true)
+        .build();
+    HddsProtos.ReplicationType replicationType =
+        HddsProtos.ReplicationType.valueOf(type.toString());
+    HddsProtos.ReplicationFactor replicationFactor =
+        HddsProtos.ReplicationFactor.valueOf(factor.getValue());
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+    for (OmKeyLocationInfo info :
+        keyInfo.getLatestVersionLocations().getLocationList()) {
+      ContainerInfo container =
+          storageContainerLocationClient.getContainer(info.getContainerID());
+      if (!container.getReplicationFactor().equals(replicationFactor) || (
+          container.getReplicationType() != replicationType)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}