Quellcode durchsuchen

HDFS-5548. Merging change r1545759 from branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.2@1545761 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li vor 11 Jahren
Ursprung
Commit
7004cb0815

+ 0 - 95
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java

@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.portmap;
-
-import org.apache.hadoop.oncrpc.XDR;
-
-/**
- * Methods that need to be implemented to provide Portmap RPC program.
- * See RFC 1833 for details.
- */
-public interface PortmapInterface {
-  public enum Procedure {
-    // the order of the values below are significant.
-    PMAPPROC_NULL,
-    PMAPPROC_SET,
-    PMAPPROC_UNSET,
-    PMAPPROC_GETPORT,
-    PMAPPROC_DUMP,
-    PMAPPROC_CALLIT,
-    PMAPPROC_GETTIME,
-    PMAPPROC_UADDR2TADDR,
-    PMAPPROC_TADDR2UADDR,
-    PMAPPROC_GETVERSADDR,
-    PMAPPROC_INDIRECT,
-    PMAPPROC_GETADDRLIST,
-    PMAPPROC_GETSTAT;
-    
-    public int getValue() {
-      return ordinal();
-    }
-    
-    public static Procedure fromValue(int value) {
-      if (value < 0 || value >= values().length) {
-        return null;
-      }
-      return values()[value];
-    }
-  }
-
-  /**
-   * This procedure does no work. By convention, procedure zero of any protocol
-   * takes no parameters and returns no results.
-   */
-  public XDR nullOp(int xidd, XDR in, XDR out);
-  
-  /**
-   * When a program first becomes available on a machine, it registers itself
-   * with the port mapper program on the same machine. The program passes its
-   * program number "prog", version number "vers", transport protocol number
-   * "prot", and the port "port" on which it awaits service request. The
-   * procedure returns a boolean reply whose value is "TRUE" if the procedure
-   * successfully established the mapping and "FALSE" otherwise. The procedure
-   * refuses to establish a mapping if one already exists for the tuple
-   * "(prog, vers, prot)".
-   */
-  public XDR set(int xid, XDR in, XDR out);
-  
-  /**
-   * When a program becomes unavailable, it should unregister itself with the
-   * port mapper program on the same machine. The parameters and results have
-   * meanings identical to those of "PMAPPROC_SET". The protocol and port number
-   * fields of the argument are ignored.
-   */
-  public XDR unset(int xid, XDR in, XDR out);
-  
-  /**
-   * Given a program number "prog", version number "vers", and transport
-   * protocol number "prot", this procedure returns the port number on which the
-   * program is awaiting call requests. A port value of zeros means the program
-   * has not been registered. The "port" field of the argument is ignored.
-   */
-  public XDR getport(int xid, XDR in, XDR out);
-  
-  /**
-   * This procedure enumerates all entries in the port mapper's database. The
-   * procedure takes no parameters and returns a list of program, version,
-   * protocol, and port values.
-   */
-  public XDR dump(int xid, XDR in, XDR out);
-}

+ 1 - 2
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java

@@ -22,7 +22,6 @@ import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.apache.hadoop.portmap.PortmapInterface.Procedure;
 
 /**
  * Helper utility for building portmap request
@@ -37,7 +36,7 @@ public class PortmapRequest {
     RpcCall call = RpcCall.getInstance(
         RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
         RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
-        Procedure.PMAPPROC_SET.getValue(), new CredentialsNone(),
+        RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(),
         new VerifierNone());
     call.write(request);
     return mapping.serialize(request);

+ 66 - 42
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,20 +40,26 @@ import org.jboss.netty.handler.timeout.IdleState;
 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
 import org.jboss.netty.handler.timeout.IdleStateEvent;
 
-final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface {
+final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
   static final int PROGRAM = 100000;
   static final int VERSION = 2;
+
+  static final int PMAPPROC_NULL = 0;
+  static final int PMAPPROC_SET = 1;
+  static final int PMAPPROC_UNSET = 2;
+  static final int PMAPPROC_GETPORT = 3;
+  static final int PMAPPROC_DUMP = 4;
+  static final int PMAPPROC_GETVERSADDR = 9;
+
   private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
 
-  /** Map synchronized usis monitor lock of this instance */
-  private final HashMap<String, PortmapMapping> map;
+  private final ConcurrentHashMap<String, PortmapMapping> map = new ConcurrentHashMap<String, PortmapMapping>();
 
   /** ChannelGroup that remembers all active channels for gracefully shutdown. */
   private final ChannelGroup allChannels;
 
   RpcProgramPortmap(ChannelGroup allChannels) {
     this.allChannels = allChannels;
-    map = new HashMap<String, PortmapMapping>(256);
     PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
         PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
     PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION,
@@ -61,48 +67,66 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler imple
     map.put(PortmapMapping.key(m), m);
     map.put(PortmapMapping.key(m1), m1);
   }
-  
-  @Override
-  public XDR nullOp(int xid, XDR in, XDR out) {
+
+  /**
+   * This procedure does no work. By convention, procedure zero of any protocol
+   * takes no parameters and returns no results.
+   */
+  private XDR nullOp(int xid, XDR in, XDR out) {
     return PortmapResponse.voidReply(out, xid);
   }
 
-  @Override
-  public XDR set(int xid, XDR in, XDR out) {
+  /**
+   * When a program first becomes available on a machine, it registers itself
+   * with the port mapper program on the same machine. The program passes its
+   * program number "prog", version number "vers", transport protocol number
+   * "prot", and the port "port" on which it awaits service request. The
+   * procedure returns a boolean reply whose value is "TRUE" if the procedure
+   * successfully established the mapping and "FALSE" otherwise. The procedure
+   * refuses to establish a mapping if one already exists for the tuple
+   * "(prog, vers, prot)".
+   */
+  private XDR set(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
     String key = PortmapMapping.key(mapping);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Portmap set key=" + key);
     }
 
-    PortmapMapping value = null;
-    synchronized(this) {
-      map.put(key, mapping);
-      value = map.get(key);
-    }  
-    return PortmapResponse.intReply(out, xid, value.getPort());
+    map.put(key, mapping);
+    return PortmapResponse.intReply(out, xid, mapping.getPort());
   }
 
-  @Override
-  public synchronized XDR unset(int xid, XDR in, XDR out) {
+  /**
+   * When a program becomes unavailable, it should unregister itself with the
+   * port mapper program on the same machine. The parameters and results have
+   * meanings identical to those of "PMAPPROC_SET". The protocol and port number
+   * fields of the argument are ignored.
+   */
+  private XDR unset(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
-    synchronized(this) {
-      map.remove(PortmapMapping.key(mapping));
-    }
+    String key = PortmapMapping.key(mapping);
+
+    if (LOG.isDebugEnabled())
+      LOG.debug("Portmap remove key=" + key);
+
+    map.remove(key);
     return PortmapResponse.booleanReply(out, xid, true);
   }
 
-  @Override
-  public synchronized XDR getport(int xid, XDR in, XDR out) {
+  /**
+   * Given a program number "prog", version number "vers", and transport
+   * protocol number "prot", this procedure returns the port number on which the
+   * program is awaiting call requests. A port value of zeros means the program
+   * has not been registered. The "port" field of the argument is ignored.
+   */
+  private XDR getport(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
     String key = PortmapMapping.key(mapping);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Portmap GETPORT key=" + key + " " + mapping);
     }
-    PortmapMapping value = null;
-    synchronized(this) {
-      value = map.get(key);
-    }
+    PortmapMapping value = map.get(key);
     int res = 0;
     if (value != null) {
       res = value.getPort();
@@ -115,13 +139,13 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler imple
     return PortmapResponse.intReply(out, xid, res);
   }
 
-  @Override
-  public synchronized XDR dump(int xid, XDR in, XDR out) {
-    PortmapMapping[] pmapList = null;
-    synchronized(this) {
-      pmapList = new PortmapMapping[map.values().size()];
-      map.values().toArray(pmapList);
-    }
+  /**
+   * This procedure enumerates all entries in the port mapper's database. The
+   * procedure takes no parameters and returns a list of program, version,
+   * protocol, and port values.
+   */
+  private XDR dump(int xid, XDR in, XDR out) {
+    PortmapMapping[] pmapList = map.values().toArray(new PortmapMapping[0]);
     return PortmapResponse.pmapList(out, xid, pmapList);
   }
 
@@ -131,23 +155,23 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler imple
 
     RpcInfo info = (RpcInfo) e.getMessage();
     RpcCall rpcCall = (RpcCall) info.header();
-    final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
+    final int portmapProc = rpcCall.getProcedure();
     int xid = rpcCall.getXid();
     XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
         XDR.State.READING);
     XDR out = new XDR();
 
-    if (portmapProc == Procedure.PMAPPROC_NULL) {
+    if (portmapProc == PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_SET) {
+    } else if (portmapProc == PMAPPROC_SET) {
       out = set(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_UNSET) {
+    } else if (portmapProc == PMAPPROC_UNSET) {
       out = unset(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_DUMP) {
+    } else if (portmapProc == PMAPPROC_DUMP) {
       out = dump(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_GETPORT) {
+    } else if (portmapProc == PMAPPROC_GETPORT) {
       out = getport(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_GETVERSADDR) {
+    } else if (portmapProc == PMAPPROC_GETVERSADDR) {
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
@@ -161,7 +185,7 @@ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler imple
     RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
-  
+
   @Override
   public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
       throws Exception {

+ 3 - 3
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java

@@ -23,7 +23,7 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.HashMap;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -80,7 +80,7 @@ public class TestPortmap {
     XDR req = new XDR();
     RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM,
         RpcProgramPortmap.VERSION,
-        PortmapInterface.Procedure.PMAPPROC_SET.getValue(),
+        RpcProgramPortmap.PMAPPROC_SET,
         new CredentialsNone(), new VerifierNone()).write(req);
 
     PortmapMapping sent = new PortmapMapping(90000, 1,
@@ -101,7 +101,7 @@ public class TestPortmap {
     Thread.sleep(100);
     boolean found = false;
     @SuppressWarnings("unchecked")
-    HashMap<String, PortmapMapping> map = (HashMap<String, PortmapMapping>) Whitebox
+    Map<String, PortmapMapping> map = (Map<String, PortmapMapping>) Whitebox
         .getInternalState(pm.getHandler(), "map");
 
     for (PortmapMapping m : map.values()) {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -119,6 +119,8 @@ Release 2.2.1 - UNRELEASED
 
     HDFS-5407. Fix typos in DFSClientCache (Haohui Mai via brandonli)
 
+    HDFS-5548. Use ConcurrentHashMap in portmap (Haohui Mai via brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES