Procházet zdrojové kódy

MAPREDUCE-7033: Map outputs implicitly rely on permissive umask for shuffle. Contributed by Jason Lowe

(cherry picked from commit 5a725bb886eb0b9182840c4a91466c361ecc11e0)
Eric Payne před 7 roky
rodič
revize
1d53c9d3a9

+ 12 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile;
@@ -84,6 +85,10 @@ public class MapTask extends Task {
    */
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
 
+  // The minimum permissions needed for a shuffle output file.
+  private static final FsPermission SHUFFLE_OUTPUT_PERM =
+      new FsPermission((short)0640);
+
   private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private final static int APPROX_HEADER_LENGTH = 150;
   private final static int APPROX_HEADER_LENGTH = 150;
 
 
@@ -1522,6 +1527,13 @@ public class MapTask extends Task {
       mergeParts();
       mergeParts();
       Path outputPath = mapOutputFile.getOutputFile();
       Path outputPath = mapOutputFile.getOutputFile();
       fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
       fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
+      // If necessary, make outputs permissive enough for shuffling.
+      if (!SHUFFLE_OUTPUT_PERM.equals(
+          SHUFFLE_OUTPUT_PERM.applyUMask(FsPermission.getUMask(job)))) {
+        Path indexPath = mapOutputFile.getOutputIndexFile();
+        rfs.setPermission(outputPath, SHUFFLE_OUTPUT_PERM);
+        rfs.setPermission(indexPath, SHUFFLE_OUTPUT_PERM);
+      }
     }
     }
 
 
     public void close() { }
     public void close() { }

+ 87 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java

@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestMapTask {
+  private static File TEST_ROOT_DIR = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir", "/tmp")),
+      TestMapTask.class.getName());
+
+  @After
+  public void cleanup() throws Exception {
+    FileUtil.fullyDelete(TEST_ROOT_DIR);
+  }
+
+  // Verify output files for shuffle have group read permission even when
+  // the configured umask normally would prevent it.
+  @Test
+  public void testShufflePermissions() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
+    MapOutputFile mof = new MROutputFiles();
+    mof.setConf(conf);
+    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
+    MapTask mockTask = mock(MapTask.class);
+    doReturn(mof).when(mockTask).getMapOutputFile();
+    doReturn(attemptId).when(mockTask).getTaskID();
+    doReturn(new Progress()).when(mockTask).getSortPhase();
+    TaskReporter mockReporter = mock(TaskReporter.class);
+    doReturn(new Counter()).when(mockReporter).getCounter(
+        any(TaskCounter.class));
+    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask,
+        conf, mockReporter);
+    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
+    mob.init(ctx);
+    mob.flush();
+    mob.close();
+    Path outputFile = mof.getOutputFile();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    FsPermission perms = lfs.getFileStatus(outputFile).getPermission();
+    Assert.assertEquals("Incorrect output file perms",
+        (short)0640, perms.toShort());
+    Path indexFile = mof.getOutputIndexFile();
+    perms = lfs.getFileStatus(indexFile).getPermission();
+    Assert.assertEquals("Incorrect index file perms",
+        (short)0640, perms.toShort());
+  }
+}