|
@@ -18,28 +18,39 @@
|
|
|
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
|
|
|
-import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass;
|
|
|
+import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
+import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.BufferedWriter;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
- * {@link StateStoreDriver} implementation based on a local file.
|
|
|
+ * {@link StateStoreDriver} implementation based on files. In this approach, we
|
|
|
+ * use temporary files for the writes and renaming "atomically" to the final
|
|
|
+ * value. Instead of writing to the final location, it will go to a temporary
|
|
|
+ * one and then rename to the final destination.
|
|
|
*/
|
|
|
public abstract class StateStoreFileBaseImpl
|
|
|
extends StateStoreSerializableImpl {
|
|
@@ -47,75 +58,76 @@ public abstract class StateStoreFileBaseImpl
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
|
|
|
|
|
|
+ /** File extension for temporary files. */
|
|
|
+ private static final String TMP_MARK = ".tmp";
|
|
|
+ /** We remove temporary files older than 10 seconds. */
|
|
|
+ private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
|
|
|
+ /** File pattern for temporary records: file.XYZ.tmp. */
|
|
|
+ private static final Pattern OLD_TMP_RECORD_PATTERN =
|
|
|
+ Pattern.compile(".+\\.(\\d+)\\.tmp");
|
|
|
+
|
|
|
/** If it is initialized. */
|
|
|
private boolean initialized = false;
|
|
|
|
|
|
- /** Name of the file containing the data. */
|
|
|
- private static final String DATA_FILE_NAME = "records.data";
|
|
|
-
|
|
|
|
|
|
/**
|
|
|
- * Lock reading records.
|
|
|
+ * Get the reader of a record for the file system.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param path Path of the record to read.
|
|
|
+ * @return Reader for the record.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> void lockRecordRead(Class<T> clazz);
|
|
|
+ protected abstract <T extends BaseRecord> BufferedReader getReader(
|
|
|
+ String path);
|
|
|
|
|
|
/**
|
|
|
- * Unlock reading records.
|
|
|
+ * Get the writer of a record for the file system.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param path Path of the record to write.
|
|
|
+ * @return Writer for the record.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> void unlockRecordRead(
|
|
|
- Class<T> clazz);
|
|
|
+ protected abstract <T extends BaseRecord> BufferedWriter getWriter(
|
|
|
+ String path);
|
|
|
|
|
|
/**
|
|
|
- * Lock writing records.
|
|
|
+ * Check if a path exists.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param path Path to check.
|
|
|
+ * @return If the path exists.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> void lockRecordWrite(
|
|
|
- Class<T> clazz);
|
|
|
+ protected abstract boolean exists(String path);
|
|
|
|
|
|
/**
|
|
|
- * Unlock writing records.
|
|
|
+ * Make a directory.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param path Path of the directory to create.
|
|
|
+ * @return If the directory was created.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> void unlockRecordWrite(
|
|
|
- Class<T> clazz);
|
|
|
+ protected abstract boolean mkdir(String path);
|
|
|
|
|
|
/**
|
|
|
- * Get the reader for the file system.
|
|
|
+ * Rename a file. This should be atomic.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param src Source name.
|
|
|
+ * @param dst Destination name.
|
|
|
+ * @return If the rename was successful.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> BufferedReader getReader(
|
|
|
- Class<T> clazz, String sub);
|
|
|
+ protected abstract boolean rename(String src, String dst);
|
|
|
|
|
|
/**
|
|
|
- * Get the writer for the file system.
|
|
|
+ * Remove a file.
|
|
|
*
|
|
|
- * @param clazz Class of the record.
|
|
|
+ * @param path Path for the file to remove
|
|
|
+ * @return If the file was removed.
|
|
|
*/
|
|
|
- protected abstract <T extends BaseRecord> BufferedWriter getWriter(
|
|
|
- Class<T> clazz, String sub);
|
|
|
+ protected abstract boolean remove(String path);
|
|
|
|
|
|
/**
|
|
|
- * Check if a path exists.
|
|
|
+ * Get the children for a path.
|
|
|
*
|
|
|
* @param path Path to check.
|
|
|
- * @return If the path exists.
|
|
|
- */
|
|
|
- protected abstract boolean exists(String path);
|
|
|
-
|
|
|
- /**
|
|
|
- * Make a directory.
|
|
|
- *
|
|
|
- * @param path Path of the directory to create.
|
|
|
- * @return If the directory was created.
|
|
|
+ * @return List of children.
|
|
|
*/
|
|
|
- protected abstract boolean mkdir(String path);
|
|
|
+ protected abstract List<String> getChildren(String path);
|
|
|
|
|
|
/**
|
|
|
* Get root directory.
|
|
@@ -171,15 +183,6 @@ public abstract class StateStoreFileBaseImpl
|
|
|
LOG.error("Cannot create data directory {}", dataDirPath);
|
|
|
return false;
|
|
|
}
|
|
|
- String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME;
|
|
|
- if (!exists(dataFilePath)) {
|
|
|
- // Create empty file
|
|
|
- List<T> emtpyList = new ArrayList<>();
|
|
|
- if(!writeAll(emtpyList, recordClass)) {
|
|
|
- LOG.error("Cannot create data file {}", dataFilePath);
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
} catch (Exception ex) {
|
|
|
LOG.error("Cannot create data directory {}", dataDirPath, ex);
|
|
@@ -188,138 +191,110 @@ public abstract class StateStoreFileBaseImpl
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Read all lines from a file and deserialize into the desired record type.
|
|
|
- *
|
|
|
- * @param reader Open handle for the file.
|
|
|
- * @param clazz Record class to create.
|
|
|
- * @param includeDates True if dateModified/dateCreated are serialized.
|
|
|
- * @return List of records.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private <T extends BaseRecord> List<T> getAllFile(
|
|
|
- BufferedReader reader, Class<T> clazz, boolean includeDates)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- List<T> ret = new ArrayList<T>();
|
|
|
- String line;
|
|
|
- while ((line = reader.readLine()) != null) {
|
|
|
- if (!line.startsWith("#") && line.length() > 0) {
|
|
|
- try {
|
|
|
- T record = newRecord(line, clazz, includeDates);
|
|
|
- ret.add(record);
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Cannot parse line in data source file: {}", line, ex);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return ret;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
|
|
|
throws IOException {
|
|
|
- return get(clazz, (String)null);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub)
|
|
|
- throws IOException {
|
|
|
verifyDriverReady();
|
|
|
- BufferedReader reader = null;
|
|
|
- lockRecordRead(clazz);
|
|
|
+ long start = monotonicNow();
|
|
|
+ StateStoreMetrics metrics = getMetrics();
|
|
|
+ List<T> ret = new ArrayList<>();
|
|
|
try {
|
|
|
- reader = getReader(clazz, sub);
|
|
|
- List<T> data = getAllFile(reader, clazz, true);
|
|
|
- return new QueryResult<T>(data, getTime());
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Cannot fetch records {}", clazz.getSimpleName());
|
|
|
- throw new IOException("Cannot read from data store " + ex.getMessage());
|
|
|
- } finally {
|
|
|
- if (reader != null) {
|
|
|
- try {
|
|
|
- reader.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Failed closing file", e);
|
|
|
+ String path = getPathForClass(clazz);
|
|
|
+ List<String> children = getChildren(path);
|
|
|
+ for (String child : children) {
|
|
|
+ String pathRecord = path + "/" + child;
|
|
|
+ if (child.endsWith(TMP_MARK)) {
|
|
|
+ LOG.debug("There is a temporary file {} in {}", child, path);
|
|
|
+ if (isOldTempRecord(child)) {
|
|
|
+ LOG.warn("Removing {} as it's an old temporary record", child);
|
|
|
+ remove(pathRecord);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ T record = getRecord(pathRecord, clazz);
|
|
|
+ ret.add(record);
|
|
|
}
|
|
|
}
|
|
|
- unlockRecordRead(clazz);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.addFailure(monotonicNow() - start);
|
|
|
+ }
|
|
|
+ String msg = "Cannot fetch records for " + clazz.getSimpleName();
|
|
|
+ LOG.error(msg, e);
|
|
|
+ throw new IOException(msg, e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.addRead(monotonicNow() - start);
|
|
|
}
|
|
|
+ return new QueryResult<T>(ret, getTime());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Overwrite the existing data with a new data set.
|
|
|
+ * Check if a record is temporary and old.
|
|
|
*
|
|
|
- * @param records List of records to write.
|
|
|
- * @param writer BufferedWriter stream to write to.
|
|
|
- * @return If the records were succesfully written.
|
|
|
+ * @param pathRecord Path for the record to check.
|
|
|
+ * @return If the record is temporary and old.
|
|
|
*/
|
|
|
- private <T extends BaseRecord> boolean writeAllFile(
|
|
|
- Collection<T> records, BufferedWriter writer) {
|
|
|
-
|
|
|
- try {
|
|
|
- for (BaseRecord record : records) {
|
|
|
- try {
|
|
|
- String data = serializeString(record);
|
|
|
- writer.write(data);
|
|
|
- writer.newLine();
|
|
|
- } catch (IllegalArgumentException ex) {
|
|
|
- LOG.error("Cannot write record {} to file", record, ex);
|
|
|
- }
|
|
|
- }
|
|
|
- writer.flush();
|
|
|
- return true;
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Cannot commit records to file", e);
|
|
|
+ @VisibleForTesting
|
|
|
+ public static boolean isOldTempRecord(final String pathRecord) {
|
|
|
+ if (!pathRecord.endsWith(TMP_MARK)) {
|
|
|
return false;
|
|
|
}
|
|
|
+ // Extract temporary record creation time
|
|
|
+ Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
|
|
|
+ if (m.find()) {
|
|
|
+ long time = Long.parseLong(m.group(1));
|
|
|
+ return now() - time > OLD_TMP_RECORD_MS;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Overwrite the existing data with a new data set. Replaces all records in
|
|
|
- * the data store for this record class. If all records in the data store are
|
|
|
- * not successfully committed, this function must return false and leave the
|
|
|
- * data store unchanged.
|
|
|
+ * Read a record from a file.
|
|
|
*
|
|
|
- * @param records List of records to write. All records must be of type
|
|
|
- * recordClass.
|
|
|
- * @param recordClass Class of record to replace.
|
|
|
- * @return true if all operations were successful, false otherwise.
|
|
|
- * @throws StateStoreUnavailableException
|
|
|
+ * @param path Path to the file containing the record.
|
|
|
+ * @param clazz Class of the record.
|
|
|
+ * @return Record read from the file.
|
|
|
+ * @throws IOException If the file cannot be read.
|
|
|
*/
|
|
|
- public <T extends BaseRecord> boolean writeAll(
|
|
|
- Collection<T> records, Class<T> recordClass)
|
|
|
- throws StateStoreUnavailableException {
|
|
|
- verifyDriverReady();
|
|
|
- lockRecordWrite(recordClass);
|
|
|
- BufferedWriter writer = null;
|
|
|
+ private <T extends BaseRecord> T getRecord(
|
|
|
+ final String path, final Class<T> clazz) throws IOException {
|
|
|
+ BufferedReader reader = getReader(path);
|
|
|
try {
|
|
|
- writer = getWriter(recordClass, null);
|
|
|
- return writeAllFile(records, writer);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(
|
|
|
- "Cannot add records to file for {}", recordClass.getSimpleName(), e);
|
|
|
- return false;
|
|
|
- } finally {
|
|
|
- if (writer != null) {
|
|
|
- try {
|
|
|
- writer.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error(
|
|
|
- "Cannot close writer for {}", recordClass.getSimpleName(), e);
|
|
|
+ String line;
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
+ if (!line.startsWith("#") && line.length() > 0) {
|
|
|
+ try {
|
|
|
+ T record = newRecord(line, clazz, false);
|
|
|
+ return record;
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.error("Cannot parse line {} in file {}", line, path, ex);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- unlockRecordWrite(recordClass);
|
|
|
+ } finally {
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
}
|
|
|
+ throw new IOException("Cannot read " + path + " for record " +
|
|
|
+ clazz.getSimpleName());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the data file name.
|
|
|
- *
|
|
|
- * @return Data file name.
|
|
|
+ * Get the path for a record class.
|
|
|
+ * @param clazz Class of the record.
|
|
|
+ * @return Path for this record class.
|
|
|
*/
|
|
|
- protected String getDataFileName() {
|
|
|
- return DATA_FILE_NAME;
|
|
|
+ private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
|
|
|
+ String className = StateStoreUtils.getRecordName(clazz);
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append(getRootDir());
|
|
|
+ if (sb.charAt(sb.length() - 1) != '/') {
|
|
|
+ sb.append("/");
|
|
|
+ }
|
|
|
+ sb.append(className);
|
|
|
+ return sb.toString();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -332,56 +307,80 @@ public abstract class StateStoreFileBaseImpl
|
|
|
List<T> records, boolean allowUpdate, boolean errorIfExists)
|
|
|
throws StateStoreUnavailableException {
|
|
|
verifyDriverReady();
|
|
|
-
|
|
|
if (records.isEmpty()) {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Class<T> clazz = (Class<T>) getRecordClass(records.get(0).getClass());
|
|
|
- QueryResult<T> result;
|
|
|
- try {
|
|
|
- result = get(clazz);
|
|
|
- } catch (IOException e) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- Map<Object, T> writeList = new HashMap<>();
|
|
|
+ long start = monotonicNow();
|
|
|
+ StateStoreMetrics metrics = getMetrics();
|
|
|
|
|
|
- // Write all of the existing records
|
|
|
- for (T existingRecord : result.getRecords()) {
|
|
|
- String key = existingRecord.getPrimaryKey();
|
|
|
- writeList.put(key, existingRecord);
|
|
|
- }
|
|
|
+ // Check if any record exists
|
|
|
+ Map<String, T> toWrite = new HashMap<>();
|
|
|
+ for (T record : records) {
|
|
|
+ Class<? extends BaseRecord> recordClass = record.getClass();
|
|
|
+ String path = getPathForClass(recordClass);
|
|
|
+ String primaryKey = getPrimaryKey(record);
|
|
|
+ String recordPath = path + "/" + primaryKey;
|
|
|
|
|
|
- // Add inserts and updates, overwrite any existing values
|
|
|
- for (T updatedRecord : records) {
|
|
|
- try {
|
|
|
- updatedRecord.validate();
|
|
|
- String key = updatedRecord.getPrimaryKey();
|
|
|
- if (writeList.containsKey(key) && allowUpdate) {
|
|
|
- // Update
|
|
|
- writeList.put(key, updatedRecord);
|
|
|
+ if (exists(recordPath)) {
|
|
|
+ if (allowUpdate) {
|
|
|
// Update the mod time stamp. Many backends will use their
|
|
|
// own timestamp for the mod time.
|
|
|
- updatedRecord.setDateModified(this.getTime());
|
|
|
- } else if (!writeList.containsKey(key)) {
|
|
|
- // Insert
|
|
|
- // Create/Mod timestamps are already initialized
|
|
|
- writeList.put(key, updatedRecord);
|
|
|
+ record.setDateModified(this.getTime());
|
|
|
+ toWrite.put(recordPath, record);
|
|
|
} else if (errorIfExists) {
|
|
|
LOG.error("Attempt to insert record {} that already exists",
|
|
|
- updatedRecord);
|
|
|
+ recordPath);
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.addFailure(monotonicNow() - start);
|
|
|
+ }
|
|
|
return false;
|
|
|
+ } else {
|
|
|
+ LOG.debug("Not updating {}", record);
|
|
|
}
|
|
|
- } catch (IllegalArgumentException ex) {
|
|
|
- LOG.error("Cannot write invalid record to State Store", ex);
|
|
|
- return false;
|
|
|
+ } else {
|
|
|
+ toWrite.put(recordPath, record);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Write all
|
|
|
- boolean status = writeAll(writeList.values(), clazz);
|
|
|
- return status;
|
|
|
+ // Write the records
|
|
|
+ boolean success = true;
|
|
|
+ for (Entry<String, T> entry : toWrite.entrySet()) {
|
|
|
+ String recordPath = entry.getKey();
|
|
|
+ String recordPathTemp = recordPath + "." + now() + TMP_MARK;
|
|
|
+ BufferedWriter writer = getWriter(recordPathTemp);
|
|
|
+ try {
|
|
|
+ T record = entry.getValue();
|
|
|
+ String line = serializeString(record);
|
|
|
+ writer.write(line);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Cannot write {}", recordPathTemp, e);
|
|
|
+ success = false;
|
|
|
+ } finally {
|
|
|
+ if (writer != null) {
|
|
|
+ try {
|
|
|
+ writer.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Cannot close the writer for {}", recordPathTemp);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Commit
|
|
|
+ if (!rename(recordPathTemp, recordPath)) {
|
|
|
+ LOG.error("Failed committing record into {}", recordPath);
|
|
|
+ success = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ long end = monotonicNow();
|
|
|
+ if (metrics != null) {
|
|
|
+ if (success) {
|
|
|
+ metrics.addWrite(end - start);
|
|
|
+ } else {
|
|
|
+ metrics.addFailure(end - start);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return success;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -393,6 +392,8 @@ public abstract class StateStoreFileBaseImpl
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ long start = Time.monotonicNow();
|
|
|
+ StateStoreMetrics metrics = getMetrics();
|
|
|
int removed = 0;
|
|
|
// Get the current records
|
|
|
try {
|
|
@@ -400,21 +401,34 @@ public abstract class StateStoreFileBaseImpl
|
|
|
final List<T> existingRecords = result.getRecords();
|
|
|
// Write all of the existing records except those to be removed
|
|
|
final List<T> recordsToRemove = filterMultiple(query, existingRecords);
|
|
|
- removed = recordsToRemove.size();
|
|
|
- final List<T> newRecords = new LinkedList<>();
|
|
|
- for (T record : existingRecords) {
|
|
|
- if (!recordsToRemove.contains(record)) {
|
|
|
- newRecords.add(record);
|
|
|
+ boolean success = true;
|
|
|
+ for (T recordToRemove : recordsToRemove) {
|
|
|
+ String path = getPathForClass(clazz);
|
|
|
+ String primaryKey = getPrimaryKey(recordToRemove);
|
|
|
+ String recordToRemovePath = path + "/" + primaryKey;
|
|
|
+ if (remove(recordToRemovePath)) {
|
|
|
+ removed++;
|
|
|
+ } else {
|
|
|
+ LOG.error("Cannot remove record {}", recordToRemovePath);
|
|
|
+ success = false;
|
|
|
}
|
|
|
}
|
|
|
- if (!writeAll(newRecords, clazz)) {
|
|
|
- throw new IOException(
|
|
|
- "Cannot remove record " + clazz + " query " + query);
|
|
|
+ if (!success) {
|
|
|
+ LOG.error("Cannot remove records {} query {}", clazz, query);
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.addFailure(monotonicNow() - start);
|
|
|
+ }
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Cannot remove records {} query {}", clazz, query, e);
|
|
|
+ if (metrics != null) {
|
|
|
+ metrics.addFailure(monotonicNow() - start);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ if (removed > 0 && metrics != null) {
|
|
|
+ metrics.addRemove(monotonicNow() - start);
|
|
|
+ }
|
|
|
return removed;
|
|
|
}
|
|
|
|
|
@@ -422,8 +436,27 @@ public abstract class StateStoreFileBaseImpl
|
|
|
public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
|
|
|
throws StateStoreUnavailableException {
|
|
|
verifyDriverReady();
|
|
|
- List<T> emptyList = new ArrayList<>();
|
|
|
- boolean status = writeAll(emptyList, clazz);
|
|
|
- return status;
|
|
|
+ long start = Time.monotonicNow();
|
|
|
+ StateStoreMetrics metrics = getMetrics();
|
|
|
+
|
|
|
+ boolean success = true;
|
|
|
+ String path = getPathForClass(clazz);
|
|
|
+ List<String> children = getChildren(path);
|
|
|
+ for (String child : children) {
|
|
|
+ String pathRecord = path + "/" + child;
|
|
|
+ if (!remove(pathRecord)) {
|
|
|
+ success = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (metrics != null) {
|
|
|
+ long time = Time.monotonicNow() - start;
|
|
|
+ if (success) {
|
|
|
+ metrics.addRemove(time);
|
|
|
+ } else {
|
|
|
+ metrics.addFailure(time);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return success;
|
|
|
}
|
|
|
}
|