|
@@ -27,8 +27,6 @@ import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.net.DNS;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
-import org.apache.hadoop.net.SocketInputStream;
|
|
|
-import org.apache.hadoop.net.SocketOutputStream;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
@@ -121,6 +119,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
private static String dnThreadName;
|
|
|
int defaultBytesPerChecksum = 512;
|
|
|
private int socketTimeout;
|
|
|
+ private int socketWriteTimeout = 0;
|
|
|
|
|
|
private DataBlockScanner blockScanner;
|
|
|
private Daemon blockScannerThread;
|
|
@@ -207,6 +206,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
|
this.socketTimeout = conf.getInt("dfs.socket.timeout",
|
|
|
FSConstants.READ_TIMEOUT);
|
|
|
+ this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
|
|
|
+ FSConstants.WRITE_TIMEOUT);
|
|
|
+
|
|
|
String address =
|
|
|
NetUtils.getServerAddress(conf,
|
|
|
"dfs.datanode.bindAddress",
|
|
@@ -257,7 +259,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
|
|
|
// find free port
|
|
|
- ServerSocket ss = ServerSocketChannel.open().socket();
|
|
|
+ ServerSocket ss = (socketWriteTimeout > 0) ?
|
|
|
+ ServerSocketChannel.open().socket() : new ServerSocket();
|
|
|
Server.bind(ss, socAddr, 0);
|
|
|
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
// adjust machine name with the actual port
|
|
@@ -333,6 +336,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Creates either NIO or regular depending on socketWriteTimeout.
|
|
|
+ */
|
|
|
+ private Socket newSocket() throws IOException {
|
|
|
+ return (socketWriteTimeout > 0) ?
|
|
|
+ SocketChannel.open().socket() : new Socket();
|
|
|
+ }
|
|
|
+
|
|
|
private NamespaceInfo handshake() throws IOException {
|
|
|
NamespaceInfo nsInfo = new NamespaceInfo();
|
|
|
while (shouldRun) {
|
|
@@ -828,7 +839,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
private static void receiveResponse(Socket s, int numTargets) throws IOException {
|
|
|
// check the response
|
|
|
DataInputStream reply = new DataInputStream(new BufferedInputStream(
|
|
|
- new SocketInputStream(s), BUFFER_SIZE));
|
|
|
+ NetUtils.getInputStream(s), BUFFER_SIZE));
|
|
|
try {
|
|
|
for (int i = 0; i < numTargets; i++) {
|
|
|
short opStatus = reply.readShort();
|
|
@@ -843,9 +854,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
|
|
|
/* utility function for sending a respose */
|
|
|
- private static void sendResponse(Socket s, short opStatus) throws IOException {
|
|
|
+ private static void sendResponse(Socket s, short opStatus, long timeout)
|
|
|
+ throws IOException {
|
|
|
DataOutputStream reply =
|
|
|
- new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
|
|
|
+ new DataOutputStream(NetUtils.getOutputStream(s, timeout));
|
|
|
try {
|
|
|
reply.writeShort(opStatus);
|
|
|
reply.flush();
|
|
@@ -943,7 +955,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
DataInputStream in=null;
|
|
|
try {
|
|
|
in = new DataInputStream(
|
|
|
- new BufferedInputStream(new SocketInputStream(s), BUFFER_SIZE));
|
|
|
+ new BufferedInputStream(NetUtils.getInputStream(s), BUFFER_SIZE));
|
|
|
short version = in.readShort();
|
|
|
if ( version != DATA_TRANSFER_VERSION ) {
|
|
|
throw new IOException( "Version Mismatch" );
|
|
@@ -1010,9 +1022,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
long length = in.readLong();
|
|
|
|
|
|
// send the block
|
|
|
- DataOutputStream out = new DataOutputStream(
|
|
|
- new BufferedOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT),
|
|
|
- SMALL_BUFFER_SIZE));
|
|
|
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ NetUtils.getOutputStream(s, socketWriteTimeout), SMALL_BUFFER_SIZE));
|
|
|
+
|
|
|
BlockSender blockSender = null;
|
|
|
try {
|
|
|
try {
|
|
@@ -1103,7 +1115,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
// get a connection back to the previous target
|
|
|
replyOut = new DataOutputStream(
|
|
|
- new SocketOutputStream(s, WRITE_TIMEOUT));
|
|
|
+ NetUtils.getOutputStream(s, socketWriteTimeout));
|
|
|
|
|
|
//
|
|
|
// Open network conn to backup machine, if
|
|
@@ -1114,19 +1126,19 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// Connect to backup machine
|
|
|
mirrorNode = targets[0].getName();
|
|
|
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
|
|
|
- mirrorSock = SocketChannel.open().socket();
|
|
|
+ mirrorSock = newSocket();
|
|
|
try {
|
|
|
int timeoutValue = numTargets * socketTimeout;
|
|
|
- int writeTimeout = WRITE_TIMEOUT +
|
|
|
+ int writeTimeout = socketWriteTimeout +
|
|
|
(WRITE_TIMEOUT_EXTENSION * numTargets);
|
|
|
mirrorSock.connect(mirrorTarget, timeoutValue);
|
|
|
mirrorSock.setSoTimeout(timeoutValue);
|
|
|
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
mirrorOut = new DataOutputStream(
|
|
|
new BufferedOutputStream(
|
|
|
- new SocketOutputStream(mirrorSock, writeTimeout),
|
|
|
+ NetUtils.getOutputStream(mirrorSock, writeTimeout),
|
|
|
BUFFER_SIZE));
|
|
|
- mirrorIn = new DataInputStream(new SocketInputStream(mirrorSock));
|
|
|
+ mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
|
|
|
|
|
|
// Write header: Copied from DFSClient.java!
|
|
|
mirrorOut.writeShort( DATA_TRANSFER_VERSION );
|
|
@@ -1247,7 +1259,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
byte [] buf = new byte[(int)fileSize];
|
|
|
IOUtils.readFully(checksumIn, buf, 0, buf.length);
|
|
|
|
|
|
- out = new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
|
|
|
+ out = new DataOutputStream(
|
|
|
+ NetUtils.getOutputStream(s, socketWriteTimeout));
|
|
|
|
|
|
out.writeByte(OP_STATUS_SUCCESS);
|
|
|
out.writeInt(buf.length);
|
|
@@ -1289,13 +1302,13 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
// get the output stream to the target
|
|
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
|
|
|
- targetSock = SocketChannel.open().socket();
|
|
|
+ targetSock = newSocket();
|
|
|
targetSock.connect(targetAddr, socketTimeout);
|
|
|
targetSock.setSoTimeout(socketTimeout);
|
|
|
|
|
|
targetOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
- new SocketOutputStream(targetSock, WRITE_TIMEOUT),
|
|
|
- SMALL_BUFFER_SIZE));
|
|
|
+ NetUtils.getOutputStream(targetSock, socketWriteTimeout),
|
|
|
+ SMALL_BUFFER_SIZE));
|
|
|
|
|
|
/* send request to the target */
|
|
|
// fist write header info
|
|
@@ -1322,7 +1335,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
} finally {
|
|
|
/* send response to the requester */
|
|
|
try {
|
|
|
- sendResponse(s, opStatus);
|
|
|
+ sendResponse(s, opStatus, socketWriteTimeout);
|
|
|
} catch (IOException replyE) {
|
|
|
LOG.warn("Error writing the response back to "+
|
|
|
s.getRemoteSocketAddress() + "\n" +
|
|
@@ -1370,7 +1383,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
} finally {
|
|
|
// send response back
|
|
|
try {
|
|
|
- sendResponse(s, opStatus);
|
|
|
+ sendResponse(s, opStatus, socketWriteTimeout);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
|
|
}
|
|
@@ -2587,14 +2600,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
try {
|
|
|
InetSocketAddress curTarget =
|
|
|
NetUtils.createSocketAddr(targets[0].getName());
|
|
|
- sock = SocketChannel.open().socket();
|
|
|
+ sock = newSocket();
|
|
|
sock.connect(curTarget, socketTimeout);
|
|
|
sock.setSoTimeout(targets.length * socketTimeout);
|
|
|
|
|
|
- long writeTimeout = WRITE_TIMEOUT +
|
|
|
+ long writeTimeout = socketWriteTimeout +
|
|
|
WRITE_TIMEOUT_EXTENSION * (targets.length-1);
|
|
|
out = new DataOutputStream(new BufferedOutputStream(
|
|
|
- new SocketOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
|
|
|
+ NetUtils.getOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
|
|
|
|
|
|
blockSender = new BlockSender(b, 0, -1, false, false, false);
|
|
|
|