Explorar el Código

HDFS-17009. RBF: state store putAll should also return failed records (#5664)

Viraj Jasani hace 2 años
padre
commit
8e17385141
Se han modificado 10 ficheros con 143 adiciones y 32 borrados
  1. 79 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java
  2. 2 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
  4. 17 10
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
  5. 8 4
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
  6. 7 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
  8. 1 3
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
  9. 21 2
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
  10. 6 5
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java

+ 79 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * State store operation result with list of failed records.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class StateStoreOperationResult {
+
+  private final List<String> failedRecordsKeys;
+  private final boolean isOperationSuccessful;
+
+  private static final StateStoreOperationResult DEFAULT_OPERATION_SUCCESS_RESULT =
+      new StateStoreOperationResult(Collections.emptyList(), true);
+
+  /**
+   * State store operation result constructor with list of failed records keys and boolean
+   * to inform whether the overall operation is successful.
+   *
+   * @param failedRecordsKeys The list of failed records keys.
+   * @param isOperationSuccessful True if the operation was successful, False otherwise.
+   */
+  public StateStoreOperationResult(List<String> failedRecordsKeys,
+      boolean isOperationSuccessful) {
+    this.failedRecordsKeys = failedRecordsKeys;
+    this.isOperationSuccessful = isOperationSuccessful;
+  }
+
+  /**
+   * State store operation result constructor with a single failed record key.
+   *
+   * @param failedRecordKey The failed record key.
+   */
+  public StateStoreOperationResult(String failedRecordKey) {
+    if (failedRecordKey != null && failedRecordKey.length() > 0) {
+      this.isOperationSuccessful = false;
+      this.failedRecordsKeys = Collections.singletonList(failedRecordKey);
+    } else {
+      this.isOperationSuccessful = true;
+      this.failedRecordsKeys = Collections.emptyList();
+    }
+  }
+
+  public List<String> getFailedRecordsKeys() {
+    return failedRecordsKeys;
+  }
+
+  public boolean isOperationSuccessful() {
+    return isOperationSuccessful;
+  }
+
+  public static StateStoreOperationResult getDefaultSuccessResult() {
+    return DEFAULT_OPERATION_SUCCESS_RESULT;
+  }
+}

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java

@@ -107,12 +107,11 @@ public interface StateStoreRecordOperations {
    * @param allowUpdate True if update of exiting record is allowed.
    * @param errorIfExists True if an error should be returned when inserting
    *          an existing record. Only used if allowUpdate = false.
-   * @return true if all operations were successful.
-   *
+   * @return The result of the putAll operation.
    * @throws IOException Throws exception if unable to query the data store.
    */
   @AtMostOnce
-  <T extends BaseRecord> boolean putAll(
+  <T extends BaseRecord> StateStoreOperationResult putAll(
       List<T> records, boolean allowUpdate, boolean errorIfExists)
           throws IOException;
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java

@@ -75,7 +75,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver {
       T record, boolean allowUpdate, boolean errorIfExists) throws IOException {
     List<T> singletonList = new ArrayList<>();
     singletonList.add(record);
-    return putAll(singletonList, allowUpdate, errorIfExists);
+    return putAll(singletonList, allowUpdate, errorIfExists).isOperationSuccessful();
   }
 
   @Override

+ 17 - 10
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -372,12 +373,12 @@ public abstract class StateStoreFileBaseImpl
   }
 
   @Override
-  public <T extends BaseRecord> boolean putAll(
+  public <T extends BaseRecord> StateStoreOperationResult putAll(
       List<T> records, boolean allowUpdate, boolean errorIfExists)
           throws StateStoreUnavailableException {
     verifyDriverReady();
     if (records.isEmpty()) {
-      return true;
+      return StateStoreOperationResult.getDefaultSuccessResult();
     }
 
     long start = monotonicNow();
@@ -402,7 +403,7 @@ public abstract class StateStoreFileBaseImpl
           if (metrics != null) {
             metrics.addFailure(monotonicNow() - start);
           }
-          return false;
+          return new StateStoreOperationResult(primaryKey);
         } else {
           LOG.debug("Not updating {}", record);
         }
@@ -414,7 +415,9 @@ public abstract class StateStoreFileBaseImpl
     // Write the records
     final AtomicBoolean success = new AtomicBoolean(true);
     final List<Callable<Void>> callables = new ArrayList<>();
-    toWrite.entrySet().forEach(entry -> callables.add(() -> writeRecordToFile(success, entry)));
+    final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>());
+    toWrite.entrySet().forEach(
+        entry -> callables.add(() -> writeRecordToFile(success, entry, failedRecordsKeys)));
     if (this.concurrentStoreAccessPool != null) {
       // Write records concurrently
       List<Future<Void>> futures = null;
@@ -454,36 +457,40 @@ public abstract class StateStoreFileBaseImpl
         metrics.addFailure(end - start);
       }
     }
-    return success.get();
+    return new StateStoreOperationResult(failedRecordsKeys, success.get());
   }
 
   /**
    * Writes the state store record to the file. At first, the record is written to a temp location
    * and then later renamed to the final location that is passed with the entry key.
    *
+   * @param <T> Record class of the records.
    * @param success The atomic boolean that gets updated to false if the file write operation fails.
    * @param entry The entry of the record path and the state store record to be written to the file
    * by first writing to a temp location and then renaming it to the record path.
-   * @param <T> Record class of the records.
+   * @param failedRecordsList The list of paths of the failed records.
    * @return Void.
    */
   private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
-      Entry<String, T> entry) {
-    String recordPath = entry.getKey();
-    String recordPathTemp = recordPath + "." + now() + TMP_MARK;
+      Entry<String, T> entry, List<String> failedRecordsList) {
+    final String recordPath = entry.getKey();
+    final T record = entry.getValue();
+    final String primaryKey = getPrimaryKey(record);
+    final String recordPathTemp = recordPath + "." + now() + TMP_MARK;
     boolean recordWrittenSuccessfully = true;
     try (BufferedWriter writer = getWriter(recordPathTemp)) {
-      T record = entry.getValue();
       String line = serializeString(record);
       writer.write(line);
     } catch (IOException e) {
       LOG.error("Cannot write {}", recordPathTemp, e);
       recordWrittenSuccessfully = false;
+      failedRecordsList.add(primaryKey);
       success.set(false);
     }
     // Commit
     if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
       LOG.error("Failed committing record into {}", recordPath);
+      failedRecordsList.add(primaryKey);
       success.set(false);
     }
     return null;

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
@@ -161,10 +162,10 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
   }
 
   @Override
-  public <T extends BaseRecord> boolean putAll(
+  public <T extends BaseRecord> StateStoreOperationResult putAll(
       List<T> records, boolean allowUpdate, boolean errorIfExists) throws IOException {
     if (records.isEmpty()) {
-      return true;
+      return StateStoreOperationResult.getDefaultSuccessResult();
     }
 
     verifyDriverReady();
@@ -173,6 +174,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
     long start = Time.monotonicNow();
 
     boolean success = true;
+    final List<String> failedRecordsKeys = new ArrayList<>();
     for (T record : records) {
       String tableName = getAndValidateTableNameForClass(record.getClass());
       String primaryKey = getPrimaryKey(record);
@@ -185,6 +187,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
           record.setDateModified(this.getTime());
           if (!updateRecord(tableName, primaryKey, data)) {
             LOG.error("Cannot write {} into table {}", primaryKey, tableName);
+            failedRecordsKeys.add(primaryKey);
             success = false;
           }
         } else {
@@ -194,7 +197,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
             if (metrics != null) {
               metrics.addFailure(Time.monotonicNow() - start);
             }
-            return false;
+            return new StateStoreOperationResult(primaryKey);
           } else {
             LOG.debug("Not updating {} as updates are not allowed", record);
           }
@@ -202,6 +205,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
       } else {
         if (!insertRecord(tableName, primaryKey, data)) {
           LOG.error("Cannot write {} in table {}", primaryKey, tableName);
+          failedRecordsKeys.add(primaryKey);
           success = false;
         }
       }
@@ -215,7 +219,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
         metrics.addFailure(end - start);
       }
     }
-    return success;
+    return new StateStoreOperationResult(failedRecordsKeys, success);
   }
 
   @Override

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java

@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -40,6 +41,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.Query;
 import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -230,11 +232,11 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
   }
 
   @Override
-  public <T extends BaseRecord> boolean putAll(
+  public <T extends BaseRecord> StateStoreOperationResult putAll(
       List<T> records, boolean update, boolean error) throws IOException {
     verifyDriverReady();
     if (records.isEmpty()) {
-      return true;
+      return StateStoreOperationResult.getDefaultSuccessResult();
     }
 
     // All records should be the same
@@ -245,6 +247,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
     long start = monotonicNow();
     final AtomicBoolean status = new AtomicBoolean(true);
     List<Callable<Void>> callables = new ArrayList<>();
+    final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>());
     records.forEach(record ->
         callables.add(
             () -> {
@@ -252,6 +255,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
               String recordZNode = getNodePath(znode, primaryKey);
               byte[] data = serialize(record);
               if (!writeNode(recordZNode, data, update, error)) {
+                failedRecordsKeys.add(primaryKey);
                 status.set(false);
               }
               return null;
@@ -276,7 +280,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
     } else {
       getMetrics().addFailure(end - start);
     }
-    return status.get();
+    return new StateStoreOperationResult(failedRecordsKeys, status.get());
   }
 
   @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java

@@ -145,7 +145,7 @@ public class MountTableStoreImpl extends MountTableStore {
       final String src = mountTable.getSourcePath();
       checkMountTablePermission(src);
     }
-    boolean status = getDriver().putAll(mountTables, false, true);
+    boolean status = getDriver().putAll(mountTables, false, true).isOperationSuccessful();
     AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance();
     response.setStatus(status);
     if (status) {

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java

@@ -235,9 +235,7 @@ public final class FederationStateStoreTestUtils {
     StateStoreDriver driver = stateStore.getDriver();
     driver.verifyDriverReady();
     if (driver.removeAll(clazz)) {
-      if (driver.putAll(records, true, false)) {
-        return true;
-      }
+      return driver.putAll(records, true, false).isOperationSuccessful();
     }
     return false;
   }

+ 21 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java

@@ -307,7 +307,15 @@ public class TestStateStoreDriverBase {
     }
 
     // Verify
-    assertTrue(driver.putAll(insertList, false, true));
+    StateStoreOperationResult result1 = driver.putAll(insertList, false, true);
+    assertTrue(result1.isOperationSuccessful());
+    assertEquals(0, result1.getFailedRecordsKeys().size());
+
+    StateStoreOperationResult result2 = driver.putAll(insertList.subList(0, 1), false, true);
+    assertFalse(result2.isOperationSuccessful());
+    assertEquals(1, result2.getFailedRecordsKeys().size());
+    assertEquals(getPrimaryKey(insertList.get(0)), result2.getFailedRecordsKeys().get(0));
+
     records = driver.get(clazz);
     assertEquals(records.getRecords().size(), 10);
 
@@ -384,7 +392,10 @@ public class TestStateStoreDriverBase {
     }
 
     // Verify
-    assertTrue(driver.putAll(insertList, false, true));
+    StateStoreOperationResult result = driver.putAll(insertList, false, true);
+    assertTrue(result.isOperationSuccessful());
+    assertEquals(0, result.getFailedRecordsKeys().size());
+
     records = driver.get(clazz);
     assertEquals(records.getRecords().size(), 10);
 
@@ -689,4 +700,12 @@ public class TestStateStoreDriverBase {
     }
     return null;
   }
+
+  private static String getPrimaryKey(BaseRecord record) {
+    String primaryKey = record.getPrimaryKey();
+    primaryKey = primaryKey.replaceAll("/", "0SLASH0");
+    primaryKey = primaryKey.replaceAll(":", "_");
+    return primaryKey;
+  }
+
 }

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java

@@ -18,11 +18,13 @@
 package org.apache.hadoop.hdfs.server.federation.store.records;
 
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -89,10 +91,9 @@ public class MockStateStoreDriver extends StateStoreBaseImpl {
   }
 
   @Override
-  public <T extends BaseRecord> boolean putAll(List<T> records,
-                                               boolean allowUpdate,
-                                               boolean errorIfExists)
-      throws IOException {
+  public <T extends BaseRecord> StateStoreOperationResult putAll(List<T> records,
+      boolean allowUpdate,
+      boolean errorIfExists) throws IOException {
     checkErrors();
     for (T record : records) {
       Map<String, BaseRecord> map =
@@ -107,7 +108,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl {
             + ": " + key);
       }
     }
-    return true;
+    return new StateStoreOperationResult(Collections.emptyList(), true);
   }
 
   /**