Browse Source

HADOOP-3633. Merge -r 674593:674594 from branch 0.18 to branch 0.17.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@674611 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 17 years ago
parent
commit
0e395eb077
2 changed files with 44 additions and 33 deletions
  1. 3 0
      CHANGES.txt
  2. 41 33
      src/java/org/apache/hadoop/dfs/DataNode.java

+ 3 - 0
CHANGES.txt

@@ -41,6 +41,9 @@ Release 0.17.1 - 2008-06-23
     HADOOP-1979. Speed up fsck by adding a buffered stream. (Lohit
     Vijaya Renu via omalley)
 
+    HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
+    the number of xceiver threads in a data-node. (shv)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 41 - 33
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -154,10 +154,6 @@ public class DataNode implements FSConstants, Runnable {
     return System.currentTimeMillis();
   }
 
-
-
-
-    
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
@@ -502,8 +498,7 @@ public class DataNode implements FSConstants, Runnable {
           this.threadGroup.interrupt();
           LOG.info("Waiting for threadgroup to exit, active threads is " +
                    this.threadGroup.activeCount());
-          if (this.threadGroup.isDestroyed() ||
-              this.threadGroup.activeCount() == 0) {
+          if (this.threadGroup.activeCount() == 0) {
             break;
           }
           try {
@@ -573,18 +568,18 @@ public class DataNode implements FSConstants, Runnable {
     shutdown();
   }
     
-  private static class Count {
-    int value = 0;
-    Count(int init) { value = init; }
-    synchronized void incr() { value++; }
-    synchronized void decr() { value--; }
-    @Override
-    public String toString() { return Integer.toString(value); }
-    public int getValue() { return value; }
+  /**
+   * Maximal number of concurrent xceivers per node.
+   * Enforcing the limit is required in order to avoid data-node
+   * running out of memory.
+   */
+  private final static int MAX_XCEIVER_COUNT = 256;
+
+  /** Number of concurrent xceivers per node. */
+  int getXceiverCount() {
+    return threadGroup == null ? 0 : threadGroup.activeCount();
   }
     
-  Count xceiverCount = new Count(0);
-    
   /**
    * Main loop for the DataNode.  Runs until shutdown,
    * forever calling remote NameNode functions.
@@ -619,7 +614,7 @@ public class DataNode implements FSConstants, Runnable {
                                                        data.getDfsUsed(),
                                                        data.getRemaining(),
                                                        xmitsInProgress,
-                                                       xceiverCount.getValue());
+                                                       getXceiverCount());
           myMetrics.heartbeats.inc(now() - startTime);
           //LOG.info("Just sent heartbeat, with name " + localName);
           lastHeartbeat = startTime;
@@ -899,15 +894,25 @@ public class DataNode implements FSConstants, Runnable {
     /**
      */
     public void run() {
-      try {
-        while (shouldRun) {
+      while (shouldRun) {
+        try {
           Socket s = ss.accept();
           s.setTcpNoDelay(true);
           new Daemon(threadGroup, new DataXceiver(s)).start();
+        } catch (IOException ie) {
+          LOG.warn(dnRegistration + ":DataXceiveServer: " 
+                                  + StringUtils.stringifyException(ie));
+        } catch (Throwable te) {
+          LOG.error(dnRegistration + ":DataXceiveServer: Exiting due to:" 
+                                   + StringUtils.stringifyException(te));
+          shouldRun = false;
         }
+      }
+      try {
         ss.close();
       } catch (IOException ie) {
-        LOG.info(dnRegistration + ":Exiting DataXceiveServer due to " + ie.toString());
+        LOG.warn(dnRegistration + ":DataXceiveServer: " 
+                                + StringUtils.stringifyException(ie));
       }
     }
     public void kill() {
@@ -915,14 +920,16 @@ public class DataNode implements FSConstants, Runnable {
         "shoudRun should be set to false before killing";
       try {
         this.ss.close();
-      } catch (IOException iex) {
+      } catch (IOException ie) {
+        LOG.warn(dnRegistration + ":DataXceiveServer.kill(): " 
+                                + StringUtils.stringifyException(ie));
       }
 
       // close all the sockets that were accepted earlier
       synchronized (childSockets) {
-        for (Iterator it = childSockets.values().iterator();
+        for (Iterator<Socket> it = childSockets.values().iterator();
              it.hasNext();) {
-          Socket thissock = (Socket) it.next();
+          Socket thissock = it.next();
           try {
             thissock.close();
           } catch (IOException e) {
@@ -945,7 +952,7 @@ public class DataNode implements FSConstants, Runnable {
       InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
       remoteAddress = isock.toString();
       localAddress = s.getInetAddress() + ":" + s.getLocalPort();
-      LOG.debug("Number of active connections is: "+xceiverCount);
+      LOG.debug("Number of active connections is: " + getXceiverCount());
     }
 
     /**
@@ -962,6 +969,13 @@ public class DataNode implements FSConstants, Runnable {
         }
         boolean local = s.getInetAddress().equals(s.getLocalAddress());
         byte op = in.readByte();
+        // Make sure the xciver count is not exceeded
+        int curXceiverCount = getXceiverCount();
+        if(curXceiverCount > MAX_XCEIVER_COUNT) {
+          throw new IOException("xceiverCount " + curXceiverCount
+                                + " exceeds the limit of concurrent xcievers "
+                                + MAX_XCEIVER_COUNT);
+        }
         long startTime = now();
         switch ( op ) {
         case OP_READ_BLOCK:
@@ -998,7 +1012,8 @@ public class DataNode implements FSConstants, Runnable {
       } catch (Throwable t) {
         LOG.error(dnRegistration + ":DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
-        LOG.debug(dnRegistration + ":Number of active connections is: "+xceiverCount);
+        LOG.debug(dnRegistration + ":Number of active connections is: "
+                                 + getXceiverCount());
         IOUtils.closeStream(in);
         IOUtils.closeSocket(s);
         childSockets.remove(s);
@@ -1011,7 +1026,6 @@ public class DataNode implements FSConstants, Runnable {
      * @throws IOException
      */
     private void readBlock(DataInputStream in) throws IOException {
-      xceiverCount.incr();
       //
       // Read in the header
       //
@@ -1064,7 +1078,6 @@ public class DataNode implements FSConstants, Runnable {
                   StringUtils.stringifyException(ioe) );
         throw ioe;
       } finally {
-        xceiverCount.decr();
         IOUtils.closeStream(out);
         IOUtils.closeStream(blockSender);
       }
@@ -1077,7 +1090,6 @@ public class DataNode implements FSConstants, Runnable {
      * @throws IOException
      */
     private void writeBlock(DataInputStream in) throws IOException {
-      xceiverCount.incr();
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
       //
@@ -1229,8 +1241,6 @@ public class DataNode implements FSConstants, Runnable {
         IOUtils.closeStream(replyOut);
         IOUtils.closeSocket(mirrorSock);
         IOUtils.closeStream(blockReceiver);
-        // decrement counter
-        xceiverCount.decr();
       }
     }
 
@@ -1239,8 +1249,6 @@ public class DataNode implements FSConstants, Runnable {
      * @param in
      */
     void readMetadata(DataInputStream in) throws IOException {
-      xceiverCount.incr();
-
       Block block = new Block( in.readLong(), 0 );
       MetaDataInputStream checksumIn = null;
       DataOutputStream out = null;
@@ -1269,7 +1277,6 @@ public class DataNode implements FSConstants, Runnable {
         //last DATA_CHUNK
         out.writeInt(0);
       } finally {
-        xceiverCount.decr();
         IOUtils.closeStream(out);
         IOUtils.closeStream(checksumIn);
       }
@@ -2683,6 +2690,7 @@ public class DataNode implements FSConstants, Runnable {
     }
         
     LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
+    shutdown();
   }
     
   /** Start a single datanode daemon and wait for it to finish.