|
@@ -32,6 +32,7 @@ using namespace hdfs;
|
|
using ::hadoop::common::TokenProto;
|
|
using ::hadoop::common::TokenProto;
|
|
using ::hadoop::hdfs::BlockOpResponseProto;
|
|
using ::hadoop::hdfs::BlockOpResponseProto;
|
|
using ::hadoop::hdfs::ChecksumProto;
|
|
using ::hadoop::hdfs::ChecksumProto;
|
|
|
|
+using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
|
|
using ::hadoop::hdfs::ExtendedBlockProto;
|
|
using ::hadoop::hdfs::ExtendedBlockProto;
|
|
using ::hadoop::hdfs::PacketHeaderProto;
|
|
using ::hadoop::hdfs::PacketHeaderProto;
|
|
using ::hadoop::hdfs::ReadOpChecksumInfoProto;
|
|
using ::hadoop::hdfs::ReadOpChecksumInfoProto;
|
|
@@ -90,13 +91,14 @@ ProducePacket(const std::string &data, const std::string &checksum,
|
|
return std::make_pair(error_code(), std::move(payload));
|
|
return std::make_pair(error_code(), std::move(payload));
|
|
}
|
|
}
|
|
|
|
|
|
-static std::shared_ptr<RemoteBlockReader<MockDNConnection>>
|
|
|
|
-ReadContent(MockDNConnection *conn, TokenProto *token,
|
|
|
|
|
|
+template<class Stream = MockDNConnection>
|
|
|
|
+static std::shared_ptr<RemoteBlockReader<Stream>>
|
|
|
|
+ReadContent(Stream *conn, TokenProto *token,
|
|
const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
|
|
const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
|
|
const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
|
|
const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
|
|
BlockReaderOptions options;
|
|
BlockReaderOptions options;
|
|
auto reader =
|
|
auto reader =
|
|
- std::make_shared<RemoteBlockReader<MockDNConnection>>(options, conn);
|
|
|
|
|
|
+ std::make_shared<RemoteBlockReader<Stream>>(options, conn);
|
|
Status result;
|
|
Status result;
|
|
reader->async_connect(
|
|
reader->async_connect(
|
|
"libhdfs++", token, &block, length, offset,
|
|
"libhdfs++", token, &block, length, offset,
|
|
@@ -121,7 +123,6 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
|
|
BlockOpResponseProto block_op_resp;
|
|
BlockOpResponseProto block_op_resp;
|
|
|
|
|
|
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
|
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
|
-
|
|
|
|
EXPECT_CALL(conn, Produce())
|
|
EXPECT_CALL(conn, Produce())
|
|
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
|
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
|
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
|
|
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
|
|
@@ -205,6 +206,49 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+TEST(RemoteBlockReaderTest, TestSaslConnection) {
|
|
|
|
+ static const size_t kChunkSize = 512;
|
|
|
|
+ static const string kChunkData(kChunkSize, 'a');
|
|
|
|
+ static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
|
|
|
|
+ "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
|
|
|
|
+ "charset=utf-8,algorithm=md5-sess";
|
|
|
|
+ MockDNConnection conn;
|
|
|
|
+ BlockOpResponseProto block_op_resp;
|
|
|
|
+ block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
|
|
|
+
|
|
|
|
+ DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1;
|
|
|
|
+ sasl_resp0.set_status(
|
|
|
|
+ ::hadoop::hdfs::
|
|
|
|
+ DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
|
|
|
|
+ sasl_resp0.set_payload(kAuthPayload);
|
|
|
|
+ sasl_resp1.set_status(
|
|
|
|
+ ::hadoop::hdfs::
|
|
|
|
+ DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
|
|
|
|
+
|
|
|
|
+ EXPECT_CALL(conn, Produce())
|
|
|
|
+ .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
|
|
|
|
+ .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
|
|
|
|
+ .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
|
|
|
+ .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
|
|
|
|
+
|
|
|
|
+ DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
|
|
|
|
+ ExtendedBlockProto block;
|
|
|
|
+ std::string data(kChunkSize, 0);
|
|
|
|
+ size_t transferred = 0;
|
|
|
|
+ Status stat;
|
|
|
|
+ sasl_conn.Handshake([&stat](const Status &s) {
|
|
|
|
+ stat = s;
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ ASSERT_TRUE(stat.ok());
|
|
|
|
+ ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
|
|
|
|
+ buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
|
|
|
|
+ &transferred);
|
|
|
|
+ ASSERT_TRUE(stat.ok());
|
|
|
|
+ ASSERT_EQ(kChunkSize, transferred);
|
|
|
|
+ ASSERT_EQ(kChunkData, data);
|
|
|
|
+}
|
|
|
|
+
|
|
int main(int argc, char *argv[]) {
|
|
int main(int argc, char *argv[]) {
|
|
// The following line must be executed to initialize Google Mock
|
|
// The following line must be executed to initialize Google Mock
|
|
// (and Google Test) before running the tests.
|
|
// (and Google Test) before running the tests.
|