|
@@ -36,6 +36,7 @@ import java.io.OutputStream;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.Hashtable;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.Random;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@@ -367,53 +368,109 @@ public class Client {
|
|
|
if (saslRpcClient != null) {
|
|
|
try {
|
|
|
saslRpcClient.dispose();
|
|
|
+ saslRpcClient = null;
|
|
|
} catch (IOException ignored) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
|
|
|
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
|
|
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
|
|
+ UserGroupInformation realUser = currentUser.getRealUser();
|
|
|
+ if (authMethod == AuthMethod.KERBEROS && loginUser != null &&
|
|
|
+ // Make sure user logged in using Kerberos either keytab or TGT
|
|
|
+ loginUser.hasKerberosCredentials() &&
|
|
|
+ // relogin only in case it is the login user (e.g. JT)
|
|
|
+ // or superuser (like oozie).
|
|
|
+ (loginUser.equals(currentUser) || loginUser.equals(realUser))) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
private synchronized boolean setupSaslConnection(final InputStream in2,
|
|
|
final OutputStream out2)
|
|
|
throws IOException {
|
|
|
- try {
|
|
|
- saslRpcClient = new SaslRpcClient(authMethod, token,
|
|
|
- serverPrincipal);
|
|
|
- return saslRpcClient.saslConnect(in2, out2);
|
|
|
- } catch (javax.security.sasl.SaslException je) {
|
|
|
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
|
|
- UserGroupInformation currentUser =
|
|
|
- UserGroupInformation.getCurrentUser();
|
|
|
- UserGroupInformation realUser = currentUser.getRealUser();
|
|
|
- if (authMethod == AuthMethod.KERBEROS &&
|
|
|
- //try setting up the connection again
|
|
|
- // relogin only in case it is the login user (e.g. JT)
|
|
|
- // or superuser (like oozie).
|
|
|
- ((currentUser != null && currentUser.equals(loginUser)) ||
|
|
|
- (realUser != null && realUser.equals(loginUser)))) {
|
|
|
- try {
|
|
|
- //try re-login
|
|
|
- if (UserGroupInformation.isLoginKeytabBased()) {
|
|
|
- loginUser.reloginFromKeytab();
|
|
|
+ saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
|
|
|
+ return saslRpcClient.saslConnect(in2, out2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void setupConnection() throws IOException {
|
|
|
+ short ioFailures = 0;
|
|
|
+ short timeoutFailures = 0;
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ this.socket = socketFactory.createSocket();
|
|
|
+ this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
+ // connection time out is 20s
|
|
|
+ NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
|
|
|
+ this.socket.setSoTimeout(pingInterval);
|
|
|
+ return;
|
|
|
+ } catch (SocketTimeoutException toe) {
|
|
|
+ /*
|
|
|
+ * The max number of retries is 45, which amounts to 20s*45 = 15
|
|
|
+ * minutes retries.
|
|
|
+ */
|
|
|
+ handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If multiple clients with the same principal try to connect to the same
|
|
|
+ * server at the same time, the server assumes a replay attack is in
|
|
|
+ * progress. This is a feature of kerberos. In order to work around this,
|
|
|
+ * what is done is that the client backs off randomly and tries to initiate
|
|
|
+ * the connection again. The other problem is to do with ticket expiry. To
|
|
|
+ * handle that, a relogin is attempted.
|
|
|
+ */
|
|
|
+ private synchronized void handleSaslConnectionFailure(
|
|
|
+ final int currRetries, final int maxRetries, final Exception ex,
|
|
|
+ final Random rand, final UserGroupInformation ugi) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ public Object run() throws IOException, InterruptedException {
|
|
|
+ final short MAX_BACKOFF = 5000;
|
|
|
+ closeConnection();
|
|
|
+ disposeSasl();
|
|
|
+ if (shouldAuthenticateOverKrb()) {
|
|
|
+ if (currRetries < maxRetries) {
|
|
|
+ LOG.debug("Exception encountered while connecting to "
|
|
|
+ + "the server : " + ex);
|
|
|
+ // try re-login
|
|
|
+ if (UserGroupInformation.isLoginKeytabBased()) {
|
|
|
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
|
|
|
+ } else {
|
|
|
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
|
|
|
+ }
|
|
|
+ // have granularity of milliseconds
|
|
|
+ //we are sleeping with the Connection lock held but since this
|
|
|
+ //connection instance is being used for connecting to the server
|
|
|
+ //in question, it is okay
|
|
|
+ Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1));
|
|
|
+ return null;
|
|
|
} else {
|
|
|
- loginUser.reloginFromTicketCache();
|
|
|
+ String msg = "Couldn't setup connection for "
|
|
|
+ + UserGroupInformation.getLoginUser().getUserName() + " to "
|
|
|
+ + serverPrincipal;
|
|
|
+ LOG.warn(msg);
|
|
|
+ throw (IOException) new IOException(msg).initCause(ex);
|
|
|
}
|
|
|
- disposeSasl();
|
|
|
- saslRpcClient = new SaslRpcClient(authMethod, token,
|
|
|
- serverPrincipal);
|
|
|
- return saslRpcClient.saslConnect(in2, out2);
|
|
|
- } catch (javax.security.sasl.SaslException jee) {
|
|
|
- LOG.warn("Couldn't setup connection for " +
|
|
|
- loginUser.getUserName() +
|
|
|
- " to " + serverPrincipal + " even after relogin.");
|
|
|
- throw jee;
|
|
|
- } catch (IOException ie) {
|
|
|
- ie.initCause(je);
|
|
|
- throw ie;
|
|
|
+ } else {
|
|
|
+ LOG.warn("Exception encountered while connecting to "
|
|
|
+ + "the server : " + ex);
|
|
|
}
|
|
|
- }
|
|
|
- throw je;
|
|
|
- }
|
|
|
+ if (ex instanceof RemoteException)
|
|
|
+ throw (RemoteException) ex;
|
|
|
+ throw new IOException(ex);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
/** Connect to the server and set up the I/O streams. It then sends
|
|
|
* a header to the server and starts
|
|
|
* the connection thread that waits for responses.
|
|
@@ -421,81 +478,95 @@ public class Client {
|
|
|
private synchronized void setupIOstreams() throws InterruptedException {
|
|
|
if (socket != null || shouldCloseConnection.get()) {
|
|
|
return;
|
|
|
- }
|
|
|
-
|
|
|
- short ioFailures = 0;
|
|
|
- short timeoutFailures = 0;
|
|
|
+ }
|
|
|
try {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Connecting to "+server);
|
|
|
}
|
|
|
+ short numRetries = 0;
|
|
|
+ final short MAX_RETRIES = 5;
|
|
|
+ Random rand = null;
|
|
|
while (true) {
|
|
|
- try {
|
|
|
- this.socket = socketFactory.createSocket();
|
|
|
- this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
- // connection time out is 20s
|
|
|
- NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
|
|
|
- this.socket.setSoTimeout(pingInterval);
|
|
|
- break;
|
|
|
- } catch (SocketTimeoutException toe) {
|
|
|
- /* The max number of retries is 45,
|
|
|
- * which amounts to 20s*45 = 15 minutes retries.
|
|
|
- */
|
|
|
- handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
|
- } catch (IOException ie) {
|
|
|
- handleConnectionFailure(ioFailures++, maxRetries, ie);
|
|
|
- }
|
|
|
- }
|
|
|
- InputStream inStream = NetUtils.getInputStream(socket);
|
|
|
- OutputStream outStream = NetUtils.getOutputStream(socket);
|
|
|
- writeRpcHeader(outStream);
|
|
|
- if (useSasl) {
|
|
|
- final InputStream in2 = inStream;
|
|
|
- final OutputStream out2 = outStream;
|
|
|
- UserGroupInformation ticket = remoteId.getTicket();
|
|
|
- if (authMethod == AuthMethod.KERBEROS) {
|
|
|
- if (ticket.getRealUser() != null) {
|
|
|
- ticket = ticket.getRealUser();
|
|
|
+ setupConnection();
|
|
|
+ InputStream inStream = NetUtils.getInputStream(socket);
|
|
|
+ OutputStream outStream = NetUtils.getOutputStream(socket);
|
|
|
+ writeRpcHeader(outStream);
|
|
|
+ if (useSasl) {
|
|
|
+ final InputStream in2 = inStream;
|
|
|
+ final OutputStream out2 = outStream;
|
|
|
+ UserGroupInformation ticket = remoteId.getTicket();
|
|
|
+ if (authMethod == AuthMethod.KERBEROS) {
|
|
|
+ if (ticket.getRealUser() != null) {
|
|
|
+ ticket = ticket.getRealUser();
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean run() throws IOException {
|
|
|
- return setupSaslConnection(in2, out2);
|
|
|
+ boolean continueSasl = false;
|
|
|
+ try {
|
|
|
+ continueSasl = ticket
|
|
|
+ .doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean run() throws IOException {
|
|
|
+ return setupSaslConnection(in2, out2);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception ex) {
|
|
|
+ if (rand == null) {
|
|
|
+ rand = new Random();
|
|
|
+ }
|
|
|
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
|
|
|
+ ticket);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (continueSasl) {
|
|
|
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
|
|
|
+ inStream = saslRpcClient.getInputStream(inStream);
|
|
|
+ outStream = saslRpcClient.getOutputStream(outStream);
|
|
|
+ } else {
|
|
|
+ // fall back to simple auth because server told us so.
|
|
|
+ authMethod = AuthMethod.SIMPLE;
|
|
|
+ header = new ConnectionHeader(header.getProtocol(), header
|
|
|
+ .getUgi(), authMethod);
|
|
|
+ useSasl = false;
|
|
|
}
|
|
|
- })) {
|
|
|
- // Sasl connect is successful. Let's set up Sasl i/o streams.
|
|
|
- inStream = saslRpcClient.getInputStream(inStream);
|
|
|
- outStream = saslRpcClient.getOutputStream(outStream);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (doPing) {
|
|
|
+ this.in = new DataInputStream(new BufferedInputStream(
|
|
|
+ new PingInputStream(inStream)));
|
|
|
} else {
|
|
|
- // fall back to simple auth because server told us so.
|
|
|
- authMethod = AuthMethod.SIMPLE;
|
|
|
- header = new ConnectionHeader(header.getProtocol(),
|
|
|
- header.getUgi(), authMethod);
|
|
|
- useSasl = false;
|
|
|
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
|
|
|
}
|
|
|
- }
|
|
|
- if (doPing) {
|
|
|
- this.in = new DataInputStream(new BufferedInputStream
|
|
|
- (new PingInputStream(inStream)));
|
|
|
- } else {
|
|
|
- this.in = new DataInputStream(new BufferedInputStream
|
|
|
- (inStream));
|
|
|
- }
|
|
|
- this.out = new DataOutputStream
|
|
|
- (new BufferedOutputStream(outStream));
|
|
|
- writeHeader();
|
|
|
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
|
|
|
+ writeHeader();
|
|
|
|
|
|
- // update last activity time
|
|
|
- touch();
|
|
|
+ // update last activity time
|
|
|
+ touch();
|
|
|
|
|
|
- // start the receiver thread after the socket connection has been set up
|
|
|
- start();
|
|
|
+ // start the receiver thread after the socket connection has been set
|
|
|
+ // up
|
|
|
+ start();
|
|
|
+ return;
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
markClosed(e);
|
|
|
close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void closeConnection() {
|
|
|
+ if (socket == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // close the current connection
|
|
|
+ try {
|
|
|
+ socket.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Not able to close a socket", e);
|
|
|
+ }
|
|
|
+ // set socket to null so that the next call to setupIOstreams
|
|
|
+ // can start the process of connect all over again.
|
|
|
+ socket = null;
|
|
|
+ }
|
|
|
|
|
|
/* Handle connection failures
|
|
|
*
|
|
@@ -513,17 +584,8 @@ public class Client {
|
|
|
*/
|
|
|
private void handleConnectionFailure(
|
|
|
int curRetries, int maxRetries, IOException ioe) throws IOException {
|
|
|
- // close the current connection
|
|
|
- if (socket != null) {
|
|
|
- try {
|
|
|
- socket.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Not able to close a socket", e);
|
|
|
- }
|
|
|
- }
|
|
|
- // set socket to null so that the next call to setupIOstreams
|
|
|
- // can start the process of connect all over again.
|
|
|
- socket = null;
|
|
|
+
|
|
|
+ closeConnection();
|
|
|
|
|
|
// throw the exception if the maximum number of retries is reached
|
|
|
if (curRetries >= maxRetries) {
|