|
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
|
@@ -524,6 +525,37 @@ public class TestDataTransferProtocol {
|
|
assertFalse(hdr.sanityCheck(100));
|
|
assertFalse(hdr.sanityCheck(100));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void TestPipeLineAckCompatibility() throws IOException {
|
|
|
|
+ DataTransferProtos.PipelineAckProto proto = DataTransferProtos
|
|
|
|
+ .PipelineAckProto.newBuilder()
|
|
|
|
+ .setSeqno(0)
|
|
|
|
+ .addReply(Status.CHECKSUM_OK)
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
|
|
|
|
+ .PipelineAckProto.newBuilder().mergeFrom(proto)
|
|
|
|
+ .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
|
|
|
|
+ Status.CHECKSUM_OK))
|
|
|
|
+ .build();
|
|
|
|
+
|
|
|
|
+ ByteArrayOutputStream oldAckBytes = new ByteArrayOutputStream();
|
|
|
|
+ proto.writeDelimitedTo(oldAckBytes);
|
|
|
|
+ PipelineAck oldAck = new PipelineAck();
|
|
|
|
+ oldAck.readFields(new ByteArrayInputStream(oldAckBytes.toByteArray()));
|
|
|
|
+ assertEquals(
|
|
|
|
+ PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status.CHECKSUM_OK),
|
|
|
|
+ oldAck.getHeaderFlag(0));
|
|
|
|
+
|
|
|
|
+ PipelineAck newAck = new PipelineAck();
|
|
|
|
+ ByteArrayOutputStream newAckBytes = new ByteArrayOutputStream();
|
|
|
|
+ newProto.writeDelimitedTo(newAckBytes);
|
|
|
|
+ newAck.readFields(new ByteArrayInputStream(newAckBytes.toByteArray()));
|
|
|
|
+ assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
|
|
|
|
+ Status.CHECKSUM_OK),
|
|
|
|
+ newAck.getHeaderFlag(0));
|
|
|
|
+ }
|
|
|
|
+
|
|
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
|
|
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
|
|
writeBlock(new ExtendedBlock(poolId, blockId),
|
|
writeBlock(new ExtendedBlock(poolId, blockId),
|
|
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
|
|
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
|