Browse Source

HADOOP-10280: Merging rr1581546 from branch-2 to branch-2.4.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1581547 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 years ago
parent
commit
b758f107ab

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -62,6 +62,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10423. Clarify compatibility policy document for combination of 
     new client and old server. (Chris Nauroth via kasha)
 
+    HADOOP-10280. Make Schedulables return a configurable identity of user
+    or group. (Chris Li via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -89,6 +89,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
    */
   public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
+  public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
 
   /** Internal buffer size for Lzo compressor/decompressors */
   public static final String  IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY =

+ 38 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/IdentityProvider.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * The IdentityProvider creates identities for each schedulable
+ * by extracting fields and returning an identity string.
+ *
+ * Implementers will be able to change how schedulers treat
+ * Schedulables.
+ */
+@InterfaceAudience.Private
+public interface IdentityProvider {
+  /**
+   * Return the string used for scheduling.
+   * @param obj the schedulable to use.
+   * @return string identity, or null if no identity could be made.
+   */
+  public String makeIdentity(Schedulable obj);
+}

+ 34 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Interface which allows extracting information necessary to
+ * create schedulable identity strings.
+ */
+@InterfaceAudience.Private
+public interface Schedulable {
+  public UserGroupInformation getUserGroupInformation();
+}

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -493,7 +493,7 @@ public abstract class Server {
   }
 
   /** A call queued for handling. */
-  public static class Call {
+  public static class Call implements Schedulable {
     private final int callId;             // the client's call id
     private final int retryCount;        // the retry count of the call
     private final Writable rpcRequest;    // Serialized Rpc request from client
@@ -531,6 +531,12 @@ public abstract class Server {
     public void setResponse(ByteBuffer response) {
       this.rpcResponse = response;
     }
+
+    // For Schedulable
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return connection.user;
+    }    
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/

+ 36 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/UserIdentityProvider.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The UserIdentityProvider creates uses the username as the
+ * identity. All jobs launched by a user will be grouped together.
+ */
+public class UserIdentityProvider implements IdentityProvider {
+  public String makeIdentity(Schedulable obj) {
+    UserGroupInformation ugi = obj.getUserGroupInformation();
+    if (ugi == null) {
+      return null;
+    }
+
+    return ugi.getUserName();
+  }
+}

+ 88 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+import java.util.List;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestIdentityProviders {
+  public class FakeSchedulable implements Schedulable {
+    public FakeSchedulable() {
+    }
+
+    public UserGroupInformation getUserGroupInformation() {
+      try {
+        return UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        return null;
+      }
+    }
+
+  }
+
+  @Test
+  public void testPluggableIdentityProvider() {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+      "org.apache.hadoop.ipc.UserIdentityProvider");
+
+    List<IdentityProvider> providers = conf.getInstances(
+      CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+      IdentityProvider.class);
+
+    assertTrue(providers.size() == 1);
+
+    IdentityProvider ip = providers.get(0);
+    assertNotNull(ip);
+    assertEquals(ip.getClass(), UserIdentityProvider.class);
+  }
+
+  @Test
+  public void testUserIdentityProvider() throws IOException {
+    UserIdentityProvider uip = new UserIdentityProvider();
+    String identity = uip.makeIdentity(new FakeSchedulable());
+
+    // Get our username
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String username = ugi.getUserName();
+
+    assertEquals(username, identity);
+  }
+}