Jelajahi Sumber

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1245690 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 tahun lalu
induk
melakukan
53b7d6c6bc

+ 5 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -174,6 +174,7 @@ Release 0.23.2 - UNRELEASED
     (sharad, todd via todd)
 
   BUG FIXES
+    HADOOP-8054 NPE with FilterFileSystem (Daryn Sharp via bobby)
 
     HADOOP-8042  When copying a file out of HDFS, modifying it, and uploading
     it back into HDFS, the put fails due to a CRC mismatch
@@ -196,6 +197,10 @@ Release 0.23.2 - UNRELEASED
     HADOOP-8082 add hadoop-client and hadoop-minicluster to the 
     dependency-management section. (tucu)
 
+    HADOOP-8066 The full docs build intermittently fails (abayer via tucu)
+
+    HADOOP-8083 javadoc generation for some modules is not done under target/ (tucu)
+
 Release 0.23.1 - 2012-02-08 
 
   INCOMPATIBLE CHANGES

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -80,6 +80,11 @@ public class FilterFileSystem extends FileSystem {
    */
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
+    // this is less than ideal, but existing filesystems sometimes neglect
+    // to initialize the embedded filesystem
+    if (fs.getConf() == null) {
+      fs.initialize(name, conf);
+    }
     String scheme = name.getScheme();
     if (!scheme.equals(fs.getUri().getScheme())) {
       swapScheme = scheme;

+ 0 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -48,13 +48,6 @@ public class LocalFileSystem extends ChecksumFileSystem {
     super(rawLocalFileSystem);
   }
     
-  @Override
-  public void initialize(URI uri, Configuration conf) throws IOException {
-    super.initialize(uri, conf);
-    // ctor didn't initialize the filtered fs
-    getRawFileSystem().initialize(uri, conf);
-  }
-  
   /** Convert a path to a File. */
   public File pathToFile(Path path) {
     return ((RawLocalFileSystem)fs).pathToFile(path);

+ 123 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

@@ -18,24 +18,39 @@
 
 package org.apache.hadoop.fs;
 
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.net.URI;
 import java.util.EnumSet;
 import java.util.Iterator;
 
-import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
-public class TestFilterFileSystem extends TestCase {
+public class TestFilterFileSystem {
 
   private static final Log LOG = FileSystem.LOG;
+  private static final Configuration conf = new Configuration();
 
+  @BeforeClass
+  public static void setup() {
+    conf.set("fs.flfs.impl", FilterLocalFileSystem.class.getName());
+    conf.setBoolean("fs.flfs.impl.disable.cache", true);
+    conf.setBoolean("fs.file.impl.disable.cache", true);
+  }
+  
   public static class DontCheck {
     public BlockLocation[] getFileBlockLocations(Path p, 
         long start, long len) { return null; }
@@ -153,6 +168,7 @@ public class TestFilterFileSystem extends TestCase {
     
   }
   
+  @Test
   public void testFilterFileSystem() throws Exception {
     for (Method m : FileSystem.class.getDeclaredMethods()) {
       if (Modifier.isStatic(m.getModifiers()))
@@ -176,4 +192,109 @@ public class TestFilterFileSystem extends TestCase {
     }
   }
   
+  @Test
+  public void testFilterEmbedInit() throws Exception {
+    FileSystem mockFs = createMockFs(false); // no conf = need init
+    checkInit(new FilterFileSystem(mockFs), true);
+  }
+
+  @Test
+  public void testFilterEmbedNoInit() throws Exception {
+    FileSystem mockFs = createMockFs(true); // has conf = skip init
+    checkInit(new FilterFileSystem(mockFs), false);
+  }
+
+  @Test
+  public void testLocalEmbedInit() throws Exception {
+    FileSystem mockFs = createMockFs(false); // no conf = need init
+    checkInit(new LocalFileSystem(mockFs), true);
+  }  
+  
+  @Test
+  public void testLocalEmbedNoInit() throws Exception {
+    FileSystem mockFs = createMockFs(true); // has conf = skip init
+    checkInit(new LocalFileSystem(mockFs), false);
+  }
+  
+  private FileSystem createMockFs(boolean useConf) {
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getUri()).thenReturn(URI.create("mock:/"));
+    when(mockFs.getConf()).thenReturn(useConf ? conf : null);
+    return mockFs;
+  }
+
+  @Test
+  public void testGetLocalFsSetsConfs() throws Exception {
+    LocalFileSystem lfs = FileSystem.getLocal(conf);
+    checkFsConf(lfs, conf, 2);
+  }
+
+  @Test
+  public void testGetFilterLocalFsSetsConfs() throws Exception {
+    FilterFileSystem flfs =
+        (FilterFileSystem) FileSystem.get(URI.create("flfs:/"), conf);
+    checkFsConf(flfs, conf, 3);
+  }
+
+  @Test
+  public void testInitLocalFsSetsConfs() throws Exception {
+    LocalFileSystem lfs = new LocalFileSystem();
+    checkFsConf(lfs, null, 2);
+    lfs.initialize(lfs.getUri(), conf);
+    checkFsConf(lfs, conf, 2);
+  }
+
+  @Test
+  public void testInitFilterFsSetsEmbedConf() throws Exception {
+    LocalFileSystem lfs = new LocalFileSystem();
+    checkFsConf(lfs, null, 2);
+    FilterFileSystem ffs = new FilterFileSystem(lfs);
+    assertEquals(lfs, ffs.getRawFileSystem());
+    checkFsConf(ffs, null, 3);
+    ffs.initialize(URI.create("filter:/"), conf);
+    checkFsConf(ffs, conf, 3);
+  }
+
+  @Test
+  public void testInitFilterLocalFsSetsEmbedConf() throws Exception {
+    FilterFileSystem flfs = new FilterLocalFileSystem();
+    assertEquals(LocalFileSystem.class, flfs.getRawFileSystem().getClass());
+    checkFsConf(flfs, null, 3);
+    flfs.initialize(URI.create("flfs:/"), conf);
+    checkFsConf(flfs, conf, 3);
+  }
+
+  private void checkInit(FilterFileSystem fs, boolean expectInit)
+      throws Exception {
+    URI uri = URI.create("filter:/");
+    fs.initialize(uri, conf);
+    
+    FileSystem embedFs = fs.getRawFileSystem();
+    if (expectInit) {
+      verify(embedFs, times(1)).initialize(eq(uri), eq(conf));
+    } else {
+      verify(embedFs, times(0)).initialize(any(URI.class), any(Configuration.class));
+    }
+  }
+
+  // check the given fs's conf, and all its filtered filesystems
+  private void checkFsConf(FileSystem fs, Configuration conf, int expectDepth) {
+    int depth = 0;
+    while (true) {
+      depth++; 
+      assertFalse("depth "+depth+">"+expectDepth, depth > expectDepth);
+      assertEquals(conf, fs.getConf());
+      if (!(fs instanceof FilterFileSystem)) {
+        break;
+      }
+      fs = ((FilterFileSystem) fs).getRawFileSystem();
+    }
+    assertEquals(expectDepth, depth);
+  }
+  
+  private static class FilterLocalFileSystem extends FilterFileSystem {
+    FilterLocalFileSystem() {
+      super(new LocalFileSystem());
+    }
+  }
 }

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -132,6 +132,9 @@ Trunk (unreleased changes)
     HDFS-2878. Fix TestBlockRecovery and move it back into main test directory.
     (todd)
 
+    HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. (Brandon Li
+    via jitendra) 
+
   OPTIMIZATIONS
     HDFS-2477. Optimize computing the diff between a block report and the
     namenode state. (Tomasz Nykiel via hairong)
@@ -210,9 +213,6 @@ Trunk (unreleased changes)
     dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
     disabled. (atm)
 
-    HDFS-2525. Race between BlockPoolSliceScanner and append. (Brandon Li
-    via jitendra)
-
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES
@@ -251,6 +251,9 @@ Release 0.23.2 - UNRELEASED
     HDFS-2950. Secondary NN HTTPS address should be listed as a
     NAMESERVICE_SPECIFIC_KEY. (todd)
 
+    HDFS-2525. Race between BlockPoolSliceScanner and append. (Brandon Li
+    via jitendra)
+
     HDFS-2938. Recursive delete of a large directory make namenode
     unresponsive. (Hari Mankude via suresh)
 

+ 55 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -369,26 +369,68 @@ class BlockReaderLocal implements BlockReader {
     if (LOG.isDebugEnabled()) {
       LOG.debug("skip " + n);
     }
+    if (n <= 0) {
+      return 0;
+    }
     if (!verifyChecksum) {
       return dataIn.skip(n);
     }
-    // Skip by reading the data so we stay in sync with checksums.
-    // This could be implemented more efficiently in the future to
-    // skip to the beginning of the appropriate checksum chunk
-    // and then only read to the middle of that chunk.
+  
+    // caller made sure newPosition is not beyond EOF.
+    int remaining = dataBuff.remaining();
+    int position = dataBuff.position();
+    int newPosition = position + (int)n;
+  
+    // if the new offset is already read into dataBuff, just reposition
+    if (n <= remaining) {
+      assert offsetFromChunkBoundary == 0;
+      dataBuff.position(newPosition);
+      return n;
+    }
+  
+    // for small gap, read through to keep the data/checksum in sync
+    if (n - remaining <= bytesPerChecksum) {
+      dataBuff.position(position + remaining);
+      if (skipBuf == null) {
+        skipBuf = new byte[bytesPerChecksum];
+      }
+      int ret = read(skipBuf, 0, (int)(n - remaining));
+      return ret;
+    }
+  
+    // optimize for big gap: discard the current buffer, skip to
+    // the beginning of the appropriate checksum chunk and then
+    // read to the middle of that chunk to be in sync with checksums.
+    this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - this.offsetFromChunkBoundary;
+  
+    dataBuff.clear();
+    checksumBuff.clear();
+  
+    long dataSkipped = dataIn.skip(toskip);
+    if (dataSkipped != toskip) {
+      throw new IOException("skip error in data input stream");
+    }
+    long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
+    if (checkSumOffset > 0) {
+      long skipped = checksumIn.skip(checkSumOffset);
+      if (skipped != checkSumOffset) {
+        throw new IOException("skip error in checksum input stream");
+      }
+    }
+
+    // read into the middle of the chunk
     if (skipBuf == null) {
       skipBuf = new byte[bytesPerChecksum];
     }
-    long nSkipped = 0;
-    while ( nSkipped < n ) {
-      int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-      int ret = read(skipBuf, 0, toSkip);
-      if ( ret <= 0 ) {
-        return nSkipped;
-      }
-      nSkipped += ret;
+    assert skipBuf.length == bytesPerChecksum;
+    assert this.offsetFromChunkBoundary < bytesPerChecksum;
+    int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
+    if (ret == -1) {  // EOS
+      return toskip;
+    } else {
+      return (toskip + ret);
     }
-    return nSkipped;
   }
 
   @Override

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -238,7 +238,53 @@ public class TestShortCircuitLocalRead {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testSkipWithVerifyChecksum() throws IOException {
+    int size = blockSize;
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .format(true).build();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // check that / exists
+      Path path = new Path("/");
+      assertTrue("/ should be a directory", fs.getFileStatus(path)
+          .isDirectory() == true);
+      
+      byte[] fileData = AppendTestUtil.randomBytes(seed, size*3);
+      // create a new file in home directory. Do not close it.
+      Path file1 = new Path("filelocal.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
   
+      // write to file
+      stm.write(fileData);
+      stm.close();
+      
+      // now test the skip function
+      FSDataInputStream instm = fs.open(file1);
+      byte[] actual = new byte[fileData.length];
+      // read something from the block first, otherwise BlockReaderLocal.skip()
+      // will not be invoked
+      int nread = instm.read(actual, 0, 3);
+      long skipped = 2*size+3;
+      instm.seek(skipped);
+      nread = instm.read(actual, (int)(skipped + nread), 3);
+      instm.close();
+        
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+     
   /**
    * Test to run benchmarks between shortcircuit read vs regular read with
    * specified number of threads simultaneously reading.

+ 6 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -55,6 +55,8 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
+                    sortFinished times when needed.
     MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
                      (Jason Lowe via bobby)
 
@@ -115,6 +117,10 @@ Release 0.23.2 - UNRELEASED
 
     MAPREDUCE-3864. Fix cluster setup docs for correct SecondaryNameNode
     HTTPS parameters. (todd)
+
+    MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking
+    urls when mutiple jobs are submitted from same client jvm. (Eric Payne via
+    sseth)
  
 Release 0.23.1 - 2012-02-08 
 

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -81,8 +81,7 @@ public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
 
   // Caches for per-user NotRunningJobs
-  private static HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs =
-      new HashMap<JobState, HashMap<String, NotRunningJob>>();
+  private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
 
   private final Configuration conf;
   private final JobID jobId;
@@ -108,6 +107,7 @@ public class ClientServiceDelegate {
     this.jobId = jobId;
     this.historyServerProxy = historyServerProxy;
     this.appId = TypeConverter.toYarn(jobId).getAppId();
+    notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
   }
 
   // Get the instance of the NotRunningJob corresponding to the specified

+ 29 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -148,10 +148,15 @@ public class TestMRJobs {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(SleepJob.class);
     job.setMaxMapAttempts(1); // speed up failures
-    job.waitForCompletion(true);
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
     verifySleepJobCounters(job);
     verifyTaskProgress(job);
     
@@ -209,9 +214,15 @@ public class TestMRJobs {
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(RandomTextWriterJob.class);
     job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
     
     // Make sure there are three files in the output-dir
     
@@ -316,9 +327,14 @@ public class TestMRJobs {
         new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
         "failmapper-output"));
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertFalse(succeeded);
-
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
     return job;
   }
 
@@ -360,8 +376,14 @@ public class TestMRJobs {
         // //Job with reduces
         // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
         job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+        job.submit();
+        String trackingUrl = job.getTrackingURL();
+        String jobId = job.getJobID().toString();
         job.waitForCompletion(true);
         Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+        Assert.assertTrue("Tracking URL was " + trackingUrl +
+                          " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
         return null;
       }
     });
@@ -444,7 +466,12 @@ public class TestMRJobs {
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
     Assert.assertTrue(job.waitForCompletion(false));
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
   }
 
   private Path createTempFile(String filename, String contents)

+ 1 - 1
hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/rumen.xml

@@ -293,7 +293,7 @@
         <tr>
           <td><code>-output-duration</code></td>
           <td>This parameter defines the final runtime of the trace. 
-              Default value if <strong>1 hour</strong>.
+              Default value is <strong>1 hour</strong>.
           </td>
           <td>'<code>-output-duration 30m</code>' 
               implies that the resulting trace will have a max runtime of 

TEMPAT SAMPAH
hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz


+ 8 - 0
hadoop-project/pom.xml

@@ -749,6 +749,11 @@
     </pluginManagement>
 
     <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+      </plugin>
+      
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
@@ -932,6 +937,9 @@
                 <goals>
                   <goal>jar</goal>
                 </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
               </execution>
             </executions>
           </plugin>

+ 6 - 0
hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java

@@ -275,6 +275,12 @@ public class LoggedTaskAttempt implements DeepCompare {
   void adjustTimes(long adjustment) {
     startTime += adjustment;
     finishTime += adjustment;
+
+    // For reduce attempts, adjust the different phases' finish times also 
+    if (sortFinished >= 0) {
+      shuffleFinished += adjustment;
+      sortFinished += adjustment;
+    }
   }
 
   public long getShuffleFinished() {