|
@@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time;
|
|
|
import org.apache.htrace.Sampler;
|
|
|
import org.apache.htrace.Trace;
|
|
|
import org.apache.htrace.TraceScope;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
@@ -86,6 +88,7 @@ import com.google.common.base.Preconditions;
|
|
|
@InterfaceAudience.Private
|
|
|
public class DFSOutputStream extends FSOutputSummer
|
|
|
implements Syncable, CanSetDropBehind {
|
|
|
+ static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
|
|
|
/**
|
|
|
* Number of times to retry creating a file when there are transient
|
|
|
* errors (typically related to encryption zones and KeyProvider operations).
|
|
@@ -413,21 +416,30 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
//
|
|
|
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
|
|
|
getStreamer().getBytesCurBlock() == blockSize) {
|
|
|
- if (DFSClient.LOG.isDebugEnabled()) {
|
|
|
- DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
|
|
|
- currentPacket.getSeqno() +
|
|
|
- ", src=" + src +
|
|
|
- ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
|
|
|
- ", blockSize=" + blockSize +
|
|
|
- ", appendChunk=" + getStreamer().getAppendChunk());
|
|
|
- }
|
|
|
- getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
+ enqueueCurrentPacketFull();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- adjustChunkBoundary();
|
|
|
+ void enqueueCurrentPacket() throws IOException {
|
|
|
+ getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
+ currentPacket = null;
|
|
|
+ }
|
|
|
|
|
|
- endBlock();
|
|
|
- }
|
|
|
+ void enqueueCurrentPacketFull() throws IOException {
|
|
|
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
|
|
|
+ + " appendChunk={}, {}", currentPacket, src, getStreamer()
|
|
|
+ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
|
|
|
+ getStreamer());
|
|
|
+ enqueueCurrentPacket();
|
|
|
+ adjustChunkBoundary();
|
|
|
+ endBlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** create an empty packet to mark the end of the block. */
|
|
|
+ void setCurrentPacketToEmpty() throws InterruptedIOException {
|
|
|
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
|
|
|
+ getStreamer().getAndIncCurrentSeqno(), true);
|
|
|
+ currentPacket.setSyncBlock(shouldSyncBlock);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
*/
|
|
|
protected void endBlock() throws IOException {
|
|
|
if (getStreamer().getBytesCurBlock() == blockSize) {
|
|
|
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
|
|
|
- getStreamer().getAndIncCurrentSeqno(), true);
|
|
|
- currentPacket.setSyncBlock(shouldSyncBlock);
|
|
|
- getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
+ setCurrentPacketToEmpty();
|
|
|
+ enqueueCurrentPacket();
|
|
|
getStreamer().setBytesCurBlock(0);
|
|
|
lastFlushOffset = 0;
|
|
|
}
|
|
@@ -586,8 +595,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
if (currentPacket != null) {
|
|
|
currentPacket.setSyncBlock(isSync);
|
|
|
- getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
+ enqueueCurrentPacket();
|
|
|
}
|
|
|
if (endBlock && getStreamer().getBytesCurBlock() > 0) {
|
|
|
// Need to end the current block, thus send an empty packet to
|
|
@@ -595,8 +603,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
|
|
|
getStreamer().getAndIncCurrentSeqno(), true);
|
|
|
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
|
|
|
- getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
+ enqueueCurrentPacket();
|
|
|
getStreamer().setBytesCurBlock(0);
|
|
|
lastFlushOffset = 0;
|
|
|
} else {
|
|
@@ -775,15 +782,11 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
flushBuffer(); // flush from all upper layers
|
|
|
|
|
|
if (currentPacket != null) {
|
|
|
- getStreamer().waitAndQueuePacket(currentPacket);
|
|
|
- currentPacket = null;
|
|
|
+ enqueueCurrentPacket();
|
|
|
}
|
|
|
|
|
|
if (getStreamer().getBytesCurBlock() != 0) {
|
|
|
- // send an empty packet to mark the end of the block
|
|
|
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
|
|
|
- getStreamer().getAndIncCurrentSeqno(), true);
|
|
|
- currentPacket.setSyncBlock(shouldSyncBlock);
|
|
|
+ setCurrentPacketToEmpty();
|
|
|
}
|
|
|
|
|
|
flushInternal(); // flush all data to Datanodes
|