Explorar o código

HDFS-10543: libhdfs++: hdfsRead stops at block boundary. Contributed by Xiaowei Zhu.

James %!s(int64=9) %!d(string=hai) anos
pai
achega
48fd946a80

+ 23 - 15
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc

@@ -78,26 +78,34 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
   LOG_TRACE(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
                          << FMT_THIS_ADDR << ", buf=" << buf
                          << ", nbyte=" << *nbyte << ") called");
+  size_t totalBytesRead = 0;
+  Status stat = Status::OK();
+  while (*nbyte != 0 && offset < (off_t)(file_info_->file_length_)) {
+    auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
+    std::future<std::tuple<Status, size_t>> future(callstate->get_future());
 
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
-  std::future<std::tuple<Status, size_t>> future(callstate->get_future());
+    /* wrap async call with promise/future to make it blocking */
+    auto callback = [callstate](const Status &s, size_t bytes) {
+      callstate->set_value(std::make_tuple(s,bytes));
+    };
 
-  /* wrap async call with promise/future to make it blocking */
-  auto callback = [callstate](const Status &s, size_t bytes) {
-    callstate->set_value(std::make_tuple(s,bytes));
-  };
+    PositionRead(buf, *nbyte, offset, callback);
 
-  PositionRead(buf, *nbyte, offset, callback);
+    /* wait for async to finish */
+    auto returnstate = future.get();
+    stat = std::get<0>(returnstate);
 
-  /* wait for async to finish */
-  auto returnstate = future.get();
-  auto stat = std::get<0>(returnstate);
+    if (!stat.ok()) {
+      return stat;
+    }
 
-  if (!stat.ok()) {
-    return stat;
+    size_t bytesRead = std::get<1>(returnstate);
+    *nbyte = *nbyte - bytesRead;
+    totalBytesRead += bytesRead;
+    offset += bytesRead;
   }
-
-  *nbyte = std::get<1>(returnstate);
+  /* Update the bytes read for return */
+  *nbyte = totalBytesRead;
   return stat;
 }
 
@@ -110,8 +118,8 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
   if(!stat.ok()) {
     return stat;
   }
-
   offset_ += *nbyte;
+
   return Status::OK();
 }