Bläddra i källkod

HADOOP-11780. Prevent IPC reader thread death. Contributed by Daryn Sharp.

(cherry picked from commit e19b37ead23805c7ed45bdcbfa7fdc8898cde7b2)
Kihwal Lee 8 år sedan
förälder
incheckning
baf8aac05c

+ 22 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -109,6 +109,7 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -955,10 +956,16 @@ public abstract class Server {
             while (iter.hasNext()) {
               key = iter.next();
               iter.remove();
-              if (key.isValid()) {
+              try {
                 if (key.isReadable()) {
                   doRead(key);
                 }
+              } catch (CancelledKeyException cke) {
+                // something else closed the connection, ex. responder or
+                // the listener doing an idle scan.  ignore it and let them
+                // clean up.
+                LOG.info(Thread.currentThread().getName() +
+                    ": connection aborted from " + key.attachment());
               }
               key = null;
             }
@@ -968,6 +975,9 @@ public abstract class Server {
             }
           } catch (IOException ex) {
             LOG.error("Error in Reader", ex);
+          } catch (Throwable re) {
+            LOG.fatal("Bug in read selector!", re);
+            ExitUtil.terminate(1, "Bug in read selector!");
           }
         }
       }
@@ -1186,8 +1196,17 @@ public abstract class Server {
             SelectionKey key = iter.next();
             iter.remove();
             try {
-              if (key.isValid() && key.isWritable()) {
-                  doAsyncWrite(key);
+              if (key.isWritable()) {
+                doAsyncWrite(key);
+              }
+            } catch (CancelledKeyException cke) {
+              // something else closed the connection, ex. reader or the
+              // listener doing an idle scan.  ignore it and let them clean
+              // up
+              RpcCall call = (RpcCall)key.attachment();
+              if (call != null) {
+                LOG.info(Thread.currentThread().getName() +
+                    ": connection aborted from " + call.connection);
               }
             } catch (IOException e) {
               LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);