|
@@ -20,11 +20,13 @@ package org.apache.hadoop.net;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.InterruptedIOException;
|
|
|
+import java.net.SocketAddress;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SelectableChannel;
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
import java.nio.channels.Selector;
|
|
|
+import java.nio.channels.SocketChannel;
|
|
|
import java.nio.channels.spi.SelectorProvider;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
@@ -159,7 +161,8 @@ abstract class SocketIOWithTimeout {
|
|
|
}
|
|
|
|
|
|
if (count == 0) {
|
|
|
- throw new SocketTimeoutException(timeoutExceptionString(ops));
|
|
|
+ throw new SocketTimeoutException(timeoutExceptionString(channel,
|
|
|
+ timeout, ops));
|
|
|
}
|
|
|
// otherwise the socket should be ready for io.
|
|
|
}
|
|
@@ -167,6 +170,64 @@ abstract class SocketIOWithTimeout {
|
|
|
return 0; // does not reach here.
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The contract is similar to {@link SocketChannel#connect(SocketAddress)}
|
|
|
+ * with a timeout.
|
|
|
+ *
|
|
|
+ * @see SocketChannel#connect(SocketAddress)
|
|
|
+ *
|
|
|
+ * @param channel - this should be a {@link SelectableChannel}
|
|
|
+ * @param endpoint
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ static void connect(SocketChannel channel,
|
|
|
+ SocketAddress endpoint, int timeout) throws IOException {
|
|
|
+
|
|
|
+ boolean blockingOn = channel.isBlocking();
|
|
|
+ if (blockingOn) {
|
|
|
+ channel.configureBlocking(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (channel.connect(endpoint)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long timeoutLeft = timeout;
|
|
|
+ long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ // we might have to call finishConnect() more than once
|
|
|
+ // for some channels (with user level protocols)
|
|
|
+
|
|
|
+ int ret = selector.select((SelectableChannel)channel,
|
|
|
+ SelectionKey.OP_CONNECT, timeoutLeft);
|
|
|
+
|
|
|
+ if (ret > 0 && channel.finishConnect()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ret == 0 ||
|
|
|
+ (timeout > 0 &&
|
|
|
+ (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
|
|
|
+ throw new SocketTimeoutException(
|
|
|
+ timeoutExceptionString(channel, timeout,
|
|
|
+ SelectionKey.OP_CONNECT));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ // javadoc for SocketChannel.connect() says channel should be closed.
|
|
|
+ try {
|
|
|
+ channel.close();
|
|
|
+ } catch (IOException ignored) {}
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ if (blockingOn && channel.isOpen()) {
|
|
|
+ channel.configureBlocking(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This is similar to {@link #doIO(ByteBuffer, int)} except that it
|
|
|
* does not perform any I/O. It just waits for the channel to be ready
|
|
@@ -182,17 +243,28 @@ abstract class SocketIOWithTimeout {
|
|
|
void waitForIO(int ops) throws IOException {
|
|
|
|
|
|
if (selector.select(channel, ops, timeout) == 0) {
|
|
|
- throw new SocketTimeoutException(timeoutExceptionString(ops));
|
|
|
+ throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
|
|
|
+ ops));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private String timeoutExceptionString(int ops) {
|
|
|
|
|
|
- String waitingFor = "" + ops;
|
|
|
- if (ops == SelectionKey.OP_READ) {
|
|
|
- waitingFor = "read";
|
|
|
- } else if (ops == SelectionKey.OP_WRITE) {
|
|
|
- waitingFor = "write";
|
|
|
+ private static String timeoutExceptionString(SelectableChannel channel,
|
|
|
+ long timeout, int ops) {
|
|
|
+
|
|
|
+ String waitingFor;
|
|
|
+ switch(ops) {
|
|
|
+
|
|
|
+ case SelectionKey.OP_READ :
|
|
|
+ waitingFor = "read"; break;
|
|
|
+
|
|
|
+ case SelectionKey.OP_WRITE :
|
|
|
+ waitingFor = "write"; break;
|
|
|
+
|
|
|
+ case SelectionKey.OP_CONNECT :
|
|
|
+ waitingFor = "connect"; break;
|
|
|
+
|
|
|
+ default :
|
|
|
+ waitingFor = "" + ops;
|
|
|
}
|
|
|
|
|
|
return timeout + " millis timeout while " +
|