Pārlūkot izejas kodu

commit 8500543657d0b0d7ed162fb8f4b4dd43c22343f8
Author: Vinay Kumar Thota <vinayt@yahoo-inc.com>
Date: Fri Jul 23 06:38:08 2010 +0000

MAPREDUCE:1919 from https://issues.apache.org/jira/secure/attachment/12450269/1919-ydist-security.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077586 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 gadi atpakaļ
vecāks
revīzija
8a61a3d701

+ 283 - 0
src/test/system/java/org/apache/hadoop/mapred/TestCacheFileReferenceCount.java

@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import java.net.URI;
+import java.io.DataOutputStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Verify the job cache files localizations.
+ */
+public class TestCacheFileReferenceCount {
+  private static final Log LOG = LogFactory
+          .getLog(TestCacheFileReferenceCount.class);
+  private static Configuration conf = new Configuration();
+  private static MRCluster cluster;
+  private static Path tmpFolderPath = null;
+  private static String cacheFile1 = "cache1.txt";
+  private static String cacheFile2 = "cache2.txt";
+  private static String cacheFile3 = "cache3.txt";
+  private static String cacheFile4 = "cache4.txt";
+  private static JTProtocol wovenClient = null;
+  private static JobClient jobClient = null;
+  private static JTClient jtClient = null;
+  private static URI cacheFileURI1;
+  private static URI cacheFileURI2;
+  private static URI cacheFileURI3;
+  private static URI cacheFileURI4;
+  
+  @BeforeClass
+  public static void before() throws Exception {
+    cluster = MRCluster.createCluster(conf);
+    cluster.setUp();
+    tmpFolderPath = new Path("hdfs:///tmp");
+    jtClient = cluster.getJTClient();
+    jobClient = jtClient.getClient();
+    wovenClient = cluster.getJTClient().getProxy();
+    cacheFileURI1 = createCacheFile(tmpFolderPath, cacheFile1);
+    cacheFileURI2 = createCacheFile(tmpFolderPath, cacheFile2);
+  }
+  
+  @AfterClass
+  public static void after() throws Exception {
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile1));
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile2));
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile4));
+    cluster.tearDown();
+  }
+  
+  /**
+   * Run the job with two distributed cache files and verify
+   * whether job is succeeded or not.
+   * @throws Exception
+   */
+  @Test
+  public void testCacheFilesLocalization() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI1, jobConf);
+    DistributedCache.addCacheFile(cacheFileURI2, jobConf);
+    RunningJob runJob = jobClient.submitJob(jobConf);
+    JobID jobId = runJob.getID();
+
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jobId));
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+    Assert.assertTrue("Cache File1 has not been localize",
+        checkLocalization(taskInfos,cacheFile1));
+    Assert.assertTrue("Cache File2 has not been localize",
+            checkLocalization(taskInfos,cacheFile2));
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+    Assert.assertEquals("Job has not been succeeded", 
+        jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+  
+  /**
+   * Run the job with distributed cache files and remove one cache
+   * file from the DFS when it is localized.verify whether the job
+   * is failed or not.
+   * @throws Exception
+   */
+  @Test
+  public void testDeleteCacheFileInDFSAfterLocalized() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    JobConf jobConf = job.setupJobConf(4, 1, 4000, 4000, 1000, 1000);
+    cacheFileURI3 = createCacheFile(tmpFolderPath, cacheFile3);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI3, jobConf);
+    RunningJob runJob = jobClient.submitJob(jobConf);
+    JobID jobId = runJob.getID();
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jobId));
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jobId);
+    boolean iscacheFileLocalized = checkLocalization(taskInfos,cacheFile3);
+    Assert.assertTrue("CacheFile has not been localized", 
+        iscacheFileLocalized);
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile3));
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = wovenClient.getJobInfo(jobId);
+    }
+    Assert.assertEquals("Job has not been failed", 
+        jInfo.getStatus().getRunState(), JobStatus.FAILED);
+  }
+  
+  /**
+   * Run the job with two distribute cache files and the size of
+   * one file should be larger than local.cache.size.Verify 
+   * whether job is succeeded or not.
+   * @throws Exception
+   */
+  @Test
+  public void testCacheSizeExceeds() throws Exception {
+    conf = wovenClient.getDaemonConf();
+    SleepJob job = new SleepJob();
+    String jobArgs []= {"-D","local.cache.size=1024", 
+                        "-m", "4", 
+                        "-r", "2", 
+                        "-mt", "2000", 
+                        "-rt", "2000",
+                        "-recordt","100"};
+    JobConf jobConf = new JobConf(conf);
+    cacheFileURI4 = createCacheFile(tmpFolderPath, cacheFile4);
+    DistributedCache.createSymlink(jobConf);
+    DistributedCache.addCacheFile(cacheFileURI4, jobConf);
+    int countBeforeJS = jtClient.getClient().getAllJobs().length;
+    JobID prvJobId = jtClient.getClient().getAllJobs()[0].getJobID();
+    int exitCode = ToolRunner.run(jobConf,job,jobArgs);
+    Assert.assertEquals("Exit Code:", 0, exitCode);
+    int countAfterJS = jtClient.getClient().getAllJobs().length;
+    int counter = 0;
+    while (counter++ < 30 ) {
+      if (countBeforeJS == countAfterJS) {
+        UtilsForTests.waitFor(1000);
+        countAfterJS = jtClient.getClient().getAllJobs().length;
+      } else {
+        break;
+      } 
+    }
+    JobID jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+    counter = 0;
+    while (counter++ < 30) {
+      if (jobId.toString().equals(prvJobId.toString())) {
+        UtilsForTests.waitFor(1000); 
+        jobId = jtClient.getClient().getAllJobs()[0].getJobID();
+      } else { 
+        break;
+      }
+    }
+    JobInfo jInfo = wovenClient.getJobInfo(jobId);
+    Assert.assertEquals("Job has not been succeeded", 
+          jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED);
+  }
+  
+  private boolean checkLocalization(TaskInfo[] taskInfos, String cacheFile) 
+      throws Exception {
+    boolean iscacheFileLocalized = false;
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        String[] taskTrackers = taskinfo.getTaskTrackers();
+        List<TTClient> ttList = getTTClients(taskTrackers);
+        for (TTClient ttClient : ttList) {
+          iscacheFileLocalized = checkCacheFile(ttClient,cacheFile);
+          if(iscacheFileLocalized) {
+            return true;
+          }
+        } 
+      }
+    }
+    return false;
+  }
+  
+  private List<TTClient> getTTClients(String[] taskTrackers) 
+      throws Exception {
+    List<TTClient> ttClientList= new ArrayList<TTClient>();
+    for (String taskTracker: taskTrackers) {
+      taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+      TTClient ttClient = cluster.getTTClient(taskTracker);
+      if (ttClient != null) {
+        ttClientList.add(ttClient);
+      }
+    }
+    return ttClientList;
+  }
+  
+  private boolean checkCacheFile(TTClient ttClient, String cacheFile) 
+      throws IOException {
+    String[] localDirs = ttClient.getMapredLocalDirs();
+    for (String localDir : localDirs) {
+      localDir = localDir + Path.SEPARATOR + 
+          TaskTracker.getPublicDistributedCacheDir();
+      FileStatus[] fileStatuses = ttClient.listStatus(localDir, 
+          true, true);
+      for (FileStatus  fileStatus : fileStatuses) {
+        Path path = fileStatus.getPath();
+        if ((path.toString()).endsWith(cacheFile)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  
+  private static void deleteCacheFile(Path cacheFilePath) 
+      throws IOException {
+    FileSystem dfs = jobClient.getFs();
+    dfs.delete(cacheFilePath, true);
+  }
+
+  private static URI createCacheFile(Path tmpFolderPath, String cacheFile) 
+      throws IOException {
+    String input = "distribute cache content...";
+    FileSystem dfs = jobClient.getFs();
+    conf = wovenClient.getDaemonConf();
+    FileSystem fs = tmpFolderPath.getFileSystem(conf);
+    if (!fs.mkdirs(tmpFolderPath)) {
+        throw new IOException("Failed to create the temp directory:" 
+            + tmpFolderPath.toString());
+      }
+    deleteCacheFile(new Path(tmpFolderPath, cacheFile));
+    DataOutputStream file = fs.create(new Path(tmpFolderPath, cacheFile));
+    int i = 0;
+    while(i++ < 100) {
+      file.writeBytes(input);
+    }
+    file.close();
+    dfs.setPermission(tmpFolderPath, new FsPermission(FsAction.ALL, 
+        FsAction.ALL, FsAction.ALL));
+    URI uri = URI.create(new Path(tmpFolderPath, cacheFile).toString());
+    return uri;
+  }
+}