|
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.util.ExitUtil;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
@@ -45,8 +46,7 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
|
|
|
private static final ThreadLocal<Edit> THREAD_EDIT = new ThreadLocal<Edit>();
|
|
|
|
|
|
// requires concurrent access from caller threads and syncing thread.
|
|
|
- private final BlockingQueue<Edit> editPendingQ =
|
|
|
- new ArrayBlockingQueue<Edit>(4096);
|
|
|
+ private final BlockingQueue<Edit> editPendingQ;
|
|
|
|
|
|
// only accessed by syncing thread so no synchronization required.
|
|
|
// queue is unbounded because it's effectively limited by the size
|
|
@@ -57,6 +57,12 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
|
|
|
super(conf, storage, editsDirs);
|
|
|
// op instances cannot be shared due to queuing for background thread.
|
|
|
cache.disableCache();
|
|
|
+ int editPendingQSize = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_PENDING_QUEUE_SIZE,
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_NAMENODE_EDITS_ASYNC_LOGGING_PENDING_QUEUE_SIZE_DEFAULT);
|
|
|
+
|
|
|
+ editPendingQ = new ArrayBlockingQueue<>(editPendingQSize);
|
|
|
}
|
|
|
|
|
|
private boolean isSyncThreadAlive() {
|