|
@@ -36,6 +36,7 @@ import java.io.OutputStream;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.Hashtable;
|
|
import java.util.Hashtable;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.Random;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@@ -351,53 +352,100 @@ public class Client {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
|
|
|
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
|
|
|
+ UserGroupInformation currentUser =
|
|
|
|
+ UserGroupInformation.getCurrentUser();
|
|
|
|
+ UserGroupInformation realUser = currentUser.getRealUser();
|
|
|
|
+ if (authMethod == AuthMethod.KERBEROS &&
|
|
|
|
+ // relogin only in case it is the login user (e.g. JT)
|
|
|
|
+ // or superuser (like oozie).
|
|
|
|
+ (currentUser.equals(loginUser) || loginUser.equals(realUser))) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
private synchronized boolean setupSaslConnection(final InputStream in2,
|
|
private synchronized boolean setupSaslConnection(final InputStream in2,
|
|
final OutputStream out2)
|
|
final OutputStream out2)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try {
|
|
|
|
- saslRpcClient = new SaslRpcClient(authMethod, token,
|
|
|
|
- serverPrincipal);
|
|
|
|
- return saslRpcClient.saslConnect(in2, out2);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("Exception encountered while connecting to the server : " +
|
|
|
|
- e.getMessage() + ". Will attempt a relogin");
|
|
|
|
- /*
|
|
|
|
- * Catch all exceptions here. Most likely we would have hit one of
|
|
|
|
- * the kerberos exceptions. Just attempt to relogin and try to
|
|
|
|
- * connect to the server
|
|
|
|
- */
|
|
|
|
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
|
|
|
- UserGroupInformation currentUser =
|
|
|
|
- UserGroupInformation.getCurrentUser();
|
|
|
|
- UserGroupInformation realUser = currentUser.getRealUser();
|
|
|
|
- if (authMethod == AuthMethod.KERBEROS &&
|
|
|
|
- // relogin only in case it is the login user (e.g. JT)
|
|
|
|
- // or superuser (like oozie).
|
|
|
|
- (currentUser.equals(loginUser) || loginUser.equals(realUser))) {
|
|
|
|
- //try setting up the connection again
|
|
|
|
- try {
|
|
|
|
- //try re-login
|
|
|
|
- if (UserGroupInformation.isLoginKeytabBased()) {
|
|
|
|
- loginUser.reloginFromKeytab();
|
|
|
|
|
|
+ saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
|
|
|
|
+ return saslRpcClient.saslConnect(in2, out2);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private synchronized void setupConnection() throws IOException {
|
|
|
|
+ short ioFailures = 0;
|
|
|
|
+ short timeoutFailures = 0;
|
|
|
|
+ while (true) {
|
|
|
|
+ try {
|
|
|
|
+ this.socket = socketFactory.createSocket();
|
|
|
|
+ this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
|
+ // connection time out is 20s
|
|
|
|
+ NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
|
|
|
|
+ this.socket.setSoTimeout(pingInterval);
|
|
|
|
+ return;
|
|
|
|
+ } catch (SocketTimeoutException toe) {
|
|
|
|
+ /* The max number of retries is 45,
|
|
|
|
+ * which amounts to 20s*45 = 15 minutes retries.
|
|
|
|
+ */
|
|
|
|
+ handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * If multiple clients with the same principal try to connect
|
|
|
|
+ * to the same server at the same time, the server assumes a
|
|
|
|
+ * replay attack is in progress. This is a feature of kerberos.
|
|
|
|
+ * In order to work around this, what is done is that the client
|
|
|
|
+ * backs off randomly and tries to initiate the connection
|
|
|
|
+ * again.
|
|
|
|
+ * The other problem is to do with ticket expiry. To handle that,
|
|
|
|
+ * a relogin is attempted.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void handleSaslConnectionFailure(
|
|
|
|
+ final int currRetries,
|
|
|
|
+ final int maxRetries, final Exception ex, final Random rand,
|
|
|
|
+ final UserGroupInformation ugi)
|
|
|
|
+ throws IOException, InterruptedException{
|
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
+ public Object run() throws IOException, InterruptedException {
|
|
|
|
+ final short MAX_BACKOFF = 5000;
|
|
|
|
+ closeConnection();
|
|
|
|
+ if (shouldAuthenticateOverKrb()) {
|
|
|
|
+ if (currRetries < maxRetries) {
|
|
|
|
+ LOG.debug("Exception encountered while connecting to " +
|
|
|
|
+ "the server : " + ex);
|
|
|
|
+ //try re-login
|
|
|
|
+ if (UserGroupInformation.isLoginKeytabBased()) {
|
|
|
|
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
|
|
|
|
+ } else {
|
|
|
|
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
|
|
|
+ }
|
|
|
|
+ disposeSasl();
|
|
|
|
+ //have granularity of milliseconds
|
|
|
|
+ //we are sleeping with the Connection lock held but since this
|
|
|
|
+ //connection instance is being used for connecting to the server
|
|
|
|
+ //in question, it is okay
|
|
|
|
+ Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1));
|
|
|
|
+ return null;
|
|
} else {
|
|
} else {
|
|
- loginUser.reloginFromTicketCache();
|
|
|
|
|
|
+ String msg = "Couldn't setup connection for " +
|
|
|
|
+ UserGroupInformation.getLoginUser().getUserName() +
|
|
|
|
+ " to " + serverPrincipal;
|
|
|
|
+ LOG.warn(msg);
|
|
|
|
+ throw (IOException) new IOException(msg).initCause(ex);
|
|
}
|
|
}
|
|
- disposeSasl();
|
|
|
|
- saslRpcClient = new SaslRpcClient(authMethod, token,
|
|
|
|
- serverPrincipal);
|
|
|
|
- return saslRpcClient.saslConnect(in2, out2);
|
|
|
|
- } catch (Exception ex) {
|
|
|
|
- String msg = "Couldn't setup connection for " +
|
|
|
|
- loginUser.getUserName() +
|
|
|
|
- " to " + serverPrincipal + " even after relogin.";
|
|
|
|
- LOG.warn(msg);
|
|
|
|
- throw (IOException) new IOException(msg).initCause(ex);
|
|
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn("Exception encountered while connecting to " +
|
|
|
|
+ "the server : " + ex);
|
|
}
|
|
}
|
|
|
|
+ if (ex instanceof RemoteException)
|
|
|
|
+ throw (RemoteException)ex;
|
|
|
|
+ throw new IOException(ex);
|
|
}
|
|
}
|
|
- if (e instanceof RemoteException)
|
|
|
|
- throw (RemoteException)e;
|
|
|
|
- throw new IOException(e);
|
|
|
|
- }
|
|
|
|
|
|
+ });
|
|
}
|
|
}
|
|
/** Connect to the server and set up the I/O streams. It then sends
|
|
/** Connect to the server and set up the I/O streams. It then sends
|
|
* a header to the server and starts
|
|
* a header to the server and starts
|
|
@@ -407,80 +455,87 @@ public class Client {
|
|
if (socket != null || shouldCloseConnection.get()) {
|
|
if (socket != null || shouldCloseConnection.get()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
-
|
|
|
|
- short ioFailures = 0;
|
|
|
|
- short timeoutFailures = 0;
|
|
|
|
|
|
+
|
|
try {
|
|
try {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Connecting to "+server);
|
|
LOG.debug("Connecting to "+server);
|
|
}
|
|
}
|
|
|
|
+ short numRetries = 0;
|
|
|
|
+ final short MAX_RETRIES = 5;
|
|
|
|
+ Random rand = null;
|
|
while (true) {
|
|
while (true) {
|
|
- try {
|
|
|
|
- this.socket = socketFactory.createSocket();
|
|
|
|
- this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
|
- // connection time out is 20s
|
|
|
|
- NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
|
|
|
|
- this.socket.setSoTimeout(pingInterval);
|
|
|
|
- break;
|
|
|
|
- } catch (SocketTimeoutException toe) {
|
|
|
|
- /* The max number of retries is 45,
|
|
|
|
- * which amounts to 20s*45 = 15 minutes retries.
|
|
|
|
- */
|
|
|
|
- handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- InputStream inStream = NetUtils.getInputStream(socket);
|
|
|
|
- OutputStream outStream = NetUtils.getOutputStream(socket);
|
|
|
|
- writeRpcHeader(outStream);
|
|
|
|
- if (useSasl) {
|
|
|
|
- final InputStream in2 = inStream;
|
|
|
|
- final OutputStream out2 = outStream;
|
|
|
|
- UserGroupInformation ticket = remoteId.getTicket();
|
|
|
|
- if (authMethod == AuthMethod.KERBEROS) {
|
|
|
|
- if (ticket.getRealUser() != null) {
|
|
|
|
- ticket = ticket.getRealUser();
|
|
|
|
|
|
+ setupConnection();
|
|
|
|
+ InputStream inStream = NetUtils.getInputStream(socket);
|
|
|
|
+ OutputStream outStream = NetUtils.getOutputStream(socket);
|
|
|
|
+ writeRpcHeader(outStream);
|
|
|
|
+ if (useSasl) {
|
|
|
|
+ final InputStream in2 = inStream;
|
|
|
|
+ final OutputStream out2 = outStream;
|
|
|
|
+ UserGroupInformation ticket = remoteId.getTicket();
|
|
|
|
+ if (authMethod == AuthMethod.KERBEROS) {
|
|
|
|
+ if (ticket.getRealUser() != null) {
|
|
|
|
+ ticket = ticket.getRealUser();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
- @Override
|
|
|
|
- public Boolean run() throws IOException {
|
|
|
|
- try {
|
|
|
|
- return setupSaslConnection(in2, out2);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- handleConnectionFailure(1, 1, ie);
|
|
|
|
- throw ie;
|
|
|
|
|
|
+ boolean continueSasl = false;
|
|
|
|
+ try {
|
|
|
|
+ continueSasl =
|
|
|
|
+ ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean run() throws IOException {
|
|
|
|
+ return setupSaslConnection(in2, out2);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ if (rand == null) {
|
|
|
|
+ rand = new Random();
|
|
}
|
|
}
|
|
|
|
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
|
|
|
|
+ ticket);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if (continueSasl) {
|
|
|
|
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
|
|
|
|
+ inStream = saslRpcClient.getInputStream(inStream);
|
|
|
|
+ outStream = saslRpcClient.getOutputStream(outStream);
|
|
|
|
+ } else {
|
|
|
|
+ // fall back to simple auth because server told us so.
|
|
|
|
+ authMethod = AuthMethod.SIMPLE;
|
|
|
|
+ header = new ConnectionHeader(header.getProtocol(),
|
|
|
|
+ header.getUgi(), authMethod);
|
|
|
|
+ useSasl = false;
|
|
}
|
|
}
|
|
- })) {
|
|
|
|
- // Sasl connect is successful. Let's set up Sasl i/o streams.
|
|
|
|
- inStream = saslRpcClient.getInputStream(inStream);
|
|
|
|
- outStream = saslRpcClient.getOutputStream(outStream);
|
|
|
|
- } else {
|
|
|
|
- // fall back to simple auth because server told us so.
|
|
|
|
- authMethod = AuthMethod.SIMPLE;
|
|
|
|
- header = new ConnectionHeader(header.getProtocol(),
|
|
|
|
- header.getUgi(), authMethod);
|
|
|
|
- useSasl = false;
|
|
|
|
}
|
|
}
|
|
- }
|
|
|
|
- this.in = new DataInputStream(new BufferedInputStream
|
|
|
|
- (new PingInputStream(inStream)));
|
|
|
|
- this.out = new DataOutputStream
|
|
|
|
- (new BufferedOutputStream(outStream));
|
|
|
|
- writeHeader();
|
|
|
|
|
|
+ this.in = new DataInputStream(new BufferedInputStream
|
|
|
|
+ (new PingInputStream(inStream)));
|
|
|
|
+ this.out = new DataOutputStream
|
|
|
|
+ (new BufferedOutputStream(outStream));
|
|
|
|
+ writeHeader();
|
|
|
|
|
|
- // update last activity time
|
|
|
|
- touch();
|
|
|
|
|
|
+ // update last activity time
|
|
|
|
+ touch();
|
|
|
|
|
|
- // start the receiver thread after the socket connection has been set up
|
|
|
|
- start();
|
|
|
|
|
|
+ // start the receiver thread after the socket connection has been set up
|
|
|
|
+ start();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
markClosed(e);
|
|
markClosed(e);
|
|
close();
|
|
close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void closeConnection() {
|
|
|
|
+ // close the current connection
|
|
|
|
+ try {
|
|
|
|
+ socket.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.warn("Not able to close a socket", e);
|
|
|
|
+ }
|
|
|
|
+ // set socket to null so that the next call to setupIOstreams
|
|
|
|
+ // can start the process of connect all over again.
|
|
|
|
+ socket = null;
|
|
|
|
+ }
|
|
|
|
|
|
/* Handle connection failures
|
|
/* Handle connection failures
|
|
*
|
|
*
|
|
@@ -498,15 +553,8 @@ public class Client {
|
|
*/
|
|
*/
|
|
private void handleConnectionFailure(
|
|
private void handleConnectionFailure(
|
|
int curRetries, int maxRetries, IOException ioe) throws IOException {
|
|
int curRetries, int maxRetries, IOException ioe) throws IOException {
|
|
- // close the current connection
|
|
|
|
- try {
|
|
|
|
- socket.close();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Not able to close a socket", e);
|
|
|
|
- }
|
|
|
|
- // set socket to null so that the next call to setupIOstreams
|
|
|
|
- // can start the process of connect all over again.
|
|
|
|
- socket = null;
|
|
|
|
|
|
+
|
|
|
|
+ closeConnection();
|
|
|
|
|
|
// throw the exception if the maximum number of retries is reached
|
|
// throw the exception if the maximum number of retries is reached
|
|
if (curRetries >= maxRetries) {
|
|
if (curRetries >= maxRetries) {
|