|
@@ -25,7 +25,16 @@ import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
import org.apache.curator.framework.imps.CuratorFrameworkState;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -57,14 +66,9 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
|
|
|
|
|
|
-
|
|
|
- /** Configuration keys. */
|
|
|
- public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
|
|
|
- RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
|
|
|
- public static final String FEDERATION_STORE_ZK_PARENT_PATH =
|
|
|
- FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
|
|
|
- public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
|
|
|
- "/hdfs-federation";
|
|
|
+ /** Service to get/update zk state. */
|
|
|
+ private ThreadPoolExecutor executorService;
|
|
|
+ private boolean enableConcurrent;
|
|
|
|
|
|
|
|
|
/** Directory to store the state store data. */
|
|
@@ -82,8 +86,22 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
|
|
|
Configuration conf = getConf();
|
|
|
baseZNode = conf.get(
|
|
|
- FEDERATION_STORE_ZK_PARENT_PATH,
|
|
|
- FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
|
|
|
+ RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH,
|
|
|
+ RBFConfigKeys.FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
|
|
|
+ int numThreads = conf.getInt(
|
|
|
+ RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS,
|
|
|
+ RBFConfigKeys.FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT);
|
|
|
+ enableConcurrent = numThreads > 0;
|
|
|
+ if (enableConcurrent) {
|
|
|
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("StateStore ZK Client-%d")
|
|
|
+ .build();
|
|
|
+ this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
|
|
|
+ 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
|
|
|
+ LOG.info("Init StateStoreZookeeperImpl by async mode with {} threads.", numThreads);
|
|
|
+ } else {
|
|
|
+ LOG.info("Init StateStoreZookeeperImpl by sync mode.");
|
|
|
+ }
|
|
|
try {
|
|
|
this.zkManager = new ZKCuratorManager(conf);
|
|
|
this.zkManager.start();
|
|
@@ -109,8 +127,16 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public void setEnableConcurrent(boolean enableConcurrent) {
|
|
|
+ this.enableConcurrent = enableConcurrent;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void close() throws Exception {
|
|
|
+ if (executorService != null) {
|
|
|
+ executorService.shutdown();
|
|
|
+ }
|
|
|
if (zkManager != null) {
|
|
|
zkManager.close();
|
|
|
}
|
|
@@ -136,34 +162,21 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
List<T> ret = new ArrayList<>();
|
|
|
String znode = getZNodeForClass(clazz);
|
|
|
try {
|
|
|
- List<String> children = zkManager.getChildren(znode);
|
|
|
- for (String child : children) {
|
|
|
- try {
|
|
|
- String path = getNodePath(znode, child);
|
|
|
- Stat stat = new Stat();
|
|
|
- String data = zkManager.getStringData(path, stat);
|
|
|
- boolean corrupted = false;
|
|
|
- if (data == null || data.equals("")) {
|
|
|
- // All records should have data, otherwise this is corrupted
|
|
|
- corrupted = true;
|
|
|
- } else {
|
|
|
- try {
|
|
|
- T record = createRecord(data, stat, clazz);
|
|
|
- ret.add(record);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
|
|
|
- clazz.getSimpleName(), data, e.getMessage());
|
|
|
- corrupted = true;
|
|
|
- }
|
|
|
+ List<Callable<T>> callables = new ArrayList<>();
|
|
|
+ zkManager.getChildren(znode).forEach(c -> callables.add(() -> getRecord(clazz, znode, c)));
|
|
|
+ if (enableConcurrent) {
|
|
|
+ List<Future<T>> futures = executorService.invokeAll(callables);
|
|
|
+ for (Future<T> future : futures) {
|
|
|
+ if (future.get() != null) {
|
|
|
+ ret.add(future.get());
|
|
|
}
|
|
|
-
|
|
|
- if (corrupted) {
|
|
|
- LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
|
|
|
- child, path);
|
|
|
- zkManager.delete(path);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ for (Callable<T> callable : callables) {
|
|
|
+ T record = callable.call();
|
|
|
+ if (record != null) {
|
|
|
+ ret.add(record);
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Cannot get data for {}: {}", child, e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -178,6 +191,44 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
return new QueryResult<T>(ret, getTime());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get one data record in the StateStore or delete it if it's corrupted.
|
|
|
+ *
|
|
|
+ * @param clazz Record class to evaluate.
|
|
|
+ * @param znode The ZNode for the class.
|
|
|
+ * @param child The child for znode to get.
|
|
|
+ * @return The record to get.
|
|
|
+ */
|
|
|
+ private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode, String child) {
|
|
|
+ T record = null;
|
|
|
+ try {
|
|
|
+ String path = getNodePath(znode, child);
|
|
|
+ Stat stat = new Stat();
|
|
|
+ String data = zkManager.getStringData(path, stat);
|
|
|
+ boolean corrupted = false;
|
|
|
+ if (data == null || data.equals("")) {
|
|
|
+ // All records should have data, otherwise this is corrupted
|
|
|
+ corrupted = true;
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ record = createRecord(data, stat, clazz);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
|
|
|
+ clazz.getSimpleName(), data, e.getMessage());
|
|
|
+ corrupted = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (corrupted) {
|
|
|
+ LOG.error("Cannot get data for {} at {}, cleaning corrupted data", child, path);
|
|
|
+ zkManager.delete(path);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Cannot get data for {}: {}", child, e.getMessage());
|
|
|
+ }
|
|
|
+ return record;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public <T extends BaseRecord> boolean putAll(
|
|
|
List<T> records, boolean update, boolean error) throws IOException {
|
|
@@ -192,22 +243,40 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
|
|
String znode = getZNodeForClass(recordClass);
|
|
|
|
|
|
long start = monotonicNow();
|
|
|
- boolean status = true;
|
|
|
- for (T record : records) {
|
|
|
- String primaryKey = getPrimaryKey(record);
|
|
|
- String recordZNode = getNodePath(znode, primaryKey);
|
|
|
- byte[] data = serialize(record);
|
|
|
- if (!writeNode(recordZNode, data, update, error)){
|
|
|
- status = false;
|
|
|
+ final AtomicBoolean status = new AtomicBoolean(true);
|
|
|
+ List<Callable<Void>> callables = new ArrayList<>();
|
|
|
+ records.forEach(record ->
|
|
|
+ callables.add(
|
|
|
+ () -> {
|
|
|
+ String primaryKey = getPrimaryKey(record);
|
|
|
+ String recordZNode = getNodePath(znode, primaryKey);
|
|
|
+ byte[] data = serialize(record);
|
|
|
+ if (!writeNode(recordZNode, data, update, error)) {
|
|
|
+ status.set(false);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ )
|
|
|
+ );
|
|
|
+ try {
|
|
|
+ if (enableConcurrent) {
|
|
|
+ executorService.invokeAll(callables);
|
|
|
+ } else {
|
|
|
+ for(Callable<Void> callable : callables) {
|
|
|
+ callable.call();
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Write record failed : {}", e.getMessage(), e);
|
|
|
+ throw new IOException(e);
|
|
|
}
|
|
|
long end = monotonicNow();
|
|
|
- if (status) {
|
|
|
+ if (status.get()) {
|
|
|
getMetrics().addWrite(end - start);
|
|
|
} else {
|
|
|
getMetrics().addFailure(end - start);
|
|
|
}
|
|
|
- return status;
|
|
|
+ return status.get();
|
|
|
}
|
|
|
|
|
|
@Override
|