|
@@ -33,7 +33,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
|
|
typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
|
|
typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
|
|
struct State {
|
|
struct State {
|
|
std::unique_ptr<asio::ip::tcp::socket> conn_;
|
|
std::unique_ptr<asio::ip::tcp::socket> conn_;
|
|
- std::unique_ptr<Reader> reader_;
|
|
|
|
|
|
+ std::shared_ptr<Reader> reader_;
|
|
std::array<asio::ip::tcp::endpoint, 1> endpoints_;
|
|
std::array<asio::ip::tcp::endpoint, 1> endpoints_;
|
|
size_t transferred_;
|
|
size_t transferred_;
|
|
Reader *reader() { return reader_.get(); }
|
|
Reader *reader() { return reader_.get(); }
|
|
@@ -47,7 +47,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait {
|
|
auto m = continuation::Pipeline<State>::Create();
|
|
auto m = continuation::Pipeline<State>::Create();
|
|
auto &s = m->state();
|
|
auto &s = m->state();
|
|
s.conn_.reset(new tcp::socket(*io_service));
|
|
s.conn_.reset(new tcp::socket(*io_service));
|
|
- s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get()));
|
|
|
|
|
|
+ s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
|
|
auto datanode = dn.id();
|
|
auto datanode = dn.id();
|
|
s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
|
|
s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
|
|
datanode.xferport());
|
|
datanode.xferport());
|