Browse Source

HDDS-1717. MR Job fails as OMFailoverProxyProvider has dependency hadoop-3.2

Closes #1056
Hanisha Koneru 6 years ago
parent
commit
8861573e8c

+ 1 - 2
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -223,8 +223,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
     retryInterval = OzoneUtils.getTimeDurationInMS(conf,
         OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
         OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
-    dtService =
-        getOMProxyProvider().getProxy().getDelegationTokenService();
+    dtService = getOMProxyProvider().getCurrentProxyDelegationToken();
     boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
         OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
         OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);

+ 32 - 50
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,7 +58,8 @@ public class OMFailoverProxyProvider implements
       LoggerFactory.getLogger(OMFailoverProxyProvider.class);
 
   // Map of OMNodeID to its proxy
-  private Map<String, OMProxyInfo> omProxies;
+  private Map<String, ProxyInfo<OzoneManagerProtocolPB>> omProxies;
+  private Map<String, OMProxyInfo> omProxyInfos;
   private List<String> omNodeIDList;
 
   private String currentProxyOMNodeId;
@@ -80,33 +80,9 @@ public class OMFailoverProxyProvider implements
     currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
   }
 
-  /**
-   * Class to store proxy information.
-   */
-  public class OMProxyInfo
-      extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
-    private InetSocketAddress address;
-    private Text dtService;
-
-    OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
-        Text dtService,
-        InetSocketAddress addr) {
-      super(proxy, proxyInfoStr);
-      this.address = addr;
-      this.dtService = dtService;
-    }
-
-    public InetSocketAddress getAddress() {
-      return address;
-    }
-
-    public Text getDelegationTokenService() {
-      return dtService;
-    }
-  }
-
   private void loadOMClientConfigs(Configuration config) throws IOException {
     this.omProxies = new HashMap<>();
+    this.omProxyInfos = new HashMap<>();
     this.omNodeIDList = new ArrayList<>();
 
     Collection<String> omServiceIds = config.getTrimmedStringCollection(
@@ -130,25 +106,21 @@ public class OMFailoverProxyProvider implements
           continue;
         }
 
-        InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
+        OMProxyInfo omProxyInfo = new OMProxyInfo(nodeId, rpcAddrStr);
+
+        if (omProxyInfo.getAddress() != null) {
 
-        // Add the OM client proxy info to list of proxies
-        if (addr != null) {
-          Text dtService = SecurityUtil.buildTokenService(addr);
-          StringBuilder proxyInfo = new StringBuilder()
-              .append(nodeId).append("(")
-              .append(NetUtils.getHostPortString(addr)).append(")");
-          OMProxyInfo omProxyInfo = new OMProxyInfo(null,
-              proxyInfo.toString(), dtService, addr);
+          ProxyInfo<OzoneManagerProtocolPB> proxyInfo =
+              new ProxyInfo(null, omProxyInfo.toString());
 
           // For a non-HA OM setup, nodeId might be null. If so, we assign it
           // a dummy value
           if (nodeId == null) {
             nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
           }
-          omProxies.put(nodeId, omProxyInfo);
+          omProxies.put(nodeId, proxyInfo);
+          omProxyInfos.put(nodeId, omProxyInfo);
           omNodeIDList.add(nodeId);
-
         } else {
           LOG.error("Failed to create OM proxy for {} at address {}",
               nodeId, rpcAddrStr);
@@ -183,26 +155,31 @@ public class OMFailoverProxyProvider implements
    * @return the OM proxy object to invoke methods upon
    */
   @Override
-  public synchronized OMProxyInfo getProxy() {
-    OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
-    createOMProxyIfNeeded(currentOMProxyInfo);
-    return currentOMProxyInfo;
+  public synchronized ProxyInfo getProxy() {
+    ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
+    createOMProxyIfNeeded(currentProxyInfo, currentProxyOMNodeId);
+    return currentProxyInfo;
   }
 
   /**
-   * Creates OM proxy object if it does not already exist.
+   * Creates proxy object if it does not already exist.
    */
-  private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
+  private void createOMProxyIfNeeded(ProxyInfo proxyInfo,
+      String nodeId) {
     if (proxyInfo.proxy == null) {
+      InetSocketAddress address = omProxyInfos.get(nodeId).getAddress();
       try {
-        proxyInfo.proxy = createOMProxy(proxyInfo.address);
+        proxyInfo.proxy = createOMProxy(address);
       } catch (IOException ioe) {
         LOG.error("{} Failed to create RPC proxy to OM at {}",
-            this.getClass().getSimpleName(), proxyInfo.address, ioe);
+            this.getClass().getSimpleName(), address, ioe);
         throw new RuntimeException(ioe);
       }
     }
-    return proxyInfo;
+  }
+
+  public synchronized Text getCurrentProxyDelegationToken() {
+    return omProxyInfos.get(currentProxyOMNodeId).getDelegationTokenService();
   }
 
   /**
@@ -269,7 +246,7 @@ public class OMFailoverProxyProvider implements
    */
   @Override
   public synchronized void close() throws IOException {
-    for (OMProxyInfo proxy : omProxies.values()) {
+    for (ProxyInfo<OzoneManagerProtocolPB> proxy : omProxies.values()) {
       OzoneManagerProtocolPB omProxy = proxy.proxy;
       if (omProxy != null) {
         RPC.stopProxy(omProxy);
@@ -278,8 +255,13 @@ public class OMFailoverProxyProvider implements
   }
 
   @VisibleForTesting
-  public List<OMProxyInfo> getOMProxies() {
-    return new ArrayList<>(omProxies.values());
+  public List<ProxyInfo> getOMProxies() {
+    return new ArrayList<ProxyInfo>(omProxies.values());
+  }
+
+  @VisibleForTesting
+  public List<OMProxyInfo> getOMProxyInfos() {
+    return new ArrayList<OMProxyInfo>(omProxyInfos.values());
   }
 }
 

+ 59 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Class to store OM proxy information.
+ */
+public class OMProxyInfo {
+  private String nodeId;
+  private String rpcAddrStr;
+  private InetSocketAddress rpcAddr;
+  private Text dtService;
+
+  OMProxyInfo(String nodeID, String rpcAddress) {
+    this.nodeId = nodeID;
+    this.rpcAddrStr = rpcAddress;
+    this.rpcAddr = NetUtils.createSocketAddr(rpcAddrStr);
+    this.dtService = SecurityUtil.buildTokenService(rpcAddr);
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder()
+        .append("nodeId=")
+        .append(nodeId)
+        .append(",nodeAddress=")
+        .append(rpcAddrStr);
+    return sb.toString();
+  }
+
+  public InetSocketAddress getAddress() {
+    return rpcAddr;
+  }
+
+  public Text getDelegationTokenService() {
+    return dtService;
+  }
+}

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -205,8 +206,7 @@ public abstract class TestOzoneRpcClientAbstract {
   public void testOMClientProxyProvider() {
     OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
         .getOMProxyProvider();
-    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
-        omFailoverProxyProvider.getOMProxies();
+    List<OMProxyInfo> omProxies = omFailoverProxyProvider.getOMProxyInfos();
 
     // For a non-HA OM service, there should be only one OM proxy.
     Assert.assertEquals(1, omProxies.size());

+ 6 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java

@@ -52,8 +52,10 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -604,8 +606,8 @@ public class TestOzoneManagerHA {
     OzoneClient rpcClient = cluster.getRpcClient();
     OMFailoverProxyProvider omFailoverProxyProvider =
         rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
-    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
-        omFailoverProxyProvider.getOMProxies();
+    List<OMProxyInfo> omProxies =
+        omFailoverProxyProvider.getOMProxyInfos();
 
     Assert.assertEquals(numOfOMs, omProxies.size());
 
@@ -613,7 +615,7 @@ public class TestOzoneManagerHA {
       InetSocketAddress omRpcServerAddr =
           cluster.getOzoneManager(i).getOmRpcServerAddr();
       boolean omClientProxyExists = false;
-      for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
+      for (OMProxyInfo omProxyInfo : omProxies) {
         if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
           omClientProxyExists = true;
           break;
@@ -674,7 +676,7 @@ public class TestOzoneManagerHA {
     // Perform a manual failover of the proxy provider to move the
     // currentProxyIndex to a node other than the leader OM.
     omFailoverProxyProvider.performFailover(
-        omFailoverProxyProvider.getProxy().proxy);
+        (OzoneManagerProtocolPB) omFailoverProxyProvider.getProxy().proxy);
 
     String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
     Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);