|
@@ -2171,6 +2171,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private DataStreamer streamer = new DataStreamer();;
|
|
|
private ResponseProcessor response = null;
|
|
|
private long currentSeqno = 0;
|
|
|
+ private long lastQueuedSeqno = -1;
|
|
|
+ private long lastAckedSeqno = -1;
|
|
|
private long bytesCurBlock = 0; // bytes writen in current block
|
|
|
private int packetSize = 0; // write packet size, including the header.
|
|
|
private int chunksPerPacket = 0;
|
|
@@ -2180,7 +2182,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private volatile int errorIndex = 0;
|
|
|
private volatile IOException lastException = null;
|
|
|
private long artificialSlowdown = 0;
|
|
|
- private long lastFlushOffset = -1; // offset when flush was invoked
|
|
|
+ private long lastFlushOffset = 0; // offset when flush was invoked
|
|
|
private boolean persistBlocks = false; // persist blocks on namenode
|
|
|
private int recoveryErrorCount = 0; // number of times block recovery failed
|
|
|
private int maxRecoveryErrorCount = 5; // try block recovery 5 times
|
|
@@ -2563,6 +2565,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
lastPacketInBlock = one.lastPacketInBlock;
|
|
|
|
|
|
synchronized (ackQueue) {
|
|
|
+ assert ack.getSeqno() == lastAckedSeqno + 1;
|
|
|
+ lastAckedSeqno = ack.getSeqno();
|
|
|
ackQueue.removeFirst();
|
|
|
ackQueue.notifyAll();
|
|
|
}
|
|
@@ -3178,11 +3182,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
if (bytesCurBlock == blockSize) {
|
|
|
currentPacket.lastPacketInBlock = true;
|
|
|
bytesCurBlock = 0;
|
|
|
- lastFlushOffset = -1;
|
|
|
+ lastFlushOffset = 0;
|
|
|
}
|
|
|
- dataQueue.addLast(currentPacket);
|
|
|
- dataQueue.notifyAll();
|
|
|
- currentPacket = null;
|
|
|
+ enqueueCurrentPacket();
|
|
|
|
|
|
// If this was the first write after reopening a file, then the above
|
|
|
// write filled up any partial chunk. Tell the summer to generate full
|
|
@@ -3198,58 +3200,102 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
//LOG.debug("DFSClient writeChunk done length " + len +
|
|
|
// " checksum length " + cklen);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private synchronized void enqueueCurrentPacket() {
|
|
|
+ synchronized (dataQueue) {
|
|
|
+ if (currentPacket == null) return;
|
|
|
+ dataQueue.addLast(currentPacket);
|
|
|
+ dataQueue.notifyAll();
|
|
|
+ lastQueuedSeqno = currentPacket.seqno;
|
|
|
+ currentPacket = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* All data is written out to datanodes. It is not guaranteed
|
|
|
* that data has been flushed to persistent store on the
|
|
|
* datanode. Block allocations are persisted on namenode.
|
|
|
*/
|
|
|
- public synchronized void sync() throws IOException {
|
|
|
+ public void sync() throws IOException {
|
|
|
+ checkOpen();
|
|
|
+ if (closed) {
|
|
|
+ throw new IOException("DFSOutputStream is closed");
|
|
|
+ }
|
|
|
try {
|
|
|
- /* Record current blockOffset. This might be changed inside
|
|
|
- * flushBuffer() where a partial checksum chunk might be flushed.
|
|
|
- * After the flush, reset the bytesCurBlock back to its previous value,
|
|
|
- * any partial checksum chunk will be sent now and in next packet.
|
|
|
- */
|
|
|
- long saveOffset = bytesCurBlock;
|
|
|
-
|
|
|
- // flush checksum buffer, but keep checksum buffer intact
|
|
|
- flushBuffer(true);
|
|
|
-
|
|
|
- LOG.debug("DFSClient flush() : saveOffset " + saveOffset +
|
|
|
- " bytesCurBlock " + bytesCurBlock +
|
|
|
- " lastFlushOffset " + lastFlushOffset);
|
|
|
-
|
|
|
- // Flush only if we haven't already flushed till this offset.
|
|
|
- if (lastFlushOffset != bytesCurBlock) {
|
|
|
-
|
|
|
- // record the valid offset of this flush
|
|
|
- lastFlushOffset = bytesCurBlock;
|
|
|
-
|
|
|
- // wait for all packets to be sent and acknowledged
|
|
|
- flushInternal();
|
|
|
- } else {
|
|
|
- // just discard the current packet since it is already been sent.
|
|
|
- currentPacket = null;
|
|
|
+ long toWaitFor;
|
|
|
+ synchronized (this) {
|
|
|
+ /* Record current blockOffset. This might be changed inside
|
|
|
+ * flushBuffer() where a partial checksum chunk might be flushed.
|
|
|
+ * After the flush, reset the bytesCurBlock back to its previous value,
|
|
|
+ * any partial checksum chunk will be sent now and in next packet.
|
|
|
+ */
|
|
|
+ long saveOffset = bytesCurBlock;
|
|
|
+ Packet oldCurrentPacket = currentPacket;
|
|
|
+
|
|
|
+ // flush checksum buffer, but keep checksum buffer intact
|
|
|
+ flushBuffer(true);
|
|
|
+ // bytesCurBlock potentially incremented if there was buffered data
|
|
|
+
|
|
|
+ // Flush only if we haven't already flushed till this offset.
|
|
|
+ if (lastFlushOffset != bytesCurBlock) {
|
|
|
+ assert bytesCurBlock > lastFlushOffset;
|
|
|
+ // record the valid offset of this flush
|
|
|
+ lastFlushOffset = bytesCurBlock;
|
|
|
+ enqueueCurrentPacket();
|
|
|
+ } else {
|
|
|
+ // just discard the current packet since it is already been sent.
|
|
|
+ if (oldCurrentPacket == null && currentPacket != null) {
|
|
|
+ // If we didn't previously have a packet queued, and now we do,
|
|
|
+ // but we don't plan on sending it, then we should not
|
|
|
+ // skip a sequence number for it!
|
|
|
+ currentSeqno--;
|
|
|
+ }
|
|
|
+ currentPacket = null;
|
|
|
+ }
|
|
|
+ // Restore state of stream. Record the last flush offset
|
|
|
+ // of the last full chunk that was flushed.
|
|
|
+ //
|
|
|
+ bytesCurBlock = saveOffset;
|
|
|
+ toWaitFor = lastQueuedSeqno;
|
|
|
}
|
|
|
-
|
|
|
- // Restore state of stream. Record the last flush offset
|
|
|
- // of the last full chunk that was flushed.
|
|
|
- //
|
|
|
- bytesCurBlock = saveOffset;
|
|
|
+ waitForAckedSeqno(toWaitFor);
|
|
|
|
|
|
// If any new blocks were allocated since the last flush,
|
|
|
// then persist block locations on namenode.
|
|
|
//
|
|
|
- if (persistBlocks) {
|
|
|
- namenode.fsync(src, clientName);
|
|
|
+ boolean willPersist;
|
|
|
+ synchronized (this) {
|
|
|
+ willPersist = persistBlocks && !closed;
|
|
|
persistBlocks = false;
|
|
|
}
|
|
|
+ if (willPersist) {
|
|
|
+ try {
|
|
|
+ namenode.fsync(src, clientName);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
|
|
|
+ // If we got an error here, it might be because some other thread called
|
|
|
+ // close before our hflush completed. In that case, we should throw an
|
|
|
+ // exception that the stream is closed.
|
|
|
+ isClosed();
|
|
|
+ if (closed) {
|
|
|
+ throw new IOException("DFSOutputStream is closed");
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we aren't closed but failed to sync, we should expose that to the
|
|
|
+ // caller.
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
- lastException = new IOException("IOException flush:" + e);
|
|
|
- closed = true;
|
|
|
- closeThreads();
|
|
|
- throw e;
|
|
|
+ LOG.warn("Error while syncing", e);
|
|
|
+ synchronized (this) {
|
|
|
+ if (!closed) {
|
|
|
+ lastException = new IOException("IOException flush:" + e);
|
|
|
+ closed = true;
|
|
|
+ closeThreads();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3275,57 +3321,34 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
* Waits till all existing data is flushed and confirmations
|
|
|
* received from datanodes.
|
|
|
*/
|
|
|
- private synchronized void flushInternal() throws IOException {
|
|
|
- checkOpen();
|
|
|
+ private void flushInternal() throws IOException {
|
|
|
isClosed();
|
|
|
+ checkOpen();
|
|
|
|
|
|
- while (!closed) {
|
|
|
- synchronized (dataQueue) {
|
|
|
- isClosed();
|
|
|
- //
|
|
|
- // If there is data in the current buffer, send it across
|
|
|
- //
|
|
|
- if (currentPacket != null) {
|
|
|
- dataQueue.addLast(currentPacket);
|
|
|
- dataQueue.notifyAll();
|
|
|
- currentPacket = null;
|
|
|
- }
|
|
|
-
|
|
|
- // wait for all buffers to be flushed to datanodes
|
|
|
- if (!closed && dataQueue.size() != 0) {
|
|
|
- try {
|
|
|
- dataQueue.wait();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
+ long toWaitFor;
|
|
|
+ synchronized (this) {
|
|
|
+ enqueueCurrentPacket();
|
|
|
+ toWaitFor = lastQueuedSeqno;
|
|
|
+ }
|
|
|
|
|
|
- // wait for all acks to be received back from datanodes
|
|
|
- synchronized (ackQueue) {
|
|
|
- if (!closed && ackQueue.size() != 0) {
|
|
|
- try {
|
|
|
- ackQueue.wait();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
+ waitForAckedSeqno(toWaitFor);
|
|
|
+ }
|
|
|
|
|
|
- // acquire both the locks and verify that we are
|
|
|
- // *really done*. In the case of error recovery,
|
|
|
- // packets might move back from ackQueue to dataQueue.
|
|
|
- //
|
|
|
- synchronized (dataQueue) {
|
|
|
- synchronized (ackQueue) {
|
|
|
- if (dataQueue.size() + ackQueue.size() == 0) {
|
|
|
- break; // we are done
|
|
|
- }
|
|
|
+ private void waitForAckedSeqno(long seqnumToWaitFor) throws IOException {
|
|
|
+ synchronized (ackQueue) {
|
|
|
+ while (!closed) {
|
|
|
+ isClosed();
|
|
|
+ if (lastAckedSeqno >= seqnumToWaitFor) {
|
|
|
+ break;
|
|
|
}
|
|
|
+ try {
|
|
|
+ ackQueue.wait();
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
}
|
|
|
}
|
|
|
+ isClosed();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Closes this output stream and releases any system
|
|
|
* resources associated with this stream.
|