|
@@ -21,6 +21,7 @@ import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
@@ -36,8 +37,10 @@ import org.apache.hadoop.dfs.DatanodeProtocol;
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
import java.security.SecureRandom;
|
|
|
+
|
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
import org.apache.hadoop.metrics.Updater;
|
|
@@ -109,7 +112,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
DatanodeRegistration dnRegistration = null;
|
|
|
private String networkLoc;
|
|
|
volatile boolean shouldRun = true;
|
|
|
- LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
+ private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
+ private LinkedList<String> delHints = new LinkedList<String>();
|
|
|
+ final private static String EMPTY_DEL_HINT = "";
|
|
|
int xmitsInProgress = 0;
|
|
|
Daemon dataXceiveServer = null;
|
|
|
long blockReportInterval;
|
|
@@ -124,6 +129,13 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
private Thread dataNodeThread = null;
|
|
|
String machineName;
|
|
|
int defaultBytesPerChecksum = 512;
|
|
|
+
|
|
|
+ // The following three fields are to support balancing
|
|
|
+ final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s
|
|
|
+ final private static short MAX_BALANCING_THREADS = 5;
|
|
|
+ private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
|
|
|
+ private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH);
|
|
|
+
|
|
|
private static class DataNodeMetrics implements Updater {
|
|
|
private final MetricsRecord metricsRecord;
|
|
|
private int bytesWritten = 0;
|
|
@@ -531,19 +543,28 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
// check if there are newly received blocks
|
|
|
Block [] blockArray=null;
|
|
|
+ String [] delHintArray=null;
|
|
|
synchronized(receivedBlockList) {
|
|
|
- if (receivedBlockList.size() > 0) {
|
|
|
- //
|
|
|
- // Send newly-received blockids to namenode
|
|
|
- //
|
|
|
- blockArray = receivedBlockList.toArray(new Block[receivedBlockList.size()]);
|
|
|
+ synchronized(delHints) {
|
|
|
+ int numBlocks = receivedBlockList.size();
|
|
|
+ if (receivedBlockList.size() > 0) {
|
|
|
+ assert(numBlocks==delHints.size());
|
|
|
+ //
|
|
|
+ // Send newly-received blockids to namenode
|
|
|
+ //
|
|
|
+ blockArray = receivedBlockList.toArray(new Block[numBlocks]);
|
|
|
+ delHintArray = delHints.toArray(new String[numBlocks]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (blockArray != null) {
|
|
|
- namenode.blockReceived(dnRegistration, blockArray);
|
|
|
+ namenode.blockReceived(dnRegistration, blockArray, delHintArray);
|
|
|
synchronized (receivedBlockList) {
|
|
|
- for(Block b: blockArray) {
|
|
|
- receivedBlockList.remove(b);
|
|
|
+ synchronized (delHints) {
|
|
|
+ for(int i=0; i<blockArray.length; i++) {
|
|
|
+ receivedBlockList.remove(blockArray[i]);
|
|
|
+ delHints.remove(delHintArray[i]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -725,15 +746,16 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* till namenode is informed before responding with success to the
|
|
|
* client? For now we don't.
|
|
|
*/
|
|
|
- private void notifyNamenodeReceivedBlock(Block block) {
|
|
|
+ private void notifyNamenodeReceivedBlock(Block block, String delHint) {
|
|
|
synchronized (receivedBlockList) {
|
|
|
- receivedBlockList.add(block);
|
|
|
- receivedBlockList.notifyAll();
|
|
|
+ synchronized (delHints) {
|
|
|
+ receivedBlockList.add(block);
|
|
|
+ delHints.add(delHint);
|
|
|
+ receivedBlockList.notifyAll();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
* Server used for receiving/sending a block of data.
|
|
|
* This is created to listen for requests from clients or
|
|
@@ -752,8 +774,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
try {
|
|
|
while (shouldRun) {
|
|
|
Socket s = ss.accept();
|
|
|
- //s.setSoTimeout(READ_TIMEOUT);
|
|
|
- xceiverCount.incr();
|
|
|
new Daemon(new DataXceiver(s)).start();
|
|
|
}
|
|
|
ss.close();
|
|
@@ -806,13 +826,18 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
case OP_READ_METADATA:
|
|
|
readMetadata( in );
|
|
|
break;
|
|
|
+ case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
|
+ replaceBlock(in);
|
|
|
+ break;
|
|
|
+ case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
|
|
|
+ copyBlock(in);
|
|
|
+ break;
|
|
|
default:
|
|
|
throw new IOException("Unknown opcode " + op + "in data stream");
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
|
|
|
} finally {
|
|
|
- xceiverCount.decr();
|
|
|
LOG.debug("Number of active connections is: "+xceiverCount);
|
|
|
IOUtils.closeStream(in);
|
|
|
IOUtils.closeSocket(s);
|
|
@@ -825,6 +850,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void readBlock(DataInputStream in) throws IOException {
|
|
|
+ xceiverCount.incr();
|
|
|
//
|
|
|
// Read in the header
|
|
|
//
|
|
@@ -864,6 +890,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
StringUtils.stringifyException(ioe) );
|
|
|
throw ioe;
|
|
|
} finally {
|
|
|
+ xceiverCount.decr();
|
|
|
IOUtils.closeStream(out);
|
|
|
IOUtils.closeStream(blockSender);
|
|
|
}
|
|
@@ -876,6 +903,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void writeBlock(DataInputStream in) throws IOException {
|
|
|
+ xceiverCount.incr();
|
|
|
//
|
|
|
// Read in the header
|
|
|
//
|
|
@@ -935,10 +963,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
// receive the block and mirror to the next target
|
|
|
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
|
|
+
|
|
|
blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
|
|
|
|
|
|
// notify name node
|
|
|
- notifyNamenodeReceivedBlock(block);
|
|
|
+ notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
|
|
|
|
|
|
String msg = "Received block " + block + " from " +
|
|
|
s.getInetAddress();
|
|
@@ -971,6 +1000,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
IOUtils.closeStream(mirrorOut);
|
|
|
IOUtils.closeSocket(mirrorSock);
|
|
|
IOUtils.closeStream(blockReceiver);
|
|
|
+ // decrement counter
|
|
|
+ xceiverCount.decr();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -979,7 +1010,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
* @param in
|
|
|
*/
|
|
|
void readMetadata(DataInputStream in) throws IOException {
|
|
|
-
|
|
|
+ xceiverCount.incr();
|
|
|
+
|
|
|
Block block = new Block( in.readLong(), 0 );
|
|
|
InputStream checksumIn = null;
|
|
|
DataOutputStream out = null;
|
|
@@ -1007,21 +1039,190 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
//last DATA_CHUNK
|
|
|
out.writeInt(0);
|
|
|
} finally {
|
|
|
+ xceiverCount.decr();
|
|
|
IOUtils.closeStream(checksumIn);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read a block from the disk and then sends it to a destination
|
|
|
+ *
|
|
|
+ * @param in
|
|
|
+ * The stream to read from
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void copyBlock(DataInputStream in) throws IOException {
|
|
|
+ // Read in the header
|
|
|
+ long blockId = in.readLong(); // read block id
|
|
|
+ Block block = new Block(blockId, 0);
|
|
|
+ String source = Text.readString(in); // read del hint
|
|
|
+ DatanodeInfo target = new DatanodeInfo(); // read target
|
|
|
+ target.readFields(in);
|
|
|
+
|
|
|
+ Socket targetSock = null;
|
|
|
+ short opStatus = OP_STATUS_SUCCESS;
|
|
|
+ BlockSender blockSender = null;
|
|
|
+ DataOutputStream targetOut = null;
|
|
|
+ try {
|
|
|
+ balancingSem.acquireUninterruptibly();
|
|
|
+
|
|
|
+ // check if the block exists or not
|
|
|
+ blockSender = new BlockSender(block, 0, -1, false, false);
|
|
|
+
|
|
|
+ // get the output stream to the target
|
|
|
+ InetSocketAddress targetAddr = createSocketAddr(target.getName());
|
|
|
+ targetSock = new Socket();
|
|
|
+ targetSock.connect(targetAddr, READ_TIMEOUT);
|
|
|
+ targetSock.setSoTimeout(READ_TIMEOUT);
|
|
|
+
|
|
|
+ targetOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ targetSock.getOutputStream(), BUFFER_SIZE));
|
|
|
+
|
|
|
+ /* send request to the target */
|
|
|
+ // fist write header info
|
|
|
+ targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version
|
|
|
+ targetOut.writeByte(OP_REPLACE_BLOCK); // op code
|
|
|
+ targetOut.writeLong(block.getBlockId()); // block id
|
|
|
+ Text.writeString( targetOut, source); // del hint
|
|
|
+
|
|
|
+ // then send data
|
|
|
+ long read = blockSender.sendBlock(targetOut, balancingThrottler);
|
|
|
+
|
|
|
+ myMetrics.readBytes((int) read);
|
|
|
+ myMetrics.readBlocks(1);
|
|
|
+
|
|
|
+ // check the response from target
|
|
|
+ receiveResponse(targetSock);
|
|
|
+
|
|
|
+ LOG.info("Copied block " + block + " to " + targetAddr);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ opStatus = OP_STATUS_ERROR;
|
|
|
+ LOG.warn("Got exception while serving " + block + " to "
|
|
|
+ + target.getName() + ": " + StringUtils.stringifyException(ioe));
|
|
|
+ throw ioe;
|
|
|
+ } finally {
|
|
|
+ /* send response to the requester */
|
|
|
+ try {
|
|
|
+ sendResponse(s, opStatus);
|
|
|
+ } catch (IOException replyE) {
|
|
|
+ LOG.warn("Error writing the response back to "+
|
|
|
+ s.getRemoteSocketAddress() + "\n" +
|
|
|
+ StringUtils.stringifyException(replyE) );
|
|
|
+ }
|
|
|
+ IOUtils.closeStream(targetOut);
|
|
|
+ IOUtils.closeStream(blockSender);
|
|
|
+ balancingSem.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Receive a block and write it to disk, it then notifies the namenode to
|
|
|
+ * remove the copy from the source
|
|
|
+ *
|
|
|
+ * @param in
|
|
|
+ * The stream to read from
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void replaceBlock(DataInputStream in) throws IOException {
|
|
|
+ balancingSem.acquireUninterruptibly();
|
|
|
+
|
|
|
+ /* read header */
|
|
|
+ Block block = new Block(in.readLong(), 0); // block id & len
|
|
|
+ String sourceID = Text.readString(in);
|
|
|
+
|
|
|
+ short opStatus = OP_STATUS_SUCCESS;
|
|
|
+ BlockReceiver blockReceiver = null;
|
|
|
+ try {
|
|
|
+ // open a block receiver and check if the block does not exist
|
|
|
+ blockReceiver = new BlockReceiver(
|
|
|
+ block, in, s.getRemoteSocketAddress().toString());
|
|
|
+
|
|
|
+ // receive a block
|
|
|
+ blockReceiver.receiveBlock(null, null, balancingThrottler);
|
|
|
+
|
|
|
+ // notify name node
|
|
|
+ notifyNamenodeReceivedBlock(block, sourceID);
|
|
|
+
|
|
|
+ LOG.info("Received block " + block +
|
|
|
+ " from " + s.getRemoteSocketAddress());
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ opStatus = OP_STATUS_ERROR;
|
|
|
+ throw ioe;
|
|
|
+ } finally {
|
|
|
+ // send response back
|
|
|
+ try {
|
|
|
+ sendResponse(s, opStatus);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
|
|
+ }
|
|
|
+ IOUtils.closeStream(blockReceiver);
|
|
|
+ balancingSem.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /* An interface to throttle the block transfers */
|
|
|
- private interface Throttler {
|
|
|
+ /** a class to throttle the block transfers
|
|
|
+ * This class is thread safe. It can be shared by multiple threads.
|
|
|
+ * The parameter bandwidthPerSec specifies the total bandwidth shared by threads.
|
|
|
+ */
|
|
|
+ static class Throttler {
|
|
|
+ private long period; // period over which bw is imposed
|
|
|
+ private long periodExtension; // Max period over which bw accumulates.
|
|
|
+ private long bytesPerPeriod; // total number of bytes can be sent in each period
|
|
|
+ private long curPeriodStart; // current period starting time
|
|
|
+ private long curReserve; // remaining bytes can be sent in the period
|
|
|
+ private long bytesAlreadyUsed;
|
|
|
+
|
|
|
+ /** Constructor */
|
|
|
+ Throttler(long bandwidthPerSec) {
|
|
|
+ this(1000, bandwidthPerSec); // by default throttling period is 1s
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Constructor */
|
|
|
+ Throttler(long period, long bandwidthPerSec) {
|
|
|
+ this.curPeriodStart = System.currentTimeMillis();
|
|
|
+ this.period = period;
|
|
|
+ this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
|
|
|
+ this.periodExtension = period*3;
|
|
|
+ }
|
|
|
+
|
|
|
/** Given the numOfBytes sent/received since last time throttle was called,
|
|
|
- * make the current thread sleep if I/O rate is too fast
|
|
|
+ * make the current thread sleep if I/O rate is too fast
|
|
|
* compared to the given bandwidth
|
|
|
- *
|
|
|
- * @param numOfBytes
|
|
|
+ *
|
|
|
+ * @param numOfBytes
|
|
|
* number of bytes sent/received since last time throttle was called
|
|
|
- */
|
|
|
- void throttle(int numOfBytes);
|
|
|
+ */
|
|
|
+ public synchronized void throttle(long numOfBytes) {
|
|
|
+ if ( numOfBytes <= 0 ) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ curReserve -= numOfBytes;
|
|
|
+ bytesAlreadyUsed += numOfBytes;
|
|
|
+
|
|
|
+ while (curReserve <= 0) {
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ long curPeriodEnd = curPeriodStart + period;
|
|
|
+
|
|
|
+ if ( now < curPeriodEnd ) {
|
|
|
+ // Wait for next period so that curReserve can be increased.
|
|
|
+ try {
|
|
|
+ wait( curPeriodEnd - now );
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
+ } else if ( now < (curPeriodStart + periodExtension)) {
|
|
|
+ curPeriodStart = curPeriodEnd;
|
|
|
+ curReserve += bytesPerPeriod;
|
|
|
+ } else {
|
|
|
+ // discard the prev period. Throttler might not have
|
|
|
+ // been used for a long time.
|
|
|
+ curPeriodStart = now;
|
|
|
+ curReserve = bytesPerPeriod - bytesAlreadyUsed;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ bytesAlreadyUsed -= numOfBytes;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private class BlockSender implements java.io.Closeable {
|
|
@@ -1175,7 +1376,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
out.write(buf, 0, len + checksumSize);
|
|
|
|
|
|
if (throttler != null) { // rebalancing so throttle
|
|
|
- throttler.throttle(len + checksumSize);
|
|
|
+ throttler.throttle(len + checksumSize + 4);
|
|
|
}
|
|
|
|
|
|
return len;
|
|
@@ -1354,7 +1555,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
|
|
|
if (throttler != null) { // throttle I/O
|
|
|
- throttler.throttle(len + checksumSize);
|
|
|
+ throttler.throttle(len + checksumSize + 4);
|
|
|
}
|
|
|
}
|
|
|
|