|
@@ -121,6 +121,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.protobuf.ByteString;
|
|
|
import com.google.protobuf.CodedOutputStream;
|
|
|
import com.google.protobuf.Message;
|
|
|
+import org.codehaus.jackson.map.ObjectMapper;
|
|
|
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -2082,6 +2083,9 @@ public abstract class Server {
|
|
|
authorizeConnection();
|
|
|
// don't set until after authz because connection isn't established
|
|
|
connectionContextRead = true;
|
|
|
+ if (user != null) {
|
|
|
+ connectionManager.incrUserConnections(user.getShortUserName());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2935,7 +2939,20 @@ public abstract class Server {
|
|
|
public int getNumOpenConnections() {
|
|
|
return connectionManager.size();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the NumOpenConnections/User.
|
|
|
+ */
|
|
|
+ public String getNumOpenConnectionsPerUser() {
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ try {
|
|
|
+ return mapper
|
|
|
+ .writeValueAsString(connectionManager.getUserToConnectionsMap());
|
|
|
+ } catch (IOException ignored) {
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The number of rpc calls in the queue.
|
|
|
* @return The number of rpc calls in the queue.
|
|
@@ -3047,6 +3064,9 @@ public abstract class Server {
|
|
|
private class ConnectionManager {
|
|
|
final private AtomicInteger count = new AtomicInteger();
|
|
|
final private Set<Connection> connections;
|
|
|
+ /* Map to maintain the statistics per User */
|
|
|
+ final private Map<String, Integer> userToConnectionsMap;
|
|
|
+ final private Object userToConnectionsMapLock = new Object();
|
|
|
|
|
|
final private Timer idleScanTimer;
|
|
|
final private int idleScanThreshold;
|
|
@@ -3078,6 +3098,7 @@ public abstract class Server {
|
|
|
this.connections = Collections.newSetFromMap(
|
|
|
new ConcurrentHashMap<Connection,Boolean>(
|
|
|
maxQueueSize, 0.75f, readThreads+2));
|
|
|
+ this.userToConnectionsMap = new ConcurrentHashMap<>();
|
|
|
}
|
|
|
|
|
|
private boolean add(Connection connection) {
|
|
@@ -3095,7 +3116,39 @@ public abstract class Server {
|
|
|
}
|
|
|
return removed;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ void incrUserConnections(String user) {
|
|
|
+ synchronized (userToConnectionsMapLock) {
|
|
|
+ Integer count = userToConnectionsMap.get(user);
|
|
|
+ if (count == null) {
|
|
|
+ count = 1;
|
|
|
+ } else {
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ userToConnectionsMap.put(user, count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void decrUserConnections(String user) {
|
|
|
+ synchronized (userToConnectionsMapLock) {
|
|
|
+ Integer count = userToConnectionsMap.get(user);
|
|
|
+ if (count == null) {
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ count--;
|
|
|
+ }
|
|
|
+ if (count == 0) {
|
|
|
+ userToConnectionsMap.remove(user);
|
|
|
+ } else {
|
|
|
+ userToConnectionsMap.put(user, count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Integer> getUserToConnectionsMap() {
|
|
|
+ return userToConnectionsMap;
|
|
|
+ }
|
|
|
+
|
|
|
int size() {
|
|
|
return count.get();
|
|
|
}
|
|
@@ -3134,6 +3187,10 @@ public abstract class Server {
|
|
|
// only close if actually removed to avoid double-closing due
|
|
|
// to possible races
|
|
|
connection.close();
|
|
|
+ // Remove authorized users only
|
|
|
+ if (connection.user != null && connection.connectionContextRead) {
|
|
|
+ decrUserConnections(connection.user.getShortUserName());
|
|
|
+ }
|
|
|
}
|
|
|
return exists;
|
|
|
}
|