浏览代码

svn merge -c -1365843 for reverting HDFS-3696 since the test cannot be compiled.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1365846 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 年之前
父节点
当前提交
f031eb236a

+ 0 - 3
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -81,9 +81,6 @@ Release 0.23.3 - UNRELEASED
     HDFS-3688. Namenode loses datanode hostname if datanode re-registers
     (Jason Lowe via daryn)
 
-    HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations
-    to get around a Java library bug causing OutOfMemoryError.  (szetszwo)
-
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 14 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -412,7 +412,6 @@ public class WebHdfsFileSystem extends FileSystem
     //Step 2) Submit another Http request with the URL from the Location header with data.
     conn = (HttpURLConnection)new URL(redirect).openConnection();
     conn.setRequestMethod(op.getType().toString());
-    conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
     return conn;
   }
 
@@ -825,7 +824,8 @@ public class WebHdfsFileSystem extends FileSystem
     }
 
     private static WebHdfsFileSystem getWebHdfs(
-        final Token<?> token, final Configuration conf) throws IOException {
+        final Token<?> token, final Configuration conf
+        ) throws IOException, InterruptedException, URISyntaxException {
       
       final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
       final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
@@ -839,7 +839,12 @@ public class WebHdfsFileSystem extends FileSystem
       // update the kerberos credentials, if they are coming from a keytab
       ugi.reloginFromKeytab();
 
-      return getWebHdfs(token, conf).renewDelegationToken(token);
+      try {
+        WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
+        return webhdfs.renewDelegationToken(token);
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
     }
   
     @Override
@@ -849,7 +854,12 @@ public class WebHdfsFileSystem extends FileSystem
       // update the kerberos credentials, if they are coming from a keytab
       ugi.checkTGTAndReloginFromKeytab();
 
-      getWebHdfs(token, conf).cancelDelegationToken(token);
+      try {
+        final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
+        webhdfs.cancelDelegationToken(token);
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
     }
   }
   

+ 0 - 199
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java

@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.Assert;
-import org.junit.Test;
-
-/** Test WebHDFS */
-public class TestWebHDFS {
-  static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
-  
-  static final Random RANDOM = new Random();
-  
-  static final long systemStartTime = System.nanoTime();
-
-  /** A timer for measuring performance. */
-  static class Ticker {
-    final String name;
-    final long startTime = System.nanoTime();
-    private long previousTick = startTime;
-
-    Ticker(final String name, String format, Object... args) {
-      this.name = name;
-      LOG.info(String.format("\n\n%s START: %s\n",
-          name, String.format(format, args)));
-    }
-
-    void tick(final long nBytes, String format, Object... args) {
-      final long now = System.nanoTime();
-      if (now - previousTick > 10000000000L) {
-        previousTick = now;
-        final double mintues = (now - systemStartTime)/60000000000.0;
-        LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
-            String.format(format, args), toMpsString(nBytes, now)));
-      }
-    }
-    
-    void end(final long nBytes) {
-      final long now = System.nanoTime();
-      final double seconds = (now - startTime)/1000000000.0;
-      LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
-          name, seconds, toMpsString(nBytes, now)));
-    }
-    
-    String toMpsString(final long nBytes, final long now) {
-      final double mb = nBytes/(double)(1<<20);
-      final double mps = mb*1000000000.0/(now - startTime);
-      return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
-    }
-  }
-
-  @Test
-  public void testLargeFile() throws Exception {
-    largeFileTest(200L << 20); //200MB file length
-  }
-
-  /** Test read and write large files. */
-  static void largeFileTest(final long fileLength) throws Exception {
-    final Configuration conf = WebHdfsTestUtil.createConf();
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(3)
-        .build();
-    try {
-      cluster.waitActive();
-
-      final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
-      final Path dir = new Path("/test/largeFile");
-      Assert.assertTrue(fs.mkdirs(dir));
-
-      final byte[] data = new byte[1 << 20];
-      RANDOM.nextBytes(data);
-
-      final byte[] expected = new byte[2 * data.length];
-      System.arraycopy(data, 0, expected, 0, data.length);
-      System.arraycopy(data, 0, expected, data.length, data.length);
-
-      final Path p = new Path(dir, "file");
-      final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
-      final FSDataOutputStream out = fs.create(p);
-      try {
-        long remaining = fileLength;
-        for(; remaining > 0;) {
-          t.tick(fileLength - remaining, "remaining=%d", remaining);
-          
-          final int n = (int)Math.min(remaining, data.length);
-          out.write(data, 0, n);
-          remaining -= n;
-        }
-      } finally {
-        out.close();
-      }
-      t.end(fileLength);
-  
-      Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());
-
-      final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
-      final long largeOffset = fileLength - smallOffset;
-      final byte[] buf = new byte[data.length];
-
-      verifySeek(fs, p, largeOffset, fileLength, buf, expected);
-      verifySeek(fs, p, smallOffset, fileLength, buf, expected);
-  
-      verifyPread(fs, p, largeOffset, fileLength, buf, expected);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  static void checkData(long offset, long remaining, int n,
-      byte[] actual, byte[] expected) {
-    if (RANDOM.nextInt(100) == 0) {
-      int j = (int)(offset % actual.length);
-      for(int i = 0; i < n; i++) {
-        if (expected[j] != actual[i]) {
-          Assert.fail("expected[" + j + "]=" + expected[j]
-              + " != actual[" + i + "]=" + actual[i]
-              + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n);
-        }
-        j++;
-      }
-    }
-  }
-
-  /** test seek */
-  static void verifySeek(FileSystem fs, Path p, long offset, long length,
-      byte[] buf, byte[] expected) throws IOException { 
-    long remaining = length - offset;
-    long checked = 0;
-    LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
-
-    final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
-        offset, remaining);
-    final FSDataInputStream in = fs.open(p, 64 << 10);
-    in.seek(offset);
-    for(; remaining > 0; ) {
-      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
-      final int n = (int)Math.min(remaining, buf.length);
-      in.readFully(buf, 0, n);
-      checkData(offset, remaining, n, buf, expected);
-
-      offset += n;
-      remaining -= n;
-      checked += n;
-    }
-    in.close();
-    t.end(checked);
-  }
-
-  static void verifyPread(FileSystem fs, Path p, long offset, long length,
-      byte[] buf, byte[] expected) throws IOException {
-    long remaining = length - offset;
-    long checked = 0;
-    LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);
-
-    final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
-        offset, remaining);
-    final FSDataInputStream in = fs.open(p, 64 << 10);
-    for(; remaining > 0; ) {
-      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
-      final int n = (int)Math.min(remaining, buf.length);
-      in.readFully(offset, buf, 0, n);
-      checkData(offset, remaining, n, buf, expected);
-
-      offset += n;
-      remaining -= n;
-      checked += n;
-    }
-    in.close();
-    t.end(checked);
-  }
-}