|
@@ -50,7 +50,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
public class RMProxy<T> {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(RMProxy.class);
|
|
|
- protected static RMProxy INSTANCE;
|
|
|
|
|
|
protected RMProxy() {}
|
|
|
|
|
@@ -79,17 +78,17 @@ public class RMProxy<T> {
|
|
|
*/
|
|
|
@Private
|
|
|
protected static <T> T createRMProxy(final Configuration configuration,
|
|
|
- final Class<T> protocol) throws IOException {
|
|
|
+ final Class<T> protocol, RMProxy instance) throws IOException {
|
|
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
|
|
? (YarnConfiguration) configuration
|
|
|
: new YarnConfiguration(configuration);
|
|
|
RetryPolicy retryPolicy = createRetryPolicy(conf);
|
|
|
if (HAUtil.isHAEnabled(conf)) {
|
|
|
RMFailoverProxyProvider<T> provider =
|
|
|
- INSTANCE.createRMFailoverProxyProvider(conf, protocol);
|
|
|
+ instance.createRMFailoverProxyProvider(conf, protocol);
|
|
|
return (T) RetryProxy.create(protocol, provider, retryPolicy);
|
|
|
} else {
|
|
|
- InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
|
|
|
+ InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
|
|
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
|
|
|
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
|
@@ -159,25 +158,6 @@ public class RMProxy<T> {
|
|
|
return provider;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * A RetryPolicy to allow failing over upto the specified maximum time.
|
|
|
- */
|
|
|
- private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
|
|
|
- private long maxTime;
|
|
|
-
|
|
|
- FailoverUptoMaximumTimePolicy(long maxTime) {
|
|
|
- this.maxTime = maxTime;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
|
|
- boolean isIdempotentOrAtMostOnce) throws Exception {
|
|
|
- return System.currentTimeMillis() < maxTime
|
|
|
- ? RetryAction.FAILOVER_AND_RETRY
|
|
|
- : RetryAction.FAIL;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Fetch retry policy from Configuration
|
|
|
*/
|