|
@@ -33,6 +33,7 @@ import java.net.Socket;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.Random;
|
|
|
|
|
|
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
|
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
|
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
|
|
@@ -518,6 +520,35 @@ public class TestDataTransferProtocol {
|
|
|
assertFalse(hdr.sanityCheck(100));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void TestPipeLineAckCompatibility() throws IOException {
|
|
|
+ DataTransferProtos.PipelineAckProto proto = DataTransferProtos
|
|
|
+ .PipelineAckProto.newBuilder()
|
|
|
+ .setSeqno(0)
|
|
|
+ .addReply(Status.CHECKSUM_OK)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
|
|
|
+ .PipelineAckProto.newBuilder().mergeFrom(proto)
|
|
|
+ .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
|
|
|
+ Status.CHECKSUM_OK))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ByteOutputStream oldAckBytes = new ByteOutputStream();
|
|
|
+ proto.writeDelimitedTo(oldAckBytes);
|
|
|
+ PipelineAck oldAck = new PipelineAck();
|
|
|
+ oldAck.readFields(new ByteArrayInputStream(oldAckBytes.getBytes()));
|
|
|
+ assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status
|
|
|
+ .CHECKSUM_OK), oldAck.getHeaderFlag(0));
|
|
|
+
|
|
|
+ PipelineAck newAck = new PipelineAck();
|
|
|
+ ByteOutputStream newAckBytes = new ByteOutputStream();
|
|
|
+ newProto.writeDelimitedTo(newAckBytes);
|
|
|
+ newAck.readFields(new ByteArrayInputStream(newAckBytes.getBytes()));
|
|
|
+ assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
|
|
|
+ .CHECKSUM_OK), newAck.getHeaderFlag(0));
|
|
|
+ }
|
|
|
+
|
|
|
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
|
|
|
writeBlock(new ExtendedBlock(poolId, blockId),
|
|
|
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);
|