|
@@ -49,9 +49,6 @@ public class ConnectionManager {
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(ConnectionManager.class);
|
|
LoggerFactory.getLogger(ConnectionManager.class);
|
|
|
|
|
|
- /** Number of parallel new connections to create. */
|
|
|
|
- protected static final int MAX_NEW_CONNECTIONS = 100;
|
|
|
|
-
|
|
|
|
/** Minimum amount of active connections: 50%. */
|
|
/** Minimum amount of active connections: 50%. */
|
|
protected static final float MIN_ACTIVE_RATIO = 0.5f;
|
|
protected static final float MIN_ACTIVE_RATIO = 0.5f;
|
|
|
|
|
|
@@ -77,8 +74,10 @@ public class ConnectionManager {
|
|
private final Lock writeLock = readWriteLock.writeLock();
|
|
private final Lock writeLock = readWriteLock.writeLock();
|
|
|
|
|
|
/** Queue for creating new connections. */
|
|
/** Queue for creating new connections. */
|
|
- private final BlockingQueue<ConnectionPool> creatorQueue =
|
|
|
|
- new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS);
|
|
|
|
|
|
+ private final BlockingQueue<ConnectionPool> creatorQueue;
|
|
|
|
+ /** Max size of queue for creating new connections. */
|
|
|
|
+ private final int creatorQueueMaxSize;
|
|
|
|
+
|
|
/** Create new connections asynchronously. */
|
|
/** Create new connections asynchronously. */
|
|
private final ConnectionCreator creator;
|
|
private final ConnectionCreator creator;
|
|
/** Periodic executor to remove stale connection pools. */
|
|
/** Periodic executor to remove stale connection pools. */
|
|
@@ -106,7 +105,12 @@ public class ConnectionManager {
|
|
this.pools = new HashMap<>();
|
|
this.pools = new HashMap<>();
|
|
|
|
|
|
// Create connections in a thread asynchronously
|
|
// Create connections in a thread asynchronously
|
|
- this.creator = new ConnectionCreator(creatorQueue);
|
|
|
|
|
|
+ this.creatorQueueMaxSize = this.conf.getInt(
|
|
|
|
+ RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE,
|
|
|
|
+ RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT
|
|
|
|
+ );
|
|
|
|
+ this.creatorQueue = new ArrayBlockingQueue<>(this.creatorQueueMaxSize);
|
|
|
|
+ this.creator = new ConnectionCreator(this.creatorQueue);
|
|
this.creator.setDaemon(true);
|
|
this.creator.setDaemon(true);
|
|
|
|
|
|
// Cleanup periods
|
|
// Cleanup periods
|
|
@@ -213,7 +217,7 @@ public class ConnectionManager {
|
|
if (conn == null || !conn.isUsable()) {
|
|
if (conn == null || !conn.isUsable()) {
|
|
if (!this.creatorQueue.offer(pool)) {
|
|
if (!this.creatorQueue.offer(pool)) {
|
|
LOG.error("Cannot add more than {} connections at the same time",
|
|
LOG.error("Cannot add more than {} connections at the same time",
|
|
- MAX_NEW_CONNECTIONS);
|
|
|
|
|
|
+ this.creatorQueueMaxSize);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|