فهرست منبع

Merge -r 699055:699056 from trunk to branch 19 to move the change log of HADDOOP-4116.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@699058 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 سال پیش
والد
کامیت
d08dff83dd

+ 2 - 0
CHANGES.txt

@@ -724,6 +724,8 @@ Release 0.19.0 - Unreleased
     HADOOP-4249. Fix eclipse path to include the hsqldb.jar. (szetszwo via
     omalley)
 
+    HADOOP-4116. Balancer should provide better resource management. (hairong)
+
 Release 0.18.1 - 2008-09-17
 
   IMPROVEMENTS

+ 8 - 4
src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -31,11 +31,15 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 13:
-   *    Added a new operation, OP_BLOCK_CHECKSUM, for obtaining
-   *    the checksum of a block from a datanode.
+   * Version 14:
+   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
+   *    including the block id, source, and proxy.
+   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
+   *    only the block id.
+   *    A reply to OP_COPY_BLOCK sends the block content.
+   *    A reply to OP_REPLACE_BLOCK includes an operation status.
    */
-  public static final int DATA_TRANSFER_VERSION = 13;
+  public static final int DATA_TRANSFER_VERSION = 14;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;

+ 34 - 36
src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -28,7 +28,6 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.SocketTimeoutException;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -180,6 +179,11 @@ public class Balancer implements Tool {
     LogFactory.getLog(Balancer.class.getName());
   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
 
+  /** The maximum number of concurrent blocks moves for 
+   * balancing purpose at a datanode
+   */
+  public static final int MAX_NUM_CONCURRENT_MOVES = 5;
+  
   private Configuration conf;
 
   private double threshold = 10D;
@@ -214,10 +218,10 @@ public class Balancer implements Tool {
   
   private double avgUtilization = 0.0D;
   
-  final private int MOVER_THREAD_POOL_SIZE = 1000;
+  final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
-  final private int DISPATCHER_THREAD_POOL_SIZE = 200;
+  final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
   final private ExecutorService dispatcherExecutor =
     Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
   
@@ -262,11 +266,13 @@ public class Balancer implements Tool {
             this.block = block;
             if ( chooseProxySource() ) {
               addToMoved(block);
-              LOG.info("Decided to move block "+ block.getBlockId()
-                  +" with a length of "+FsShell.byteDesc(block.getNumBytes())
-                  + " bytes from " + source.getName() 
-                  + " to " + target.getName()
-                  + " using proxy source " + proxySource.getName() );
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Decided to move block "+ block.getBlockId()
+                    +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+                    + " bytes from " + source.getName() 
+                    + " to " + target.getName()
+                    + " using proxy source " + proxySource.getName() );
+              }
               return true;
             }
           }
@@ -306,11 +312,9 @@ public class Balancer implements Tool {
       DataOutputStream out = null;
       DataInputStream in = null;
       try {
-        sock.connect(DataNode.createSocketAddr(
-            proxySource.datanode.getName()), HdfsConstants.READ_TIMEOUT);
-        long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
-        sock.setSoTimeout(2*HdfsConstants.READ_TIMEOUT+
-            (int)(block.getNumBytes()*1500/bandwidth));
+        sock.connect(NetUtils.createSocketAddr(
+            target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
+        sock.setKeepAlive(true);
         out = new DataOutputStream( new BufferedOutputStream(
             sock.getOutputStream(), FSConstants.BUFFER_SIZE));
         sendRequest(out);
@@ -318,25 +322,17 @@ public class Balancer implements Tool {
             sock.getInputStream(), FSConstants.BUFFER_SIZE));
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug( "Moving block " + block.getBlock().getBlockId() +
+        LOG.info( "Moving block " + block.getBlock().getBlockId() +
               " from "+ source.getName() + " to " +
               target.getName() + " through " +
               proxySource.getName() +
-              " succeeded." );
-        }
-      } catch (SocketTimeoutException te) { 
-        LOG.warn("Timeout moving block "+block.getBlockId()+
-            " from " + source.getName() + " to " +
-            target.getName() + " through " +
-            proxySource.getName());
+              " is succeeded." );
       } catch (IOException e) {
         LOG.warn("Error moving block "+block.getBlockId()+
             " from " + source.getName() + " to " +
             target.getName() + " through " +
             proxySource.getName() +
-            ": "+e.getMessage()+ "\n" +
-            StringUtils.stringifyException(e) );
+            ": "+e.getMessage());
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -356,14 +352,14 @@ public class Balancer implements Tool {
       }
     }
     
-    /* Send a block copy request to the outputstream*/
+    /* Send a block replace request to the output stream*/
     private void sendRequest(DataOutputStream out) throws IOException {
       out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-      out.writeByte(DataTransferProtocol.OP_COPY_BLOCK);
+      out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
       out.writeLong(block.getBlock().getBlockId());
       out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
-      target.write(out);
+      proxySource.write(out);
       out.flush();
     }
     
@@ -371,11 +367,7 @@ public class Balancer implements Tool {
     private void receiveResponse(DataInputStream in) throws IOException {
       short status = in.readShort();
       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        throw new IOException("Moving block "+block.getBlockId()+
-            " from "+source.getName() + " to " +
-            target.getName() + " through " +
-            proxySource.getName() +
-        "failed");
+        throw new IOException("block move is failed");
       }
     }
 
@@ -391,8 +383,10 @@ public class Balancer implements Tool {
     private void scheduleBlockMove() {
       moverExecutor.execute(new Runnable() {
         public void run() {
-          LOG.info("Starting moving "+ block.getBlockId() +
-              " from " + proxySource.getName() + " to " + target.getName());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Starting moving "+ block.getBlockId() +
+                " from " + proxySource.getName() + " to " + target.getName());
+          }
           dispatch();
         }
       });
@@ -482,8 +476,6 @@ public class Balancer implements Tool {
   /* A class that keeps track of a datanode in Balancer */
   private static class BalancerDatanode implements Writable {
     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
-    final protected static short MAX_NUM_CONCURRENT_MOVES =
-      DataNode.MAX_BALANCING_THREADS;
     protected DatanodeInfo datanode;
     private double utilization;
     protected long maxSizeToMove;
@@ -920,6 +912,9 @@ public class Balancer implements Tool {
     // compute average utilization
     long totalCapacity=0L, totalUsedSpace=0L;
     for (DatanodeInfo datanode : datanodes) {
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+        continue; // ignore decommissioning or decommissioned nodes
+      }
       totalCapacity += datanode.getCapacity();
       totalUsedSpace += datanode.getDfsUsed();
     }
@@ -933,6 +928,9 @@ public class Balancer implements Tool {
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     shuffleArray(datanodes);
     for (DatanodeInfo datanode : datanodes) {
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+        continue; // ignore decommissioning or decommissioned nodes
+      }
       cluster.add(datanode);
       BalancerDatanode datanodeS;
       if (getUtilization(datanode) > avgUtilization) {

+ 0 - 12
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -39,7 +39,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.Semaphore;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -182,12 +181,6 @@ public class DataNode extends Configured
   
   private static final Random R = new Random();
   
-    // The following three fields are to support balancing
-  public final static short MAX_BALANCING_THREADS = 5;
-  protected Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
-  long balanceBandwidth;
-  protected BlockTransferThrottler balancingThrottler;
-  
   // For InterDataNodeProtocol
   public Server ipcServer;
 
@@ -328,11 +321,6 @@ public class DataNode extends Configured
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
     DataNode.nameNodeAddr = nameNodeAddr;
 
-    //set up parameter for cluster balancing
-    this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
-    LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
-    this.balancingThrottler = new BlockTransferThrottler(balanceBandwidth);   
-
     //initialize periodic block scanner
     String reason = null;
     if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {

+ 86 - 78
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -475,69 +475,50 @@ class DataXceiver implements Runnable, FSConstants {
     // Read in the header
     long blockId = in.readLong(); // read block id
     Block block = new Block(blockId, 0, in.readLong());
-    String source = Text.readString(in); // read del hint
-    DatanodeInfo target = new DatanodeInfo(); // read target
-    target.readFields(in);
 
-    Socket targetSock = null;
-    short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
+    if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
+      LOG.info("Not able to copy block " + blockId + " to " 
+          + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+      return;
+    }
+
     BlockSender blockSender = null;
-    DataOutputStream targetOut = null;
+    DataOutputStream reply = null;
+    boolean isOpSuccess = true;
+
     try {
-      datanode.balancingSem.acquireUninterruptibly();
-      
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, false, 
           datanode);
 
-      // get the output stream to the target
-      InetSocketAddress targetAddr = NetUtils.createSocketAddr(
-          target.getName());
-      targetSock = datanode.newSocket();
-      targetSock.connect(targetAddr, datanode.socketTimeout);
-      targetSock.setSoTimeout(datanode.socketTimeout);
+      // set up response stream
+      OutputStream baseStream = NetUtils.getOutputStream(
+          s, datanode.socketWriteTimeout);
+      reply = new DataOutputStream(new BufferedOutputStream(
+          baseStream, SMALL_BUFFER_SIZE));
 
-      OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
-          datanode.socketWriteTimeout);
-      targetOut = new DataOutputStream(
-                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-
-      /* send request to the target */
-      // fist write header info
-      targetOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
-      targetOut.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); // op code
-      targetOut.writeLong(block.getBlockId()); // block id
-      targetOut.writeLong(block.getGenerationStamp()); // block id
-      Text.writeString( targetOut, source); // del hint
-
-      // then send data
-      long read = blockSender.sendBlock(targetOut, baseStream, 
-                                        datanode.balancingThrottler);
+      // send block content to the target
+      long read = blockSender.sendBlock(reply, baseStream, 
+                                        dataXceiverServer.balanceThrottler);
 
       datanode.myMetrics.bytesRead.inc((int) read);
       datanode.myMetrics.blocksRead.inc();
       
-      // check the response from target
-      receiveResponse(targetSock, 1);
-
-      LOG.info("Copied block " + block + " to " + targetAddr);
+      LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
     } catch (IOException ioe) {
-      opStatus = DataTransferProtocol.OP_STATUS_ERROR;
-      LOG.warn("Got exception while serving " + block + " to "
-          + target.getName() + ": " + StringUtils.stringifyException(ioe));
+      isOpSuccess = false;
       throw ioe;
     } finally {
-      /* send response to the requester */
-      try {
-        sendResponse(s, opStatus, datanode.socketWriteTimeout);
-      } catch (IOException replyE) {
-        LOG.warn("Error writing the response back to "+
-            s.getRemoteSocketAddress() + "\n" +
-            StringUtils.stringifyException(replyE) );
+      dataXceiverServer.balanceThrottler.release();
+      if (isOpSuccess) {
+        try {
+          // send one last byte to indicate that the resource is cleaned.
+          reply.writeChar('d');
+        } catch (IOException ignored) {
+        }
       }
-      IOUtils.closeStream(targetOut);
+      IOUtils.closeStream(reply);
       IOUtils.closeStream(blockSender);
-      datanode.balancingSem.release();
     }
   }
 
@@ -549,67 +530,94 @@ class DataXceiver implements Runnable, FSConstants {
    * @throws IOException
    */
   private void replaceBlock(DataInputStream in) throws IOException {
-    datanode.balancingSem.acquireUninterruptibly();
-
     /* read header */
-    Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize,
-        in.readLong()); // block id & len
-    String sourceID = Text.readString(in);
+    long blockId = in.readLong();
+    Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
+        in.readLong()); // block id & generation stamp
+    String sourceID = Text.readString(in); // read del hint
+    DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
+    proxySource.readFields(in);
+
+    if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
+      LOG.warn("Not able to receive block " + blockId + " from " 
+          + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
+          datanode.socketWriteTimeout);
+      return;
+    }
 
+    Socket proxySock = null;
+    DataOutputStream proxyOut = null;
     short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
     BlockReceiver blockReceiver = null;
+    DataInputStream proxyReply = null;
+    
     try {
+      // get the output stream to the proxy
+      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
+          proxySource.getName());
+      proxySock = datanode.newSocket();
+      proxySock.connect(proxyAddr, datanode.socketTimeout);
+      proxySock.setSoTimeout(datanode.socketTimeout);
+
+      OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
+          datanode.socketWriteTimeout);
+      proxyOut = new DataOutputStream(
+                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+      /* send request to the proxy */
+      proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
+      proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
+      proxyOut.writeLong(block.getBlockId()); // block id
+      proxyOut.writeLong(block.getGenerationStamp()); // block id
+      proxyOut.flush();
+
+      // receive the response from the proxy
+      proxyReply = new DataInputStream(new BufferedInputStream(
+          NetUtils.getInputStream(proxySock), BUFFER_SIZE));
       // open a block receiver and check if the block does not exist
-       blockReceiver = new BlockReceiver(
-          block, in, s.getRemoteSocketAddress().toString(),
-          s.getLocalSocketAddress().toString(), false, "", null, datanode);
+      blockReceiver = new BlockReceiver(
+          block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+          proxySock.getLocalSocketAddress().toString(),
+          false, "", null, datanode);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          datanode.balancingThrottler, -1);
+          dataXceiverServer.balanceThrottler, -1);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
 
       LOG.info("Moved block " + block + 
           " from " + s.getRemoteSocketAddress());
+      
     } catch (IOException ioe) {
       opStatus = DataTransferProtocol.OP_STATUS_ERROR;
       throw ioe;
     } finally {
+      // receive the last byte that indicates the proxy released its thread resource
+      if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
+        try {
+          proxyReply.readChar();
+        } catch (IOException ignored) {
+        }
+      }
+      
+      // now release the thread resource
+      dataXceiverServer.balanceThrottler.release();
+      
       // send response back
       try {
         sendResponse(s, opStatus, datanode.socketWriteTimeout);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
+      IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
-      datanode.balancingSem.release();
+      IOUtils.closeStream(proxyReply);
     }
   }
   
-  /**
-   *  Utility function for receiving a response.
-   *  @param s socket to read from
-   *  @param numTargets number of responses to read
-   **/
-  private void receiveResponse(Socket s, int numTargets) throws IOException {
-    // check the response
-    DataInputStream reply = new DataInputStream(new BufferedInputStream(
-                                NetUtils.getInputStream(s), BUFFER_SIZE));
-    try {
-      for (int i = 0; i < numTargets; i++) {
-        short opStatus = reply.readShort();
-        if(opStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
-          throw new IOException("operation failed at "+
-              s.getInetAddress());
-        } 
-      }
-    } finally {
-      IOUtils.closeStream(reply);
-    }
-  }
-
   /**
    * Utility function for sending a response.
    * @param s socket to write to

+ 44 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 
@@ -54,6 +55,45 @@ class DataXceiverServer implements Runnable, FSConstants {
   static final int MAX_XCEIVER_COUNT = 256;
   int maxXceiverCount = MAX_XCEIVER_COUNT;
 
+  /** A manager to make sure that cluster balancing does not
+   * take too much resources.
+   * 
+   * It limits the number of block moves for balancing and
+   * the total amount of bandwidth they can use.
+   */
+  static class BlockBalanceThrottler extends BlockTransferThrottler {
+   private int numThreads;
+   
+   /**Constructor
+    * 
+    * @param bandwidth Total amount of bandwidth can be used for balancing 
+    */
+   private BlockBalanceThrottler(long bandwidth) {
+     super(bandwidth);
+     LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+   }
+   
+   /** Check if the block move can start. 
+    * 
+    * Return true if the thread quota is not exceeded and 
+    * the counter is incremented; False otherwise.
+    */
+   synchronized boolean acquire() {
+     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+       return false;
+     }
+     numThreads++;
+     return true;
+   }
+   
+   /** Mark that the move is completed. The thread counter is decremented. */
+   synchronized void release() {
+     numThreads--;
+   }
+  }
+
+  BlockBalanceThrottler balanceThrottler;
+  
   /**
    * We need an estimate for block size to check if the disk partition has
    * enough space. For now we set it to be the default block size set
@@ -75,6 +115,10 @@ class DataXceiverServer implements Runnable, FSConstants {
         MAX_XCEIVER_COUNT);
     
     this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    
+    //set up parameter for cluster balancing
+    this.balanceThrottler = new BlockBalanceThrottler(
+      conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
   }
 
   /**

+ 5 - 5
src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -213,7 +213,7 @@ public class TestBlockReplacement extends TestCase {
   }
 
   /* Copy a block from sourceProxy to destination. If the block becomes
-   * overreplicated, preferrably remove it from source.
+   * over-replicated, preferably remove it from source.
    * 
    * Return true if a block is successfully copied; otherwise false.
    */
@@ -221,16 +221,16 @@ public class TestBlockReplacement extends TestCase {
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
     Socket sock = new Socket();
     sock.connect(NetUtils.createSocketAddr(
-        sourceProxy.getName()), HdfsConstants.READ_TIMEOUT);
-    sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+        destination.getName()), HdfsConstants.READ_TIMEOUT);
+    sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
     out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    out.writeByte(DataTransferProtocol.OP_COPY_BLOCK);
+    out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
     out.writeLong(block.getBlockId());
     out.writeLong(block.getGenerationStamp());
     Text.writeString(out, source.getStorageID());
-    destination.write(out);
+    sourceProxy.write(out);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());