|
@@ -65,7 +65,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
private ByteBuffer buf; // contains one full packet.
|
|
|
private int bufRead; //amount of valid data in the buf
|
|
|
private int maxPacketReadLen;
|
|
|
- protected long offsetInBlock;
|
|
|
protected final String inAddr;
|
|
|
protected final String myAddr;
|
|
|
private String mirrorAddr;
|
|
@@ -78,6 +77,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
DatanodeInfo srcDataNode = null;
|
|
|
private Checksum partialCrc = null;
|
|
|
private final DataNode datanode;
|
|
|
+ final private ReplicaInPipelineInterface replicaInfo;
|
|
|
|
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
|
String myAddr, boolean isRecovery, String clientName,
|
|
@@ -89,7 +89,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
this.myAddr = myAddr;
|
|
|
this.isRecovery = isRecovery;
|
|
|
this.clientName = clientName;
|
|
|
- this.offsetInBlock = 0;
|
|
|
this.srcDataNode = srcDataNode;
|
|
|
this.datanode = datanode;
|
|
|
this.checksum = DataChecksum.newDataChecksum(in);
|
|
@@ -100,13 +99,14 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
// Open local disk out
|
|
|
//
|
|
|
if (clientName.length() == 0) { //replication or move
|
|
|
- streams = datanode.data.writeToTemporary(block);
|
|
|
+ replicaInfo = datanode.data.writeToTemporary(block);
|
|
|
} else if (finalized && isRecovery) { // client append
|
|
|
- streams = datanode.data.append(block);
|
|
|
+ replicaInfo = datanode.data.append(block);
|
|
|
this.finalized = false;
|
|
|
} else { // client write
|
|
|
- streams = datanode.data.writeToRbw(block, isRecovery);
|
|
|
+ replicaInfo = datanode.data.writeToRbw(block, isRecovery);
|
|
|
}
|
|
|
+ streams = replicaInfo.createStreams();
|
|
|
if (streams != null) {
|
|
|
this.out = streams.dataOut;
|
|
|
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
|
@@ -397,10 +397,22 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
buf.mark();
|
|
|
//read the header
|
|
|
buf.getInt(); // packet length
|
|
|
- offsetInBlock = buf.getLong(); // get offset of packet in block
|
|
|
+ long offsetInBlock = buf.getLong(); // get offset of packet in block
|
|
|
+
|
|
|
+ if (offsetInBlock > replicaInfo.getNumBytes()) {
|
|
|
+ throw new IOException("Received an out-of-sequence packet for " + block +
|
|
|
+ "from " + inAddr + " at offset " + offsetInBlock +
|
|
|
+ ". Expecting packet starting at " + replicaInfo.getNumBytes());
|
|
|
+ }
|
|
|
long seqno = buf.getLong(); // get seqno
|
|
|
boolean lastPacketInBlock = (buf.get() != 0);
|
|
|
|
|
|
+ int len = buf.getInt();
|
|
|
+ if (len < 0) {
|
|
|
+ throw new IOException("Got wrong length during writeBlock(" + block +
|
|
|
+ ") from " + inAddr + " at offset " +
|
|
|
+ offsetInBlock + ": " + len);
|
|
|
+ }
|
|
|
int endOfHeader = buf.position();
|
|
|
buf.reset();
|
|
|
|
|
@@ -412,8 +424,12 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
" lastPacketInBlock " + lastPacketInBlock);
|
|
|
}
|
|
|
|
|
|
- setBlockPosition(offsetInBlock);
|
|
|
-
|
|
|
+ // update received bytes
|
|
|
+ offsetInBlock += len;
|
|
|
+ if (replicaInfo.getNumBytes() < offsetInBlock) {
|
|
|
+ replicaInfo.setNumBytes(offsetInBlock);
|
|
|
+ }
|
|
|
+
|
|
|
//First write the packet to the mirror:
|
|
|
if (mirrorOut != null) {
|
|
|
try {
|
|
@@ -425,19 +441,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
buf.position(endOfHeader);
|
|
|
- int len = buf.getInt();
|
|
|
|
|
|
- if (len < 0) {
|
|
|
- throw new IOException("Got wrong length during writeBlock(" + block +
|
|
|
- ") from " + inAddr + " at offset " +
|
|
|
- offsetInBlock + ": " + len);
|
|
|
- }
|
|
|
-
|
|
|
if (len == 0) {
|
|
|
LOG.debug("Receiving empty packet for block " + block);
|
|
|
} else {
|
|
|
- offsetInBlock += len;
|
|
|
-
|
|
|
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
|
|
|
checksumSize;
|
|
|
|
|
@@ -463,8 +470,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- if (!finalized) {
|
|
|
+ if (!finalized && replicaInfo.getBytesOnDisk()<offsetInBlock) {
|
|
|
//finally write to the disk :
|
|
|
+ setBlockPosition(offsetInBlock-len);
|
|
|
+
|
|
|
out.write(pktBuf, dataOff, len);
|
|
|
|
|
|
// If this is a partial chunk, then verify that this is the only
|
|
@@ -485,6 +494,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
} else {
|
|
|
checksumOut.write(pktBuf, checksumOff, checksumLen);
|
|
|
}
|
|
|
+ replicaInfo.setBytesOnDisk(offsetInBlock);
|
|
|
datanode.myMetrics.bytesWritten.inc(len);
|
|
|
}
|
|
|
} catch (IOException iex) {
|
|
@@ -499,7 +509,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
// put in queue for pending acks
|
|
|
if (responder != null) {
|
|
|
((PacketResponder)responder.getRunnable()).enqueue(seqno,
|
|
|
- lastPacketInBlock);
|
|
|
+ lastPacketInBlock, offsetInBlock);
|
|
|
}
|
|
|
|
|
|
if (throttler != null) { // throttle I/O
|
|
@@ -569,7 +579,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
close();
|
|
|
|
|
|
// Finalize the block. Does this fsync()?
|
|
|
- block.setNumBytes(offsetInBlock);
|
|
|
+ block.setNumBytes(replicaInfo.getNumBytes());
|
|
|
datanode.data.finalizeBlock(block);
|
|
|
datanode.myMetrics.blocksWritten.inc();
|
|
|
}
|
|
@@ -741,12 +751,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
* enqueue the seqno that is still be to acked by the downstream datanode.
|
|
|
* @param seqno
|
|
|
* @param lastPacketInBlock
|
|
|
+ * @param lastByteInPacket
|
|
|
*/
|
|
|
- synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
|
|
|
+ synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
|
|
|
if (running) {
|
|
|
LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
|
|
|
" to ack queue.");
|
|
|
- ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
|
|
|
+ ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
|
|
|
notifyAll();
|
|
|
}
|
|
|
}
|
|
@@ -820,7 +831,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
if (!receiver.finalized) {
|
|
|
receiver.close();
|
|
|
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
- block.setNumBytes(receiver.offsetInBlock);
|
|
|
+ block.setNumBytes(replicaInfo.getNumBytes());
|
|
|
datanode.data.finalizeBlock(block);
|
|
|
datanode.myMetrics.blocksWritten.inc();
|
|
|
datanode.notifyNamenodeReceivedBlock(block,
|
|
@@ -910,6 +921,9 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
pkt = ackQueue.removeFirst();
|
|
|
expected = pkt.seqno;
|
|
|
+ if (pkt.lastByteInBlock > replicaInfo.getBytesAcked()) {
|
|
|
+ replicaInfo.setBytesAcked(pkt.lastByteInBlock);
|
|
|
+ }
|
|
|
notifyAll();
|
|
|
LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
|
|
|
if (seqno != expected) {
|
|
@@ -953,7 +967,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
if (lastPacketInBlock && !receiver.finalized) {
|
|
|
receiver.close();
|
|
|
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
- block.setNumBytes(receiver.offsetInBlock);
|
|
|
+ block.setNumBytes(replicaInfo.getNumBytes());
|
|
|
datanode.data.finalizeBlock(block);
|
|
|
datanode.myMetrics.blocksWritten.inc();
|
|
|
datanode.notifyNamenodeReceivedBlock(block,
|
|
@@ -1042,10 +1056,12 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
static private class Packet {
|
|
|
long seqno;
|
|
|
boolean lastPacketInBlock;
|
|
|
+ long lastByteInBlock;
|
|
|
|
|
|
- Packet(long seqno, boolean lastPacketInBlock) {
|
|
|
+ Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
|
|
|
this.seqno = seqno;
|
|
|
this.lastPacketInBlock = lastPacketInBlock;
|
|
|
+ this.lastByteInBlock = lastByteInPacket;
|
|
|
}
|
|
|
}
|
|
|
}
|