|
@@ -1431,8 +1431,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
int dataLen = in.readInt();
|
|
int dataLen = in.readInt();
|
|
|
|
|
|
// Sanity check the lengths
|
|
// Sanity check the lengths
|
|
- if ( dataLen < 0 ||
|
|
|
|
- ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
|
|
|
|
|
|
+ if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
|
|
|
|
+ ( dataLen != 0 && lastPacketInBlock) ||
|
|
(seqno != (lastSeqNo + 1)) ) {
|
|
(seqno != (lastSeqNo + 1)) ) {
|
|
throw new IOException("BlockReader: error in packet header" +
|
|
throw new IOException("BlockReader: error in packet header" +
|
|
"(chunkOffset : " + chunkOffset +
|
|
"(chunkOffset : " + chunkOffset +
|
|
@@ -2598,7 +2598,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
response.start();
|
|
response.start();
|
|
stage = BlockConstructionStage.DATA_STREAMING;
|
|
stage = BlockConstructionStage.DATA_STREAMING;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private void endBlock() {
|
|
|
|
+ LOG.debug("Closing old block " + block);
|
|
|
|
+ this.setName("DataStreamer for file " + src);
|
|
|
|
+ closeResponder();
|
|
|
|
+ closeStream();
|
|
|
|
+ nodes = null;
|
|
|
|
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
|
+ }
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* streamer thread is the only thread that opens streams to datanode,
|
|
* streamer thread is the only thread that opens streams to datanode,
|
|
* and closes them. Any error recovery is also done by this thread.
|
|
* and closes them. Any error recovery is also done by this thread.
|
|
@@ -2642,8 +2651,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
one = dataQueue.getFirst();
|
|
one = dataQueue.getFirst();
|
|
}
|
|
}
|
|
|
|
|
|
- long offsetInBlock = one.offsetInBlock;
|
|
|
|
-
|
|
|
|
// get new block from namenode.
|
|
// get new block from namenode.
|
|
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
LOG.debug("Allocating new block");
|
|
LOG.debug("Allocating new block");
|
|
@@ -2655,14 +2662,34 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
initDataStreaming();
|
|
initDataStreaming();
|
|
}
|
|
}
|
|
|
|
|
|
- if (offsetInBlock >= blockSize) {
|
|
|
|
|
|
+ long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
|
|
|
|
+ if (lastByteOffsetInBlock > blockSize) {
|
|
throw new IOException("BlockSize " + blockSize +
|
|
throw new IOException("BlockSize " + blockSize +
|
|
" is smaller than data size. " +
|
|
" is smaller than data size. " +
|
|
" Offset of packet in block " +
|
|
" Offset of packet in block " +
|
|
- offsetInBlock +
|
|
|
|
|
|
+ lastByteOffsetInBlock +
|
|
" Aborting file " + src);
|
|
" Aborting file " + src);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (one.lastPacketInBlock) {
|
|
|
|
+ // wait for all data packets have been successfully acked
|
|
|
|
+ synchronized (dataQueue) {
|
|
|
|
+ while (!streamerClosed && !hasError &&
|
|
|
|
+ ackQueue.size() != 0 && clientRunning) {
|
|
|
|
+ try {
|
|
|
|
+ // wait for acks to arrive from datanodes
|
|
|
|
+ dataQueue.wait(1000);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (streamerClosed || hasError || !clientRunning) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ stage = BlockConstructionStage.PIPELINE_CLOSE;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // send the packet
|
|
ByteBuffer buf = one.getBuffer();
|
|
ByteBuffer buf = one.getBuffer();
|
|
|
|
|
|
synchronized (dataQueue) {
|
|
synchronized (dataQueue) {
|
|
@@ -2674,11 +2701,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("DataStreamer block " + block +
|
|
LOG.debug("DataStreamer block " + block +
|
|
- " sending packet seqno:" + one.seqno +
|
|
|
|
- " size:" + buf.remaining() +
|
|
|
|
- " offsetInBlock:" + one.offsetInBlock +
|
|
|
|
- " lastPacketInBlock:" + one.lastPacketInBlock +
|
|
|
|
- " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
|
|
|
|
|
|
+ " sending packet " + one);
|
|
}
|
|
}
|
|
|
|
|
|
// write out data to remote datanode
|
|
// write out data to remote datanode
|
|
@@ -2690,22 +2713,31 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
if (bytesSent < tmpBytesSent) {
|
|
if (bytesSent < tmpBytesSent) {
|
|
bytesSent = tmpBytesSent;
|
|
bytesSent = tmpBytesSent;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if (streamerClosed || hasError || !clientRunning) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Is this block full?
|
|
if (one.lastPacketInBlock) {
|
|
if (one.lastPacketInBlock) {
|
|
|
|
+ // wait for the close packet has been acked
|
|
synchronized (dataQueue) {
|
|
synchronized (dataQueue) {
|
|
- while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
|
|
|
|
- try {
|
|
|
|
- dataQueue.wait(1000); // wait for acks to arrive from datanodes
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- }
|
|
|
|
|
|
+ while (!streamerClosed && !hasError &&
|
|
|
|
+ ackQueue.size() != 0 && clientRunning) {
|
|
|
|
+ dataQueue.wait(1000);// wait for acks to arrive from datanodes
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (ackQueue.isEmpty()) { // done receiving all acks
|
|
|
|
- // indicate end-of-block
|
|
|
|
- blockStream.writeInt(0);
|
|
|
|
- blockStream.flush();
|
|
|
|
|
|
+ if (streamerClosed || hasError || !clientRunning) {
|
|
|
|
+ continue;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ endBlock();
|
|
|
|
+ }
|
|
|
|
+ if (progress != null) { progress.progress(); }
|
|
|
|
+
|
|
|
|
+ // This is used by unit test to trigger race conditions.
|
|
|
|
+ if (artificialSlowdown != 0 && clientRunning) {
|
|
|
|
+ Thread.sleep(artificialSlowdown);
|
|
}
|
|
}
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
LOG.warn("DataStreamer Exception: " +
|
|
LOG.warn("DataStreamer Exception: " +
|
|
@@ -2718,29 +2750,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
streamerClosed = true;
|
|
streamerClosed = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
- if (streamerClosed || hasError || !clientRunning) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Is this block full?
|
|
|
|
- if (one.lastPacketInBlock) {
|
|
|
|
- LOG.debug("Closing old block " + block);
|
|
|
|
- this.setName("DataStreamer for file " + src);
|
|
|
|
- closeResponder();
|
|
|
|
- closeStream();
|
|
|
|
- nodes = null;
|
|
|
|
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
|
- }
|
|
|
|
- if (progress != null) { progress.progress(); }
|
|
|
|
-
|
|
|
|
- // This is used by unit test to trigger race conditions.
|
|
|
|
- if (artificialSlowdown != 0 && clientRunning) {
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(artificialSlowdown);
|
|
|
|
- } catch (InterruptedException e) {}
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
closeInternal();
|
|
closeInternal();
|
|
}
|
|
}
|
|
@@ -2928,7 +2937,15 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
boolean doSleep = setupPipelineForAppendOrRecovery();
|
|
boolean doSleep = setupPipelineForAppendOrRecovery();
|
|
|
|
|
|
if (!streamerClosed && clientRunning) {
|
|
if (!streamerClosed && clientRunning) {
|
|
- initDataStreaming();
|
|
|
|
|
|
+ if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
|
|
|
|
+ synchronized (dataQueue) {
|
|
|
|
+ dataQueue.remove(); // remove the end of block packet
|
|
|
|
+ dataQueue.notifyAll();
|
|
|
|
+ }
|
|
|
|
+ endBlock();
|
|
|
|
+ } else {
|
|
|
|
+ initDataStreaming();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
return doSleep;
|
|
return doSleep;
|
|
@@ -3392,15 +3409,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
", blockSize=" + blockSize +
|
|
", blockSize=" + blockSize +
|
|
", appendChunk=" + appendChunk);
|
|
", appendChunk=" + appendChunk);
|
|
}
|
|
}
|
|
- //
|
|
|
|
- // if we allocated a new packet because we encountered a block
|
|
|
|
- // boundary, reset bytesCurBlock.
|
|
|
|
- //
|
|
|
|
- if (bytesCurBlock == blockSize) {
|
|
|
|
- currentPacket.lastPacketInBlock = true;
|
|
|
|
- bytesCurBlock = 0;
|
|
|
|
- lastFlushOffset = -1;
|
|
|
|
- }
|
|
|
|
waitAndQueuePacket(currentPacket);
|
|
waitAndQueuePacket(currentPacket);
|
|
currentPacket = null;
|
|
currentPacket = null;
|
|
|
|
|
|
@@ -3413,6 +3421,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
}
|
|
}
|
|
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
|
|
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
|
|
computePacketChunkSize(psize, bytesPerChecksum);
|
|
computePacketChunkSize(psize, bytesPerChecksum);
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // if encountering a block boundary, send an empty packet to
|
|
|
|
+ // indicate the end of block and reset bytesCurBlock.
|
|
|
|
+ //
|
|
|
|
+ if (bytesCurBlock == blockSize) {
|
|
|
|
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
|
|
|
|
+ bytesCurBlock);
|
|
|
|
+ currentPacket.lastPacketInBlock = true;
|
|
|
|
+ waitAndQueuePacket(currentPacket);
|
|
|
|
+ currentPacket = null;
|
|
|
|
+ bytesCurBlock = 0;
|
|
|
|
+ lastFlushOffset = -1;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3556,21 +3578,22 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
try {
|
|
try {
|
|
flushBuffer(); // flush from all upper layers
|
|
flushBuffer(); // flush from all upper layers
|
|
|
|
|
|
- // Mark that this packet is the last packet in block.
|
|
|
|
- // If there are no outstanding packets and the last packet
|
|
|
|
- // was not the last one in the current block, then create a
|
|
|
|
- // packet with empty payload.
|
|
|
|
- if (currentPacket == null && bytesCurBlock != 0) {
|
|
|
|
- currentPacket = new Packet(packetSize, chunksPerPacket,
|
|
|
|
- bytesCurBlock);
|
|
|
|
- }
|
|
|
|
if (currentPacket != null) {
|
|
if (currentPacket != null) {
|
|
|
|
+ waitAndQueuePacket(currentPacket);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (bytesCurBlock != 0) {
|
|
|
|
+ // send an empty packet to mark the end of the block
|
|
|
|
+ currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
|
|
|
|
+ bytesCurBlock);
|
|
currentPacket.lastPacketInBlock = true;
|
|
currentPacket.lastPacketInBlock = true;
|
|
}
|
|
}
|
|
|
|
|
|
flushInternal(); // flush all data to Datanodes
|
|
flushInternal(); // flush all data to Datanodes
|
|
|
|
+ LOG.info("Done flushing");
|
|
// get last block before destroying the streamer
|
|
// get last block before destroying the streamer
|
|
Block lastBlock = streamer.getBlock();
|
|
Block lastBlock = streamer.getBlock();
|
|
|
|
+ LOG.info("Closing the streams...");
|
|
closeThreads(false);
|
|
closeThreads(false);
|
|
completeFile(lastBlock);
|
|
completeFile(lastBlock);
|
|
leasechecker.remove(src);
|
|
leasechecker.remove(src);
|