فهرست منبع

Merge trunk to branch-trunk-win

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1452666 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 سال پیش
والد
کامیت
d1ad6ec8cb
13فایلهای تغییر یافته به همراه120 افزوده شده و 78 حذف شده
  1. 6 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 7 23
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java
  3. 40 31
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
  4. 5 8
      hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto
  5. 18 8
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFVariations.java
  6. 4 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  7. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
  8. 6 0
      hadoop-yarn-project/CHANGES.txt
  9. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
  10. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
  11. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  12. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  13. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

+ 6 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -153,6 +153,9 @@ Trunk (Unreleased)
     HADOOP-9112. test-patch should -1 for @Tests without a timeout 
     (Surenkumar Nihalani via bobby)
 
+    HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to
+    avoid an extra copy (Sanjay Radia)
+
   BUG FIXES
 
     HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang)
@@ -403,6 +406,9 @@ Release 2.0.4-beta - UNRELEASED
     HADOOP-9349. Confusing output when running hadoop version from one hadoop 
     installation when HADOOP_HOME points to another. (sandyr via tucu)
 
+    HADOOP-9337. org.apache.hadoop.fs.DF.getMount() does not work on Mac OS.
+    (Ivan A. Veselovsky via atm)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

+ 7 - 23
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java

@@ -179,7 +179,8 @@ public class DF extends Shell {
   protected String[] getExecString() {
     // ignoring the error since the exit code it enough
     return (WINDOWS)? new String[]{"cmd", "/c", "df -k " + dirPath + " 2>nul"}:
-        new String[] {"bash","-c","exec 'df' '-k' '" + dirPath + "' 2>/dev/null"};
+        new String[] {"bash","-c","exec 'df' '-k' '-P' '" + dirPath 
+                      + "' 2>/dev/null"};
   }
 
   @Override
@@ -222,28 +223,11 @@ public class DF extends Shell {
     }
 
     try {
-      switch(getOSType()) {
-        case OS_TYPE_AIX:
-          Long.parseLong(tokens.nextToken()); // capacity
-          Long.parseLong(tokens.nextToken()); // available
-          Integer.parseInt(tokens.nextToken()); // pct used
-          tokens.nextToken();
-          tokens.nextToken();
-          this.mount = tokens.nextToken();
-          break;
-
-        case OS_TYPE_WIN:
-        case OS_TYPE_SOLARIS:
-        case OS_TYPE_MAC:
-        case OS_TYPE_UNIX:
-        default:
-          Long.parseLong(tokens.nextToken()); // capacity
-          Long.parseLong(tokens.nextToken()); // used
-          Long.parseLong(tokens.nextToken()); // available
-          Integer.parseInt(tokens.nextToken()); // pct used
-          this.mount = tokens.nextToken();
-          break;
-     }
+      Long.parseLong(tokens.nextToken()); // capacity
+      Long.parseLong(tokens.nextToken()); // used
+      Long.parseLong(tokens.nextToken()); // available
+      Integer.parseInt(tokens.nextToken()); // pct used
+      this.mount = tokens.nextToken();
     } catch (NoSuchElementException e) {
       throw new IOException("Could not parse line: " + line);
     } catch (NumberFormatException e) {

+ 40 - 31
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
@@ -39,7 +40,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto;
+import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -128,25 +129,12 @@ public class ProtobufRpcEngine implements RpcEngine {
           .getProtocolVersion(protocol);
     }
 
-    private RequestProto constructRpcRequest(Method method,
-        Object[] params) throws ServiceException {
-      RequestProto rpcRequest;
-      RequestProto.Builder builder = RequestProto
+    private RequestHeaderProto constructRpcRequestHeader(Method method) {
+      RequestHeaderProto.Builder builder = RequestHeaderProto
           .newBuilder();
       builder.setMethodName(method.getName());
+     
 
-      if (params.length != 2) { // RpcController + Message
-        throw new ServiceException("Too many parameters for request. Method: ["
-            + method.getName() + "]" + ", Expected: 2, Actual: "
-            + params.length);
-      }
-      if (params[1] == null) {
-        throw new ServiceException("null param while calling Method: ["
-            + method.getName() + "]");
-      }
-
-      Message param = (Message) params[1];
-      builder.setRequest(param.toByteString());
       // For protobuf, {@code protocol} used when creating client side proxy is
       // the interface extending BlockingInterface, which has the annotations 
       // such as ProtocolName etc.
@@ -160,8 +148,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       // For PB this may limit the use of mixins on client side.
       builder.setDeclaringClassProtocolName(protocolName);
       builder.setClientProtocolVersion(clientProtocolVersion);
-      rpcRequest = builder.build();
-      return rpcRequest;
+      return builder.build();
     }
 
     /**
@@ -189,8 +176,18 @@ public class ProtobufRpcEngine implements RpcEngine {
       if (LOG.isDebugEnabled()) {
         startTime = Time.now();
       }
+      
+      if (args.length != 2) { // RpcController + Message
+        throw new ServiceException("Too many parameters for request. Method: ["
+            + method.getName() + "]" + ", Expected: 2, Actual: "
+            + args.length);
+      }
+      if (args[1] == null) {
+        throw new ServiceException("null param while calling Method: ["
+            + method.getName() + "]");
+      }
 
-      RequestProto rpcRequest = constructRpcRequest(method, args);
+      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
       RpcResponseWrapper val = null;
       
       if (LOG.isTraceEnabled()) {
@@ -198,9 +195,12 @@ public class ProtobufRpcEngine implements RpcEngine {
             remoteId + ": " + method.getName() +
             " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
       }
+
+
+      Message theRequest = (Message) args[1];
       try {
         val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-            new RpcRequestWrapper(rpcRequest), remoteId);
+            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) {
@@ -275,20 +275,25 @@ public class ProtobufRpcEngine implements RpcEngine {
    * use type Writable as a wrapper to work across multiple RpcEngine kinds.
    */
   private static class RpcRequestWrapper implements Writable {
-    RequestProto message;
+    RequestHeaderProto requestHeader;
+    Message theRequest; // for clientSide, the request is here
+    byte[] theRequestRead; // for server side, the request is here
 
     @SuppressWarnings("unused")
     public RpcRequestWrapper() {
     }
 
-    RpcRequestWrapper(RequestProto message) {
-      this.message = message;
+    RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {
+      this.requestHeader = requestHeader;
+      this.theRequest = theRequest;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-      ((Message)message).writeDelimitedTo(
-          DataOutputOutputStream.constructOutputStream(out));
+      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
+      
+      ((Message)requestHeader).writeDelimitedTo(os);
+      theRequest.writeDelimitedTo(os);
     }
 
     @Override
@@ -296,13 +301,16 @@ public class ProtobufRpcEngine implements RpcEngine {
       int length = ProtoUtil.readRawVarint32(in);
       byte[] bytes = new byte[length];
       in.readFully(bytes);
-      message = RequestProto.parseFrom(bytes);
+      requestHeader = RequestHeaderProto.parseFrom(bytes);
+      length = ProtoUtil.readRawVarint32(in);
+      theRequestRead = new byte[length];
+      in.readFully(theRequestRead);
     }
     
     @Override
     public String toString() {
-      return message.getDeclaringClassProtocolName() + "." +
-          message.getMethodName();
+      return requestHeader.getDeclaringClassProtocolName() + "." +
+          requestHeader.getMethodName();
     }
   }
 
@@ -434,7 +442,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       public Writable call(RPC.Server server, String connectionProtocolName,
           Writable writableRequest, long receiveTime) throws Exception {
         RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
-        RequestProto rpcRequest = request.message;
+        RequestHeaderProto rpcRequest = request.requestHeader;
         String methodName = rpcRequest.getMethodName();
         
         
@@ -474,7 +482,8 @@ public class ProtobufRpcEngine implements RpcEngine {
         }
         Message prototype = service.getRequestPrototype(methodDescriptor);
         Message param = prototype.newBuilderForType()
-            .mergeFrom(rpcRequest.getRequest()).build();
+            .mergeFrom(request.theRequestRead).build();
+        
         Message result;
         try {
           long startTime = Time.now();

+ 5 - 8
hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto

@@ -1,4 +1,4 @@
-/**
+/**DER
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,20 +28,17 @@ option java_generate_equals_and_hash = true;
 package hadoop.common;
 
 /**
- * This message is used for Protobuf Rpc Engine.
- * The message is used to marshal a Rpc-request
- * from RPC client to the RPC server.
+ * This message is the header for the Protobuf Rpc Engine
+ * when sending a RPC request from  RPC client to the RPC server.
+ * The actual request (serialized as protobuf) follows this request.
  *
  * No special header is needed for the Rpc Response for Protobuf Rpc Engine.
  * The normal RPC response header (see RpcHeader.proto) are sufficient. 
  */
-message RequestProto {
+message RequestHeaderProto {
   /** Name of the RPC method */
   required string methodName = 1;
 
-  /** Bytes corresponding to the client protobuf request */
-  optional bytes request = 2;
-  
   /** 
    * RPCs for a particular interface (ie protocol) are done using a
    * IPC connection that is setup using rpcProxy.

+ 18 - 8
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDFVariations.java

@@ -31,6 +31,7 @@ import java.util.Random;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Shell;
 import org.junit.Test;
+import static org.junit.Assert.*;
 
 public class TestDFVariations {
 
@@ -46,14 +47,8 @@ public class TestDFVariations {
     }
     @Override
     protected String[] getExecString() {
-      switch(getOSType()) {
-        case OS_TYPE_AIX:
-          return new String[] { "echo", "IGNORE\n", "/dev/sda3",
-            "453115160", "400077240", "11%", "18", "skip%", "/foo/bar", "\n" };
-        default:
-          return new String[] { "echo", "IGNORE\n", "/dev/sda3",
-            "453115160", "53037920", "400077240", "11%", "/foo/bar", "\n" };
-      }
+      return new String[] { "echo", "IGNORE\n", 
+        "/dev/sda3", "453115160", "53037920", "400077240", "11%", "/foo/bar\n"};
     }
   }
 
@@ -135,5 +130,20 @@ public class TestDFVariations {
       System.out.println(e.toString());
     }
   }
+
+  @Test(timeout=5000)
+  public void testGetMountCurrentDirectory() throws Exception {
+    File currentDirectory = new File(".");
+    String workingDir = currentDirectory.getAbsoluteFile().getCanonicalPath();
+    DF df = new DF(new File(workingDir), 0L);
+    String mountPath = df.getMount();
+    File mountDir = new File(mountPath);
+    assertTrue("Mount dir ["+mountDir.getAbsolutePath()+"] should exist.", 
+        mountDir.exists());
+    assertTrue("Mount dir ["+mountDir.getAbsolutePath()+"] should be directory.", 
+        mountDir.isDirectory());
+    assertTrue("Working dir ["+workingDir+"] should start with ["+mountPath+"].",
+        workingDir.startsWith(mountPath));
+  }
 }
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -339,6 +339,10 @@ Release 2.0.4-beta - UNRELEASED
     HDFS-4235. When outputting XML, OfflineEditsViewer can't handle some edits
     containing non-ASCII strings. (Colin Patrick McCabe via atm)
 
+    HDFS-4541. Set hadoop.log.dir and hadoop.id.str when starting secure
+    datanode to write the logs to right dir by default. (Arpit Gupta via
+    suresh)
+
 Release 2.0.3-alpha - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs

@@ -73,9 +73,11 @@ if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_
   
     if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then
       HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
+      HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
     fi
    
     HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
+    HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
     starting_secure_dn="true"
   else
     echo "It looks like you're trying to start a secure DN, but \$JSVC_HOME"\

+ 6 - 0
hadoop-yarn-project/CHANGES.txt

@@ -362,6 +362,12 @@ Release 0.23.7 - UNRELEASED
     YARN-426. Failure to download a public resource prevents further downloads
     (Jason Lowe via bobby)
 
+    YARN-448. Remove unnecessary hflush from log aggregation (Kihwal Lee via
+    bobby)
+
+    YARN-345. Many InvalidStateTransitonException errors for ApplicationImpl
+    in Node Manager (Robert Parker via jlowe)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -231,7 +231,6 @@ public class AggregatedLogFormat {
       out = this.writer.prepareAppendValue(-1);
       out.writeInt(VERSION);
       out.close();
-      this.fsDataOStream.hflush();
     }
 
     public void writeApplicationOwner(String user) throws IOException {

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java

@@ -34,5 +34,6 @@ public enum ApplicationEventType {
 
   // Source: Log Handler
   APPLICATION_LOG_HANDLING_INITED,
-  APPLICATION_LOG_HANDLING_FINISHED
+  APPLICATION_LOG_HANDLING_FINISHED,
+  APPLICATION_LOG_HANDLING_FAILED
 }

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -149,6 +149,9 @@ public class ApplicationImpl implements Application {
            .addTransition(ApplicationState.INITING, ApplicationState.INITING,
                ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
                new AppLogInitDoneTransition())
+           .addTransition(ApplicationState.INITING, ApplicationState.INITING,
+               ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
+               new AppLogInitFailTransition())
            .addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
                ApplicationEventType.APPLICATION_INITED,
                new AppInitDoneTransition())
@@ -237,6 +240,26 @@ public class ApplicationImpl implements Application {
     }
   }
 
+  /**
+   * Handles the APPLICATION_LOG_HANDLING_FAILED event that occurs after
+   * {@link LogAggregationService} has failed to initialize the log 
+   * aggregation service
+   * 
+   * In particular, this requests that the {@link ResourceLocalizationService}
+   * localize the application-scoped resources.
+   */
+  @SuppressWarnings("unchecked")
+  static class AppLogInitFailTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      LOG.warn("Log Aggregation service failed to initialize, there will " + 
+               "be no logs for this application");
+      app.dispatcher.getEventHandler().handle(
+          new ApplicationLocalizationEvent(
+              LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+    }
+  }
   /**
    * Handles INIT_CONTAINER events which request that we launch a new
    * container. When we're still in the INITTING state, we simply

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -300,8 +300,9 @@ public class LogAggregationService extends AbstractService implements
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
     } catch (YarnException e) {
-      eventResponse = new ApplicationFinishEvent(appId,
-          "Application failed to init aggregation: " + e.getMessage());
+      LOG.warn("Application failed to init aggregation: " + e.getMessage());
+      eventResponse = new ApplicationEvent(appId,
+          ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
     }
     this.dispatcher.getEventHandler().handle(eventResponse);
   }

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -421,8 +421,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
-        new ApplicationFinishEvent(appId,
-            "Application failed to init aggregation: KABOOM!")
+        new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
@@ -471,8 +471,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
-        new ApplicationFinishEvent(appId,
-            "Application failed to init aggregation: "+e)
+        new ApplicationEvent(appId, 
+        		ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");