|
@@ -124,26 +124,25 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
" while receiving block " + block + " from " + inAddr);
|
|
|
}
|
|
|
}
|
|
|
- streams = replicaInfo.createStreams();
|
|
|
+ // read checksum meta information
|
|
|
+ this.checksum = DataChecksum.newDataChecksum(in);
|
|
|
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
+ this.checksumSize = checksum.getChecksumSize();
|
|
|
+
|
|
|
+ boolean isCreate = stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
|
|
|
+ || clientName.length() == 0;
|
|
|
+ streams = replicaInfo.createStreams(isCreate,
|
|
|
+ this.bytesPerChecksum, this.checksumSize);
|
|
|
if (streams != null) {
|
|
|
this.out = streams.dataOut;
|
|
|
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
|
|
streams.checksumOut,
|
|
|
SMALL_BUFFER_SIZE));
|
|
|
|
|
|
- // read checksum meta information
|
|
|
- this.checksum = DataChecksum.newDataChecksum(in);
|
|
|
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
|
|
- this.checksumSize = checksum.getChecksumSize();
|
|
|
-
|
|
|
// write data chunk header if creating a new replica
|
|
|
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
|
|
|
- || clientName.length() == 0) {
|
|
|
+ if (isCreate) {
|
|
|
BlockMetadataHeader.writeHeader(checksumOut, checksum);
|
|
|
- } else {
|
|
|
- datanode.data.setChannelPosition(block, streams, 0,
|
|
|
- BlockMetadataHeader.getHeaderSize());
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
} catch (ReplicaAlreadyExistsException bae) {
|
|
|
throw bae;
|
|
@@ -449,6 +448,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
// update received bytes
|
|
|
+ long firstByteInBlock = offsetInBlock;
|
|
|
offsetInBlock += len;
|
|
|
if (replicaInfo.getNumBytes() < offsetInBlock) {
|
|
|
replicaInfo.setNumBytes(offsetInBlock);
|
|
@@ -479,8 +479,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
checksumSize;
|
|
|
|
|
|
if ( buf.remaining() != (checksumLen + len)) {
|
|
|
- throw new IOException("Data remaining in packet does not match " +
|
|
|
- "sum of checksumLen and dataLen");
|
|
|
+ throw new IOException("Data remaining in packet does not match" +
|
|
|
+ "sum of checksumLen and dataLen " +
|
|
|
+ " size remaining: " + buf.remaining() +
|
|
|
+ " data len: " + len +
|
|
|
+ " checksum Len: " + checksumLen);
|
|
|
}
|
|
|
int checksumOff = buf.position();
|
|
|
int dataOff = checksumOff + checksumLen;
|
|
@@ -500,11 +503,29 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- if (replicaInfo.getBytesOnDisk()<offsetInBlock) {
|
|
|
+ long onDiskLen = replicaInfo.getBytesOnDisk();
|
|
|
+ if (onDiskLen<offsetInBlock) {
|
|
|
//finally write to the disk :
|
|
|
- setBlockPosition(offsetInBlock-len);
|
|
|
|
|
|
- out.write(pktBuf, dataOff, len);
|
|
|
+ if (onDiskLen % bytesPerChecksum != 0) {
|
|
|
+ // prepare to overwrite last checksum
|
|
|
+ adjustCrcFilePosition();
|
|
|
+ }
|
|
|
+
|
|
|
+ // If this is a partial chunk, then read in pre-existing checksum
|
|
|
+ if (firstByteInBlock % bytesPerChecksum != 0) {
|
|
|
+ LOG.info("Packet starts at " + firstByteInBlock +
|
|
|
+ " for block " + block +
|
|
|
+ " which is not a multiple of bytesPerChecksum " +
|
|
|
+ bytesPerChecksum);
|
|
|
+ long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
|
|
+ onDiskLen / bytesPerChecksum * checksumSize;
|
|
|
+ computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
|
|
|
+ }
|
|
|
+
|
|
|
+ int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
|
|
|
+ int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
|
|
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
|
|
|
|
|
|
// If this is a partial chunk, then verify that this is the only
|
|
|
// chunk in the packet. Calculate new crc for this chunk.
|
|
@@ -516,7 +537,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
" len = " + len +
|
|
|
" bytesPerChecksum " + bytesPerChecksum);
|
|
|
}
|
|
|
- partialCrc.update(pktBuf, dataOff, len);
|
|
|
+ partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
|
|
|
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
|
|
|
checksumOut.write(buf);
|
|
|
LOG.debug("Writing out partial crc for data len " + len);
|
|
@@ -626,14 +647,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Sets the file pointer in the local block file to the specified value.
|
|
|
+ * Adjust the file pointer in the local meta file so that the last checksum
|
|
|
+ * will be overwritten.
|
|
|
*/
|
|
|
- private void setBlockPosition(long offsetInBlock) throws IOException {
|
|
|
- if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
|
|
|
- return; // nothing to do
|
|
|
- }
|
|
|
- long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
|
|
- offsetInBlock / bytesPerChecksum * checksumSize;
|
|
|
+ private void adjustCrcFilePosition() throws IOException {
|
|
|
if (out != null) {
|
|
|
out.flush();
|
|
|
}
|
|
@@ -641,23 +658,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
checksumOut.flush();
|
|
|
}
|
|
|
|
|
|
- // If this is a partial chunk, then read in pre-existing checksum
|
|
|
- if (offsetInBlock % bytesPerChecksum != 0) {
|
|
|
- LOG.info("setBlockPosition trying to set position to " +
|
|
|
- offsetInBlock +
|
|
|
- " for block " + block +
|
|
|
- " which is not a multiple of bytesPerChecksum " +
|
|
|
- bytesPerChecksum);
|
|
|
- computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Changing block file offset of block " + block + " from " +
|
|
|
- datanode.data.getChannelPosition(block, streams) +
|
|
|
- " to " + offsetInBlock +
|
|
|
- " meta file offset to " + offsetInChecksum);
|
|
|
-
|
|
|
- // set the position of the block file
|
|
|
- datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
|
|
|
+ // rollback the position of the meta file
|
|
|
+ datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
|
|
|
}
|
|
|
|
|
|
/**
|