Ver código fonte

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1296485 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 anos atrás
pai
commit
ffa527138b

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -270,6 +270,9 @@ Release 0.23.2 - UNRELEASED
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
 
+    HADOOP-8131. FsShell put doesn't correctly handle a non-existent dir
+    (Daryn Sharp via bobby)
+
 Release 0.23.1 - 2012-02-17 
 
   INCOMPATIBLE CHANGES

+ 4 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java

@@ -112,10 +112,12 @@ abstract class CommandWithDestination extends FsCommand {
       if (!dst.stat.isDirectory()) {
         throw new PathIsNotDirectoryException(dst.toString());
       }
-    } else {
-      if (dst.exists && !dst.stat.isDirectory() && !overwrite) {
+    } else if (dst.exists) {
+      if (!dst.stat.isDirectory() && !overwrite) {
         throw new PathExistsException(dst.toString());
       }
+    } else if (!dst.parentExists()) {
+      throw new PathNotFoundException(dst.toString());
     }
     super.processArguments(args);
   }

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java

@@ -181,6 +181,20 @@ public class PathData {
     return tmpFile;
   }
 
+  /**
+   * Test if the parent directory exists
+   * @return boolean indicating parent exists
+   * @throws IOException upon unexpected error
+   */
+  public boolean parentExists() throws IOException {
+    String uriPath = uri.getPath();
+    String name = uriPath.substring(uriPath.lastIndexOf("/")+1);
+    // Path will munch off the chars that indicate a dir, so there's no way
+    // to perform this test except by examining the raw basename we maintain
+    return (name.isEmpty() || name.equals(".") || name.equals(".."))
+        ? fs.exists(path) : fs.exists(path.getParent());
+  }
+  
   /**
    * Returns a list of PathData objects of the items contained in the given
    * directory.

+ 138 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellCopy.java

@@ -38,8 +38,10 @@ public class TestFsShellCopy {
     conf = new Configuration();
     shell = new FsShell(conf);
     lfs = FileSystem.getLocal(conf);
-    testRootDir = new Path(
-        System.getProperty("test.build.data","test/build/data"), "testShellCopy");
+    testRootDir = lfs.makeQualified(new Path(
+        System.getProperty("test.build.data","test/build/data"),
+        "testShellCopy"));
+    
     lfs.mkdirs(testRootDir);    
     srcPath = new Path(testRootDir, "srcFile");
     dstPath = new Path(testRootDir, "dstFile");
@@ -94,4 +96,138 @@ public class TestFsShellCopy {
   private void shellRun(int n, String ... args) throws Exception {
     assertEquals(n, shell.run(args));
   }
+  
+  @Test
+  public void testCopyFileFromLocal() throws Exception {
+    Path testRoot = new Path(testRootDir, "testPutFile");
+    lfs.delete(testRoot, true);
+    lfs.mkdirs(testRoot);
+
+    Path targetDir = new Path(testRoot, "target");    
+    Path filePath = new Path(testRoot, new Path("srcFile"));
+    lfs.create(filePath).close();
+    checkPut(filePath, targetDir);
+  }
+
+  @Test
+  public void testCopyDirFromLocal() throws Exception {
+    Path testRoot = new Path(testRootDir, "testPutDir");
+    lfs.delete(testRoot, true);
+    lfs.mkdirs(testRoot);
+    
+    Path targetDir = new Path(testRoot, "target");    
+    Path dirPath = new Path(testRoot, new Path("srcDir"));
+    lfs.mkdirs(dirPath);
+    lfs.create(new Path(dirPath, "srcFile")).close();
+    checkPut(dirPath, targetDir);
+  }
+  
+  private void checkPut(Path srcPath, Path targetDir)
+  throws Exception {
+    lfs.delete(targetDir, true);
+    lfs.mkdirs(targetDir);    
+    lfs.setWorkingDirectory(targetDir);
+
+    final Path dstPath = new Path("path");
+    final Path childPath = new Path(dstPath, "childPath");
+    lfs.setWorkingDirectory(targetDir);
+    
+    // copy to new file, then again
+    prepPut(dstPath, false, false);
+    checkPut(0, srcPath, dstPath);
+    if (lfs.isFile(srcPath)) {
+      checkPut(1, srcPath, dstPath);
+    } else { // directory works because it copies into the dir
+      // clear contents so the check won't think there are extra paths
+      prepPut(dstPath, true, true);
+      checkPut(0, srcPath, dstPath);
+    }
+
+    // copy to non-existent subdir
+    prepPut(childPath, false, false);
+    checkPut(1, srcPath, dstPath);
+
+    // copy into dir, then with another name
+    prepPut(dstPath, true, true);
+    checkPut(0, srcPath, dstPath);
+    prepPut(childPath, true, true);
+    checkPut(0, srcPath, childPath);
+
+    // try to put to pwd with existing dir
+    prepPut(targetDir, true, true);
+    checkPut(0, srcPath, null);
+    prepPut(targetDir, true, true);
+    checkPut(0, srcPath, new Path("."));
+
+    // try to put to pwd with non-existent cwd
+    prepPut(dstPath, false, true);
+    lfs.setWorkingDirectory(dstPath);
+    checkPut(1, srcPath, null);
+    prepPut(dstPath, false, true);
+    checkPut(1, srcPath, new Path("."));
+  }
+
+  private void prepPut(Path dst, boolean create,
+                       boolean isDir) throws IOException {
+    lfs.delete(dst, true);
+    assertFalse(lfs.exists(dst));
+    if (create) {
+      if (isDir) {
+        lfs.mkdirs(dst);
+        assertTrue(lfs.isDirectory(dst));
+      } else {
+        lfs.mkdirs(new Path(dst.getName()));
+        lfs.create(dst).close();
+        assertTrue(lfs.isFile(dst));
+      }
+    }
+  }
+  
+  private void checkPut(int exitCode, Path src, Path dest) throws Exception {
+    String argv[] = null;
+    if (dest != null) {
+      argv = new String[]{ "-put", src.toString(), pathAsString(dest) };
+    } else {
+      argv = new String[]{ "-put", src.toString() };
+      dest = new Path(Path.CUR_DIR);
+    }
+    
+    Path target;
+    if (lfs.exists(dest)) {
+      if (lfs.isDirectory(dest)) {
+        target = new Path(pathAsString(dest), src.getName());
+      } else {
+        target = dest;
+      }
+    } else {
+      target = new Path(lfs.getWorkingDirectory(), dest);
+    }
+    boolean targetExists = lfs.exists(target);
+    Path parent = lfs.makeQualified(target).getParent();
+    
+    System.out.println("COPY src["+src.getName()+"] -> ["+dest+"] as ["+target+"]");
+    String lsArgv[] = new String[]{ "-ls", "-R", pathAsString(parent) };
+    shell.run(lsArgv);
+    
+    int gotExit = shell.run(argv);
+    
+    System.out.println("copy exit:"+gotExit);
+    lsArgv = new String[]{ "-ls", "-R", pathAsString(parent) };
+    shell.run(lsArgv);
+    
+    if (exitCode == 0) {
+      assertTrue(lfs.exists(target));
+      assertTrue(lfs.isFile(src) == lfs.isFile(target));
+      assertEquals(1, lfs.listStatus(lfs.makeQualified(target).getParent()).length);      
+    } else {
+      assertEquals(targetExists, lfs.exists(target));
+    }
+    assertEquals(exitCode, gotExit);
+  }
+  
+  // path handles "." rather oddly
+  private String pathAsString(Path p) {
+    String s = (p == null) ? Path.CUR_DIR : p.toString();
+    return s.isEmpty() ? Path.CUR_DIR : s;
+  }
 }

+ 6 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShellReturnCode.java

@@ -280,11 +280,15 @@ public class TestFsShellReturnCode {
     System.setErr(out);
     final String results;
     try {
+      Path tdir = new Path(TEST_ROOT_DIR, "notNullCopy");
+      fileSys.delete(tdir, true);
+      fileSys.mkdirs(tdir);
       String[] args = new String[3];
       args[0] = "-get";
-      args[1] = "/invalidPath";
-      args[2] = "/test/tmp";
+      args[1] = tdir+"/invalidSrc";
+      args[2] = tdir+"/invalidDst";
       assertTrue("file exists", !fileSys.exists(new Path(args[1])));
+      assertTrue("file exists", !fileSys.exists(new Path(args[2])));
       int run = shell.run(args);
       results = bytes.toString();
       assertEquals("Return code should be 1", 1, run);

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -235,6 +235,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3706. Fix circular redirect error in job-attempts page. (bobby
     via acmurthy)
 
+    MAPREDUCE-3896. Add user information to the delegation token issued by the
+    history server. (Vinod Kumar Vavilapalli via sseth)
+
 Release 0.23.1 - 2012-02-17
 
   INCOMPATIBLE CHANGES

+ 0 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.mapreduce.v2.security.client;
 
 import java.lang.annotation.Annotation;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.security.KerberosInfo;

+ 25 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java

@@ -29,8 +29,10 @@ import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -121,7 +123,6 @@ public class HistoryClientService extends AbstractService {
     InetAddress hostNameResolved = null;
     try {
       hostNameResolved = InetAddress.getLocalHost(); 
-      //address.getAddress().getLocalHost();
     } catch (UnknownHostException e) {
       throw new YarnException(e);
     }
@@ -166,6 +167,16 @@ public class HistoryClientService extends AbstractService {
     super.stop();
   }
 
+  @Private
+  public MRClientProtocol getClientHandler() {
+    return this.protocolHandler;
+  }
+
+  @Private
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddress;
+  }
+
   private class MRClientProtocolHandler implements MRClientProtocol {
 
     private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@@ -294,9 +305,12 @@ public class HistoryClientService extends AbstractService {
         GetDelegationTokenRequest request) throws YarnRemoteException {
 
       try {
+
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
       // Verify that the connection is kerberos authenticated
       AuthenticationMethod authMethod = UserGroupInformation
-        .getRealAuthenticationMethod(UserGroupInformation.getCurrentUser());
+        .getRealAuthenticationMethod(ugi);
       if (UserGroupInformation.isSecurityEnabled()
           && (authMethod != AuthenticationMethod.KERBEROS)) {
        throw new IOException(
@@ -305,8 +319,16 @@ public class HistoryClientService extends AbstractService {
 
       GetDelegationTokenResponse response = recordFactory.newRecordInstance(
           GetDelegationTokenResponse.class);
+
+      String user = ugi.getUserName();
+      Text owner = new Text(user);
+      Text realUser = null;
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
       MRDelegationTokenIdentifier tokenIdentifier =
-          new MRDelegationTokenIdentifier();
+          new MRDelegationTokenIdentifier(owner, new Text(
+            request.getRenewer()), realUser);
       Token<MRDelegationTokenIdentifier> realJHSToken =
           new Token<MRDelegationTokenIdentifier>(tokenIdentifier,
               jhsDTSecretManager);

+ 7 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -107,7 +108,12 @@ public class JobHistoryServer extends CompositeService {
     jhsDTSecretManager.stopThreads();
     super.stop();
   }
-  
+
+  @Private
+  public HistoryClientService getClientService() {
+    return this.clientService;
+  }
+
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
     try {

+ 120 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java

@@ -0,0 +1,120 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class TestJHSSecurity {
+
+  @Test
+  public void testDelegationToken() throws IOException, InterruptedException {
+
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+
+    final YarnConfiguration conf = new YarnConfiguration(new JobConf());
+    // Just a random principle
+    conf.set(JHAdminConfig.MR_HISTORY_PRINCIPAL,
+      "RandomOrc/localhost@apache.org");
+
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    final JobHistoryServer jobHistoryServer = new JobHistoryServer() {
+      protected void doSecureLogin(Configuration conf) throws IOException {
+        // no keytab based login
+      };
+    };
+    jobHistoryServer.init(conf);
+    jobHistoryServer.start();
+
+    // Fake the authentication-method
+    UserGroupInformation loggedInUser = UserGroupInformation.getCurrentUser();
+    loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+
+    // Get the delegation token directly as it is a little difficult to setup
+    // the kerberos based rpc.
+    DelegationToken token =
+        loggedInUser.doAs(new PrivilegedExceptionAction<DelegationToken>() {
+          @Override
+          public DelegationToken run() throws YarnRemoteException {
+            GetDelegationTokenRequest request =
+                Records.newRecord(GetDelegationTokenRequest.class);
+            request.setRenewer("OneRenewerToRuleThemAll");
+            return jobHistoryServer.getClientService().getClientHandler()
+              .getDelegationToken(request).getDelegationToken();
+          }
+        });
+
+    // Now try talking to JHS using the delegation token
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser("TheDarkLord");
+    ugi.addToken(new Token<MRDelegationTokenIdentifier>(token.getIdentifier()
+      .array(), token.getPassword().array(), new Text(token.getKind()),
+      new Text(token.getService())));
+    final YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol userUsingDT =
+        ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
+          @Override
+          public MRClientProtocol run() {
+            return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+              jobHistoryServer.getClientService().getBindAddress(), conf);
+          }
+        });
+    GetJobReportRequest jobReportRequest =
+        Records.newRecord(GetJobReportRequest.class);
+    jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
+    try {
+      userUsingDT.getJobReport(jobReportRequest);
+    } catch (YarnRemoteException e) {
+      Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
+    }
+  }
+
+}

+ 28 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/resources/krb5.conf

@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+[libdefaults]
+	default_realm = APACHE.ORG
+	udp_preference_limit = 1
+	extra_addresses = 127.0.0.1
+[realms]
+	APACHE.ORG = {
+		admin_server = localhost:88
+		kdc = localhost:88
+	}
+[domain_realm]
+	localhost = APACHE.ORG