瀏覽代碼

HDFS-10247: libhdfs++: Datanode protocol version mismatch fix. Contributed by James Clampffer

James 9 年之前
父節點
當前提交
60c3437267

+ 11 - 6
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h

@@ -27,6 +27,7 @@
 #include <asio/read.hpp>
 #include <asio/write.hpp>
 #include <asio/ip/tcp.hpp>
+#include <memory>
 
 namespace hdfs {
 namespace asio_continuation {
@@ -36,7 +37,7 @@ using namespace continuation;
 template <class Stream, class MutableBufferSequence>
 class ReadContinuation : public Continuation {
 public:
-  ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
+  ReadContinuation(std::shared_ptr<Stream>& stream, const MutableBufferSequence &buffer)
       : stream_(stream), buffer_(buffer) {}
   virtual void Run(const Next &next) override {
     auto handler =
@@ -45,14 +46,16 @@ public:
   }
 
 private:
-  Stream *stream_;
+  // prevent construction from raw ptr
+  ReadContinuation(Stream *stream, MutableBufferSequence &buffer);
+  std::shared_ptr<Stream> stream_;
   MutableBufferSequence buffer_;
 };
 
 template <class Stream, class ConstBufferSequence>
 class WriteContinuation : public Continuation {
 public:
-  WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
+  WriteContinuation(std::shared_ptr<Stream>& stream, const ConstBufferSequence &buffer)
       : stream_(stream), buffer_(buffer) {}
 
   virtual void Run(const Next &next) override {
@@ -62,7 +65,9 @@ public:
   }
 
 private:
-  Stream *stream_;
+  // prevent construction from raw ptr
+  WriteContinuation(Stream *stream, ConstBufferSequence &buffer);
+  std::shared_ptr<Stream> stream_;
   ConstBufferSequence buffer_;
 };
 
@@ -117,13 +122,13 @@ private:
 };
 
 template <class Stream, class ConstBufferSequence>
-static inline Continuation *Write(Stream *stream,
+static inline Continuation *Write(std::shared_ptr<Stream> stream,
                                   const ConstBufferSequence &buffer) {
   return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
 }
 
 template <class Stream, class MutableBufferSequence>
-static inline Continuation *Read(Stream *stream,
+static inline Continuation *Read(std::shared_ptr<Stream> stream,
                                  const MutableBufferSequence &buffer) {
   return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
 }

+ 8 - 7
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h

@@ -94,13 +94,14 @@ struct WriteDelimitedPBMessageContinuation : Continuation {
       : stream_(stream), msg_(msg) {}
 
   virtual void Run(const Next &next) override {
-    namespace pbio = google::protobuf::io;
-    int size = msg_->ByteSize();
-    buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
-    pbio::StringOutputStream ss(&buf_);
-    pbio::CodedOutputStream os(&ss);
-    os.WriteVarint32(size);
-    msg_->SerializeToCodedStream(&os);
+    bool success = true;
+    buf_ = SerializeDelimitedProtobufMessage(msg_, &success);
+
+    if(!success) {
+      next(Status::Error("Unable to serialize protobuf message."));
+      return;
+    }
+
     asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } );
   }
 

+ 35 - 3
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc

@@ -18,12 +18,44 @@
 
 #include "common/util.h"
 
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
 namespace hdfs {
 
+bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
+                            ::google::protobuf::MessageLite *msg) {
+  uint32_t size = 0;
+  in->ReadVarint32(&size);
+  auto limit = in->PushLimit(size);
+  bool res = msg->ParseFromCodedStream(in);
+  in->PopLimit(limit);
+
+  return res;
+}
+
+
+std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
+                                              bool *err) {
+  namespace pbio = ::google::protobuf::io;
+
+  std::string buf;
+
+  int size = msg->ByteSize();
+  buf.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+  pbio::StringOutputStream ss(&buf);
+  pbio::CodedOutputStream os(&ss);
+  os.WriteVarint32(size);
+
+  if(err)
+    *err = msg->SerializeToCodedStream(&os);
+
+  return buf;
+}
+
+
 std::string GetRandomClientName() {
-  unsigned char buf[6] = {
-      0,
-  };
+  unsigned char buf[6];
+
   RAND_pseudo_bytes(buf, sizeof(buf));
 
   std::stringstream ss;

+ 16 - 19
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h

@@ -28,6 +28,7 @@
 #include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
 
+
 namespace hdfs {
 
 static inline Status ToStatus(const ::asio::error_code &ec) {
@@ -38,32 +39,30 @@ static inline Status ToStatus(const ::asio::error_code &ec) {
   }
 }
 
-static inline int DelimitedPBMessageSize(
-    const ::google::protobuf::MessageLite *msg) {
+// Determine size of buffer that needs to be allocated in order to serialize msg
+// in delimited format
+static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) {
   size_t size = msg->ByteSize();
   return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
 }
 
-static inline void ReadDelimitedPBMessage(
-    ::google::protobuf::io::CodedInputStream *in,
-    ::google::protobuf::MessageLite *msg) {
-  uint32_t size = 0;
-  in->ReadVarint32(&size);
-  auto limit = in->PushLimit(size);
-  msg->ParseFromCodedStream(in);
-  in->PopLimit(limit);
-}
+// Construct msg from the input held in the CodedInputStream
+// return false on failure, otherwise return true
+bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
+                            ::google::protobuf::MessageLite *msg);
+
+// Serialize msg into a delimited form (java protobuf compatible)
+// err, if not null, will be set to false on failure
+std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
+                                              bool *err);
 
 std::string Base64Encode(const std::string &src);
 
-/*
- * Returns a new high-entropy client name
- */
+// Return a new high-entropy client name
 std::string GetRandomClientName();
 
-/* Returns true if _someone_ is holding the lock (not necessarily this thread,
- * but a std::mutex doesn't track which thread is holding the lock)
- */
+// Returns true if _someone_ is holding the lock (not necessarily this thread,
+// but a std::mutex doesn't track which thread is holding the lock)
 template<class T>
 bool lock_held(T & mutex) {
   bool result = !mutex.try_lock();
@@ -72,8 +71,6 @@ bool lock_held(T & mutex) {
   return result;
 }
 
-
-
 }
 
 #endif

+ 17 - 6
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc

@@ -20,6 +20,7 @@
 #include "common/continuation/continuation.h"
 #include "common/continuation/asio.h"
 #include "common/logging.h"
+#include "common/util.h"
 
 #include <future>
 
@@ -55,6 +56,9 @@ ReadBlockProto(const std::string &client_name, bool verify_checksum,
   return p;
 }
 
+
+static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock};
+
 void BlockReaderImpl::AsyncRequestBlock(
     const std::string &client_name,
     const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
@@ -78,17 +82,24 @@ void BlockReaderImpl::AsyncRequestBlock(
   auto m = continuation::Pipeline<State>::Create(cancel_state_);
   State *s = &m->state();
 
-  s->header.insert(s->header.begin(),
-                   {0, kDataTransferVersion, Operation::kReadBlock});
-  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
-                                        dn_->token_.get(), block, length, offset));
+  s->request = ReadBlockProto(client_name, options_.verify_checksum,
+                              dn_->token_.get(), block, length, offset);
+
+  s->header = std::string((const char*)unsecured_request_block_header, 3);
+
+  bool serialize_success = true;
+  s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success);
+
+  if(!serialize_success) {
+    handler(Status::Error("Unable to serialize protobuf message"));
+    return;
+  }
 
   auto read_pb_message =
       new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(
           dn_, &s->response);
 
-  m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header)))
-      .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request))
+  m->Push(asio_continuation::Write(dn_, asio::buffer(s->header)))
       .Push(read_pb_message);
 
   m->Run([this, handler, offset](const Status &status, const State &s) {    Status stat = status;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h

@@ -117,7 +117,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
 
   DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
 
-  m->Push(Write(stream_.get(), kMagicNumberBuffer))
+  m->Push(Write(stream_, kMagicNumberBuffer))
       .Push(WriteDelimitedPBMessage(stream_, &s->req0))
       .Push(new ReadSaslMessage(stream_, &s->resp0))
       .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))