소스 검색

HADOOP-252. Add versioning to RPC protocols. Contributed by Milind.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@421841 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 년 전
부모
커밋
a7550d9585

+ 3 - 0
CHANGES.txt

@@ -37,6 +37,9 @@ Trunk (unreleased changes)
 10. HADOOP-354.  Make public methods to stop DFS daemons.
    (Barry Kaplan via cutting)
 
+11. HADOOP-252.  Add versioning to RPC protocols.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 4 - 1
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -16,6 +16,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**********************************************************************
  * ClientProtocol is used by a piece of DFS user code to communicate 
@@ -24,8 +25,10 @@ import java.io.*;
  *
  * @author Mike Cafarella
  **********************************************************************/
-interface ClientProtocol {
+interface ClientProtocol extends VersionedProtocol {
 
+  public static final long versionID = 1L;
+  
     ///////////////////////////////////////
     // File contents
     ///////////////////////////////////////

+ 4 - 2
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -97,9 +97,11 @@ class DFSClient implements FSConstants {
     /** 
      * Create a new DFSClient connected to the given namenode server.
      */
-    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) {
+    public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+    throws IOException {
         this.conf = conf;
-        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf);
+        this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+            ClientProtocol.versionID, nameNodeAddr, conf);
         try {
             this.localName = InetAddress.getLocalHost().getHostName();
         } catch (UnknownHostException uhe) {

+ 2 - 1
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -118,7 +118,8 @@ public class DataNode implements FSConstants, Runnable {
       // get storage info and lock the data dir
       storage = new DataStorage( datadir );
       // connect to name node
-      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, 
+      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
+                                                      DatanodeProtocol.versionID,
                                                       nameNodeAddr, 
                                                       conf);
       // find free port

+ 3 - 1
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -17,6 +17,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**********************************************************************
  * Protocol that a DFS datanode uses to communicate with the NameNode.
@@ -27,7 +28,8 @@ import java.io.*;
  *
  * @author Michael Cafarella
  **********************************************************************/
-interface DatanodeProtocol {
+interface DatanodeProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
   // error code
   final static int DISK_ERROR = 1;
   final static int INVALID_BLOCK = 2;

+ 7 - 1
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -20,7 +20,6 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.*;
 
 import java.io.*;
 
@@ -57,6 +56,13 @@ import java.io.*;
  * @author Mike Cafarella
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
+    public long getProtocolVersion(String protocol, long clientVersion) { 
+      if (protocol.equals(ClientProtocol.class.getName())) {
+        return ClientProtocol.versionID; 
+      } else {
+        return DatanodeProtocol.versionID;
+      }
+    }
     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NameNode");
     public static final Log stateChangeLog = LogFactory.getLog( "org.apache.hadoop.dfs.StateChange");
 

+ 13 - 2
src/java/org/apache/hadoop/ipc/RPC.java

@@ -165,10 +165,21 @@ public class RPC {
 
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static Object getProxy(Class protocol, InetSocketAddress addr, Configuration conf) {
-    return Proxy.newProxyInstance(protocol.getClassLoader(),
+  public static VersionedProtocol getProxy(Class protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf)
+  throws RemoteException {
+    VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
+                                  protocol.getClassLoader(),
                                   new Class[] { protocol },
                                   new Invoker(addr, conf));
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+    if (serverVersion == clientVersion) {
+      return proxy;
+    } else {
+      throw new RemoteException(protocol.getName(),
+          "RPC Server and Client Versions Mismatched. SID:"+serverVersion+
+          " CID:"+clientVersion);
+    }
   }
 
   /** Expert: Make multiple, parallel calls to a set of servers. */

+ 32 - 0
src/java/org/apache/hadoop/ipc/VersionedProtocol.java

@@ -0,0 +1,32 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.io.UTF8;
+
+/**
+ * Superclass of all protocols that use Hadoop RPC.
+ * Subclasses of this interface are also supposed to have
+ * a static final long versionID field.
+ * @author milindb
+ */
+public interface VersionedProtocol {
+  /**
+   * Return protocol version corresponding to protocol interface
+   */
+  public long getProtocolVersion(String protocol, long clientVersion);
+}

+ 5 - 1
src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -18,11 +18,15 @@ package org.apache.hadoop.mapred;
 
 import java.io.*;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** 
  * Protocol that a TaskTracker and the central JobTracker use to communicate.
  * The JobTracker is the Server, which implements this protocol.
  */ 
-interface InterTrackerProtocol {
+interface InterTrackerProtocol extends VersionedProtocol {
+  public static final long versionID = 1L;
+  
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
 

+ 4 - 0
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -32,6 +32,10 @@ public class IsolationRunner {
 
   private static class FakeUmbilical implements TaskUmbilicalProtocol {
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
     public void done(String taskid) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }

+ 3 - 1
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -187,6 +187,7 @@ public class JobClient extends ToolBase implements MRConstants  {
         } else {
           this.jobSubmitClient = (JobSubmissionProtocol) 
             RPC.getProxy(JobSubmissionProtocol.class,
+                         JobSubmissionProtocol.versionID,
                          JobTracker.getAddress(conf), conf);
         }        
     }
@@ -196,7 +197,8 @@ public class JobClient extends ToolBase implements MRConstants  {
      */
     public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
         this.jobSubmitClient = (JobSubmissionProtocol) 
-            RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr, conf);
+            RPC.getProxy(JobSubmissionProtocol.class,
+                         JobSubmissionProtocol.versionID, jobTrackAddr, conf);
     }
 
 

+ 4 - 2
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -17,14 +17,16 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.util.*;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  The
  * JobClient can use these methods to submit a Job for execution, and learn about
  * the current system status.
  */ 
-interface JobSubmissionProtocol {
+interface JobSubmissionProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
     /**
      * Submit a Job for execution.  Returns the latest profile for
      * that job.

+ 7 - 0
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -87,6 +87,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       tracker = null;
     }
     
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      if (protocol.equals(InterTrackerProtocol.class.getName())) {
+        return InterTrackerProtocol.versionID;
+      } else {
+        return JobSubmissionProtocol.versionID;
+      }
+    }
     /**
      * A thread to timeout tasks that have been assigned to task trackers,
      * but that haven't reported back yet.

+ 8 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -35,6 +35,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
+  public long getProtocolVersion(String protocol, long clientVersion) {
+    return JobSubmissionProtocol.versionID;
+  }
+  
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
     private String file;
@@ -48,6 +52,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
     private Path localFile;
     private FileSystem localFs;
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
     public Job(String file, Configuration conf) throws IOException {
       this.file = file;
       this.id = "job_" + newId();

+ 6 - 1
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -114,6 +114,9 @@ public class TaskTracker
       taskCleanupThread.start();
     }
     
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
     /**
      * Do the real constructor work here.  It's in a separate method
      * so we can call it again and "recycle" the object after calling
@@ -160,7 +163,8 @@ public class TaskTracker
         this.mapOutputFile.cleanupStorage();
         this.justStarted = true;
 
-        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf);
+        this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class,
+            InterTrackerProtocol.versionID, jobTrackAddr, this.fConf);
         
         this.running = true;
     }
@@ -1002,6 +1006,7 @@ public class TaskTracker
           String taskid = args[1];
           TaskUmbilicalProtocol umbilical =
             (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+                                                TaskUmbilicalProtocol.versionID,
                                                 new InetSocketAddress(port), 
                                                 defaultConf);
             

+ 5 - 1
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -18,12 +18,16 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
-interface TaskUmbilicalProtocol {
+interface TaskUmbilicalProtocol extends VersionedProtocol {
 
+  public static final long versionID = 1L;
+  
   /** Called when a child task process starts, to get its task.*/
   Task getTask(String taskid) throws IOException;
 

+ 10 - 2
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
   private static final int PORT = 1234;
@@ -46,7 +48,9 @@ public class TestRPC extends TestCase {
 
   public TestRPC(String name) { super(name); }
 	
-  public interface TestProtocol {
+  public interface TestProtocol extends VersionedProtocol {
+    public static final long versionID = 1L;
+    
     void ping() throws IOException;
     String echo(String value) throws IOException;
     String[] echo(String[] value) throws IOException;
@@ -59,6 +63,10 @@ public class TestRPC extends TestCase {
 
   public class TestImpl implements TestProtocol {
 
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TestProtocol.versionID;
+    }
+    
     public void ping() {}
 
     public String echo(String value) throws IOException { return value; }
@@ -98,7 +106,7 @@ public class TestRPC extends TestCase {
 
     InetSocketAddress addr = new InetSocketAddress(PORT);
     TestProtocol proxy =
-      (TestProtocol)RPC.getProxy(TestProtocol.class, addr, conf);
+      (TestProtocol)RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
     
     proxy.ping();