|
@@ -1092,7 +1092,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
mirrorSock.connect(mirrorTarget, timeoutValue);
|
|
|
mirrorSock.setSoTimeout(timeoutValue);
|
|
|
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
|
- mirrorOut = new DataOutputStream(mirrorSock.getOutputStream());
|
|
|
+ mirrorOut = new DataOutputStream(
|
|
|
+ new BufferedOutputStream(mirrorSock.getOutputStream(),
|
|
|
+ BUFFER_SIZE));
|
|
|
mirrorIn = new DataInputStream(mirrorSock.getInputStream());
|
|
|
|
|
|
// Write header: Copied from DFSClient.java!
|
|
@@ -1918,6 +1920,18 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // this class is a bufferoutputstream that exposes the number of
|
|
|
+ // bytes in the buffer.
|
|
|
+ static private class DFSBufferedOutputStream extends BufferedOutputStream {
|
|
|
+ DFSBufferedOutputStream(OutputStream out, int capacity) {
|
|
|
+ super(out, capacity);
|
|
|
+ }
|
|
|
+
|
|
|
+ int count() {
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/* A class that receives a block and wites to its own disk, meanwhile
|
|
|
* may copies it to another site. If a throttler is provided,
|
|
|
* streaming throttling is also supported.
|
|
@@ -1929,6 +1943,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
private DataChecksum checksum; // from where chunks of a block can be read
|
|
|
private DataOutputStream out = null; // to block file at local disk
|
|
|
private DataOutputStream checksumOut = null; // to crc file at local disk
|
|
|
+ private DFSBufferedOutputStream bufStream = null;
|
|
|
private int bytesPerChecksum;
|
|
|
private int checksumSize;
|
|
|
private byte buf[];
|
|
@@ -1968,8 +1983,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
streams = data.writeToBlock(block, isRecovery);
|
|
|
this.finalized = data.isValidBlock(block);
|
|
|
if (streams != null) {
|
|
|
- this.out = new DataOutputStream(streams.dataOut);
|
|
|
- this.checksumOut = new DataOutputStream(streams.checksumOut);
|
|
|
+ this.bufStream = new DFSBufferedOutputStream(
|
|
|
+ streams.dataOut, BUFFER_SIZE);
|
|
|
+ this.out = new DataOutputStream(bufStream);
|
|
|
+ this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
+ streams.checksumOut, BUFFER_SIZE));
|
|
|
}
|
|
|
} catch(IOException ioe) {
|
|
|
IOUtils.closeStream(this);
|
|
@@ -2290,7 +2308,8 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- if (data.getChannelPosition(block, streams) == offsetInBlock) {
|
|
|
+ if (data.getChannelPosition(block, streams) + bufStream.count() ==
|
|
|
+ offsetInBlock) {
|
|
|
return; // nothing to do
|
|
|
}
|
|
|
if (offsetInBlock % bytesPerChecksum != 0) {
|
|
@@ -2657,5 +2676,4 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|