Explorar o código

HDDS-419. ChunkInputStream bulk read api does not read from all the chunks. Contributed by Lokesh Jain and Mukul Kumar.

(cherry picked from commit 6f037468bce7bbda6b9fc01166f2c61ae40b690b)
Xiaoyu Yao %!s(int64=6) %!d(string=hai) anos
pai
achega
435c3eacff

+ 23 - 7
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java

@@ -121,12 +121,17 @@ public class ChunkInputStream extends InputStream implements Seekable {
       return 0;
     }
     checkOpen();
-    int available = prepareRead(len);
-    if (available == EOF) {
-      return EOF;
+    int total = 0;
+    while (len > 0) {
+      int available = prepareRead(len);
+      if (available == EOF) {
+        return total != 0 ? total : EOF;
+      }
+      buffers.get(bufferIndex).get(b, off + total, available);
+      len -= available;
+      total += available;
     }
-    buffers.get(bufferIndex).get(b, off, available);
-    return available;
+    return total;
   }
 
   @Override
@@ -196,13 +201,20 @@ public class ChunkInputStream extends InputStream implements Seekable {
     // next chunk
     chunkIndex += 1;
     final ReadChunkResponseProto readChunkResponse;
+    final ChunkInfo chunkInfo = chunks.get(chunkIndex);
     try {
-      readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
-          chunks.get(chunkIndex), blockID, traceID);
+      readChunkResponse = ContainerProtocolCalls
+          .readChunk(xceiverClient, chunkInfo, blockID, traceID);
     } catch (IOException e) {
       throw new IOException("Unexpected OzoneException: " + e.toString(), e);
     }
     ByteString byteString = readChunkResponse.getData();
+    if (byteString.size() != chunkInfo.getLen()) {
+      // Bytes read from chunk should be equal to chunk size.
+      throw new IOException(String
+          .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
+              chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
+    }
     buffers = byteString.asReadOnlyByteBufferList();
     bufferIndex = 0;
   }
@@ -260,4 +272,8 @@ public class ChunkInputStream extends InputStream implements Seekable {
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
+
+  public BlockID getBlockID() {
+    return blockID;
+  }
 }

+ 13 - 12
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java

@@ -115,19 +115,20 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
         return totalReadLen == 0 ? EOF : totalReadLen;
       }
       ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int readLen = Math.min(len, (int)current.getRemaining());
-      int actualLen = current.read(b, off, readLen);
-      // this means the underlying stream has nothing at all, return
-      if (actualLen == EOF) {
-        return totalReadLen > 0 ? totalReadLen : EOF;
+      int numBytesToRead = Math.min(len, (int)current.getRemaining());
+      int numBytesRead = current.read(b, off, numBytesToRead);
+      if (numBytesRead != numBytesToRead) {
+        // This implies that there is either data loss or corruption in the
+        // chunk entries. Even EOF in the current stream would be covered in
+        // this case.
+        throw new IOException(String.format(
+            "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
+            current.chunkInputStream.getBlockID(), current.length,
+            numBytesRead));
       }
-      totalReadLen += actualLen;
-      // this means there is no more data to read beyond this point, return
-      if (actualLen != readLen) {
-        return totalReadLen;
-      }
-      off += readLen;
-      len -= readLen;
+      totalReadLen += numBytesRead;
+      off += numBytesRead;
+      len -= numBytesRead;
       if (current.getRemaining() <= 0) {
         currentStreamIndex += 1;
       }

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java

@@ -68,7 +68,7 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfKeys(1);
     randomKeyGenerator.setType(ReplicationType.RATIS);
     randomKeyGenerator.setFactor(ReplicationFactor.THREE);
-    randomKeyGenerator.setKeySize(104857600);
+    randomKeyGenerator.setKeySize(20971520);
     randomKeyGenerator.setValidateWrites(true);
     randomKeyGenerator.call();
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
@@ -84,7 +84,7 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfVolumes(1);
     randomKeyGenerator.setNumOfBuckets(1);
     randomKeyGenerator.setNumOfKeys(1);
-    randomKeyGenerator.setKeySize(104857600);
+    randomKeyGenerator.setKeySize(20971520);
     randomKeyGenerator.setValidateWrites(true);
     randomKeyGenerator.call();
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());

+ 4 - 3
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -984,9 +985,9 @@ public final class RandomKeyGenerator implements Callable<Void> {
               writeValidationFailureCount++;
               LOG.warn("Data validation error for key {}/{}/{}",
                   kv.bucket.getVolumeName(), kv.bucket, kv.key);
-              LOG.warn("Expected: {}, Actual: {}",
-                  DFSUtil.bytes2String(kv.value),
-                  DFSUtil.bytes2String(value));
+              LOG.warn("Expected checksum: {}, Actual checksum: {}",
+                  DigestUtils.md5Hex(kv.value),
+                  DigestUtils.md5Hex(value));
             }
           }
         } catch (IOException | InterruptedException ex) {