Преглед на файлове

HADOOP-3935. Split out inner classes from DataNode.java. (johan)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@685979 13f79535-47bb-0310-9956-ffa450edef68
Johan Oskarsson преди 17 години
родител
ревизия
113e442c46

+ 2 - 0
CHANGES.txt

@@ -192,6 +192,8 @@ Trunk (unreleased changes)
     HADOOP-3844. Include message of local exception in RPC client failures.
     HADOOP-3844. Include message of local exception in RPC client failures.
     (Steve Loughran via omalley)
     (Steve Loughran via omalley)
 
 
+    HADOOP-3935. Split out inner classes from DataNode.java. (johan)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

+ 969 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -0,0 +1,969 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/** A class that receives a block and writes to its own disk, meanwhile
+ * may copies it to another site. If a throttler is provided,
+ * streaming throttling is also supported.
+ **/
+class BlockReceiver implements java.io.Closeable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  private Block block; // the block to receive
+  protected boolean finalized;
+  private DataInputStream in = null; // from where data are read
+  private DataChecksum checksum; // from where chunks of a block can be read
+  private OutputStream out = null; // to block file at local disk
+  private DataOutputStream checksumOut = null; // to crc file at local disk
+  private int bytesPerChecksum;
+  private int checksumSize;
+  private ByteBuffer buf; // contains one full packet.
+  private int bufRead; //amount of valid data in the buf
+  private int maxPacketReadLen;
+  protected long offsetInBlock;
+  protected final String inAddr;
+  private String mirrorAddr;
+  private DataOutputStream mirrorOut;
+  private Daemon responder = null;
+  private BlockTransferThrottler throttler;
+  private FSDataset.BlockWriteStreams streams;
+  private boolean isRecovery = false;
+  private String clientName;
+  DatanodeInfo srcDataNode = null;
+  private Checksum partialCrc = null;
+  private DataNode datanode = null;
+
+  BlockReceiver(Block block, DataInputStream in, String inAddr,
+                boolean isRecovery, String clientName, 
+                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
+    try{
+      this.block = block;
+      this.in = in;
+      this.inAddr = inAddr;
+      this.isRecovery = isRecovery;
+      this.clientName = clientName;
+      this.offsetInBlock = 0;
+      this.checksum = DataChecksum.newDataChecksum(in);
+      this.bytesPerChecksum = checksum.getBytesPerChecksum();
+      this.checksumSize = checksum.getChecksumSize();
+      this.srcDataNode = srcDataNode;
+      this.datanode = datanode;
+      //
+      // Open local disk out
+      //
+      streams = datanode.data.writeToBlock(block, isRecovery);
+      this.finalized = datanode.data.isValidBlock(block);
+      if (streams != null) {
+        this.out = streams.dataOut;
+        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+                                                  streams.checksumOut, 
+                                                  SMALL_BUFFER_SIZE));
+        // If this block is for appends, then remove it from periodic
+        // validation.
+        if (datanode.blockScanner != null && isRecovery) {
+          datanode.blockScanner.deleteBlock(block);
+        }
+      }
+    } catch(IOException ioe) {
+      IOUtils.closeStream(this);
+      throw ioe;
+    }
+  }
+
+  /**
+   * close files.
+   */
+  public void close() throws IOException {
+
+    IOException ioe = null;
+    // close checksum file
+    try {
+      if (checksumOut != null) {
+        checksumOut.flush();
+        checksumOut.close();
+        checksumOut = null;
+      }
+    } catch(IOException e) {
+      ioe = e;
+    }
+    // close block file
+    try {
+      if (out != null) {
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException e) {
+      ioe = e;
+    }
+    // disk check
+    if(ioe != null) {
+      datanode.checkDiskError(ioe);
+      throw ioe;
+    }
+  }
+
+  /**
+   * Flush block data and metadata files to disk.
+   * @throws IOException
+   */
+  void flush() throws IOException {
+    if (checksumOut != null) {
+      checksumOut.flush();
+    }
+    if (out != null) {
+      out.flush();
+    }
+  }
+
+  /**
+   * While writing to mirrorOut, failure to write to mirror should not
+   * affect this datanode unless a client is writing the block.
+   */
+  private void handleMirrorOutError(IOException ioe) throws IOException {
+    LOG.info(datanode.dnRegistration + ":Exception writing block " +
+             block + " to mirror " + mirrorAddr + "\n" +
+             StringUtils.stringifyException(ioe));
+    mirrorOut = null;
+    //
+    // If stream-copy fails, continue
+    // writing to disk for replication requests. For client
+    // writes, return error so that the client can do error
+    // recovery.
+    //
+    if (clientName.length() > 0) {
+      throw ioe;
+    }
+  }
+  
+  /**
+   * Verify multiple CRC chunks. 
+   */
+  private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
+                             byte[] checksumBuf, int checksumOff ) 
+                             throws IOException {
+    while (len > 0) {
+      int chunkLen = Math.min(len, bytesPerChecksum);
+      
+      checksum.update(dataBuf, dataOff, chunkLen);
+
+      if (!checksum.compare(checksumBuf, checksumOff)) {
+        if (srcDataNode != null) {
+          try {
+            LOG.info("report corrupt block " + block + " from datanode " +
+                      srcDataNode + " to namenode");
+            LocatedBlock lb = new LocatedBlock(block, 
+                                            new DatanodeInfo[] {srcDataNode});
+            datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
+          } catch (IOException e) {
+            LOG.warn("Failed to report bad block " + block + 
+                      " from datanode " + srcDataNode + " to namenode");
+          }
+        }
+        throw new IOException("Unexpected checksum mismatch " + 
+                              "while writing " + block + " from " + inAddr);
+      }
+
+      checksum.reset();
+      dataOff += chunkLen;
+      checksumOff += checksumSize;
+      len -= chunkLen;
+    }
+  }
+
+  /**
+   * Makes sure buf.position() is zero without modifying buf.remaining().
+   * It moves the data if position needs to be changed.
+   */
+  private void shiftBufData() {
+    if (bufRead != buf.limit()) {
+      throw new IllegalStateException("bufRead should be same as " +
+                                      "buf.limit()");
+    }
+    
+    //shift the remaining data on buf to the front
+    if (buf.position() > 0) {
+      int dataLeft = buf.remaining();
+      if (dataLeft > 0) {
+        byte[] b = buf.array();
+        System.arraycopy(b, buf.position(), b, 0, dataLeft);
+      }
+      buf.position(0);
+      bufRead = dataLeft;
+      buf.limit(bufRead);
+    }
+  }
+  
+  /**
+   * reads upto toRead byte to buf at buf.limit() and increments the limit.
+   * throws an IOException if read does not succeed.
+   */
+  private int readToBuf(int toRead) throws IOException {
+    if (toRead < 0) {
+      toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
+               - buf.limit();
+    }
+    
+    int nRead = in.read(buf.array(), buf.limit(), toRead);
+    
+    if (nRead < 0) {
+      throw new EOFException("while trying to read " + toRead + " bytes");
+    }
+    bufRead = buf.limit() + nRead;
+    buf.limit(bufRead);
+    return nRead;
+  }
+  
+  
+  /**
+   * Reads (at least) one packet and returns the packet length.
+   * buf.position() points to the start of the packet and 
+   * buf.limit() point to the end of the packet. There could 
+   * be more data from next packet in buf.<br><br>
+   * 
+   * It tries to read a full packet with single read call.
+   * Consecutive packets are usually of the same length.
+   */
+  private int readNextPacket() throws IOException {
+    /* This dances around buf a little bit, mainly to read 
+     * full packet with single read and to accept arbitarary size  
+     * for next packet at the same time.
+     */
+    if (buf == null) {
+      /* initialize buffer to the best guess size:
+       * 'chunksPerPacket' calculation here should match the same 
+       * calculation in DFSClient to make the guess accurate.
+       */
+      int chunkSize = bytesPerChecksum + checksumSize;
+      int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN - 
+                             SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
+      buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+                                Math.max(chunksPerPacket, 1) * chunkSize);
+      buf.limit(0);
+    }
+    
+    // See if there is data left in the buffer :
+    if (bufRead > buf.limit()) {
+      buf.limit(bufRead);
+    }
+    
+    while (buf.remaining() < SIZE_OF_INTEGER) {
+      if (buf.position() > 0) {
+        shiftBufData();
+      }
+      readToBuf(-1);
+    }
+    
+    /* We mostly have the full packet or at least enough for an int
+     */
+    buf.mark();
+    int payloadLen = buf.getInt();
+    buf.reset();
+    
+    if (payloadLen == 0) {
+      //end of stream!
+      buf.limit(buf.position() + SIZE_OF_INTEGER);
+      return 0;
+    }
+    
+    // check corrupt values for pktLen, 100MB upper limit should be ok?
+    if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
+      throw new IOException("Incorrect value for packet payload : " +
+                            payloadLen);
+    }
+    
+    int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+    
+    if (buf.remaining() < pktSize) {
+      //we need to read more data
+      int toRead = pktSize - buf.remaining();
+      
+      // first make sure buf has enough space.        
+      int spaceLeft = buf.capacity() - buf.limit();
+      if (toRead > spaceLeft && buf.position() > 0) {
+        shiftBufData();
+        spaceLeft = buf.capacity() - buf.limit();
+      }
+      if (toRead > spaceLeft) {
+        byte oldBuf[] = buf.array();
+        int toCopy = buf.limit();
+        buf = ByteBuffer.allocate(toCopy + toRead);
+        System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
+        buf.limit(toCopy);
+      }
+      
+      //now read:
+      while (toRead > 0) {
+        toRead -= readToBuf(toRead);
+      }
+    }
+    
+    if (buf.remaining() > pktSize) {
+      buf.limit(buf.position() + pktSize);
+    }
+    
+    if (pktSize > maxPacketReadLen) {
+      maxPacketReadLen = pktSize;
+    }
+    
+    return payloadLen;
+  }
+  
+  /** 
+   * Receives and processes a packet. It can contain many chunks.
+   * returns size of the packet.
+   */
+  private int receivePacket() throws IOException {
+    
+    int payloadLen = readNextPacket();
+    
+    if (payloadLen <= 0) {
+      return payloadLen;
+    }
+    
+    buf.mark();
+    //read the header
+    buf.getInt(); // packet length
+    offsetInBlock = buf.getLong(); // get offset of packet in block
+    long seqno = buf.getLong();    // get seqno
+    boolean lastPacketInBlock = (buf.get() != 0);
+    
+    int endOfHeader = buf.position();
+    buf.reset();
+    
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Receiving one packet for block " + block +
+                " of length " + payloadLen +
+                " seqno " + seqno +
+                " offsetInBlock " + offsetInBlock +
+                " lastPacketInBlock " + lastPacketInBlock);
+    }
+    
+    setBlockPosition(offsetInBlock);
+    
+    //First write the packet to the mirror:
+    if (mirrorOut != null) {
+      try {
+        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+        mirrorOut.flush();
+      } catch (IOException e) {
+        handleMirrorOutError(e);
+      }
+    }
+
+    buf.position(endOfHeader);        
+    int len = buf.getInt();
+    
+    if (len < 0) {
+      throw new IOException("Got wrong length during writeBlock(" + block + 
+                            ") from " + inAddr + " at offset " + 
+                            offsetInBlock + ": " + len); 
+    } 
+
+    if (len == 0) {
+      LOG.debug("Receiving empty packet for block " + block);
+    } else {
+      offsetInBlock += len;
+
+      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
+                                                            checksumSize;
+
+      if ( buf.remaining() != (checksumLen + len)) {
+        throw new IOException("Data remaining in packet does not match " +
+                              "sum of checksumLen and dataLen");
+      }
+      int checksumOff = buf.position();
+      int dataOff = checksumOff + checksumLen;
+      byte pktBuf[] = buf.array();
+
+      buf.position(buf.limit()); // move to the end of the data.
+
+      /* skip verifying checksum iff this is not the last one in the 
+       * pipeline and clientName is non-null. i.e. Checksum is verified
+       * on all the datanodes when the data is being written by a 
+       * datanode rather than a client. Whe client is writing the data, 
+       * protocol includes acks and only the last datanode needs to verify 
+       * checksum.
+       */
+      if (mirrorOut == null || clientName.length() == 0) {
+        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+      }
+
+      try {
+        if (!finalized) {
+          //finally write to the disk :
+          out.write(pktBuf, dataOff, len);
+
+          // If this is a partial chunk, then verify that this is the only
+          // chunk in the packet. Calculate new crc for this chunk.
+          if (partialCrc != null) {
+            if (len > bytesPerChecksum) {
+              throw new IOException("Got wrong length during writeBlock(" + 
+                                    block + ") from " + inAddr + " " +
+                                    "A packet can have only one partial chunk."+
+                                    " len = " + len + 
+                                    " bytesPerChecksum " + bytesPerChecksum);
+            }
+            partialCrc.update(pktBuf, dataOff, len);
+            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
+            checksumOut.write(buf);
+            LOG.debug("Writing out partial crc for data len " + len);
+            partialCrc = null;
+          } else {
+            checksumOut.write(pktBuf, checksumOff, checksumLen);
+          }
+          datanode.myMetrics.bytesWritten.inc(len);
+        }
+      } catch (IOException iex) {
+        datanode.checkDiskError(iex);
+        throw iex;
+      }
+    }
+
+    /// flush entire packet before sending ack
+    flush();
+
+    // put in queue for pending acks
+    if (responder != null) {
+      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
+                                      lastPacketInBlock); 
+    }
+    
+    if (throttler != null) { // throttle I/O
+      throttler.throttle(payloadLen);
+    }
+    
+    return payloadLen;
+  }
+
+  void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
+    checksum.writeHeader(mirrorOut);
+  }
+ 
+
+  void receiveBlock(
+      DataOutputStream mirrOut, // output to next datanode
+      DataInputStream mirrIn,   // input from next datanode
+      DataOutputStream replyOut,  // output to previous datanode
+      String mirrAddr, BlockTransferThrottler throttlerArg,
+      int numTargets) throws IOException {
+
+      mirrorOut = mirrOut;
+      mirrorAddr = mirrAddr;
+      throttler = throttlerArg;
+
+    try {
+      // write data chunk header
+      if (!finalized) {
+        BlockMetadataHeader.writeHeader(checksumOut, checksum);
+      }
+      if (clientName.length() > 0) {
+        responder = new Daemon(datanode.threadGroup, 
+                               new PacketResponder(this, block, mirrIn, 
+                                                   replyOut, numTargets,
+                                                   clientName));
+        responder.start(); // start thread to processes reponses
+      }
+
+      /* 
+       * Receive until packet length is zero.
+       */
+      while (receivePacket() > 0) {}
+
+      // flush the mirror out
+      if (mirrorOut != null) {
+        try {
+          mirrorOut.writeInt(0); // mark the end of the block
+          mirrorOut.flush();
+        } catch (IOException e) {
+          handleMirrorOutError(e);
+        }
+      }
+
+      // wait for all outstanding packet responses. And then
+      // indicate responder to gracefully shutdown.
+      if (responder != null) {
+        ((PacketResponder)responder.getRunnable()).close();
+      }
+
+      // if this write is for a replication request (and not
+      // from a client), then finalize block. For client-writes, 
+      // the block is finalized in the PacketResponder.
+      if (clientName.length() == 0) {
+        // close the block/crc files
+        close();
+
+        // Finalize the block. Does this fsync()?
+        block.setNumBytes(offsetInBlock);
+        datanode.data.finalizeBlock(block);
+        datanode.myMetrics.blocksWritten.inc();
+      }
+
+    } catch (IOException ioe) {
+      LOG.info("Exception in receiveBlock for block " + block + 
+               " " + ioe);
+      IOUtils.closeStream(this);
+      if (responder != null) {
+        responder.interrupt();
+      }
+      throw ioe;
+    } finally {
+      if (responder != null) {
+        try {
+          responder.join();
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted receiveBlock");
+        }
+        responder = null;
+      }
+    }
+  }
+
+  /**
+   * Sets the file pointer in the local block file to the specified value.
+   */
+  private void setBlockPosition(long offsetInBlock) throws IOException {
+    if (finalized) {
+      if (!isRecovery) {
+        throw new IOException("Write to offset " + offsetInBlock +
+                              " of block " + block +
+                              " that is already finalized.");
+      }
+      if (offsetInBlock > datanode.data.getLength(block)) {
+        throw new IOException("Write to offset " + offsetInBlock +
+                              " of block " + block +
+                              " that is already finalized and is of size " +
+                              datanode.data.getLength(block));
+      }
+      return;
+    }
+
+    if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
+      return;                   // nothing to do 
+    }
+    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+                            offsetInBlock / bytesPerChecksum * checksumSize;
+    if (out != null) {
+     out.flush();
+    }
+    if (checksumOut != null) {
+      checksumOut.flush();
+    }
+
+    // If this is a partial chunk, then read in pre-existing checksum
+    if (offsetInBlock % bytesPerChecksum != 0) {
+      LOG.info("setBlockPosition trying to set position to " +
+               offsetInBlock +
+               " for block " + block +
+               " which is not a multiple of bytesPerChecksum " +
+               bytesPerChecksum);
+      computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
+    }
+
+    LOG.info("Changing block file offset of block " + block + " from " + 
+        datanode.data.getChannelPosition(block, streams) +
+             " to " + offsetInBlock +
+             " meta file offset to " + offsetInChecksum);
+
+    // set the position of the block file
+    datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
+  }
+
+  /**
+   * reads in the partial crc chunk and computes checksum
+   * of pre-existing data in partial chunk.
+   */
+  private void computePartialChunkCrc(long blkoff, long ckoff, 
+                                      int bytesPerChecksum) throws IOException {
+
+    // find offset of the beginning of partial chunk.
+    //
+    int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
+    int checksumSize = checksum.getChecksumSize();
+    blkoff = blkoff - sizePartialChunk;
+    LOG.info("computePartialChunkCrc sizePartialChunk " + 
+              sizePartialChunk +
+              " block " + block +
+              " offset in block " + blkoff +
+              " offset in metafile " + ckoff);
+
+    // create an input stream from the block file
+    // and read in partial crc chunk into temporary buffer
+    //
+    byte[] buf = new byte[sizePartialChunk];
+    byte[] crcbuf = new byte[checksumSize];
+    FSDataset.BlockInputStreams instr = null;
+    try { 
+      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+      IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
+
+      // open meta file and read in crc value computer earlier
+      IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
+    } finally {
+      IOUtils.closeStream(instr);
+    }
+
+    // compute crc of partial chunk from data read in the block file.
+    partialCrc = new CRC32();
+    partialCrc.update(buf, 0, sizePartialChunk);
+    LOG.info("Read in partial CRC chunk from disk for block " + block);
+
+    // paranoia! verify that the pre-computed crc matches what we
+    // recalculated just now
+    if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
+      String msg = "Partial CRC " + partialCrc.getValue() +
+                   " does not match value computed the " +
+                   " last time file was closed " +
+                   FSInputChecker.checksum2long(crcbuf);
+      throw new IOException(msg);
+    }
+    //LOG.debug("Partial CRC matches 0x" + 
+    //            Long.toHexString(partialCrc.getValue()));
+  }
+  
+  
+  /**
+   * Processed responses from downstream datanodes in the pipeline
+   * and sends back replies to the originator.
+   */
+  class PacketResponder implements Runnable, FSConstants {   
+
+    //packet waiting for ack
+    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    private volatile boolean running = true;
+    private Block block;
+    DataInputStream mirrorIn;   // input from downstream datanode
+    DataOutputStream replyOut;  // output to upstream datanode
+    private int numTargets;     // number of downstream datanodes including myself
+    private String clientName;  // The name of the client (if any)
+    private BlockReceiver receiver; // The owner of this responder.
+
+    public String toString() {
+      return "PacketResponder " + numTargets + " for Block " + this.block;
+    }
+
+    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
+                    DataOutputStream out, int numTargets, String clientName) {
+      this.receiver = receiver;
+      this.block = b;
+      mirrorIn = in;
+      replyOut = out;
+      this.numTargets = numTargets;
+      this.clientName = clientName;
+    }
+
+    /**
+     * enqueue the seqno that is still be to acked by the downstream datanode.
+     * @param seqno
+     * @param lastPacketInBlock
+     */
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+      if (running) {
+        LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
+                  " to ack queue.");
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        notifyAll();
+      }
+    }
+
+    /**
+     * wait for all pending packets to be acked. Then shutdown thread.
+     */
+    synchronized void close() {
+      while (running && ackQueue.size() != 0 && datanode.shouldRun) {
+        try {
+          wait();
+        } catch (InterruptedException e) {
+          running = false;
+        }
+      }
+      LOG.debug("PacketResponder " + numTargets +
+               " for block " + block + " Closing down.");
+      running = false;
+      notifyAll();
+    }
+
+    private synchronized void lastDataNodeRun() {
+      long lastHeartbeat = System.currentTimeMillis();
+      boolean lastPacket = false;
+
+      while (running && datanode.shouldRun && !lastPacket) {
+        long now = System.currentTimeMillis();
+        try {
+
+            // wait for a packet to be sent to downstream datanode
+            while (running && datanode.shouldRun && ackQueue.size() == 0) {
+              long idle = now - lastHeartbeat;
+              long timeout = (datanode.socketTimeout/2) - idle;
+              if (timeout <= 0) {
+                timeout = 1000;
+              }
+              try {
+                wait(timeout);
+              } catch (InterruptedException e) {
+                if (running) {
+                  LOG.info("PacketResponder " + numTargets +
+                           " for block " + block + " Interrupted.");
+                  running = false;
+                }
+                break;
+              }
+          
+              // send a heartbeat if it is time.
+              now = System.currentTimeMillis();
+              if (now - lastHeartbeat > datanode.socketTimeout/2) {
+                replyOut.writeLong(-1); // send heartbeat
+                replyOut.flush();
+                lastHeartbeat = now;
+              }
+            }
+
+            if (!running || !datanode.shouldRun) {
+              break;
+            }
+            Packet pkt = ackQueue.removeFirst();
+            long expected = pkt.seqno;
+            notifyAll();
+            LOG.debug("PacketResponder " + numTargets +
+                      " for block " + block + 
+                      " acking for packet " + expected);
+
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (pkt.lastPacketInBlock) {
+              if (!receiver.finalized) {
+                receiver.close();
+                block.setNumBytes(receiver.offsetInBlock);
+                datanode.data.finalizeBlock(block);
+                datanode.myMetrics.blocksWritten.inc();
+                datanode.notifyNamenodeReceivedBlock(block, 
+                    DataNode.EMPTY_DEL_HINT);
+                LOG.info("Received block " + block + 
+                         " of size " + block.getNumBytes() + 
+                         " from " + receiver.inAddr);
+              }
+              lastPacket = true;
+            }
+
+            replyOut.writeLong(expected);
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+            replyOut.flush();
+        } catch (Exception e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+
+    /**
+     * Thread to process incoming acks.
+     * @see java.lang.Runnable#run()
+     */
+    public void run() {
+
+      // If this is the last datanode in pipeline, then handle differently
+      if (numTargets == 0) {
+        lastDataNodeRun();
+        return;
+      }
+
+      boolean lastPacketInBlock = false;
+      while (running && datanode.shouldRun && !lastPacketInBlock) {
+
+        try {
+            short op = OP_STATUS_SUCCESS;
+            boolean didRead = false;
+            long expected = -2;
+            try { 
+              // read seqno from downstream datanode
+              long seqno = mirrorIn.readLong();
+              didRead = true;
+              if (seqno == -1) {
+                replyOut.writeLong(-1); // send keepalive
+                replyOut.flush();
+                LOG.debug("PacketResponder " + numTargets + " got -1");
+                continue;
+              } else if (seqno == -2) {
+                LOG.debug("PacketResponder " + numTargets + " got -2");
+              } else {
+                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
+                    seqno);
+                Packet pkt = null;
+                synchronized (this) {
+                  while (running && datanode.shouldRun && ackQueue.size() == 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
+                                " waiting for local datanode to finish write.");
+                    }
+                    wait();
+                  }
+                  pkt = ackQueue.removeFirst();
+                  expected = pkt.seqno;
+                  notifyAll();
+                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
+                  if (seqno != expected) {
+                    throw new IOException("PacketResponder " + numTargets +
+                                          " for block " + block +
+                                          " expected seqno:" + expected +
+                                          " received:" + seqno);
+                  }
+                  lastPacketInBlock = pkt.lastPacketInBlock;
+                }
+              }
+            } catch (Throwable e) {
+              if (running) {
+                LOG.info("PacketResponder " + block + " " + numTargets + 
+                         " Exception " + StringUtils.stringifyException(e));
+                running = false;
+              }
+            }
+
+            if (Thread.interrupted()) {
+              /* The receiver thread cancelled this thread. 
+               * We could also check any other status updates from the 
+               * receiver thread (e.g. if it is ok to write to replyOut). 
+               * It is prudent to not send any more status back to the client
+               * because this datanode has a problem. The upstream datanode
+               * will detect a timout on heartbeats and will declare that
+               * this datanode is bad, and rightly so.
+               */
+              LOG.info("PacketResponder " + block +  " " + numTargets +
+                       " : Thread is interrupted.");
+              running = false;
+              continue;
+            }
+            
+            if (!didRead) {
+              op = OP_STATUS_ERROR;
+            }
+            
+            // If this is the last packet in block, then close block
+            // file and finalize the block before responding success
+            if (lastPacketInBlock && !receiver.finalized) {
+              receiver.close();
+              block.setNumBytes(receiver.offsetInBlock);
+              datanode.data.finalizeBlock(block);
+              datanode.myMetrics.blocksWritten.inc();
+              datanode.notifyNamenodeReceivedBlock(block, 
+                  DataNode.EMPTY_DEL_HINT);
+              LOG.info("Received block " + block + 
+                       " of size " + block.getNumBytes() + 
+                       " from " + receiver.inAddr);
+            }
+
+            // send my status back to upstream datanode
+            replyOut.writeLong(expected); // send seqno upstream
+            replyOut.writeShort(OP_STATUS_SUCCESS);
+
+            LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block +
+                      " responded my status " +
+                      " for seqno " + expected);
+
+            // forward responses from downstream datanodes.
+            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
+              try {
+                if (op == OP_STATUS_SUCCESS) {
+                  op = mirrorIn.readShort();
+                  if (op != OP_STATUS_SUCCESS) {
+                    LOG.debug("PacketResponder for block " + block +
+                              ": error code received from downstream " +
+                              " datanode[" + i + "] " + op);
+                  }
+                }
+              } catch (Throwable e) {
+                op = OP_STATUS_ERROR;
+              }
+              replyOut.writeShort(op);
+            }
+            replyOut.flush();
+            LOG.debug("PacketResponder " + block + " " + numTargets + 
+                      " responded other status " + " for seqno " + expected);
+
+            // If we were unable to read the seqno from downstream, then stop.
+            if (expected == -2) {
+              running = false;
+            }
+            // If we forwarded an error response from a downstream datanode
+            // and we are acting on behalf of a client, then we quit. The 
+            // client will drive the recovery mechanism.
+            if (op == OP_STATUS_ERROR && clientName.length() > 0) {
+              running = false;
+            }
+        } catch (IOException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        } catch (RuntimeException e) {
+          if (running) {
+            LOG.info("PacketResponder " + block + " " + numTargets + 
+                     " Exception " + StringUtils.stringifyException(e));
+            running = false;
+          }
+        }
+      }
+      LOG.info("PacketResponder " + numTargets + 
+               " for block " + block + " terminating");
+    }
+  }
+  
+  /**
+   * This information is cached by the Datanode in the ackQueue.
+   */
+  static private class Packet {
+    long seqno;
+    boolean lastPacketInBlock;
+
+    Packet(long seqno, boolean lastPacketInBlock) {
+      this.seqno = seqno;
+      this.lastPacketInBlock = lastPacketInBlock;
+    }
+  }
+}

+ 396 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Reads a block from the disk and sends it to a recipient.
+ */
+class BlockSender implements java.io.Closeable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  private Block block; // the block to read from
+  private InputStream blockIn; // data stream
+  private long blockInPosition = -1; // updated while using transferTo().
+  private DataInputStream checksumIn; // checksum datastream
+  private DataChecksum checksum; // checksum stream
+  private long offset; // starting position to read
+  private long endOffset; // ending position
+  private long blockLength;
+  private int bytesPerChecksum; // chunk size
+  private int checksumSize; // checksum size
+  private boolean corruptChecksumOk; // if need to verify checksum
+  private boolean chunkOffsetOK; // if need to send chunk offset
+  private long seqno; // sequence number of packet
+
+  private boolean transferToAllowed = true;
+  private boolean blockReadFully; //set when the whole block is read
+  private boolean verifyChecksum; //if true, check is verified while reading
+  private BlockTransferThrottler throttler;
+  
+  /**
+   * Minimum buffer used while sending data to clients. Used only if
+   * transferTo() is enabled. 64KB is not that large. It could be larger, but
+   * not sure if there will be much more improvement.
+   */
+  private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+
+  
+  BlockSender(Block block, long startOffset, long length,
+              boolean corruptChecksumOk, boolean chunkOffsetOK,
+              boolean verifyChecksum, DataNode datanode) throws IOException {
+
+    try {
+      this.block = block;
+      this.chunkOffsetOK = chunkOffsetOK;
+      this.corruptChecksumOk = corruptChecksumOk;
+      this.verifyChecksum = verifyChecksum;
+      this.blockLength = datanode.data.getLength(block);
+      this.transferToAllowed = datanode.transferToAllowed;
+
+      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+        checksumIn = new DataInputStream(
+                new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
+                                        BUFFER_SIZE));
+
+        // read and handle the common header here. For now just a version
+       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+       short version = header.getVersion();
+
+        if (version != FSDataset.METADATA_VERSION) {
+          LOG.warn("Wrong version (" + version + ") for metadata file for "
+              + block + " ignoring ...");
+        }
+        checksum = header.getChecksum();
+      } else {
+        LOG.warn("Could not find metadata file for " + block);
+        // This only decides the buffer size. Use BUFFER_SIZE?
+        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+            16 * 1024);
+      }
+
+      /* If bytesPerChecksum is very large, then the metadata file
+       * is mostly corrupted. For now just truncate bytesPerchecksum to
+       * blockLength.
+       */        
+      bytesPerChecksum = checksum.getBytesPerChecksum();
+      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
+        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+                                   Math.max((int)blockLength, 10*1024*1024));
+        bytesPerChecksum = checksum.getBytesPerChecksum();        
+      }
+      checksumSize = checksum.getChecksumSize();
+
+      if (length < 0) {
+        length = blockLength;
+      }
+
+      endOffset = blockLength;
+      if (startOffset < 0 || startOffset > endOffset
+          || (length + startOffset) > endOffset) {
+        String msg = " Offset " + startOffset + " and length " + length
+        + " don't match block " + block + " ( blockLen " + endOffset + " )";
+        LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
+        throw new IOException(msg);
+      }
+
+      
+      offset = (startOffset - (startOffset % bytesPerChecksum));
+      if (length >= 0) {
+        // Make sure endOffset points to end of a checksumed chunk.
+        long tmpLen = startOffset + length + (startOffset - offset);
+        if (tmpLen % bytesPerChecksum != 0) {
+          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        }
+        if (tmpLen < endOffset) {
+          endOffset = tmpLen;
+        }
+      }
+
+      // seek to the right offsets
+      if (offset > 0) {
+        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        // note blockInStream is  seeked when created below
+        if (checksumSkip > 0) {
+          // Should we use seek() for checksum file as well?
+          IOUtils.skipFully(checksumIn, checksumSkip);
+        }
+      }
+      seqno = 0;
+
+      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+    } catch (IOException ioe) {
+      IOUtils.closeStream(this);
+      IOUtils.closeStream(blockIn);
+      throw ioe;
+    }
+  }
+
+  /**
+   * close opened files.
+   */
+  public void close() throws IOException {
+    IOException ioe = null;
+    // close checksum file
+    if(checksumIn!=null) {
+      try {
+        checksumIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      checksumIn = null;
+    }
+    // close data file
+    if(blockIn!=null) {
+      try {
+        blockIn.close();
+      } catch (IOException e) {
+        ioe = e;
+      }
+      blockIn = null;
+    }
+    // throw IOException if there is any
+    if(ioe!= null) {
+      throw ioe;
+    }
+  }
+
+  /**
+   * Sends upto maxChunks chunks of data.
+   * 
+   * When blockInPosition is >= 0, assumes 'out' is a 
+   * {@link SocketOutputStream} and tries 
+   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+   * send data (and updates blockInPosition).
+   */
+  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
+                         throws IOException {
+    // Sends multiple chunks in one packet with a single write().
+
+    int len = Math.min((int) (endOffset - offset),
+                       bytesPerChecksum*maxChunks);
+    if (len == 0) {
+      return 0;
+    }
+
+    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+    int packetLen = len + numChunks*checksumSize + 4;
+    pkt.clear();
+    
+    // write packet header
+    pkt.putInt(packetLen);
+    pkt.putLong(offset);
+    pkt.putLong(seqno);
+    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+               //why no ByteBuf.putBoolean()?
+    pkt.putInt(len);
+    
+    int checksumOff = pkt.position();
+    int checksumLen = numChunks * checksumSize;
+    byte[] buf = pkt.array();
+    
+    if (checksumSize > 0 && checksumIn != null) {
+      try {
+        checksumIn.readFully(buf, checksumOff, checksumLen);
+      } catch (IOException e) {
+        LOG.warn(" Could not read or failed to veirfy checksum for data" +
+                 " at offset " + offset + " for block " + block + " got : "
+                 + StringUtils.stringifyException(e));
+        IOUtils.closeStream(checksumIn);
+        checksumIn = null;
+        if (corruptChecksumOk) {
+          // Just fill the array with zeros.
+          Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
+        } else {
+          throw e;
+        }
+      }
+    }
+    
+    int dataOff = checksumOff + checksumLen;
+    
+    if (blockInPosition < 0) {
+      //normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, len);
+
+      if (verifyChecksum) {
+        int dOff = dataOff;
+        int cOff = checksumOff;
+        int dLeft = len;
+
+        for (int i=0; i<numChunks; i++) {
+          checksum.reset();
+          int dLen = Math.min(dLeft, bytesPerChecksum);
+          checksum.update(buf, dOff, dLen);
+          if (!checksum.compare(buf, cOff)) {
+            throw new ChecksumException("Checksum failed at " + 
+                                        (offset + len - dLeft), len);
+          }
+          dLeft -= dLen;
+          dOff += dLen;
+          cOff += checksumSize;
+        }
+      }
+      //writing is done below (mainly to handle IOException)
+    }
+    
+    try {
+      if (blockInPosition >= 0) {
+        //use transferTo(). Checks on out and blockIn are already done. 
+
+        SocketOutputStream sockOut = (SocketOutputStream)out;
+        //first write the packet
+        sockOut.write(buf, 0, dataOff);
+        // no need to flush. since we know out is not a buffered stream. 
+
+        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
+                                blockInPosition, len);
+
+        blockInPosition += len;
+      } else {
+        // normal transfer
+        out.write(buf, 0, dataOff + len);
+      }
+      
+    } catch (IOException e) {
+      /* exception while writing to the client (well, with transferTo(),
+       * it could also be while reading from the local file). Many times 
+       * this error can be ignored. We will let the callers distinguish this 
+       * from other exceptions if this is not a subclass of IOException. 
+       */
+      if (e.getClass().equals(IOException.class)) {
+        // "se" could be a new class in stead of SocketException.
+        IOException se = new SocketException("Original Exception : " + e);
+        se.initCause(e);
+        /* Cange the stacktrace so that original trace is not truncated
+         * when printed.*/ 
+        se.setStackTrace(e.getStackTrace());
+        throw se;
+      }
+      throw e;
+    }
+
+    if (throttler != null) { // rebalancing so throttle
+      throttler.throttle(packetLen);
+    }
+
+    return len;
+  }
+
+  /**
+   * sendBlock() is used to read block and its metadata and stream the data to
+   * either a client or to another datanode. 
+   * 
+   * @param out  stream to which the block is written to
+   * @param baseStream optional. if non-null, <code>out</code> is assumed to 
+   *        be a wrapper over this stream. This enables optimizations for
+   *        sending the data, e.g. 
+   *        {@link SocketOutputStream#transferToFully(FileChannel, 
+   *        long, int)}.
+   * @param throttler for sending data.
+   * @return total bytes reads, including crc.
+   */
+  long sendBlock(DataOutputStream out, OutputStream baseStream, 
+                 BlockTransferThrottler throttler) throws IOException {
+    if( out == null ) {
+      throw new IOException( "out stream is null" );
+    }
+    this.throttler = throttler;
+
+    long initialOffset = offset;
+    long totalRead = 0;
+    OutputStream streamForSendChunks = out;
+    
+    try {
+      checksum.writeHeader(out);
+      if ( chunkOffsetOK ) {
+        out.writeLong( offset );
+      }
+      out.flush();
+      
+      int maxChunksPerPacket;
+      int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+      
+      if (transferToAllowed && !verifyChecksum && 
+          baseStream instanceof SocketOutputStream && 
+          blockIn instanceof FileInputStream) {
+        
+        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+        
+        // blockInPosition also indicates sendChunks() uses transferTo.
+        blockInPosition = fileChannel.position();
+        streamForSendChunks = baseStream;
+        
+        // assure a mininum buffer size.
+        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+                                       MIN_BUFFER_WITH_TRANSFERTO)
+                              + bytesPerChecksum - 1)/bytesPerChecksum;
+        
+        // allocate smaller buffer while using transferTo(). 
+        pktSize += checksumSize * maxChunksPerPacket;
+      } else {
+        maxChunksPerPacket = Math.max(1,
+                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+      }
+
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+
+      while (endOffset > offset) {
+        long len = sendChunks(pktBuf, maxChunksPerPacket, 
+                              streamForSendChunks);
+        offset += len;
+        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+                            checksumSize);
+        seqno++;
+      }
+      out.writeInt(0); // mark the end of block        
+      out.flush();
+    } finally {
+      close();
+    }
+
+    blockReadFully = (initialOffset == 0 && offset >= blockLength);
+
+    return totalRead;
+  }
+  
+  boolean isBlockReadFully() {
+    return blockReadFully;
+  }
+}

+ 111 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockTransferThrottler.java

@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+/** 
+ * a class to throttle the block transfers.
+ * This class is thread safe. It can be shared by multiple threads.
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by
+ * threads.
+ */
+class BlockTransferThrottler {
+  private long period;          // period over which bw is imposed
+  private long periodExtension; // Max period over which bw accumulates.
+  private long bytesPerPeriod; // total number of bytes can be sent in each period
+  private long curPeriodStart; // current period starting time
+  private long curReserve;     // remaining bytes can be sent in the period
+  private long bytesAlreadyUsed;
+
+  /** Constructor 
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  BlockTransferThrottler(long bandwidthPerSec) {
+    this(500, bandwidthPerSec);  // by default throttling period is 500ms 
+  }
+
+  /**
+   * Constructor
+   * @param period in milliseconds. Bandwidth is enforced over this
+   *        period.
+   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
+   */
+  BlockTransferThrottler(long period, long bandwidthPerSec) {
+    this.curPeriodStart = System.currentTimeMillis();
+    this.period = period;
+    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
+    this.periodExtension = period*3;
+  }
+
+  /**
+   * @return current throttle bandwidth in bytes per second.
+   */
+  synchronized long getBandwidth() {
+    return bytesPerPeriod*1000/period;
+  }
+  
+  /**
+   * Sets throttle bandwidth. This takes affect latest by the end of current
+   * period.
+   * 
+   * @param bytesPerSecond 
+   */
+  synchronized void setBandwidth(long bytesPerSecond) {
+    if ( bytesPerSecond <= 0 ) {
+      throw new IllegalArgumentException("" + bytesPerSecond);
+    }
+    bytesPerPeriod = bytesPerSecond*period/1000;
+  }
+  
+  /** Given the numOfBytes sent/received since last time throttle was called,
+   * make the current thread sleep if I/O rate is too fast
+   * compared to the given bandwidth.
+   *
+   * @param numOfBytes
+   *     number of bytes sent/received since last time throttle was called
+   */
+  synchronized void throttle(long numOfBytes) {
+    if ( numOfBytes <= 0 ) {
+      return;
+    }
+
+    curReserve -= numOfBytes;
+    bytesAlreadyUsed += numOfBytes;
+
+    while (curReserve <= 0) {
+      long now = System.currentTimeMillis();
+      long curPeriodEnd = curPeriodStart + period;
+
+      if ( now < curPeriodEnd ) {
+        // Wait for next period so that curReserve can be increased.
+        try {
+          wait( curPeriodEnd - now );
+        } catch (InterruptedException ignored) {}
+      } else if ( now <  (curPeriodStart + periodExtension)) {
+        curPeriodStart = curPeriodEnd;
+        curReserve += bytesPerPeriod;
+      } else {
+        // discard the prev period. Throttler might not have
+        // been used for a long time.
+        curPeriodStart = now;
+        curReserve = bytesPerPeriod - bytesAlreadyUsed;
+      }
+    }
+
+    bytesAlreadyUsed -= numOfBytes;
+  }
+}

+ 4 - 5
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java

@@ -49,7 +49,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockSender;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
@@ -95,7 +94,7 @@ public class DataBlockScanner implements Runnable {
   
   
   Random random = new Random();
   Random random = new Random();
   
   
-  DataNode.Throttler throttler = null;
+  BlockTransferThrottler throttler = null;
   
   
   private static enum ScanType {
   private static enum ScanType {
     REMOTE_READ,           // Verified when a block read by a client etc
     REMOTE_READ,           // Verified when a block read by a client etc
@@ -239,7 +238,7 @@ public class DataBlockScanner implements Runnable {
     }
     }
     
     
     synchronized (this) {
     synchronized (this) {
-      throttler = new DataNode.Throttler(200, MAX_SCAN_RATE);
+      throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
     }
     }
   }
   }
 
 
@@ -424,8 +423,8 @@ public class DataBlockScanner implements Runnable {
       try {
       try {
         adjustThrottler();
         adjustThrottler();
         
         
-        blockSender = datanode.new BlockSender(block, 0, -1, false, 
-                                               false, true);
+        blockSender = new BlockSender(block, 0, -1, false, 
+                                               false, true, datanode);
 
 
         DataOutputStream out = 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());
                 new DataOutputStream(new IOUtils.NullOutputStream());

Файловите разлики са ограничени, защото са твърде много
+ 66 - 1984
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java


+ 571 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Thread for processing incoming/outgoing data stream.
+ */
+class DataXceiver implements Runnable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  Socket s;
+  String remoteAddress; // address of remote side
+  String localAddress;  // local address of this daemon
+  DataNode datanode;
+  DataXceiverServer dataXceiverServer;
+  
+  public DataXceiver(Socket s, DataNode datanode, 
+      DataXceiverServer dataXceiverServer) {
+    
+    this.s = s;
+    this.datanode = datanode;
+    this.dataXceiverServer = dataXceiverServer;
+    InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
+    remoteAddress = isock.toString();
+    localAddress = s.getInetAddress() + ":" + s.getLocalPort();
+    LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+  }
+
+  /**
+   * Read/write data from/to the DataXceiveServer.
+   */
+  public void run() {
+    DataInputStream in=null; 
+    try {
+      in = new DataInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(s), 
+                                  SMALL_BUFFER_SIZE));
+      short version = in.readShort();
+      if ( version != DATA_TRANSFER_VERSION ) {
+        throw new IOException( "Version Mismatch" );
+      }
+      boolean local = s.getInetAddress().equals(s.getLocalAddress());
+      byte op = in.readByte();
+      // Make sure the xciver count is not exceeded
+      int curXceiverCount = datanode.getXceiverCount();
+      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+        throw new IOException("xceiverCount " + curXceiverCount
+                              + " exceeds the limit of concurrent xcievers "
+                              + dataXceiverServer.maxXceiverCount);
+      }
+      long startTime = DataNode.now();
+      switch ( op ) {
+      case OP_READ_BLOCK:
+        readBlock( in );
+        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
+        if (local)
+          datanode.myMetrics.readsFromLocalClient.inc();
+        else
+          datanode.myMetrics.readsFromRemoteClient.inc();
+        break;
+      case OP_WRITE_BLOCK:
+        writeBlock( in );
+        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
+        if (local)
+          datanode.myMetrics.writesFromLocalClient.inc();
+        else
+          datanode.myMetrics.writesFromRemoteClient.inc();
+        break;
+      case OP_READ_METADATA:
+        readMetadata( in );
+        datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
+        break;
+      case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+        replaceBlock(in);
+        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
+        break;
+      case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
+        copyBlock(in);
+        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
+        break;
+      default:
+        throw new IOException("Unknown opcode " + op + " in data stream");
+      }
+    } catch (Throwable t) {
+      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
+    } finally {
+      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+                               + datanode.getXceiverCount());
+      IOUtils.closeStream(in);
+      IOUtils.closeSocket(s);
+      dataXceiverServer.childSockets.remove(s);
+    }
+  }
+
+  /**
+   * Read a block from the disk.
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void readBlock(DataInputStream in) throws IOException {
+    //
+    // Read in the header
+    //
+    long blockId = in.readLong();          
+    Block block = new Block( blockId, 0 , in.readLong());
+
+    long startOffset = in.readLong();
+    long length = in.readLong();
+
+    // send the block
+    OutputStream baseStream = NetUtils.getOutputStream(s, 
+        datanode.socketWriteTimeout);
+    DataOutputStream out = new DataOutputStream(
+                 new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+    
+    BlockSender blockSender = null;
+    try {
+      try {
+        blockSender = new BlockSender(block, startOffset, length, 
+                                      true, true, false, datanode);
+      } catch(IOException e) {
+        out.writeShort(OP_STATUS_ERROR);
+        throw e;
+      }
+
+      out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
+      long read = blockSender.sendBlock(out, baseStream, null); // send data
+
+      if (blockSender.isBlockReadFully()) {
+        // See if client verification succeeded. 
+        // This is an optional response from client.
+        try {
+          if (in.readShort() == OP_STATUS_CHECKSUM_OK  && 
+              datanode.blockScanner != null) {
+            datanode.blockScanner.verifiedByClient(block);
+          }
+        } catch (IOException ignored) {}
+      }
+      
+      datanode.myMetrics.bytesRead.inc((int) read);
+      datanode.myMetrics.blocksRead.inc();
+      LOG.info(datanode.dnRegistration + " Served block " + block + " to " + 
+          s.getInetAddress());
+    } catch ( SocketException ignored ) {
+      // Its ok for remote side to close the connection anytime.
+      datanode.myMetrics.blocksRead.inc();
+    } catch ( IOException ioe ) {
+      /* What exactly should we do here?
+       * Earlier version shutdown() datanode if there is disk error.
+       */
+      LOG.warn(datanode.dnRegistration +  ":Got exception while serving " + 
+          block + " to " +
+                s.getInetAddress() + ":\n" + 
+                StringUtils.stringifyException(ioe) );
+      throw ioe;
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(blockSender);
+    }
+  }
+
+  /**
+   * Write a block to disk.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void writeBlock(DataInputStream in) throws IOException {
+    DatanodeInfo srcDataNode = null;
+    LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
+              " tcp no delay " + s.getTcpNoDelay());
+    //
+    // Read in the header
+    //
+    Block block = new Block(in.readLong(), 
+        dataXceiverServer.estimateBlockSize, in.readLong());
+    LOG.info("Receiving block " + block + 
+             " src: " + remoteAddress +
+             " dest: " + localAddress);
+    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+    boolean isRecovery = in.readBoolean(); // is this part of recovery?
+    String client = Text.readString(in); // working on behalf of this client
+    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+    if (hasSrcDataNode) {
+      srcDataNode = new DatanodeInfo();
+      srcDataNode.readFields(in);
+    }
+    int numTargets = in.readInt();
+    if (numTargets < 0) {
+      throw new IOException("Mislabelled incoming datastream.");
+    }
+    DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+    for (int i = 0; i < targets.length; i++) {
+      DatanodeInfo tmp = new DatanodeInfo();
+      tmp.readFields(in);
+      targets[i] = tmp;
+    }
+
+    DataOutputStream mirrorOut = null;  // stream to next target
+    DataInputStream mirrorIn = null;    // reply from next target
+    DataOutputStream replyOut = null;   // stream to prev target
+    Socket mirrorSock = null;           // socket to next target
+    BlockReceiver blockReceiver = null; // responsible for data handling
+    String mirrorNode = null;           // the name:port of next target
+    String firstBadLink = "";           // first datanode that failed in connection setup
+    try {
+      // open a block receiver and check if the block does not exist
+      blockReceiver = new BlockReceiver(block, in, 
+          s.getInetAddress().toString(), isRecovery, client, srcDataNode,
+          datanode);
+
+      // get a connection back to the previous target
+      replyOut = new DataOutputStream(
+                     NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+
+      //
+      // Open network conn to backup machine, if 
+      // appropriate
+      //
+      if (targets.length > 0) {
+        InetSocketAddress mirrorTarget = null;
+        // Connect to backup machine
+        mirrorNode = targets[0].getName();
+        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+        mirrorSock = datanode.newSocket();
+        try {
+          int timeoutValue = numTargets * datanode.socketTimeout;
+          int writeTimeout = datanode.socketWriteTimeout + 
+                             (WRITE_TIMEOUT_EXTENSION * numTargets);
+          mirrorSock.connect(mirrorTarget, timeoutValue);
+          mirrorSock.setSoTimeout(timeoutValue);
+          mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
+          mirrorOut = new DataOutputStream(
+             new BufferedOutputStream(
+                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
+                         SMALL_BUFFER_SIZE));
+          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
+
+          // Write header: Copied from DFSClient.java!
+          mirrorOut.writeShort( DATA_TRANSFER_VERSION );
+          mirrorOut.write( OP_WRITE_BLOCK );
+          mirrorOut.writeLong( block.getBlockId() );
+          mirrorOut.writeLong( block.getGenerationStamp() );
+          mirrorOut.writeInt( pipelineSize );
+          mirrorOut.writeBoolean( isRecovery );
+          Text.writeString( mirrorOut, client );
+          mirrorOut.writeBoolean(hasSrcDataNode);
+          if (hasSrcDataNode) { // pass src node information
+            srcDataNode.write(mirrorOut);
+          }
+          mirrorOut.writeInt( targets.length - 1 );
+          for ( int i = 1; i < targets.length; i++ ) {
+            targets[i].write( mirrorOut );
+          }
+
+          blockReceiver.writeChecksumHeader(mirrorOut);
+          mirrorOut.flush();
+
+          // read connect ack (only for clients, not for replication req)
+          if (client.length() != 0) {
+            firstBadLink = Text.readString(mirrorIn);
+            if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+              LOG.info("Datanode " + targets.length +
+                       " got response for connect ack " +
+                       " from downstream datanode with firstbadlink as " +
+                       firstBadLink);
+            }
+          }
+
+        } catch (IOException e) {
+          if (client.length() != 0) {
+            Text.writeString(replyOut, mirrorNode);
+            replyOut.flush();
+          }
+          IOUtils.closeStream(mirrorOut);
+          mirrorOut = null;
+          IOUtils.closeStream(mirrorIn);
+          mirrorIn = null;
+          IOUtils.closeSocket(mirrorSock);
+          mirrorSock = null;
+          if (client.length() > 0) {
+            throw e;
+          } else {
+            LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+                     block + " to mirror " + mirrorNode +
+                     ". continuing without the mirror.\n" +
+                     StringUtils.stringifyException(e));
+          }
+        }
+      }
+
+      // send connect ack back to source (only for clients)
+      if (client.length() != 0) {
+        if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
+          LOG.info("Datanode " + targets.length +
+                   " forwarding connect ack to upstream firstbadlink is " +
+                   firstBadLink);
+        }
+        Text.writeString(replyOut, firstBadLink);
+        replyOut.flush();
+      }
+
+      // receive the block and mirror to the next target
+      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+                                 mirrorAddr, null, targets.length);
+
+      // if this write is for a replication request (and not
+      // from a client), then confirm block. For client-writes,
+      // the block is finalized in the PacketResponder.
+      if (client.length() == 0) {
+        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+        LOG.info("Received block " + block + 
+                 " src: " + remoteAddress +
+                 " dest: " + localAddress +
+                 " of size " + block.getNumBytes());
+      }
+
+      if (datanode.blockScanner != null) {
+        datanode.blockScanner.addBlock(block);
+      }
+      
+    } catch (IOException ioe) {
+      LOG.info("writeBlock " + block + " received exception " + ioe);
+      throw ioe;
+    } finally {
+      // close all opened streams
+      IOUtils.closeStream(mirrorOut);
+      IOUtils.closeStream(mirrorIn);
+      IOUtils.closeStream(replyOut);
+      IOUtils.closeSocket(mirrorSock);
+      IOUtils.closeStream(blockReceiver);
+    }
+  }
+
+  /**
+   * Reads the metadata and sends the data in one 'DATA_CHUNK'.
+   * @param in
+   */
+  void readMetadata(DataInputStream in) throws IOException {
+    Block block = new Block( in.readLong(), 0 , in.readLong());
+    MetaDataInputStream checksumIn = null;
+    DataOutputStream out = null;
+    
+    try {
+
+      checksumIn = datanode.data.getMetaDataInputStream(block);
+      
+      long fileSize = checksumIn.getLength();
+
+      if (fileSize >= 1L<<31 || fileSize <= 0) {
+          throw new IOException("Unexpected size for checksumFile of block" +
+                  block);
+      }
+
+      byte [] buf = new byte[(int)fileSize];
+      IOUtils.readFully(checksumIn, buf, 0, buf.length);
+      
+      out = new DataOutputStream(
+                NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+      
+      out.writeByte(OP_STATUS_SUCCESS);
+      out.writeInt(buf.length);
+      out.write(buf);
+      
+      //last DATA_CHUNK
+      out.writeInt(0);
+    } finally {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(checksumIn);
+    }
+  }
+  
+  /**
+   * Read a block from the disk and then sends it to a destination.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void copyBlock(DataInputStream in) throws IOException {
+    // Read in the header
+    long blockId = in.readLong(); // read block id
+    Block block = new Block(blockId, 0, in.readLong());
+    String source = Text.readString(in); // read del hint
+    DatanodeInfo target = new DatanodeInfo(); // read target
+    target.readFields(in);
+
+    Socket targetSock = null;
+    short opStatus = OP_STATUS_SUCCESS;
+    BlockSender blockSender = null;
+    DataOutputStream targetOut = null;
+    try {
+      datanode.balancingSem.acquireUninterruptibly();
+      
+      // check if the block exists or not
+      blockSender = new BlockSender(block, 0, -1, false, false, false, 
+          datanode);
+
+      // get the output stream to the target
+      InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+          target.getName());
+      targetSock = datanode.newSocket();
+      targetSock.connect(targetAddr, datanode.socketTimeout);
+      targetSock.setSoTimeout(datanode.socketTimeout);
+
+      OutputStream baseStream = NetUtils.getOutputStream(targetSock, 
+          datanode.socketWriteTimeout);
+      targetOut = new DataOutputStream(
+                     new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
+
+      /* send request to the target */
+      // fist write header info
+      targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
+      targetOut.writeByte(OP_REPLACE_BLOCK); // op code
+      targetOut.writeLong(block.getBlockId()); // block id
+      targetOut.writeLong(block.getGenerationStamp()); // block id
+      Text.writeString( targetOut, source); // del hint
+
+      // then send data
+      long read = blockSender.sendBlock(targetOut, baseStream, 
+                                        datanode.balancingThrottler);
+
+      datanode.myMetrics.bytesRead.inc((int) read);
+      datanode.myMetrics.blocksRead.inc();
+      
+      // check the response from target
+      receiveResponse(targetSock, 1);
+
+      LOG.info("Copied block " + block + " to " + targetAddr);
+    } catch (IOException ioe) {
+      opStatus = OP_STATUS_ERROR;
+      LOG.warn("Got exception while serving " + block + " to "
+          + target.getName() + ": " + StringUtils.stringifyException(ioe));
+      throw ioe;
+    } finally {
+      /* send response to the requester */
+      try {
+        sendResponse(s, opStatus, datanode.socketWriteTimeout);
+      } catch (IOException replyE) {
+        LOG.warn("Error writing the response back to "+
+            s.getRemoteSocketAddress() + "\n" +
+            StringUtils.stringifyException(replyE) );
+      }
+      IOUtils.closeStream(targetOut);
+      IOUtils.closeStream(blockSender);
+      datanode.balancingSem.release();
+    }
+  }
+
+  /**
+   * Receive a block and write it to disk, it then notifies the namenode to
+   * remove the copy from the source.
+   * 
+   * @param in The stream to read from
+   * @throws IOException
+   */
+  private void replaceBlock(DataInputStream in) throws IOException {
+    datanode.balancingSem.acquireUninterruptibly();
+
+    /* read header */
+    Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize,
+        in.readLong()); // block id & len
+    String sourceID = Text.readString(in);
+
+    short opStatus = OP_STATUS_SUCCESS;
+    BlockReceiver blockReceiver = null;
+    try {
+      // open a block receiver and check if the block does not exist
+       blockReceiver = new BlockReceiver(
+          block, in, s.getRemoteSocketAddress().toString(), false, "", null,
+          datanode);
+
+      // receive a block
+      blockReceiver.receiveBlock(null, null, null, null, 
+          datanode.balancingThrottler, -1);
+                    
+      // notify name node
+      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+
+      LOG.info("Moved block " + block + 
+          " from " + s.getRemoteSocketAddress());
+    } catch (IOException ioe) {
+      opStatus = OP_STATUS_ERROR;
+      throw ioe;
+    } finally {
+      // send response back
+      try {
+        sendResponse(s, opStatus, datanode.socketWriteTimeout);
+      } catch (IOException ioe) {
+        LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
+      }
+      IOUtils.closeStream(blockReceiver);
+      datanode.balancingSem.release();
+    }
+  }
+  
+  /**
+   *  Utility function for receiving a response.
+   *  @param s socket to read from
+   *  @param numTargets number of responses to read
+   **/
+  private void receiveResponse(Socket s, int numTargets) throws IOException {
+    // check the response
+    DataInputStream reply = new DataInputStream(new BufferedInputStream(
+                                NetUtils.getInputStream(s), BUFFER_SIZE));
+    try {
+      for (int i = 0; i < numTargets; i++) {
+        short opStatus = reply.readShort();
+        if(opStatus != OP_STATUS_SUCCESS) {
+          throw new IOException("operation failed at "+
+              s.getInetAddress());
+        } 
+      }
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+
+  /**
+   * Utility function for sending a response.
+   * @param s socket to write to
+   * @param opStatus status message to write
+   * @param timeout send timeout
+   **/
+  private void sendResponse(Socket s, short opStatus, long timeout) 
+                                                       throws IOException {
+    DataOutputStream reply = 
+      new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+    try {
+      reply.writeShort(opStatus);
+      reply.flush();
+    } finally {
+      IOUtils.closeStream(reply);
+    }
+  }
+}

+ 128 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Server used for receiving/sending a block of data.
+ * This is created to listen for requests from clients or 
+ * other DataNodes.  This small server does not use the 
+ * Hadoop IPC mechanism.
+ */
+class DataXceiverServer implements Runnable, FSConstants {
+  public static final Log LOG = DataNode.LOG;
+  
+  ServerSocket ss;
+  DataNode datanode;
+  // Record all sockets opend for data transfer
+  Map<Socket, Socket> childSockets = Collections.synchronizedMap(
+                                       new HashMap<Socket, Socket>());
+  
+  /**
+   * Maximal number of concurrent xceivers per node.
+   * Enforcing the limit is required in order to avoid data-node
+   * running out of memory.
+   */
+  static final int MAX_XCEIVER_COUNT = 256;
+  int maxXceiverCount = MAX_XCEIVER_COUNT;
+
+  /**
+   * We need an estimate for block size to check if the disk partition has
+   * enough space. For now we set it to be the default block size set
+   * in the server side configuration, which is not ideal because the
+   * default block size should be a client-size configuration. 
+   * A better solution is to include in the header the estimated block size,
+   * i.e. either the actual block size or the default block size.
+   */
+  long estimateBlockSize;
+  
+  
+  DataXceiverServer(ServerSocket ss, Configuration conf, 
+      DataNode datanode) {
+    
+    this.ss = ss;
+    this.datanode = datanode;
+    
+    this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
+        MAX_XCEIVER_COUNT);
+    
+    this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+  }
+
+  /**
+   */
+  public void run() {
+    while (datanode.shouldRun) {
+      try {
+        Socket s = ss.accept();
+        s.setTcpNoDelay(true);
+        new Daemon(datanode.threadGroup, 
+            new DataXceiver(s, datanode, this)).start();
+      } catch (IOException ie) {
+        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+                                + StringUtils.stringifyException(ie));
+      } catch (Throwable te) {
+        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:" 
+                                 + StringUtils.stringifyException(te));
+        datanode.shouldRun = false;
+      }
+    }
+    try {
+      ss.close();
+    } catch (IOException ie) {
+      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
+                              + StringUtils.stringifyException(ie));
+    }
+  }
+  
+  void kill() {
+    assert datanode.shouldRun == false :
+      "shoudRun should be set to false before killing";
+    try {
+      this.ss.close();
+    } catch (IOException ie) {
+      LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): " 
+                              + StringUtils.stringifyException(ie));
+    }
+
+    // close all the sockets that were accepted earlier
+    synchronized (childSockets) {
+      for (Iterator<Socket> it = childSockets.values().iterator();
+           it.hasNext();) {
+        Socket thissock = it.next();
+        try {
+          thissock.close();
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
+}

+ 10 - 9
src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java → src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.hdfs;
+package org.apache.hadoop.hdfs.server.datanode;
 
 
 import java.io.DataInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
@@ -27,24 +27,25 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 import java.util.Random;
 import java.util.Random;
 
 
+import junit.framework.TestCase;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
-
-
-import junit.framework.TestCase;
 /**
 /**
  * This class tests if block replacement request to data nodes work correctly.
  * This class tests if block replacement request to data nodes work correctly.
  */
  */
@@ -60,7 +61,7 @@ public class TestBlockReplacement extends TestCase {
     final long TOTAL_BYTES =6*bandwidthPerSec; 
     final long TOTAL_BYTES =6*bandwidthPerSec; 
     long bytesToSend = TOTAL_BYTES; 
     long bytesToSend = TOTAL_BYTES; 
     long start = Util.now();
     long start = Util.now();
-    DataNode.Throttler throttler = new DataNode.Throttler(bandwidthPerSec);
+    BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
     long totalBytes = 0L;
     long totalBytes = 0L;
     long bytesSent = 1024*512L; // 0.5MB
     long bytesSent = 1024*512L; // 0.5MB
     throttler.throttle(bytesSent);
     throttler.throttle(bytesSent);

Някои файлове не бяха показани, защото твърде много файлове са промени