|
@@ -32,7 +32,6 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
import org.apache.hadoop.dfs.IncorrectVersionException;
|
|
import org.apache.hadoop.dfs.IncorrectVersionException;
|
|
import org.apache.hadoop.mapred.StatusHttpServer;
|
|
import org.apache.hadoop.mapred.StatusHttpServer;
|
|
-import org.apache.hadoop.dfs.Balancer;
|
|
|
|
import org.apache.hadoop.dfs.BlockCommand;
|
|
import org.apache.hadoop.dfs.BlockCommand;
|
|
import org.apache.hadoop.dfs.DatanodeProtocol;
|
|
import org.apache.hadoop.dfs.DatanodeProtocol;
|
|
import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
|
|
import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
|
|
@@ -46,6 +45,7 @@ import java.nio.channels.FileChannel;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
+import java.util.concurrent.Semaphore;
|
|
import java.security.NoSuchAlgorithmException;
|
|
import java.security.NoSuchAlgorithmException;
|
|
import java.security.SecureRandom;
|
|
import java.security.SecureRandom;
|
|
|
|
|
|
@@ -146,45 +146,6 @@ public class DataNode extends Configured
|
|
private static final int MAX_XCEIVER_COUNT = 256;
|
|
private static final int MAX_XCEIVER_COUNT = 256;
|
|
private int maxXceiverCount = MAX_XCEIVER_COUNT;
|
|
private int maxXceiverCount = MAX_XCEIVER_COUNT;
|
|
|
|
|
|
- /** A manager to make sure that cluster balancing does not
|
|
|
|
- * take too much resources.
|
|
|
|
- *
|
|
|
|
- * It limits the number of block moves for balancing and
|
|
|
|
- * the total amount of bandwidth they can use.
|
|
|
|
- */
|
|
|
|
- private static class BlockBalanceThrottler extends Throttler {
|
|
|
|
- private int numThreads;
|
|
|
|
-
|
|
|
|
- /**Constructor
|
|
|
|
- *
|
|
|
|
- * @param bandwidth Total amount of bandwidth can be used for balancing
|
|
|
|
- */
|
|
|
|
- private BlockBalanceThrottler(long bandwidth) {
|
|
|
|
- super(bandwidth);
|
|
|
|
- LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** Check if the block move can start.
|
|
|
|
- *
|
|
|
|
- * Return true if the thread quota is not exceeded and
|
|
|
|
- * the counter is incremented; False otherwise.
|
|
|
|
- */
|
|
|
|
- private synchronized boolean acquire() {
|
|
|
|
- if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
- numThreads++;
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** Mark that the move is completed. The thread counter is decremented. */
|
|
|
|
- private synchronized void release() {
|
|
|
|
- numThreads--;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private BlockBalanceThrottler balanceThrottler;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* We need an estimate for block size to check if the disk partition has
|
|
* We need an estimate for block size to check if the disk partition has
|
|
* enough space. For now we set it to be the default block size set
|
|
* enough space. For now we set it to be the default block size set
|
|
@@ -195,6 +156,12 @@ public class DataNode extends Configured
|
|
*/
|
|
*/
|
|
private long estimateBlockSize;
|
|
private long estimateBlockSize;
|
|
|
|
|
|
|
|
+ // The following three fields are to support balancing
|
|
|
|
+ final static short MAX_BALANCING_THREADS = 5;
|
|
|
|
+ private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
|
|
|
|
+ long balanceBandwidth;
|
|
|
|
+ private Throttler balancingThrottler;
|
|
|
|
+
|
|
// For InterDataNodeProtocol
|
|
// For InterDataNodeProtocol
|
|
Server ipcServer;
|
|
Server ipcServer;
|
|
|
|
|
|
@@ -341,8 +308,9 @@ public class DataNode extends Configured
|
|
DataNode.nameNodeAddr = nameNodeAddr;
|
|
DataNode.nameNodeAddr = nameNodeAddr;
|
|
|
|
|
|
//set up parameter for cluster balancing
|
|
//set up parameter for cluster balancing
|
|
- this.balanceThrottler = new BlockBalanceThrottler(
|
|
|
|
- conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
|
|
|
|
|
|
+ this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
|
|
|
|
+ LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
|
|
|
|
+ this.balancingThrottler = new Throttler(balanceBandwidth);
|
|
|
|
|
|
//initialize periodic block scanner
|
|
//initialize periodic block scanner
|
|
String reason = null;
|
|
String reason = null;
|
|
@@ -916,6 +884,24 @@ public class DataNode extends Configured
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /* utility function for receiving a response */
|
|
|
|
+ private static void receiveResponse(Socket s, int numTargets) throws IOException {
|
|
|
|
+ // check the response
|
|
|
|
+ DataInputStream reply = new DataInputStream(new BufferedInputStream(
|
|
|
|
+ NetUtils.getInputStream(s), BUFFER_SIZE));
|
|
|
|
+ try {
|
|
|
|
+ for (int i = 0; i < numTargets; i++) {
|
|
|
|
+ short opStatus = reply.readShort();
|
|
|
|
+ if(opStatus != OP_STATUS_SUCCESS) {
|
|
|
|
+ throw new IOException("operation failed at "+
|
|
|
|
+ s.getInetAddress());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ IOUtils.closeStream(reply);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/* utility function for sending a respose */
|
|
/* utility function for sending a respose */
|
|
private static void sendResponse(Socket s, short opStatus, long timeout)
|
|
private static void sendResponse(Socket s, short opStatus, long timeout)
|
|
throws IOException {
|
|
throws IOException {
|
|
@@ -959,7 +945,6 @@ public class DataNode extends Configured
|
|
this.ss = ss;
|
|
this.ss = ss;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
*/
|
|
*/
|
|
public void run() {
|
|
public void run() {
|
|
@@ -1375,50 +1360,67 @@ public class DataNode extends Configured
|
|
// Read in the header
|
|
// Read in the header
|
|
long blockId = in.readLong(); // read block id
|
|
long blockId = in.readLong(); // read block id
|
|
Block block = new Block(blockId, 0, in.readLong());
|
|
Block block = new Block(blockId, 0, in.readLong());
|
|
|
|
+ String source = Text.readString(in); // read del hint
|
|
|
|
+ DatanodeInfo target = new DatanodeInfo(); // read target
|
|
|
|
+ target.readFields(in);
|
|
|
|
|
|
- if (!balanceThrottler.acquire()) { // not able to start
|
|
|
|
- LOG.info("Not able to copy block " + blockId + " to "
|
|
|
|
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ Socket targetSock = null;
|
|
|
|
+ short opStatus = OP_STATUS_SUCCESS;
|
|
BlockSender blockSender = null;
|
|
BlockSender blockSender = null;
|
|
- DataOutputStream reply = null;
|
|
|
|
- boolean isOpSuccess = true;
|
|
|
|
-
|
|
|
|
|
|
+ DataOutputStream targetOut = null;
|
|
try {
|
|
try {
|
|
|
|
+ balancingSem.acquireUninterruptibly();
|
|
|
|
+
|
|
// check if the block exists or not
|
|
// check if the block exists or not
|
|
blockSender = new BlockSender(block, 0, -1, false, false, false);
|
|
blockSender = new BlockSender(block, 0, -1, false, false, false);
|
|
|
|
|
|
- // set up response stream
|
|
|
|
- OutputStream baseStream = NetUtils.getOutputStream(
|
|
|
|
- s, socketWriteTimeout);
|
|
|
|
- reply = new DataOutputStream(new BufferedOutputStream(
|
|
|
|
- baseStream, SMALL_BUFFER_SIZE));
|
|
|
|
|
|
+ // get the output stream to the target
|
|
|
|
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
|
|
|
|
+ targetSock = newSocket();
|
|
|
|
+ targetSock.connect(targetAddr, socketTimeout);
|
|
|
|
+ targetSock.setSoTimeout(socketTimeout);
|
|
|
|
|
|
|
|
+ OutputStream baseStream = NetUtils.getOutputStream(targetSock,
|
|
|
|
+ socketWriteTimeout);
|
|
|
|
+ targetOut = new DataOutputStream(
|
|
|
|
+ new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
|
|
|
|
+
|
|
|
|
+ /* send request to the target */
|
|
|
|
+ // fist write header info
|
|
|
|
+ targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
|
|
|
|
+ targetOut.writeByte(OP_REPLACE_BLOCK); // op code
|
|
|
|
+ targetOut.writeLong(block.getBlockId()); // block id
|
|
|
|
+ targetOut.writeLong(block.getGenerationStamp()); // block id
|
|
|
|
+ Text.writeString( targetOut, source); // del hint
|
|
|
|
|
|
- // send block content to the target
|
|
|
|
- long read = blockSender.sendBlock(reply, baseStream,
|
|
|
|
- balanceThrottler);
|
|
|
|
|
|
+ // then send data
|
|
|
|
+ long read = blockSender.sendBlock(targetOut, baseStream,
|
|
|
|
+ balancingThrottler);
|
|
|
|
|
|
myMetrics.bytesRead.inc((int) read);
|
|
myMetrics.bytesRead.inc((int) read);
|
|
myMetrics.blocksRead.inc();
|
|
myMetrics.blocksRead.inc();
|
|
|
|
|
|
- LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
|
|
|
|
|
|
+ // check the response from target
|
|
|
|
+ receiveResponse(targetSock, 1);
|
|
|
|
+
|
|
|
|
+ LOG.info("Copied block " + block + " to " + targetAddr);
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
- isOpSuccess = false;
|
|
|
|
|
|
+ opStatus = OP_STATUS_ERROR;
|
|
|
|
+ LOG.warn("Got exception while serving " + block + " to "
|
|
|
|
+ + target.getName() + ": " + StringUtils.stringifyException(ioe));
|
|
throw ioe;
|
|
throw ioe;
|
|
} finally {
|
|
} finally {
|
|
- balanceThrottler.release();
|
|
|
|
- if (isOpSuccess) {
|
|
|
|
- try {
|
|
|
|
- // send one last byte to indicate that the resource is cleaned.
|
|
|
|
- reply.writeChar('d');
|
|
|
|
- } catch (IOException ignored) {
|
|
|
|
- }
|
|
|
|
|
|
+ /* send response to the requester */
|
|
|
|
+ try {
|
|
|
|
+ sendResponse(s, opStatus, socketWriteTimeout);
|
|
|
|
+ } catch (IOException replyE) {
|
|
|
|
+ LOG.warn("Error writing the response back to "+
|
|
|
|
+ s.getRemoteSocketAddress() + "\n" +
|
|
|
|
+ StringUtils.stringifyException(replyE) );
|
|
}
|
|
}
|
|
- IOUtils.closeStream(reply);
|
|
|
|
|
|
+ IOUtils.closeStream(targetOut);
|
|
IOUtils.closeStream(blockSender);
|
|
IOUtils.closeStream(blockSender);
|
|
|
|
+ balancingSem.release();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1431,59 +1433,21 @@ public class DataNode extends Configured
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private void replaceBlock(DataInputStream in) throws IOException {
|
|
private void replaceBlock(DataInputStream in) throws IOException {
|
|
- /* read header */
|
|
|
|
- long blockId = in.readLong();
|
|
|
|
- Block block = new Block(blockId, estimateBlockSize,
|
|
|
|
- in.readLong()); // block id & generation stamp
|
|
|
|
- String sourceID = Text.readString(in); // read del hint
|
|
|
|
- DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
|
|
|
|
- proxySource.readFields(in);
|
|
|
|
-
|
|
|
|
- if (!balanceThrottler.acquire()) { // not able to start
|
|
|
|
- LOG.warn("Not able to receive block " + blockId + " from "
|
|
|
|
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
|
- sendResponse(s, (short)OP_STATUS_ERROR,
|
|
|
|
- socketWriteTimeout);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Socket proxySock = null;
|
|
|
|
- DataOutputStream proxyOut = null;
|
|
|
|
-
|
|
|
|
- short opStatus = OP_STATUS_SUCCESS;
|
|
|
|
- BlockReceiver blockReceiver = null;
|
|
|
|
- DataInputStream proxyReply = null;
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- // get the output stream to the proxy
|
|
|
|
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
|
|
|
|
- proxySource.getName());
|
|
|
|
- proxySock = newSocket();
|
|
|
|
- proxySock.connect(proxyAddr, socketTimeout);
|
|
|
|
- proxySock.setSoTimeout(socketTimeout);
|
|
|
|
-
|
|
|
|
- OutputStream baseStream = NetUtils.getOutputStream(proxySock,
|
|
|
|
- socketWriteTimeout);
|
|
|
|
- proxyOut = new DataOutputStream(
|
|
|
|
- new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
|
|
|
|
-
|
|
|
|
- /* send request to the proxy */
|
|
|
|
- proxyOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
|
|
|
|
- proxyOut.writeByte(OP_COPY_BLOCK); // op code
|
|
|
|
- proxyOut.writeLong(block.getBlockId()); // block id
|
|
|
|
- proxyOut.writeLong(block.getGenerationStamp()); // block id
|
|
|
|
- proxyOut.flush();
|
|
|
|
-
|
|
|
|
- // receive the response from the proxy
|
|
|
|
- proxyReply = new DataInputStream(new BufferedInputStream(
|
|
|
|
- NetUtils.getInputStream(proxySock), BUFFER_SIZE));
|
|
|
|
|
|
+ balancingSem.acquireUninterruptibly();
|
|
|
|
+
|
|
|
|
+ /* read header */
|
|
|
|
+ Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
|
|
|
|
+ String sourceID = Text.readString(in);
|
|
|
|
+
|
|
|
|
+ short opStatus = OP_STATUS_SUCCESS;
|
|
|
|
+ BlockReceiver blockReceiver = null;
|
|
|
|
+ try {
|
|
// open a block receiver and check if the block does not exist
|
|
// open a block receiver and check if the block does not exist
|
|
- blockReceiver = new BlockReceiver(
|
|
|
|
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
|
|
|
- false, "", null);
|
|
|
|
|
|
+ blockReceiver = new BlockReceiver(
|
|
|
|
+ block, in, s.getRemoteSocketAddress().toString(), false, "", null);
|
|
|
|
|
|
// receive a block
|
|
// receive a block
|
|
- blockReceiver.receiveBlock(null, null, null, null, balanceThrottler, -1);
|
|
|
|
|
|
+ blockReceiver.receiveBlock(null, null, null, null, balancingThrottler, -1);
|
|
|
|
|
|
// notify name node
|
|
// notify name node
|
|
notifyNamenodeReceivedBlock(block, sourceID);
|
|
notifyNamenodeReceivedBlock(block, sourceID);
|
|
@@ -1494,26 +1458,14 @@ public class DataNode extends Configured
|
|
opStatus = OP_STATUS_ERROR;
|
|
opStatus = OP_STATUS_ERROR;
|
|
throw ioe;
|
|
throw ioe;
|
|
} finally {
|
|
} finally {
|
|
- // receive the last byte that indicates the proxy released its thread resource
|
|
|
|
- if (opStatus == OP_STATUS_SUCCESS) {
|
|
|
|
- try {
|
|
|
|
- proxyReply.readChar();
|
|
|
|
- } catch (IOException ignored) {
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // now release the thread resource
|
|
|
|
- balanceThrottler.release();
|
|
|
|
-
|
|
|
|
// send response back
|
|
// send response back
|
|
try {
|
|
try {
|
|
sendResponse(s, opStatus, socketWriteTimeout);
|
|
sendResponse(s, opStatus, socketWriteTimeout);
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
|
}
|
|
}
|
|
- IOUtils.closeStream(proxyOut);
|
|
|
|
IOUtils.closeStream(blockReceiver);
|
|
IOUtils.closeStream(blockReceiver);
|
|
- IOUtils.closeStream(proxyReply);
|
|
|
|
|
|
+ balancingSem.release();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|