|
@@ -76,13 +76,6 @@ import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
|
public class DataNode implements FSConstants, Runnable {
|
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
|
|
|
|
|
|
- /**
|
|
|
- * A buffer size small enough that read/writes while reading headers
|
|
|
- * don't result in multiple io calls but reading larger amount of data
|
|
|
- * like one checksum size does not result in extra copy.
|
|
|
- */
|
|
|
- public static final int SMALL_HDR_BUFFER_SIZE = 64;
|
|
|
-
|
|
|
/**
|
|
|
* Util method to build socket addr from either:
|
|
|
* <host>:<post>
|
|
@@ -718,7 +711,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
public void run() {
|
|
|
try {
|
|
|
DataInputStream in = new DataInputStream(
|
|
|
- new BufferedInputStream(s.getInputStream(), SMALL_HDR_BUFFER_SIZE));
|
|
|
+ new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
|
|
|
short version = in.readShort();
|
|
|
if ( version != DATA_TRANFER_VERSION ) {
|
|
|
throw new IOException( "Version Mismatch" );
|
|
@@ -827,8 +820,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// Open local disk out
|
|
|
//
|
|
|
FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
|
|
|
- out = new DataOutputStream(streams.dataOut);
|
|
|
- checksumOut = new DataOutputStream(streams.checksumOut);
|
|
|
+ out = new DataOutputStream(
|
|
|
+ new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));
|
|
|
+ checksumOut = new DataOutputStream(
|
|
|
+ new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
|
|
|
|
|
|
InetSocketAddress mirrorTarget = null;
|
|
|
String mirrorNode = null;
|
|
@@ -846,7 +841,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
mirrorSock.setSoTimeout(READ_TIMEOUT);
|
|
|
mirrorOut = new DataOutputStream(
|
|
|
new BufferedOutputStream(mirrorSock.getOutputStream(),
|
|
|
- SMALL_HDR_BUFFER_SIZE));
|
|
|
+ BUFFER_SIZE));
|
|
|
mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
|
|
|
//Copied from DFSClient.java!
|
|
|
mirrorOut.writeShort( DATA_TRANFER_VERSION );
|
|
@@ -918,6 +913,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
try {
|
|
|
mirrorOut.writeInt( len );
|
|
|
mirrorOut.write( buf, 0, len + checksumSize );
|
|
|
+ if (len == 0) {
|
|
|
+ mirrorOut.flush();
|
|
|
+ }
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info( "Exception writing to mirror " + mirrorNode +
|
|
|
"\n" + StringUtils.stringifyException(ioe) );
|
|
@@ -1092,15 +1090,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
long sendBlock(Socket sock, Block block,
|
|
|
long startOffset, long length, DatanodeInfo targets[] )
|
|
|
throws IOException {
|
|
|
- // May be we should just use io.file.buffer.size.
|
|
|
DataOutputStream out = new DataOutputStream(
|
|
|
new BufferedOutputStream(sock.getOutputStream(),
|
|
|
- SMALL_HDR_BUFFER_SIZE));
|
|
|
- DataInputStream in = null;
|
|
|
+ BUFFER_SIZE));
|
|
|
+ RandomAccessFile blockInFile = null;
|
|
|
+ DataInputStream blockIn = null;
|
|
|
DataInputStream checksumIn = null;
|
|
|
long totalRead = 0;
|
|
|
|
|
|
-
|
|
|
/* XXX This will affect inter datanode transfers during
|
|
|
* a CRC upgrade. There should not be any replication
|
|
|
* during crc upgrade since we are in safe mode, right?
|
|
@@ -1109,13 +1106,15 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
try {
|
|
|
File blockFile = data.getBlockFile( block );
|
|
|
- in = new DataInputStream( new FileInputStream( blockFile ) );
|
|
|
+ blockInFile = new RandomAccessFile(blockFile, "r");
|
|
|
|
|
|
File checksumFile = FSDataset.getMetaFile( blockFile );
|
|
|
DataChecksum checksum = null;
|
|
|
|
|
|
if ( !corruptChecksumOk || checksumFile.exists() ) {
|
|
|
- checksumIn = new DataInputStream( new FileInputStream(checksumFile) );
|
|
|
+ checksumIn = new DataInputStream(
|
|
|
+ new BufferedInputStream(new FileInputStream(checksumFile),
|
|
|
+ BUFFER_SIZE));
|
|
|
|
|
|
//read and handle the common header here. For now just a version
|
|
|
short version = checksumIn.readShort();
|
|
@@ -1169,17 +1168,17 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// seek to the right offsets
|
|
|
if ( offset > 0 ) {
|
|
|
long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
|
|
|
- /* XXX skip() could be very inefficent. Should be seek().
|
|
|
- * at least skipFully
|
|
|
- */
|
|
|
- if ( in.skip( offset ) != offset ||
|
|
|
- ( checksumSkip > 0 &&
|
|
|
- checksumIn.skip( checksumSkip ) != checksumSkip ) ) {
|
|
|
- throw new IOException( "Could not seek to right position while " +
|
|
|
- "reading for " + block );
|
|
|
+ blockInFile.seek(offset);
|
|
|
+ if (checksumSkip > 0) {
|
|
|
+ //Should we use seek() for checksum file as well?
|
|
|
+ FileUtil.skipFully(checksumIn, checksumSkip);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ blockIn = new DataInputStream(new BufferedInputStream(
|
|
|
+ new FileInputStream(blockInFile.getFD()),
|
|
|
+ BUFFER_SIZE));
|
|
|
+
|
|
|
if ( targets != null ) {
|
|
|
//
|
|
|
// Header info
|
|
@@ -1205,7 +1204,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// Write one data chunk per loop.
|
|
|
int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
|
|
|
if ( len > 0 ) {
|
|
|
- in.readFully( buf, 0, len );
|
|
|
+ blockIn.readFully( buf, 0, len );
|
|
|
totalRead += len;
|
|
|
|
|
|
if ( checksumSize > 0 && checksumIn != null ) {
|
|
@@ -1239,8 +1238,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
offset += len;
|
|
|
}
|
|
|
} finally {
|
|
|
+ FileUtil.closeStream( blockInFile );
|
|
|
FileUtil.closeStream( checksumIn );
|
|
|
- FileUtil.closeStream( in );
|
|
|
+ FileUtil.closeStream( blockIn );
|
|
|
FileUtil.closeStream( out );
|
|
|
}
|
|
|
|