فهرست منبع

HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao.

(cherry picked from commit e91ccfad07ec5b5674a84009772dd31a82b4e4de)
Jing Zhao 9 سال پیش
والد
کامیت
70df729a1e

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -10,6 +10,8 @@ Release 2.7.5 - UNRELEASED
 
     HDFS-9153. Pretty-format the output for DFSIO. (Kai Zheng via wheat9)
 
+    HDFS-8797. WebHdfsFileSystem creates too many connections for pread. (jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 51 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.web;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -66,6 +67,16 @@ public abstract class ByteRangeInputStream extends FSInputStream {
         final boolean resolved) throws IOException;
   }
 
+  static class InputStreamAndFileLength {
+    final Long length;
+    final InputStream in;
+
+    InputStreamAndFileLength(Long length, InputStream in) {
+      this.length = length;
+      this.in = in;
+    }
+  }
+
   enum StreamStatus {
     NORMAL, SEEK, CLOSED
   }
@@ -102,7 +113,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
         if (in != null) {
           in.close();
         }
-        in = openInputStream();
+        InputStreamAndFileLength fin = openInputStream(startPos);
+        in = fin.in;
+        fileLength = fin.length;
         status = StreamStatus.NORMAL;
         break;
       case CLOSED:
@@ -112,31 +125,33 @@ public abstract class ByteRangeInputStream extends FSInputStream {
   }
 
   @VisibleForTesting
-  protected InputStream openInputStream() throws IOException {
+  protected InputStreamAndFileLength openInputStream(long startOffset)
+      throws IOException {
     // Use the original url if no resolved url exists, eg. if
     // it's the first time a request is made.
     final boolean resolved = resolvedURL.getURL() != null;
     final URLOpener opener = resolved? resolvedURL: originalURL;
 
-    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    final HttpURLConnection connection = opener.connect(startOffset, resolved);
     resolvedURL.setURL(getResolvedUrl(connection));
 
     InputStream in = connection.getInputStream();
+    final Long length;
     final Map<String, List<String>> headers = connection.getHeaderFields();
     if (isChunkedTransferEncoding(headers)) {
       // file length is not known
-      fileLength = null;
+      length = null;
     } else {
       // for non-chunked transfer-encoding, get content-length
       long streamlength = getStreamLength(connection, headers);
-      fileLength = startPos + streamlength;
+      length = startOffset + streamlength;
 
       // Java has a bug with >2GB request streams.  It won't bounds check
       // the reads so the transfer blocks until the server times out
       in = new BoundedInputStream(in, streamlength);
     }
 
-    return in;
+    return new InputStreamAndFileLength(length, in);
   }
 
   private static long getStreamLength(HttpURLConnection connection,
@@ -230,6 +245,36 @@ public abstract class ByteRangeInputStream extends FSInputStream {
     }
   }
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    try (InputStream in = openInputStream(position).in) {
+      return in.read(buffer, offset, length);
+    }
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    final InputStreamAndFileLength fin = openInputStream(position);
+    if (fin.length != null && length + position > fin.length) {
+      throw new EOFException("The length to read " + length
+          + " exceeds the file length " + fin.length);
+    }
+    try {
+      int nread = 0;
+      while (nread < length) {
+        int nbytes = fin.in.read(buffer, offset + nread, length - nread);
+        if (nbytes < 0) {
+          throw new EOFException("End of file reached before reading fully.");
+        }
+        nread += nbytes;
+      }
+    } finally {
+      fin.in.close();
+    }
+  }
+
   /**
    * Return the current offset from the start of the file
    */

+ 19 - 16
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java

@@ -35,7 +35,9 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 
 import com.google.common.net.HttpHeaders;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestByteRangeInputStream {
@@ -140,8 +142,9 @@ public class TestByteRangeInputStream {
   public void testPropagatedClose() throws IOException {
     ByteRangeInputStream bris =
         mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
-    InputStream mockStream = mock(InputStream.class);
-    doReturn(mockStream).when(bris).openInputStream();
+    InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
+        mock(InputStream.class));
+    doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
     Whitebox.setInternalState(bris, "status",
                               ByteRangeInputStream.StreamStatus.SEEK);
 
@@ -151,46 +154,46 @@ public class TestByteRangeInputStream {
 
     // first open, shouldn't close underlying stream
     bris.getInputStream();
-    verify(bris, times(++brisOpens)).openInputStream();
+    verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
 
     // stream is open, shouldn't close underlying stream
     bris.getInputStream();
-    verify(bris, times(brisOpens)).openInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
 
     // seek forces a reopen, should close underlying stream
     bris.seek(1);
     bris.getInputStream();
-    verify(bris, times(++brisOpens)).openInputStream();
+    verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
-    verify(mockStream, times(++isCloses)).close();
+    verify(mockStream.in, times(++isCloses)).close();
 
     // verify that the underlying stream isn't closed after a seek
     // ie. the state was correctly updated
     bris.getInputStream();
-    verify(bris, times(brisOpens)).openInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
 
     // seeking to same location should be a no-op
     bris.seek(1);
     bris.getInputStream();
-    verify(bris, times(brisOpens)).openInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
 
     // close should of course close
     bris.close();
     verify(bris, times(++brisCloses)).close();
-    verify(mockStream, times(++isCloses)).close();
+    verify(mockStream.in, times(++isCloses)).close();
 
     // it's already closed, underlying stream should not close
     bris.close();
     verify(bris, times(++brisCloses)).close();
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
 
     // it's closed, don't reopen it
     boolean errored = false;
@@ -202,9 +205,9 @@ public class TestByteRangeInputStream {
     } finally {
       assertTrue("Read a closed steam", errored);
     }
-    verify(bris, times(brisOpens)).openInputStream();
+    verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
     verify(bris, times(brisCloses)).close();
 
-    verify(mockStream, times(isCloses)).close();
+    verify(mockStream.in, times(isCloses)).close();
   }
 }

+ 39 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java

@@ -591,6 +591,45 @@ public class TestWebHDFS {
     }
   }
 
+  @Test
+  public void testWebHdfsPread() throws Exception {
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .build();
+    byte[] content = new byte[1024];
+    RANDOM.nextBytes(content);
+    final Path foo = new Path("/foo");
+    FSDataInputStream in = null;
+    try {
+      final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+          WebHdfsFileSystem.SCHEME);
+      try (OutputStream os = fs.create(foo)) {
+        os.write(content);
+      }
+
+      // pread
+      in = fs.open(foo, 1024);
+      byte[] buf = new byte[1024];
+      try {
+        in.readFully(1020, buf, 0, 5);
+        Assert.fail("EOF expected");
+      } catch (EOFException ignored) {}
+
+      // mix pread with stateful read
+      int length = in.read(buf, 0, 512);
+      in.readFully(100, new byte[1024], 0, 100);
+      int preadLen = in.read(200, new byte[1024], 0, 200);
+      Assert.assertTrue(preadLen > 0);
+      IOUtils.readFully(in, buf, length, 1024 - length);
+      Assert.assertArrayEquals(content, buf);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+      cluster.shutdown();
+    }
+  }
+
   @Test(timeout=90000)
   public void testWebHdfsReadRetries() throws Exception {
     // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);