Explorar o código

Merge remote-tracking branch 'apache/trunk' into HDDS-48

Arpit Agarwal %!s(int64=7) %!d(string=hai) anos
pai
achega
9bd5bef297
Modificáronse 20 ficheiros con 515 adicións e 251 borrados
  1. 3 3
      hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
  2. 1 1
      hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties
  3. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
  4. 3 1
      hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties
  5. 3 1
      hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties
  6. 5 0
      hadoop-hdds/framework/pom.xml
  7. 62 46
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
  8. 42 1
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
  9. 79 0
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
  10. 25 10
      hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
  11. 4 31
      hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
  12. 94 13
      hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
  13. 0 91
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
  14. 8 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
  15. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
  16. 23 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
  17. 3 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
  18. 123 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
  19. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  20. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

+ 3 - 3
hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh

@@ -88,7 +88,7 @@
 # Extra Java runtime options for all Hadoop commands. We don't support
 # IPv6 yet/still, so by default the preference is set to IPv4.
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true"
-# For Kerberos debugging, an extended option set logs more invormation
+# For Kerberos debugging, an extended option set logs more information
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug"
 
 # Some parts of the shell code may do special things dependent upon
@@ -120,9 +120,9 @@ esac
 #
 # By default, Apache Hadoop overrides Java's CLASSPATH
 # environment variable.  It is configured such
-# that it sarts out blank with new entries added after passing
+# that it starts out blank with new entries added after passing
 # a series of checks (file/dir exists, not already listed aka
-# de-deduplication).  During de-depulication, wildcards and/or
+# de-deduplication).  During de-deduplication, wildcards and/or
 # directories are *NOT* expanded to keep it simple. Therefore,
 # if the computed classpath has two specific mentions of
 # awesome-methods-1.0.jar, only the first one added will be seen.

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties

@@ -47,7 +47,7 @@
 #*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
 
 # Tag values to use for the ganglia prefix. If not defined no tags are used.
-# If '*' all tags are used. If specifiying multiple tags separate them with 
+# If '*' all tags are used. If specifying multiple tags separate them with
 # commas. Note that the last segment of the property name is the context name.
 #
 # A typical use of tags is separating the metrics by the HDFS rpc port

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java

@@ -1036,13 +1036,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           public Token<?> run() throws Exception {
             // Not using the cached token here.. Creating a new token here
             // everytime.
-            LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
+            LOG.info("Getting new token from {}, renewer:{}", url, renewer);
             return authUrl.getDelegationToken(url,
                 new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
           }
         });
         if (token != null) {
-          LOG.debug("New token received: ({})", token);
+          LOG.info("New token received: ({})", token);
           credentials.addToken(token.getService(), token);
           tokens = new Token<?>[] { token };
         } else {

+ 3 - 1
hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties

@@ -37,4 +37,6 @@ log4j.logger.org.apache.hadoop=INFO
 log4j.logger.com.sun.jersey.server.wadl.generators.WadlGeneratorJAXBGrammarGenerator=OFF
 # make zookeeper log level an explicit config, and not changing with rootLogger.
 log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.org.apache.curator=INFO
+log4j.logger.org.apache.curator=INFO
+# make jetty log level an explicit config, and not changing with rootLogger.
+log4j.logger.org.eclipse.jetty=INFO

+ 3 - 1
hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties

@@ -31,4 +31,6 @@ log4j.logger.org.apache.directory.server.core=OFF
 log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
 # make zookeeper log level an explicit config, and not changing with rootLogger.
 log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.org.apache.curator=INFO
+log4j.logger.org.apache.curator=INFO
+# make jetty log level an explicit config, and not changing with rootLogger.
+log4j.logger.org.eclipse.jetty=INFO

+ 5 - 0
hadoop-hdds/framework/pom.xml

@@ -39,6 +39,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>hadoop-hdds-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

+ 62 - 46
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java

@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdds.server.events;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable {
   private static final Logger LOG =
       LoggerFactory.getLogger(EventQueue.class);
 
+  private static final String EXECUTOR_NAME_SEPARATOR = "For";
+
   private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
       new HashMap<>();
 
@@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable {
 
   public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
       EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
-
-    this.addHandler(event, new SingleThreadExecutor<>(
-        event.getName()), handler);
+    this.addHandler(event, handler, generateHandlerName(handler));
   }
 
+  /**
+   * Add new handler to the event queue.
+   * <p>
+   * By default a separated single thread executor will be dedicated to
+   * deliver the events to the registered event handler.
+   *
+   * @param event        Triggering event.
+   * @param handler      Handler of event (will be called from a separated
+   *                     thread)
+   * @param handlerName  The name of handler (should be unique together with
+   *                     the event name)
+   * @param <PAYLOAD>    The type of the event payload.
+   * @param <EVENT_TYPE> The type of the event identifier.
+   */
   public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
-      EVENT_TYPE event,
-      EventExecutor<PAYLOAD> executor,
-      EventHandler<PAYLOAD> handler) {
+      EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
+    validateEvent(event);
+    Preconditions.checkNotNull(handler, "Handler name should not be null.");
+    String executorName =
+        StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+            + handlerName;
+    this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
+  }
 
-    executors.putIfAbsent(event, new HashMap<>());
-    executors.get(event).putIfAbsent(executor, new ArrayList<>());
+  private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
+    Preconditions
+        .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
+            "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
+                + " string.");
 
-    executors.get(event)
-        .get(executor)
-        .add(handler);
+  }
+
+  private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
+    if (!"".equals(handler.getClass().getSimpleName())) {
+      return handler.getClass().getSimpleName();
+    } else {
+      return handler.getClass().getName();
+    }
   }
 
   /**
-   * Creates one executor with multiple event handlers.
+   * Add event handler with custom executor.
+   *
+   * @param event        Triggering event.
+   * @param executor     The executor imlementation to deliver events from a
+   *                     separated threads. Please keep in your mind that
+   *                     registering metrics is the responsibility of the
+   *                     caller.
+   * @param handler      Handler of event (will be called from a separated
+   *                     thread)
+   * @param <PAYLOAD>    The type of the event payload.
+   * @param <EVENT_TYPE> The type of the event identifier.
    */
-  public void addHandlerGroup(String name, HandlerForEvent<?>...
-      eventsAndHandlers) {
-    SingleThreadExecutor sharedExecutor =
-        new SingleThreadExecutor(name);
-    for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
-      addHandler(handlerForEvent.event, sharedExecutor,
-          handlerForEvent.handler);
-    }
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+      EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
+      EventHandler<PAYLOAD> handler) {
+    validateEvent(event);
+    executors.putIfAbsent(event, new HashMap<>());
+    executors.get(event).putIfAbsent(executor, new ArrayList<>());
 
+    executors.get(event).get(executor).add(handler);
   }
 
+
+
   /**
    * Route an event with payload to the right listener(s).
    *
@@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable {
     });
   }
 
-  /**
-   * Event identifier together with the handler.
-   *
-   * @param <PAYLOAD>
-   */
-  public static class HandlerForEvent<PAYLOAD> {
-
-    private final Event<PAYLOAD> event;
-
-    private final EventHandler<PAYLOAD> handler;
-
-    public HandlerForEvent(
-        Event<PAYLOAD> event,
-        EventHandler<PAYLOAD> handler) {
-      this.event = event;
-      this.handler = handler;
-    }
-
-    public Event<PAYLOAD> getEvent() {
-      return event;
-    }
-
-    public EventHandler<PAYLOAD> getHandler() {
-      return handler;
-    }
-  }
 
 }

+ 42 - 1
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java

@@ -26,12 +26,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
 import org.apache.hadoop.ozone.lease.LeaseExpiredException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,18 +63,39 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   private final LeaseManager<UUID> leaseManager;
 
+  private final EventWatcherMetrics metrics;
+
+  private final String name;
+
   protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
       new ConcurrentHashMap<>();
 
   protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
 
-  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+  private final Map<UUID, Long> startTrackingTimes = new HashedMap();
+
+  public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
       Event<COMPLETION_PAYLOAD> completionEvent,
       LeaseManager<UUID> leaseManager) {
     this.startEvent = startEvent;
     this.completionEvent = completionEvent;
     this.leaseManager = leaseManager;
+    this.metrics = new EventWatcherMetrics();
+    Preconditions.checkNotNull(name);
+    if (name.equals("")) {
+      name = getClass().getSimpleName();
+    }
+    if (name.equals("")) {
+      //for anonymous inner classes
+      name = getClass().getName();
+    }
+    this.name = name;
+  }
 
+  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+      Event<COMPLETION_PAYLOAD> completionEvent,
+      LeaseManager<UUID> leaseManager) {
+    this("", startEvent, completionEvent, leaseManager);
   }
 
   public void start(EventQueue queue) {
@@ -87,11 +113,16 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
       }
     });
 
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.register(name, "EventWatcher metrics", metrics);
   }
 
   private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
       EventPublisher publisher) {
+    metrics.incrementTrackedEvents();
     UUID identifier = payload.getUUID();
+    startTrackingTimes.put(identifier, System.currentTimeMillis());
+
     trackedEventsByUUID.put(identifier, payload);
     trackedEvents.add(payload);
     try {
@@ -112,16 +143,21 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   private synchronized void handleCompletion(UUID uuid,
       EventPublisher publisher) throws LeaseNotFoundException {
+    metrics.incrementCompletedEvents();
     leaseManager.release(uuid);
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
     trackedEvents.remove(payload);
+    long originalTime = startTrackingTimes.remove(uuid);
+    metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
     onFinished(publisher, payload);
   }
 
   private synchronized void handleTimeout(EventPublisher publisher,
       UUID identifier) {
+    metrics.incrementTimedOutEvents();
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
     trackedEvents.remove(payload);
+    startTrackingTimes.remove(payload.getUUID());
     onTimeout(publisher, payload);
   }
 
@@ -154,4 +190,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
     return trackedEventsByUUID.values().stream().filter(predicate)
         .collect(Collectors.toList());
   }
+
+  @VisibleForTesting
+  protected EventWatcherMetrics getMetrics() {
+    return metrics;
+  }
 }

+ 79 - 0
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Metrics for any event watcher.
+ */
+public class EventWatcherMetrics {
+
+  @Metric()
+  private MutableCounterLong trackedEvents;
+
+  @Metric()
+  private MutableCounterLong timedOutEvents;
+
+  @Metric()
+  private MutableCounterLong completedEvents;
+
+  @Metric()
+  private MutableRate completionTime;
+
+  public void incrementTrackedEvents() {
+    trackedEvents.incr();
+  }
+
+  public void incrementTimedOutEvents() {
+    timedOutEvents.incr();
+  }
+
+  public void incrementCompletedEvents() {
+    completedEvents.incr();
+  }
+
+  @VisibleForTesting
+  public void updateFinishingTime(long duration) {
+    completionTime.add(duration);
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTrackedEvents() {
+    return trackedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTimedOutEvents() {
+    return timedOutEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getCompletedEvents() {
+    return completedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableRate getCompletionTime() {
+    return completionTime;
+  }
+}

+ 25 - 10
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java

@@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 
 /**
  * Simple EventExecutor to call all the event handler one-by-one.
  *
  * @param <T>
  */
+@Metrics(context = "EventQueue")
 public class SingleThreadExecutor<T> implements EventExecutor<T> {
 
   public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -41,14 +46,24 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
 
   private final ThreadPoolExecutor executor;
 
-  private final AtomicLong queuedCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong queued;
 
-  private final AtomicLong successfulCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong done;
 
-  private final AtomicLong failedCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong failed;
 
+  /**
+   * Create SingleThreadExecutor.
+   *
+   * @param name Unique name used in monitoring and metrics.
+   */
   public SingleThreadExecutor(String name) {
     this.name = name;
+    DefaultMetricsSystem.instance()
+        .register("EventQueue" + name, "Event Executor metrics ", this);
 
     LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     executor =
@@ -64,31 +79,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
   @Override
   public void onMessage(EventHandler<T> handler, T message, EventPublisher
       publisher) {
-    queuedCount.incrementAndGet();
+    queued.incr();
     executor.execute(() -> {
       try {
         handler.onMessage(message, publisher);
-        successfulCount.incrementAndGet();
+        done.incr();
       } catch (Exception ex) {
         LOG.error("Error on execution message {}", message, ex);
-        failedCount.incrementAndGet();
+        failed.incr();
       }
     });
   }
 
   @Override
   public long failedEvents() {
-    return failedCount.get();
+    return failed.value();
   }
 
   @Override
   public long successfulEvents() {
-    return successfulCount.get();
+    return done.value();
   }
 
   @Override
   public long queuedEvents() {
-    return queuedCount.get();
+    return queued.value();
   }
 
   @Override

+ 4 - 31
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java

@@ -25,6 +25,8 @@ import org.junit.Test;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
 /**
  * Testing the basic functionality of the event queue.
  */
@@ -44,11 +46,13 @@ public class TestEventQueue {
 
   @Before
   public void startEventQueue() {
+    DefaultMetricsSystem.initialize(getClass().getSimpleName());
     queue = new EventQueue();
   }
 
   @After
   public void stopEventQueue() {
+    DefaultMetricsSystem.shutdown();
     queue.close();
   }
 
@@ -79,35 +83,4 @@ public class TestEventQueue {
 
   }
 
-  @Test
-  public void handlerGroup() {
-    final long[] result = new long[2];
-    queue.addHandlerGroup(
-        "group",
-        new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
-            result[0] = payload),
-        new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
-            result[1] = payload)
-    );
-
-    queue.fireEvent(EVENT3, 23L);
-    queue.fireEvent(EVENT4, 42L);
-
-    queue.processAll(1000);
-
-    Assert.assertEquals(23, result[0]);
-    Assert.assertEquals(42, result[1]);
-
-    Set<String> eventQueueThreadNames =
-        Thread.getAllStackTraces().keySet()
-            .stream()
-            .filter(t -> t.getName().startsWith(SingleThreadExecutor
-                .THREAD_NAME_PREFIX))
-            .map(Thread::getName)
-            .collect(Collectors.toSet());
-    System.out.println(eventQueueThreadNames);
-    Assert.assertEquals(1, eventQueueThreadNames.size());
-
-  }
-
 }

+ 94 - 13
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java

@@ -21,8 +21,13 @@ import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.MetricsAsserts;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,6 +51,7 @@ public class TestEventWatcher {
 
   @Before
   public void startLeaseManager() {
+    DefaultMetricsSystem.instance();
     leaseManager = new LeaseManager<>(2000l);
     leaseManager.start();
   }
@@ -53,12 +59,12 @@ public class TestEventWatcher {
   @After
   public void stopLeaseManager() {
     leaseManager.shutdown();
+    DefaultMetricsSystem.shutdown();
   }
 
 
   @Test
   public void testEventHandling() throws InterruptedException {
-
     EventQueue queue = new EventQueue();
 
     EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
@@ -139,26 +145,101 @@ public class TestEventWatcher {
     Assert.assertEquals(0, c1todo.size());
     Assert.assertFalse(replicationWatcher.contains(event1));
 
+  }
+
+  @Test
+  public void testMetrics() throws InterruptedException {
+
+    DefaultMetricsSystem.initialize("test");
+
+    EventQueue queue = new EventQueue();
+
+    EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+        replicationWatcher = createEventWatcher();
+
+    EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
+        new EventHandlerStub<>();
+
+    queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+    replicationWatcher.start(queue);
+
+    //send 3 event to track 3 in-progress activity
+    UnderreplicatedEvent event1 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    UnderreplicatedEvent event2 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C2");
+
+    UnderreplicatedEvent event3 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event2);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event3);
+
+    //1st event is completed, don't need to track any more
+    ReplicationCompletedEvent event1Completed =
+        new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
+
+    queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
+
+
+    Thread.sleep(2200l);
+
+    //until now: 3 in-progress activities are tracked with three
+    // UnderreplicatedEvents. The first one is completed, the remaining two
+    // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms.
 
+    EventWatcherMetrics metrics = replicationWatcher.getMetrics();
+
+    //3 events are received
+    Assert.assertEquals(3, metrics.getTrackedEvents().value());
+
+    //one is finished. doesn't need to be resent
+    Assert.assertEquals(1, metrics.getCompletedEvents().value());
+
+    //Other two are timed out and resent
+    Assert.assertEquals(2, metrics.getTimedOutEvents().value());
+
+    DefaultMetricsSystem.shutdown();
   }
 
   private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
   createEventWatcher() {
-    return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>(
-        WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
+    return new CommandWatcherExample(WATCH_UNDER_REPLICATED,
+        REPLICATION_COMPLETED, leaseManager);
+  }
 
-      @Override
-      void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
-        publisher.fireEvent(UNDER_REPLICATED, payload);
-      }
+  private class CommandWatcherExample
+      extends EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> {
 
-      @Override
-      void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
-        //Good job. We did it.
-      }
-    };
+    public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent,
+        Event<ReplicationCompletedEvent> completionEvent,
+        LeaseManager<UUID> leaseManager) {
+      super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
+    }
+
+    @Override
+    void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+      publisher.fireEvent(UNDER_REPLICATED, payload);
+    }
+
+    @Override
+    void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+      //Good job. We did it.
+    }
+
+    @Override
+    public EventWatcherMetrics getMetrics() {
+      return super.getMetrics();
+    }
   }
 
+  ;
+
   private static class ReplicationCompletedEvent
       implements IdentifiableEventPayload {
 
@@ -217,4 +298,4 @@ public class TestEventWatcher {
     }
   }
 
-}
+}

+ 0 - 91
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java

@@ -36,19 +36,9 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,87 +57,6 @@ public final class AMRMClientUtils {
   private AMRMClientUtils() {
   }
 
-  /**
-   * Handle ApplicationNotRegistered exception and re-register.
-   *
-   * @param appId application Id
-   * @param rmProxy RM proxy instance
-   * @param registerRequest the AM re-register request
-   * @throws YarnException if re-register fails
-   */
-  public static void handleNotRegisteredExceptionAndReRegister(
-      ApplicationId appId, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest) throws YarnException {
-    LOG.info("App attempt {} not registered, most likely due to RM failover. "
-        + " Trying to re-register.", appId);
-    try {
-      rmProxy.registerApplicationMaster(registerRequest);
-    } catch (Exception e) {
-      if (e instanceof InvalidApplicationMasterRequestException
-          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
-        LOG.info("Concurrent thread successfully registered, moving on.");
-      } else {
-        LOG.error("Error trying to re-register AM", e);
-        throw new YarnException(e);
-      }
-    }
-  }
-
-  /**
-   * Helper method for client calling ApplicationMasterProtocol.allocate that
-   * handles re-register if RM fails over.
-   *
-   * @param request allocate request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return allocate response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
-      ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.allocate(request);
-    } catch (ApplicationMasterNotRegisteredException e) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // reset responseId after re-register
-      request.setResponseId(0);
-      // retry allocate
-      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
-   * Helper method for client calling
-   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
-   * if RM fails over.
-   *
-   * @param request finishApplicationMaster request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return finishApplicationMaster response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static FinishApplicationMasterResponse finishAMWithReRegister(
-      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.finishApplicationMaster(request);
-    } catch (ApplicationMasterNotRegisteredException ex) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // retry finishAM after re-register
-      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
   /**
    * Create a proxy for the specified protocol.
    *

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java

@@ -147,6 +147,11 @@ public class AMRMClientRelayer extends AbstractService
     super.serviceStop();
   }
 
+  public void setAMRegistrationRequest(
+      RegisterApplicationMasterRequest registerRequest) {
+    this.amRegistrationRequest = registerRequest;
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
@@ -259,8 +264,10 @@ public class AMRMClientRelayer extends AbstractService
           }
         }
 
-        // re register with RM, then retry allocate recursively
+        // re-register with RM, then retry allocate recursively
         registerApplicationMaster(this.amRegistrationRequest);
+        // Reset responseId after re-register
+        allocateRequest.setResponseId(0);
         return allocate(allocateRequest);
       }
 

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -385,4 +386,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
     return this.unmanagedAppMasterMap.containsKey(uamId);
   }
 
+  /**
+   * Return the rmProxy relayer of an UAM.
+   *
+   * @param uamId uam Id
+   * @return the rmProxy relayer
+   * @throws YarnException if fails
+   */
+  public AMRMClientRelayer getAMRMClientRelayer(String uamId)
+      throws YarnException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
+  }
+
 }

+ 23 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
@@ -90,7 +91,7 @@ public class UnmanagedApplicationManager {
 
   private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
   private AMRequestHandlerThread handlerThread;
-  private ApplicationMasterProtocol rmProxy;
+  private AMRMClientRelayer rmProxyRelayer;
   private ApplicationId applicationId;
   private String submitter;
   private String appNameSuffix;
@@ -138,7 +139,7 @@ public class UnmanagedApplicationManager {
     this.appNameSuffix = appNameSuffix;
     this.handlerThread = new AMRequestHandlerThread();
     this.requestQueue = new LinkedBlockingQueue<>();
-    this.rmProxy = null;
+    this.rmProxyRelayer = null;
     this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -190,8 +191,9 @@ public class UnmanagedApplicationManager {
       throws IOException {
     this.userUgi = UserGroupInformation.createProxyUser(
         this.applicationId.toString(), UserGroupInformation.getCurrentUser());
-    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
-        this.userUgi, amrmToken);
+    this.rmProxyRelayer =
+        new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
+            this.conf, this.userUgi, amrmToken));
   }
 
   /**
@@ -209,19 +211,18 @@ public class UnmanagedApplicationManager {
     // Save the register request for re-register later
     this.registerRequest = request;
 
-    // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
-    // We do not expect application already registered exception here
     LOG.info("Registering the Unmanaged application master {}",
         this.applicationId);
     RegisterApplicationMasterResponse response =
-        this.rmProxy.registerApplicationMaster(this.registerRequest);
+        this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
+    this.lastResponseId = 0;
 
     for (Container container : response.getContainersFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing running container "
+      LOG.debug("RegisterUAM returned existing running container "
           + container.getId());
     }
     for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing NM token for node "
+      LOG.debug("RegisterUAM returned existing NM token for node "
           + nmToken.getNodeId());
     }
 
@@ -249,7 +250,7 @@ public class UnmanagedApplicationManager {
 
     this.handlerThread.shutdown();
 
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
         // blocked and retrying. Return a dummy response in this case.
@@ -261,8 +262,7 @@ public class UnmanagedApplicationManager {
             + "be called before createAndRegister");
       }
     }
-    return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
-        this.registerRequest, this.applicationId);
+    return this.rmProxyRelayer.finishApplicationMaster(request);
   }
 
   /**
@@ -308,7 +308,7 @@ public class UnmanagedApplicationManager {
     //
     // In case 2, we have already save the allocate request above, so if the
     // registration succeed later, no request is lost.
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");
@@ -328,6 +328,15 @@ public class UnmanagedApplicationManager {
     return this.applicationId;
   }
 
+  /**
+   * Returns the rmProxy relayer of this UAM.
+   *
+   * @return rmProxy relayer of the UAM
+   */
+  public AMRMClientRelayer getAMRMClientRelayer() {
+    return this.rmProxyRelayer;
+  }
+
   /**
    * Returns RM proxy for the specified protocol type. Unit test cases can
    * override this method and return mock proxy instances.
@@ -592,10 +601,7 @@ public class UnmanagedApplicationManager {
           }
 
           request.setResponseId(lastResponseId);
-
-          AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
-              request, rmProxy, registerRequest, applicationId);
-
+          AllocateResponse response = rmProxyRelayer.allocate(request);
           if (response == null) {
             throw new YarnException("Null allocateResponse from allocate");
           }

+ 3 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -251,8 +251,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Registering application attempt: " + attemptId);
 
-    shouldReRegisterNext = false;
-
     List<Container> containersFromPreviousAttempt = null;
 
     synchronized (applicationContainerIdMap) {
@@ -266,7 +264,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
             containersFromPreviousAttempt.add(Container.newInstance(containerId,
                 null, null, null, null, null));
           }
-        } else {
+        } else if (!shouldReRegisterNext) {
           throw new InvalidApplicationMasterRequestException(
               AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
         }
@@ -276,6 +274,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       }
     }
 
+    shouldReRegisterNext = false;
+
     // Make sure we wait for certain test cases last in the method
     synchronized (syncObj) {
       syncObj.notifyAll();
@@ -339,13 +339,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     validateRunning();
 
-    if (request.getAskList() != null && request.getAskList().size() > 0
-        && request.getReleaseList() != null
-        && request.getReleaseList().size() > 0) {
-      Assert.fail("The mock RM implementation does not support receiving "
-          + "askList and releaseList in the same heartbeat");
-    }
-
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Allocate from application attempt: " + attemptId);
 

+ 123 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -62,14 +62,15 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
@@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public static final String NMSS_REG_RESPONSE_KEY =
       NMSS_CLASS_PREFIX + "registerResponse";
 
-  /*
+  /**
    * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
-   * Registry. Otherwise if NM recovery is enabled, the UAM token are store in
+   * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
    * local NMSS instead under this directory name.
    */
   public static final String NMSS_SECONDARY_SC_PREFIX =
@@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * The home sub-cluster is the sub-cluster where the AM container is running
    * in.
    */
-  private ApplicationMasterProtocol homeRM;
+  private AMRMClientRelayer homeRMRelayer;
   private SubClusterId homeSubClusterId;
+  private volatile int lastHomeResponseId;
+
+  /**
+   * A flag for work preserving NM restart. If we just recovered, we need to
+   * generate an {@link ApplicationMasterNotRegisteredException} exception back
+   * to AM (similar to what RM will do after its restart/fail-over) in its next
+   * allocate to trigger AM re-register (which we will shield from RM and just
+   * return our saved register response) and a full pending requests re-send, so
+   * that all the {@link AMRMClientRelayer} will be re-populated with all
+   * pending requests.
+   *
+   * TODO: When split-merge is not idempotent, this can lead to some
+   * over-allocation without a full cancel to RM.
+   */
+  private volatile boolean justRecovered;
 
   /**
    * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private UnmanagedAMPoolManager uamPool;
 
+  /**
+   * The rmProxy relayers for secondary sub-clusters that keep track of all
+   * pending requests.
+   */
+  private Map<String, AMRMClientRelayer> secondaryRelayers;
+
   /** Thread pool used for asynchronous operations. */
   private ExecutorService threadpool;
 
@@ -186,8 +208,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     this.asyncResponseSink = new ConcurrentHashMap<>();
     this.threadpool = Executors.newCachedThreadPool();
     this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+    this.secondaryRelayers = new ConcurrentHashMap<>();
     this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
+    this.lastHomeResponseId = Integer.MAX_VALUE;
+    this.justRecovered = false;
   }
 
   /**
@@ -224,8 +249,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
-    this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
-        this.appOwner);
+    this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+        ApplicationMasterProtocol.class, this.appOwner));
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -240,13 +265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   @Override
   public void recover(Map<String, byte[]> recoveredDataMap) {
     super.recover(recoveredDataMap);
-    LOG.info("Recovering data for FederationInterceptor");
+    ApplicationAttemptId attemptId =
+        getApplicationContext().getApplicationAttemptId();
+    LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
     if (recoveredDataMap == null) {
       return;
     }
-
-    ApplicationAttemptId attemptId =
-        getApplicationContext().getApplicationAttemptId();
     try {
       if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
         RegisterApplicationMasterRequestProto pb =
@@ -255,6 +279,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         this.amRegistrationRequest =
             new RegisterApplicationMasterRequestPBImpl(pb);
         LOG.info("amRegistrationRequest recovered for {}", attemptId);
+
+        // Give the register request to homeRMRelayer for future re-registration
+        this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
       }
       if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
         RegisterApplicationMasterResponseProto pb =
@@ -263,6 +290,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         this.amRegistrationResponse =
             new RegisterApplicationMasterResponsePBImpl(pb);
         LOG.info("amRegistrationResponse recovered for {}", attemptId);
+        // Trigger re-register and full pending re-send only if we have a
+        // saved register response. This should always be true though.
+        this.justRecovered = true;
       }
 
       // Recover UAM amrmTokens from registry or NMSS
@@ -309,6 +339,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               getApplicationContext().getUser(), this.homeSubClusterId.getId(),
               entry.getValue());
 
+          this.secondaryRelayers.put(subClusterId.getId(),
+              this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
           RegisterApplicationMasterResponse response =
               this.uamPool.registerApplicationMaster(subClusterId.getId(),
                   this.amRegistrationRequest);
@@ -436,7 +469,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
      * the other sub-cluster RM will be done lazily as needed later.
      */
     this.amRegistrationResponse =
-        this.homeRM.registerApplicationMaster(request);
+        this.homeRMRelayer.registerApplicationMaster(request);
     if (this.amRegistrationResponse
         .getContainersFromPreviousAttempts() != null) {
       cacheAllocatedContainers(
@@ -495,6 +528,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     Preconditions.checkArgument(this.policyInterpreter != null,
         "Allocate should be called after registerApplicationMaster");
 
+    if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
+      // Save the responseId home RM is expecting
+      this.lastHomeResponseId = request.getResponseId();
+
+      throw new ApplicationMasterNotRegisteredException(
+          "AMRMProxy just restarted and recovered for "
+              + getApplicationContext().getApplicationAttemptId()
+              + ". AM should re-register and full re-send pending requests.");
+    }
+
+    // Override responseId in the request in two cases:
+    //
+    // 1. After we just recovered after an NM restart and AM's responseId is
+    // reset due to the exception we generate. We need to override the
+    // responseId to the one homeRM expects.
+    //
+    // 2. After homeRM fail-over, the allocate response with reseted responseId
+    // might not be returned successfully back to AM because of RPC connection
+    // timeout between AM and AMRMProxy. In this case, we remember and reset the
+    // responseId for AM.
+    if (this.justRecovered
+        || request.getResponseId() > this.lastHomeResponseId) {
+      LOG.warn("Setting allocate responseId for {} from {} to {}",
+          getApplicationContext().getApplicationAttemptId(),
+          request.getResponseId(), this.lastHomeResponseId);
+      request.setResponseId(this.lastHomeResponseId);
+    }
+
     try {
       // Split the heart beat request into multiple requests, one for each
       // sub-cluster RM that is used by this application.
@@ -509,10 +570,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           sendRequestsToSecondaryResourceManagers(requests);
 
       // Send the request to the home RM and get the response
-      AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
-          requests.get(this.homeSubClusterId), this.homeRM,
-          this.amRegistrationRequest,
-          getApplicationContext().getApplicationAttemptId().getApplicationId());
+      AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
+      LOG.info("{} heartbeating to home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeRequest.getResponseId());
+
+      AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
+
+      // Reset the flag after the first successful homeRM allocate response,
+      // otherwise keep overriding the responseId of new allocate request
+      if (this.justRecovered) {
+        this.justRecovered = false;
+      }
 
       // Notify policy of home response
       try {
@@ -540,6 +609,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             newRegistrations.getSuccessfulRegistrations());
       }
 
+      LOG.info("{} heartbeat response from home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeResponse.getResponseId());
+
+      // Update lastHomeResponseId in three cases:
+      // 1. The normal responseId increments
+      // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
+      // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
+      // 3. lastHomeResponseId == MAX_INT. This is the initial case or
+      // responseId about to overflow and wrap around
+      if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
+          || homeResponse.getResponseId() == 1
+          || this.lastHomeResponseId == Integer.MAX_VALUE) {
+        this.lastHomeResponseId = homeResponse.getResponseId();
+      }
+
       // return the final response to the application master.
       return homeResponse;
     } catch (IOException ex) {
@@ -584,6 +669,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             try {
               uamResponse =
                   uamPool.finishApplicationMaster(subClusterId, finishRequest);
+
+              if (uamResponse.getIsUnregistered()) {
+                secondaryRelayers.remove(subClusterId);
+
+                if (getNMStateStore() != null) {
+                  getNMStateStore().removeAMRMProxyAppContextEntry(
+                      getApplicationContext().getApplicationAttemptId(),
+                      NMSS_SECONDARY_SC_PREFIX + subClusterId);
+                }
+              }
             } catch (Throwable e) {
               LOG.warn("Failed to finish unmanaged application master: "
                   + "RM address: " + subClusterId + " ApplicationId: "
@@ -600,9 +695,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     // asynchronously by other sub-cluster resource managers, send the same
     // request to the home resource manager on this thread.
     FinishApplicationMasterResponse homeResponse =
-        AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
-            this.amRegistrationRequest, getApplicationContext()
-                .getApplicationAttemptId().getApplicationId());
+        this.homeRMRelayer.finishApplicationMaster(request);
 
     if (subClusterIds.size() > 0) {
       // Wait for other sub-cluster resource managers to return the
@@ -621,10 +714,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           if (uamResponse.getResponse() == null
               || !uamResponse.getResponse().getIsUnregistered()) {
             failedToUnRegister = true;
-          } else if (getNMStateStore() != null) {
-            getNMStateStore().removeAMRMProxyAppContextEntry(
-                getApplicationContext().getApplicationAttemptId(),
-                NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
           }
         } catch (Throwable e) {
           failedToUnRegister = true;
@@ -689,6 +778,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return this.registryClient;
   }
 
+  @VisibleForTesting
+  protected int getLastHomeResponseId() {
+    return this.lastHomeResponseId;
+  }
+
   /**
    * Create the UAM pool manager for secondary sub-clsuters. For unit test to
    * override.
@@ -800,6 +894,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                     getApplicationContext().getUser(), homeSubClusterId.getId(),
                     amrmToken);
 
+                secondaryRelayers.put(subClusterId.getId(),
+                    uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
                 response = uamPool.registerApplicationMaster(
                     subClusterId.getId(), amRegistrationRequest);
 
@@ -1098,7 +1195,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   token = uamPool.launchUAM(subClusterId, config,
                       appContext.getApplicationAttemptId().getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString(), registryClient != null);
+                      homeSubClusterId.toString(), true);
+
+                  secondaryRelayers.put(subClusterId,
+                      uamPool.getAMRMClientRelayer(subClusterId));
 
                   uamResponse = uamPool.registerApplicationMaster(subClusterId,
                       registerRequest);

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -536,6 +537,7 @@ public abstract class BaseAMRMProxyTest {
     capability.setMemorySize(memory);
     capability.setVirtualCores(vCores);
     req.setCapability(capability);
+    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
     if (labelExpression != null) {
       req.setNodeLabelExpression(labelExpression);
     }

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
@@ -516,6 +517,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        Assert.assertEquals(Integer.MAX_VALUE,
+            interceptor.getLastHomeResponseId());
+
+        // The first allocate call expects a fail-over exception and re-register
+        int responseId = 10;
+        AllocateRequest allocateRequest =
+            Records.newRecord(AllocateRequest.class);
+        allocateRequest.setResponseId(responseId);
+        try {
+          interceptor.allocate(allocateRequest);
+          Assert.fail("Expecting an ApplicationMasterNotRegisteredException  "
+              + " after FederationInterceptor restarts and recovers");
+        } catch (ApplicationMasterNotRegisteredException e) {
+        }
+        interceptor.registerApplicationMaster(registerReq);
+        Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
 
         // Release all containers
         releaseContainersAndAssert(containers);