Explorar o código

MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files get deleted from task tracker. Contributed by Mayank Bansal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1355197 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko %!s(int64=13) %!d(string=hai) anos
pai
achega
35ae20bedc

+ 5 - 0
mapreduce/CHANGES.txt

@@ -78,6 +78,11 @@ Release 0.22.1 - Unreleased
     MAPREDUCE-4318. TestRecoveryManager should not use raw configuration keys.
     (Benoy Antony via shv)
 
+  END OF HADOOP-8357 SUBTASKS
+
+    MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files
+    get deleted from task tracker. (Mayank Bansal via shv)
+
 Release 0.22.0 - 2011-11-29
 
   INCOMPATIBLE CHANGES

+ 26 - 2
mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java

@@ -163,7 +163,13 @@ public class TrackerDistributedCacheManager {
     Path localPath = null;
     synchronized (cachedArchives) {
       lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
+      if (lcacheStatus == null
+          || !checkPathExists(lcacheStatus.localizedLoadPath, conf)) {
+        if (lcacheStatus != null) {
+          LOG.warn("Key: " + key + " is not valid removing it from the cache");
+          LOG.warn("Local Cache has been deleted... Downloading the cache again");
+          cachedArchives.remove(key);
+        }
         // was never localized
         String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
@@ -222,7 +228,25 @@ public class TrackerDistributedCacheManager {
     }
     return localizedPath;
   }
-
+  
+  /**
+   * This module checks whether file is present or not.
+   * 
+   * @param cachePath
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  boolean checkPathExists(Path cachePath, Configuration conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    boolean isPresent = true;
+    if (!localFs.exists(cachePath)) {
+      isPresent = false;
+    }
+    return isPresent;
+  }
+ 
   /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache

+ 74 - 0
mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java

@@ -218,6 +218,80 @@ public class TestTrackerDistributedCacheManager extends TestCase {
     manager.purgeCache();
     assertFalse(pathToFile(cachedFirstFile).exists());
   }
+  
+  /**
+   * This is when somebody deletes the cache
+   * 
+   * @throws IOException
+   * @throws LoginException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("deprecation")
+  public void testCacheConsistency() throws IOException, LoginException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+    // ****** Imitate JobClient code
+    // Configures a task/job with both a regular file and a "classpath" file.
+    Configuration subConf = new Configuration(conf);
+    String userName = getJobOwnerName();
+    subConf.set(MRJobConfig.USER_NAME, userName);
+    JobID jobid = new JobID("jt", 1);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+    // ****** End of imitating JobClient code
+
+    Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
+    FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
+    subConf.writeXml(os);
+    os.close();
+
+    // ****** Imitate TaskRunner code.
+    TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(
+        conf);
+    TaskDistributedCacheManager handle = manager
+        .newTaskDistributedCacheManager(jobid, subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf);
+    // ****** End of imitating TaskRunner code
+
+    Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFiles);
+    assertEquals(1, localCacheFiles.length);
+    Path cachedFirstFile = localCacheFiles[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFile);
+    assertFalse("Paths should be different.",
+        firstCacheFile.equals(cachedFirstFile));
+
+    File f1 = new File(cachedFirstFile.toString());
+    assertTrue(f1.delete());
+
+    // ****** Imitate JobClient code
+    // Configures a task/job with both a regular file and a "classpath" file.
+    subConf = new Configuration(conf);
+    userName = getJobOwnerName();
+    subConf.set(MRJobConfig.USER_NAME, userName);
+    DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
+    TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
+    JobID jobidnew = new JobID("jt", 2);
+    handle = manager.newTaskDistributedCacheManager(jobidnew, subConf);
+    assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf);
+    // ****** End of imitating TaskRunner code
+    Path[] localCacheFilesagain = DistributedCache.getLocalCacheFiles(subConf);
+    assertNotNull(null, localCacheFilesagain);
+    assertEquals(1, localCacheFilesagain.length);
+    Path cachedFirstFileAgain = localCacheFilesagain[0];
+    assertFileLengthEquals(firstCacheFile, cachedFirstFileAgain);
+    assertFalse("Paths should be different.",
+        firstCacheFile.equals(cachedFirstFileAgain));
+  }
 
   /**
    * This DistributedCacheManager fails in localizing firstCacheFile.