|
@@ -25,7 +25,11 @@ import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
@@ -284,38 +288,47 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public <T extends BaseRecord> int remove(
|
|
|
- Class<T> clazz, Query<T> query) throws IOException {
|
|
|
+ public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
|
|
|
+ List<Query<T>> queries) throws IOException {
|
|
|
verifyDriverReady();
|
|
|
- if (query == null) {
|
|
|
- return 0;
|
|
|
+ // Track how many entries are deleted by each query
|
|
|
+ Map<Query<T>, Integer> ret = new HashMap<>();
|
|
|
+ final List<T> trueRemoved = Collections.synchronizedList(new ArrayList<>());
|
|
|
+ if (queries.isEmpty()) {
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
// Read the current data
|
|
|
long start = monotonicNow();
|
|
|
- List<T> records = null;
|
|
|
+ List<T> records;
|
|
|
try {
|
|
|
QueryResult<T> result = get(clazz);
|
|
|
records = result.getRecords();
|
|
|
} catch (IOException ex) {
|
|
|
LOG.error("Cannot get existing records", ex);
|
|
|
getMetrics().addFailure(monotonicNow() - start);
|
|
|
- return 0;
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
// Check the records to remove
|
|
|
String znode = getZNodeForClass(clazz);
|
|
|
- List<T> recordsToRemove = filterMultiple(query, records);
|
|
|
+ Set<T> recordsToRemove = new HashSet<>();
|
|
|
+ Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
|
|
|
+ for (Query<T> query : queries) {
|
|
|
+ List<T> filtered = filterMultiple(query, records);
|
|
|
+ queryToRecords.put(query, filtered);
|
|
|
+ recordsToRemove.addAll(filtered);
|
|
|
+ }
|
|
|
|
|
|
// Remove the records
|
|
|
- int removed = 0;
|
|
|
- for (T existingRecord : recordsToRemove) {
|
|
|
+ List<Callable<Void>> callables = new ArrayList<>();
|
|
|
+ recordsToRemove.forEach(existingRecord -> callables.add(() -> {
|
|
|
LOG.info("Removing \"{}\"", existingRecord);
|
|
|
try {
|
|
|
String primaryKey = getPrimaryKey(existingRecord);
|
|
|
String path = getNodePath(znode, primaryKey);
|
|
|
if (zkManager.delete(path)) {
|
|
|
- removed++;
|
|
|
+ trueRemoved.add(existingRecord);
|
|
|
} else {
|
|
|
LOG.error("Did not remove \"{}\"", existingRecord);
|
|
|
}
|
|
@@ -323,12 +336,38 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
LOG.error("Cannot remove \"{}\"", existingRecord, e);
|
|
|
getMetrics().addFailure(monotonicNow() - start);
|
|
|
}
|
|
|
+ return null;
|
|
|
+ }));
|
|
|
+ try {
|
|
|
+ if (enableConcurrent) {
|
|
|
+ executorService.invokeAll(callables);
|
|
|
+ } else {
|
|
|
+ for (Callable<Void> callable : callables) {
|
|
|
+ callable.call();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Record removal failed : {}", e.getMessage(), e);
|
|
|
}
|
|
|
long end = monotonicNow();
|
|
|
- if (removed > 0) {
|
|
|
+ if (!trueRemoved.isEmpty()) {
|
|
|
getMetrics().addRemove(end - start);
|
|
|
}
|
|
|
- return removed;
|
|
|
+ // Generate return map
|
|
|
+ for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
|
|
|
+ for (T record : entry.getValue()) {
|
|
|
+ if (trueRemoved.contains(record)) {
|
|
|
+ ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
|
|
|
+ throws IOException {
|
|
|
+ return remove(clazz, Collections.singletonList(query)).get(query);
|
|
|
}
|
|
|
|
|
|
@Override
|