|
@@ -319,9 +319,6 @@ class BlockReceiver implements Closeable {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
void flushOrSync(boolean isSync) throws IOException {
|
|
void flushOrSync(boolean isSync) throws IOException {
|
|
- if (isSync && (out != null || checksumOut != null)) {
|
|
|
|
- datanode.metrics.incrFsyncCount();
|
|
|
|
- }
|
|
|
|
long flushTotalNanos = 0;
|
|
long flushTotalNanos = 0;
|
|
if (checksumOut != null) {
|
|
if (checksumOut != null) {
|
|
long flushStartNanos = System.nanoTime();
|
|
long flushStartNanos = System.nanoTime();
|
|
@@ -347,6 +344,9 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
if (checksumOut != null || out != null) {
|
|
if (checksumOut != null || out != null) {
|
|
datanode.metrics.addFlushNanos(flushTotalNanos);
|
|
datanode.metrics.addFlushNanos(flushTotalNanos);
|
|
|
|
+ if (isSync) {
|
|
|
|
+ datanode.metrics.incrFsyncCount();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -438,8 +438,10 @@ class BlockReceiver implements Closeable {
|
|
int len = header.getDataLen();
|
|
int len = header.getDataLen();
|
|
boolean syncBlock = header.getSyncBlock();
|
|
boolean syncBlock = header.getSyncBlock();
|
|
|
|
|
|
- // make sure the block gets sync'ed upon close
|
|
|
|
- this.syncOnClose |= syncBlock && lastPacketInBlock;
|
|
|
|
|
|
+ // avoid double sync'ing on close
|
|
|
|
+ if (syncBlock && lastPacketInBlock) {
|
|
|
|
+ this.syncOnClose = false;
|
|
|
|
+ }
|
|
|
|
|
|
// update received bytes
|
|
// update received bytes
|
|
long firstByteInBlock = offsetInBlock;
|
|
long firstByteInBlock = offsetInBlock;
|
|
@@ -448,11 +450,11 @@ class BlockReceiver implements Closeable {
|
|
replicaInfo.setNumBytes(offsetInBlock);
|
|
replicaInfo.setNumBytes(offsetInBlock);
|
|
}
|
|
}
|
|
|
|
|
|
- // put in queue for pending acks
|
|
|
|
- if (responder != null) {
|
|
|
|
- ((PacketResponder)responder.getRunnable()).enqueue(seqno,
|
|
|
|
- lastPacketInBlock, offsetInBlock);
|
|
|
|
- }
|
|
|
|
|
|
+ // put in queue for pending acks, unless sync was requested
|
|
|
|
+ if (responder != null && !syncBlock) {
|
|
|
|
+ ((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
|
|
|
+ lastPacketInBlock, offsetInBlock);
|
|
|
|
+ }
|
|
|
|
|
|
//First write the packet to the mirror:
|
|
//First write the packet to the mirror:
|
|
if (mirrorOut != null && !mirrorError) {
|
|
if (mirrorOut != null && !mirrorError) {
|
|
@@ -471,8 +473,8 @@ class BlockReceiver implements Closeable {
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug("Receiving an empty packet or the end of the block " + block);
|
|
LOG.debug("Receiving an empty packet or the end of the block " + block);
|
|
}
|
|
}
|
|
- // flush unless close() would flush anyway
|
|
|
|
- if (syncBlock && !lastPacketInBlock) {
|
|
|
|
|
|
+ // sync block if requested
|
|
|
|
+ if (syncBlock) {
|
|
flushOrSync(true);
|
|
flushOrSync(true);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -563,8 +565,8 @@ class BlockReceiver implements Closeable {
|
|
checksumBuf.arrayOffset() + checksumBuf.position(),
|
|
checksumBuf.arrayOffset() + checksumBuf.position(),
|
|
checksumLen);
|
|
checksumLen);
|
|
}
|
|
}
|
|
- /// flush entire packet, sync unless close() will sync
|
|
|
|
- flushOrSync(syncBlock && !lastPacketInBlock);
|
|
|
|
|
|
+ /// flush entire packet, sync if requested
|
|
|
|
+ flushOrSync(syncBlock);
|
|
|
|
|
|
replicaInfo.setLastChecksumAndDataLen(
|
|
replicaInfo.setLastChecksumAndDataLen(
|
|
offsetInBlock, lastChunkChecksum
|
|
offsetInBlock, lastChunkChecksum
|
|
@@ -580,6 +582,13 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // if sync was requested, put in queue for pending acks here
|
|
|
|
+ // (after the fsync finished)
|
|
|
|
+ if (responder != null && syncBlock) {
|
|
|
|
+ ((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
|
|
|
+ lastPacketInBlock, offsetInBlock);
|
|
|
|
+ }
|
|
|
|
+
|
|
if (throttler != null) { // throttle I/O
|
|
if (throttler != null) { // throttle I/O
|
|
throttler.throttle(len);
|
|
throttler.throttle(len);
|
|
}
|
|
}
|