|
@@ -25,28 +25,103 @@ import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.LinkedListMultimap;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
+import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
|
- * A cache of sockets.
|
|
|
+ * A cache of input stream sockets to Data Node.
|
|
|
*/
|
|
|
class SocketCache {
|
|
|
- static final Log LOG = LogFactory.getLog(SocketCache.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(SocketCache.class);
|
|
|
+ private Daemon daemon;
|
|
|
+ /** A map for per user per datanode. */
|
|
|
+ private static LinkedListMultimap<SocketAddress, SocketProp> multimap =
|
|
|
+ LinkedListMultimap.create();
|
|
|
+ private static int capacity;
|
|
|
+ private static long expiryPeriod;
|
|
|
+ private static SocketCache scInstance = new SocketCache();
|
|
|
+
|
|
|
+ private static class SocketProp {
|
|
|
+ Socket s;
|
|
|
+ long createTime;
|
|
|
+ public SocketProp(Socket s)
|
|
|
+ {
|
|
|
+ this.s=s;
|
|
|
+ this.createTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
|
|
|
- private final LinkedListMultimap<SocketAddress, Socket> multimap;
|
|
|
- private final int capacity;
|
|
|
+ public long getCreateTime() {
|
|
|
+ return this.createTime;
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * Create a SocketCache with the given capacity.
|
|
|
- * @param capacity Max cache size.
|
|
|
- */
|
|
|
- public SocketCache(int capacity) {
|
|
|
- multimap = LinkedListMultimap.create();
|
|
|
- this.capacity = capacity;
|
|
|
+ public Socket getSocket() {
|
|
|
+ return this.s;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // capacity and expiryPeriod are only initialized once.
|
|
|
+ private static boolean isInitedOnce() {
|
|
|
+ if (capacity == 0 || expiryPeriod == 0) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static synchronized SocketCache getInstance(int c, long e) {
|
|
|
+
|
|
|
+ if (c == 0 || e == 0) {
|
|
|
+ throw new IllegalStateException("Cannot initialize ZERO capacity " +
|
|
|
+ "or expiryPeriod");
|
|
|
+ }
|
|
|
+
|
|
|
+ // capacity is only initialzied once
|
|
|
+ if (isInitedOnce() == false) {
|
|
|
+ capacity = c;
|
|
|
+ expiryPeriod = e;
|
|
|
+ } else if (capacity != c || expiryPeriod != e) {
|
|
|
+ LOG.info("capacity and expiry periods already set to " + capacity +
|
|
|
+ " and " + expiryPeriod + " respectively. Cannot set it to " + c +
|
|
|
+ " and " + e);
|
|
|
+ }
|
|
|
+
|
|
|
+ return scInstance;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isDaemonStarted() {
|
|
|
+ return (daemon == null)? false: true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void startExpiryDaemon() {
|
|
|
+ // start daemon only if not already started
|
|
|
+ if (isDaemonStarted() == true) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ daemon = new Daemon(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ SocketCache.this.run();
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ //noop
|
|
|
+ } finally {
|
|
|
+ SocketCache.this.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return String.valueOf(SocketCache.this);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ daemon.start();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -55,14 +130,14 @@ class SocketCache {
|
|
|
* @return A socket with unknown state, possibly closed underneath. Or null.
|
|
|
*/
|
|
|
public synchronized Socket get(SocketAddress remote) {
|
|
|
- List<Socket> socklist = multimap.get(remote);
|
|
|
- if (socklist == null) {
|
|
|
+ List<SocketProp> sockPropList = multimap.get(remote);
|
|
|
+ if (sockPropList == null) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- Iterator<Socket> iter = socklist.iterator();
|
|
|
+ Iterator<SocketProp> iter = sockPropList.iterator();
|
|
|
while (iter.hasNext()) {
|
|
|
- Socket candidate = iter.next();
|
|
|
+ Socket candidate = iter.next().getSocket();
|
|
|
iter.remove();
|
|
|
if (!candidate.isClosed()) {
|
|
|
return candidate;
|
|
@@ -76,7 +151,9 @@ class SocketCache {
|
|
|
* @param sock socket not used by anyone.
|
|
|
*/
|
|
|
public synchronized void put(Socket sock) {
|
|
|
+
|
|
|
Preconditions.checkNotNull(sock);
|
|
|
+ startExpiryDaemon();
|
|
|
|
|
|
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
|
|
|
if (remoteAddr == null) {
|
|
@@ -89,40 +166,75 @@ class SocketCache {
|
|
|
if (capacity == multimap.size()) {
|
|
|
evictOldest();
|
|
|
}
|
|
|
- multimap.put(remoteAddr, sock);
|
|
|
+ multimap.put(remoteAddr, new SocketProp (sock));
|
|
|
}
|
|
|
|
|
|
public synchronized int size() {
|
|
|
return multimap.size();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Evict and close sockets older than expiry period from the cache.
|
|
|
+ */
|
|
|
+ private synchronized void evictExpired(long expiryPeriod) {
|
|
|
+ while (multimap.size() != 0) {
|
|
|
+ Iterator<Entry<SocketAddress, SocketProp>> iter =
|
|
|
+ multimap.entries().iterator();
|
|
|
+ Entry<SocketAddress, SocketProp> entry = iter.next();
|
|
|
+
|
|
|
+ // if oldest socket expired, remove it
|
|
|
+ if (entry == null ||
|
|
|
+ System.currentTimeMillis() - entry.getValue().getCreateTime() <
|
|
|
+ expiryPeriod) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ iter.remove();
|
|
|
+ Socket sock = entry.getValue().getSocket();
|
|
|
+ IOUtils.closeSocket(sock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Evict the oldest entry in the cache.
|
|
|
*/
|
|
|
private synchronized void evictOldest() {
|
|
|
- Iterator<Entry<SocketAddress, Socket>> iter =
|
|
|
+ Iterator<Entry<SocketAddress, SocketProp>> iter =
|
|
|
multimap.entries().iterator();
|
|
|
if (!iter.hasNext()) {
|
|
|
throw new IllegalStateException("Cannot evict from empty cache!");
|
|
|
}
|
|
|
- Entry<SocketAddress, Socket> entry = iter.next();
|
|
|
+ Entry<SocketAddress, SocketProp> entry = iter.next();
|
|
|
iter.remove();
|
|
|
- Socket sock = entry.getValue();
|
|
|
+ Socket sock = entry.getValue().getSocket();
|
|
|
IOUtils.closeSocket(sock);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Empty the cache, and close all sockets.
|
|
|
+ * Periodically check in the cache and expire the entries
|
|
|
+ * older than expiryPeriod minutes
|
|
|
*/
|
|
|
- public synchronized void clear() {
|
|
|
- for (Socket sock : multimap.values()) {
|
|
|
- IOUtils.closeSocket(sock);
|
|
|
+ private void run() throws InterruptedException {
|
|
|
+ for(long lastExpiryTime = System.currentTimeMillis();
|
|
|
+ !Thread.interrupted();
|
|
|
+ Thread.sleep(expiryPeriod)) {
|
|
|
+ final long elapsed = System.currentTimeMillis() - lastExpiryTime;
|
|
|
+ if (elapsed >= expiryPeriod) {
|
|
|
+ evictExpired(expiryPeriod);
|
|
|
+ lastExpiryTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
}
|
|
|
- multimap.clear();
|
|
|
+ clear();
|
|
|
+ throw new InterruptedException("Daemon Interrupted");
|
|
|
}
|
|
|
|
|
|
- protected void finalize() {
|
|
|
- clear();
|
|
|
+ /**
|
|
|
+ * Empty the cache, and close all sockets.
|
|
|
+ */
|
|
|
+ private synchronized void clear() {
|
|
|
+ for (SocketProp sockProp : multimap.values()) {
|
|
|
+ IOUtils.closeSocket(sockProp.getSocket());
|
|
|
+ }
|
|
|
+ multimap.clear();
|
|
|
}
|
|
|
|
|
|
}
|