|
@@ -1594,6 +1594,8 @@ class DFSClient implements FSConstants {
|
|
|
private volatile int errorIndex = 0;
|
|
|
private IOException lastException = null;
|
|
|
private long artificialSlowdown = 0;
|
|
|
+ private long lastFlushOffset = -1; // offset when flush was invoked
|
|
|
+ private boolean persistBlocks = false; // persist blocks on namenode
|
|
|
|
|
|
private class Packet {
|
|
|
ByteBuffer buffer;
|
|
@@ -1601,6 +1603,8 @@ class DFSClient implements FSConstants {
|
|
|
long offsetInBlock; // offset in block
|
|
|
boolean lastPacketInBlock; // is this the last packet in block?
|
|
|
int numChunks; // number of chunks currently in packet
|
|
|
+ int flushOffsetBuffer; // last full chunk that was flushed
|
|
|
+ long flushOffsetBlock; // block offset of last full chunk flushed
|
|
|
|
|
|
// create a new packet
|
|
|
Packet(int size, long offsetInBlock) {
|
|
@@ -1610,9 +1614,23 @@ class DFSClient implements FSConstants {
|
|
|
this.numChunks = 0;
|
|
|
this.offsetInBlock = offsetInBlock;
|
|
|
this.seqno = currentSeqno;
|
|
|
+ this.flushOffsetBuffer = 0;
|
|
|
+ this.flushOffsetBlock = 0;
|
|
|
currentSeqno++;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // create a new Packet with the contents copied from the
|
|
|
+ // specified one. Shares the same buffer.
|
|
|
+ Packet(Packet old) {
|
|
|
+ this.buffer = old.buffer;
|
|
|
+ this.lastPacketInBlock = old.lastPacketInBlock;
|
|
|
+ this.numChunks = old.numChunks;
|
|
|
+ this.offsetInBlock = old.offsetInBlock;
|
|
|
+ this.seqno = old.seqno;
|
|
|
+ this.flushOffsetBuffer = old.flushOffsetBuffer;
|
|
|
+ this.flushOffsetBlock = old.flushOffsetBlock;
|
|
|
+ }
|
|
|
+
|
|
|
// writes len bytes from offset off in inarray into
|
|
|
// this packet.
|
|
|
//
|
|
@@ -1625,6 +1643,12 @@ class DFSClient implements FSConstants {
|
|
|
void writeInt(int value) {
|
|
|
buffer.putInt(value);
|
|
|
}
|
|
|
+
|
|
|
+ // sets the last flush offset of this packet.
|
|
|
+ void setFlushOffset(int bufoff, long blockOff) {
|
|
|
+ this.flushOffsetBuffer = bufoff;;
|
|
|
+ this.flushOffsetBlock = blockOff;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -1674,7 +1698,9 @@ class DFSClient implements FSConstants {
|
|
|
try {
|
|
|
// get packet to be sent.
|
|
|
one = dataQueue.getFirst();
|
|
|
+ int start = 0;
|
|
|
int len = one.buffer.limit();
|
|
|
+ long offsetInBlock = one.offsetInBlock;
|
|
|
|
|
|
// get new block from namenode.
|
|
|
if (blockStream == null) {
|
|
@@ -1686,13 +1712,21 @@ class DFSClient implements FSConstants {
|
|
|
response.start();
|
|
|
}
|
|
|
|
|
|
+ // If we are sending a sub-packet, then determine the offset
|
|
|
+ // in block.
|
|
|
+ if (one.flushOffsetBuffer != 0) {
|
|
|
+ offsetInBlock += one.flushOffsetBlock;
|
|
|
+ len = len - one.flushOffsetBuffer;
|
|
|
+ start += one.flushOffsetBuffer;
|
|
|
+ }
|
|
|
+
|
|
|
// user bytes from 'position' to 'limit'.
|
|
|
byte[] arr = one.buffer.array();
|
|
|
- if (one.offsetInBlock >= blockSize) {
|
|
|
+ if (offsetInBlock >= blockSize) {
|
|
|
throw new IOException("BlockSize " + blockSize +
|
|
|
" is smaller than data size. " +
|
|
|
" Offset of packet in block " +
|
|
|
- one.offsetInBlock +
|
|
|
+ offsetInBlock +
|
|
|
" Aborting file " + src);
|
|
|
}
|
|
|
|
|
@@ -1706,10 +1740,10 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
// write out data to remote datanode
|
|
|
blockStream.writeInt(len); // size of this packet
|
|
|
- blockStream.writeLong(one.offsetInBlock); // data offset in block
|
|
|
+ blockStream.writeLong(offsetInBlock); // data offset in block
|
|
|
blockStream.writeLong(one.seqno); // sequence num of packet
|
|
|
blockStream.writeBoolean(one.lastPacketInBlock);
|
|
|
- blockStream.write(arr, 0, len);
|
|
|
+ blockStream.write(arr, start, len);
|
|
|
if (one.lastPacketInBlock) {
|
|
|
blockStream.writeInt(0); // indicate end-of-block
|
|
|
}
|
|
@@ -1717,7 +1751,7 @@ class DFSClient implements FSConstants {
|
|
|
LOG.debug("DataStreamer block " + block +
|
|
|
" wrote packet seqno:" + one.seqno +
|
|
|
" size:" + len +
|
|
|
- " offsetInBlock:" + one.offsetInBlock +
|
|
|
+ " offsetInBlock:" + offsetInBlock +
|
|
|
" lastPacketInBlock:" + one.lastPacketInBlock);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("DataStreamer Exception: " + e);
|
|
@@ -2085,6 +2119,10 @@ class DFSClient implements FSConstants {
|
|
|
LOG.debug("pipeline = " + nodes[i].getName());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // persist blocks on namenode on next flush
|
|
|
+ persistBlocks = true;
|
|
|
+
|
|
|
try {
|
|
|
LOG.debug("Connecting to " + nodes[0].getName());
|
|
|
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
|
|
@@ -2182,7 +2220,7 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
// @see FSOutputSummer#writeChunk()
|
|
|
@Override
|
|
|
- protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
|
|
|
+ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
|
|
|
throws IOException {
|
|
|
checkOpen();
|
|
|
isClosed();
|
|
@@ -2219,6 +2257,8 @@ class DFSClient implements FSConstants {
|
|
|
|
|
|
if (currentPacket == null) {
|
|
|
currentPacket = new Packet(packetSize, bytesCurBlock);
|
|
|
+ LOG.debug("DFSClient writeChunk allocating new packet " +
|
|
|
+ currentPacket.seqno);
|
|
|
}
|
|
|
|
|
|
currentPacket.writeInt(len);
|
|
@@ -2246,24 +2286,106 @@ class DFSClient implements FSConstants {
|
|
|
currentPacket = null;
|
|
|
}
|
|
|
}
|
|
|
- //LOG.debug("DFSClient writeChunk with length " + len +
|
|
|
+ //LOG.debug("DFSClient writeChunk done length " + len +
|
|
|
// " checksum length " + cklen);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Waits till all existing data is flushed and
|
|
|
- * confirmations received from datanodes.
|
|
|
+ * All data is written out to datanodes. It is not guaranteed
|
|
|
+ * that data has been flushed to persistent store on the
|
|
|
+ * datanode. Block allocations are persisted on namenode.
|
|
|
*/
|
|
|
@Override
|
|
|
public synchronized void flush() throws IOException {
|
|
|
+ Packet savePacket = null;
|
|
|
+ int position = 0;
|
|
|
+ long saveOffset = 0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Record the state of the current output stream.
|
|
|
+ // This state will be reverted after the flush successfully
|
|
|
+ // finishes. It is necessary to do this so that partial
|
|
|
+ // checksum chunks are reused by writes that follow this
|
|
|
+ // flush.
|
|
|
+ if (currentPacket != null) {
|
|
|
+ savePacket = new Packet(currentPacket);
|
|
|
+ position = savePacket.buffer.position();
|
|
|
+ }
|
|
|
+ saveOffset = bytesCurBlock;
|
|
|
+
|
|
|
+ // flush checksum buffer, but keep checksum buffer intact
|
|
|
+ flushBuffer(true);
|
|
|
+
|
|
|
+ LOG.debug("DFSClient flushInternal save position " +
|
|
|
+ position +
|
|
|
+ " cur position " +
|
|
|
+ ((currentPacket != null) ? currentPacket.buffer.position() : -1) +
|
|
|
+ " limit " +
|
|
|
+ ((currentPacket != null) ? currentPacket.buffer.limit() : -1) +
|
|
|
+ " bytesCurBlock " + bytesCurBlock +
|
|
|
+ " lastFlushOffset " + lastFlushOffset);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Detect the condition that we have already flushed all
|
|
|
+ // outstanding data.
|
|
|
+ //
|
|
|
+ boolean skipFlush = (lastFlushOffset == bytesCurBlock &&
|
|
|
+ savePacket != null && currentPacket != null &&
|
|
|
+ savePacket.seqno == currentPacket.seqno);
|
|
|
+
|
|
|
+ // Do the flush.
|
|
|
+ //
|
|
|
+ if (!skipFlush) {
|
|
|
+
|
|
|
+ // record the valid offset of this flush
|
|
|
+ lastFlushOffset = bytesCurBlock;
|
|
|
+
|
|
|
+ // wait for all packets to be sent and acknowledged
|
|
|
+ flushInternal();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Restore state of stream. Record the last flush offset
|
|
|
+ // of the last full chunk that was flushed.
|
|
|
+ //
|
|
|
+ bytesCurBlock = saveOffset;
|
|
|
+ currentPacket = null;
|
|
|
+ if (savePacket != null) {
|
|
|
+ savePacket.buffer.limit(savePacket.buffer.capacity());
|
|
|
+ savePacket.buffer.position(position);
|
|
|
+ savePacket.setFlushOffset(position,
|
|
|
+ savePacket.numChunks *
|
|
|
+ checksum.getBytesPerChecksum());
|
|
|
+ currentPacket = savePacket;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If any new blocks were allocated since the last flush,
|
|
|
+ // then persist block locations on namenode.
|
|
|
+ //
|
|
|
+ if (persistBlocks) {
|
|
|
+ namenode.fsync(src, clientName);
|
|
|
+ persistBlocks = false;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ lastException = new IOException("IOException flush:" + e);
|
|
|
+ closed = true;
|
|
|
+ closeThreads();
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Waits till all existing data is flushed and confirmations
|
|
|
+ * received from datanodes.
|
|
|
+ */
|
|
|
+ private synchronized void flushInternal() throws IOException {
|
|
|
checkOpen();
|
|
|
isClosed();
|
|
|
-
|
|
|
+
|
|
|
while (!closed) {
|
|
|
synchronized (dataQueue) {
|
|
|
isClosed();
|
|
|
//
|
|
|
- // if there is data in the current buffer, send it across
|
|
|
+ // If there is data in the current buffer, send it across
|
|
|
//
|
|
|
if (currentPacket != null) {
|
|
|
currentPacket.buffer.flip();
|
|
@@ -2324,6 +2446,23 @@ class DFSClient implements FSConstants {
|
|
|
s = null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // shutdown datastreamer and responseprocessor threads.
|
|
|
+ private void closeThreads() throws IOException {
|
|
|
+ try {
|
|
|
+ streamer.close();
|
|
|
+ streamer.join();
|
|
|
+
|
|
|
+ // shutdown response after streamer has exited.
|
|
|
+ if (response != null) {
|
|
|
+ response.close();
|
|
|
+ response.join();
|
|
|
+ response = null;
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new IOException("Failed to shutdown response thread");
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Closes this output stream and releases any system
|
|
@@ -2334,36 +2473,27 @@ class DFSClient implements FSConstants {
|
|
|
isClosed();
|
|
|
|
|
|
try {
|
|
|
- flushBuffer(); // flush from all upper layers
|
|
|
+ flushBuffer(); // flush from all upper layers
|
|
|
|
|
|
- // Mark that this packet is the last packet in block.
|
|
|
- // If there are no outstanding packets and the last packet
|
|
|
- // was not the last one in the current block, then create a
|
|
|
- // packet with empty payload.
|
|
|
- synchronized (dataQueue) {
|
|
|
- if (currentPacket == null && bytesCurBlock != 0) {
|
|
|
- currentPacket = new Packet(packetSize, bytesCurBlock);
|
|
|
- currentPacket.writeInt(0); // one chunk with empty contents
|
|
|
- }
|
|
|
- if (currentPacket != null) {
|
|
|
- currentPacket.lastPacketInBlock = true;
|
|
|
+ // Mark that this packet is the last packet in block.
|
|
|
+ // If there are no outstanding packets and the last packet
|
|
|
+ // was not the last one in the current block, then create a
|
|
|
+ // packet with empty payload.
|
|
|
+ synchronized (dataQueue) {
|
|
|
+ if (currentPacket == null && bytesCurBlock != 0) {
|
|
|
+ currentPacket = new Packet(packetSize, bytesCurBlock);
|
|
|
+ currentPacket.writeInt(0); // one chunk with empty contents
|
|
|
+ }
|
|
|
+ if (currentPacket != null) {
|
|
|
+ currentPacket.lastPacketInBlock = true;
|
|
|
+ currentPacket.setFlushOffset(0, 0); // send whole packet
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- flush(); // flush all data to Datanodes
|
|
|
+ flushInternal(); // flush all data to Datanodes
|
|
|
closed = true;
|
|
|
-
|
|
|
- // wait for threads to finish processing
|
|
|
- streamer.close();
|
|
|
- // wait for threads to exit
|
|
|
- streamer.join();
|
|
|
-
|
|
|
- // shutdown response after streamer has exited.
|
|
|
- if (response != null) {
|
|
|
- response.close();
|
|
|
- response.join();
|
|
|
- response = null;
|
|
|
- }
|
|
|
+
|
|
|
+ closeThreads();
|
|
|
|
|
|
synchronized (dataQueue) {
|
|
|
if (blockStream != null) {
|
|
@@ -2395,8 +2525,6 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new IOException("Failed to shutdown response thread");
|
|
|
} finally {
|
|
|
closed = true;
|
|
|
}
|
|
@@ -2406,7 +2534,7 @@ class DFSClient implements FSConstants {
|
|
|
artificialSlowdown = period;
|
|
|
}
|
|
|
|
|
|
- void setChunksPerPacket(int value) {
|
|
|
+ synchronized void setChunksPerPacket(int value) {
|
|
|
chunksPerPacket = Math.min(chunksPerPacket, value);
|
|
|
packetSize = chunkSize * chunksPerPacket;
|
|
|
}
|