|
@@ -25,6 +25,7 @@
|
|
|
|
|
|
#include <functional>
|
|
|
#include <future>
|
|
|
+#include <type_traits>
|
|
|
|
|
|
namespace hdfs {
|
|
|
|
|
@@ -91,7 +92,10 @@ struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
|
|
|
ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
|
|
|
size_t *transferred)
|
|
|
: reader_(reader), buffer_(buffer),
|
|
|
- buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
|
|
|
+ buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
|
|
|
+ static_assert(!std::is_reference<MutableBufferSequence>::value,
|
|
|
+ "Buffer must not be a reference type");
|
|
|
+ }
|
|
|
|
|
|
virtual void Run(const Next &next) override {
|
|
|
*transferred_ = 0;
|
|
@@ -101,7 +105,7 @@ struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
|
|
|
|
|
|
private:
|
|
|
Reader *reader_;
|
|
|
- MutableBufferSequence buffer_;
|
|
|
+ const MutableBufferSequence buffer_;
|
|
|
const size_t buffer_size_;
|
|
|
size_t *transferred_;
|
|
|
std::function<void(const Status &)> next_;
|
|
@@ -176,12 +180,12 @@ void InputStreamImpl::AsyncReadBlock(
|
|
|
size_t size = asio::buffer_size(buffers);
|
|
|
m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
|
|
|
&block.b(), size, offset))
|
|
|
- .Push(new ReadBlockContinuation<Reader, decltype(buffers)>(
|
|
|
+ .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>(
|
|
|
s.reader(), buffers, s.transferred()));
|
|
|
const std::string &dnid = dn.id().datanodeuuid();
|
|
|
m->Run([handler, dnid](const Status &status,
|
|
|
- const typename BlockReaderTrait::State &state) {
|
|
|
- handler(status, dnid, *state.transferred());
|
|
|
+ const typename BlockReaderTrait::State &state) {
|
|
|
+ handler(status, dnid, *state.transferred());
|
|
|
});
|
|
|
}
|
|
|
}
|