|
@@ -22,6 +22,9 @@ import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.PrintStream;
|
|
import java.io.PrintStream;
|
|
|
|
+import java.net.ConnectException;
|
|
|
|
+import java.net.SocketException;
|
|
|
|
+import java.net.SocketTimeoutException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
@@ -74,9 +77,11 @@ import org.codehaus.jettison.json.JSONObject;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.sun.jersey.api.client.Client;
|
|
import com.sun.jersey.api.client.Client;
|
|
import com.sun.jersey.api.client.ClientHandlerException;
|
|
import com.sun.jersey.api.client.ClientHandlerException;
|
|
|
|
+import com.sun.jersey.api.client.ClientRequest;
|
|
import com.sun.jersey.api.client.ClientResponse;
|
|
import com.sun.jersey.api.client.ClientResponse;
|
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
|
import com.sun.jersey.api.client.WebResource;
|
|
import com.sun.jersey.api.client.WebResource;
|
|
|
|
+import com.sun.jersey.api.client.filter.ClientFilter;
|
|
|
|
|
|
@Public
|
|
@Public
|
|
@Evolving
|
|
@Evolving
|
|
@@ -97,14 +102,27 @@ public class LogsCLI extends Configured implements Tool {
|
|
= "show_container_log_info";
|
|
= "show_container_log_info";
|
|
private static final String OUT_OPTION = "out";
|
|
private static final String OUT_OPTION = "out";
|
|
private static final String SIZE_OPTION = "size";
|
|
private static final String SIZE_OPTION = "size";
|
|
|
|
+ private static final String CLIENT_MAX_RETRY_OPTION = "client_max_retries";
|
|
|
|
+ private static final String CLIENT_RETRY_INTERVAL_OPTION
|
|
|
|
+ = "client_retry_interval_ms";
|
|
public static final String HELP_CMD = "help";
|
|
public static final String HELP_CMD = "help";
|
|
|
|
+
|
|
private PrintStream outStream = System.out;
|
|
private PrintStream outStream = System.out;
|
|
private YarnClient yarnClient = null;
|
|
private YarnClient yarnClient = null;
|
|
|
|
+ private Client webServiceClient = null;
|
|
|
|
+
|
|
|
|
+ private static final int DEFAULT_MAX_RETRIES = 30;
|
|
|
|
+ private static final long DEFAULT_RETRY_INTERVAL = 1000;
|
|
|
|
+
|
|
|
|
+ @Private
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ ClientConnectionRetry connectionRetry;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public int run(String[] args) throws Exception {
|
|
public int run(String[] args) throws Exception {
|
|
try {
|
|
try {
|
|
yarnClient = createYarnClient();
|
|
yarnClient = createYarnClient();
|
|
|
|
+ webServiceClient = Client.create();
|
|
return runCommand(args);
|
|
return runCommand(args);
|
|
} finally {
|
|
} finally {
|
|
if (yarnClient != null) {
|
|
if (yarnClient != null) {
|
|
@@ -139,6 +157,8 @@ public class LogsCLI extends Configured implements Tool {
|
|
List<String> amContainersList = new ArrayList<String>();
|
|
List<String> amContainersList = new ArrayList<String>();
|
|
String localDir = null;
|
|
String localDir = null;
|
|
long bytes = Long.MAX_VALUE;
|
|
long bytes = Long.MAX_VALUE;
|
|
|
|
+ int maxRetries = DEFAULT_MAX_RETRIES;
|
|
|
|
+ long retryInterval = DEFAULT_RETRY_INTERVAL;
|
|
try {
|
|
try {
|
|
CommandLine commandLine = parser.parse(opts, args, false);
|
|
CommandLine commandLine = parser.parse(opts, args, false);
|
|
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
|
|
appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION);
|
|
@@ -170,6 +190,14 @@ public class LogsCLI extends Configured implements Tool {
|
|
if (commandLine.hasOption(SIZE_OPTION)) {
|
|
if (commandLine.hasOption(SIZE_OPTION)) {
|
|
bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION));
|
|
bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION));
|
|
}
|
|
}
|
|
|
|
+ if (commandLine.hasOption(CLIENT_MAX_RETRY_OPTION)) {
|
|
|
|
+ maxRetries = Integer.parseInt(commandLine.getOptionValue(
|
|
|
|
+ CLIENT_MAX_RETRY_OPTION));
|
|
|
|
+ }
|
|
|
|
+ if (commandLine.hasOption(CLIENT_RETRY_INTERVAL_OPTION)) {
|
|
|
|
+ retryInterval = Long.parseLong(commandLine.getOptionValue(
|
|
|
|
+ CLIENT_RETRY_INTERVAL_OPTION));
|
|
|
|
+ }
|
|
} catch (ParseException e) {
|
|
} catch (ParseException e) {
|
|
System.err.println("options parsing failed: " + e.getMessage());
|
|
System.err.println("options parsing failed: " + e.getMessage());
|
|
printHelpMessage(printOpts);
|
|
printHelpMessage(printOpts);
|
|
@@ -231,6 +259,11 @@ public class LogsCLI extends Configured implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Set up Retry WebService Client
|
|
|
|
+ connectionRetry = new ClientConnectionRetry(maxRetries, retryInterval);
|
|
|
|
+ ClientJerseyRetryFilter retryFilter = new ClientJerseyRetryFilter();
|
|
|
|
+ webServiceClient.addFilter(retryFilter);
|
|
|
|
+
|
|
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
|
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
|
logCliHelper.setConf(getConf());
|
|
logCliHelper.setConf(getConf());
|
|
|
|
|
|
@@ -341,7 +374,6 @@ public class LogsCLI extends Configured implements Tool {
|
|
protected List<JSONObject> getAMContainerInfoForRMWebService(
|
|
protected List<JSONObject> getAMContainerInfoForRMWebService(
|
|
Configuration conf, String appId) throws ClientHandlerException,
|
|
Configuration conf, String appId) throws ClientHandlerException,
|
|
UniformInterfaceException, JSONException {
|
|
UniformInterfaceException, JSONException {
|
|
- Client webServiceClient = Client.create();
|
|
|
|
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
|
|
String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf);
|
|
|
|
|
|
WebResource webResource = webServiceClient.resource(webAppAddress);
|
|
WebResource webResource = webServiceClient.resource(webAppAddress);
|
|
@@ -363,7 +395,6 @@ public class LogsCLI extends Configured implements Tool {
|
|
private List<JSONObject> getAMContainerInfoForAHSWebService(
|
|
private List<JSONObject> getAMContainerInfoForAHSWebService(
|
|
Configuration conf, String appId) throws ClientHandlerException,
|
|
Configuration conf, String appId) throws ClientHandlerException,
|
|
UniformInterfaceException, JSONException {
|
|
UniformInterfaceException, JSONException {
|
|
- Client webServiceClient = Client.create();
|
|
|
|
String webAppAddress =
|
|
String webAppAddress =
|
|
WebAppUtils.getHttpSchemePrefix(conf)
|
|
WebAppUtils.getHttpSchemePrefix(conf)
|
|
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
|
|
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
|
|
@@ -416,7 +447,6 @@ public class LogsCLI extends Configured implements Tool {
|
|
throws IOException {
|
|
throws IOException {
|
|
List<Pair<PerContainerLogFileInfo, String>> logFileInfos
|
|
List<Pair<PerContainerLogFileInfo, String>> logFileInfos
|
|
= new ArrayList<>();
|
|
= new ArrayList<>();
|
|
- Client webServiceClient = Client.create();
|
|
|
|
try {
|
|
try {
|
|
WebResource webResource = webServiceClient
|
|
WebResource webResource = webServiceClient
|
|
.resource(WebAppUtils.getHttpSchemePrefix(conf) + nodeHttpAddress);
|
|
.resource(WebAppUtils.getHttpSchemePrefix(conf) + nodeHttpAddress);
|
|
@@ -489,7 +519,6 @@ public class LogsCLI extends Configured implements Tool {
|
|
lastModificationTime);
|
|
lastModificationTime);
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
@Private
|
|
@Private
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public int printContainerLogsFromRunningApplication(Configuration conf,
|
|
public int printContainerLogsFromRunningApplication(Configuration conf,
|
|
@@ -520,7 +549,6 @@ public class LogsCLI extends Configured implements Tool {
|
|
ContainerLogsRequest newOptions = new ContainerLogsRequest(request);
|
|
ContainerLogsRequest newOptions = new ContainerLogsRequest(request);
|
|
newOptions.setLogTypes(matchedFiles);
|
|
newOptions.setLogTypes(matchedFiles);
|
|
|
|
|
|
- Client webServiceClient = Client.create();
|
|
|
|
boolean foundAnyLogs = false;
|
|
boolean foundAnyLogs = false;
|
|
byte[] buffer = new byte[65536];
|
|
byte[] buffer = new byte[65536];
|
|
for (String logFile : newOptions.getLogTypes()) {
|
|
for (String logFile : newOptions.getLogTypes()) {
|
|
@@ -796,6 +824,10 @@ public class LogsCLI extends Configured implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Create Command Options.
|
|
|
|
+ * @return the command options
|
|
|
|
+ */
|
|
private Options createCommandOpts() {
|
|
private Options createCommandOpts() {
|
|
Options opts = new Options();
|
|
Options opts = new Options();
|
|
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
|
|
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
|
|
@@ -858,6 +890,13 @@ public class LogsCLI extends Configured implements Tool {
|
|
opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes "
|
|
opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes "
|
|
+ "or the last 'n' bytes. Use negative values as bytes to read from "
|
|
+ "or the last 'n' bytes. Use negative values as bytes to read from "
|
|
+ "the end and positive values as bytes to read from the beginning.");
|
|
+ "the end and positive values as bytes to read from the beginning.");
|
|
|
|
+ opts.addOption(CLIENT_MAX_RETRY_OPTION, true, "Set max retry number for a"
|
|
|
|
+ + " retry client to get the container logs for the running "
|
|
|
|
+ + "applications. Use a negative value to make retry forever. "
|
|
|
|
+ + "The default value is 30.");
|
|
|
|
+ opts.addOption(CLIENT_RETRY_INTERVAL_OPTION, true,
|
|
|
|
+ "Work with --client_max_retries to create a retry client. "
|
|
|
|
+ + "The default value is 1000.");
|
|
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
|
|
opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID");
|
|
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
|
|
opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID");
|
|
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
|
|
opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address");
|
|
@@ -865,9 +904,17 @@ public class LogsCLI extends Configured implements Tool {
|
|
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
|
|
opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers");
|
|
opts.getOption(OUT_OPTION).setArgName("Local Directory");
|
|
opts.getOption(OUT_OPTION).setArgName("Local Directory");
|
|
opts.getOption(SIZE_OPTION).setArgName("size");
|
|
opts.getOption(SIZE_OPTION).setArgName("size");
|
|
|
|
+ opts.getOption(CLIENT_MAX_RETRY_OPTION).setArgName("Max Retries");
|
|
|
|
+ opts.getOption(CLIENT_RETRY_INTERVAL_OPTION)
|
|
|
|
+ .setArgName("Retry Interval");
|
|
return opts;
|
|
return opts;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Create Print options for helper message.
|
|
|
|
+ * @param commandOpts the options
|
|
|
|
+ * @return the print options
|
|
|
|
+ */
|
|
private Options createPrintOpts(Options commandOpts) {
|
|
private Options createPrintOpts(Options commandOpts) {
|
|
Options printOpts = new Options();
|
|
Options printOpts = new Options();
|
|
printOpts.addOption(commandOpts.getOption(HELP_CMD));
|
|
printOpts.addOption(commandOpts.getOption(HELP_CMD));
|
|
@@ -883,6 +930,8 @@ public class LogsCLI extends Configured implements Tool {
|
|
printOpts.addOption(commandOpts.getOption(SIZE_OPTION));
|
|
printOpts.addOption(commandOpts.getOption(SIZE_OPTION));
|
|
printOpts.addOption(commandOpts.getOption(
|
|
printOpts.addOption(commandOpts.getOption(
|
|
PER_CONTAINER_LOG_FILES_REGEX_OPTION));
|
|
PER_CONTAINER_LOG_FILES_REGEX_OPTION));
|
|
|
|
+ printOpts.addOption(commandOpts.getOption(CLIENT_MAX_RETRY_OPTION));
|
|
|
|
+ printOpts.addOption(commandOpts.getOption(CLIENT_RETRY_INTERVAL_OPTION));
|
|
return printOpts;
|
|
return printOpts;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1285,4 +1334,120 @@ public class LogsCLI extends Configured implements Tool {
|
|
return nodeInfo.has("nodeHTTPAddress") ?
|
|
return nodeInfo.has("nodeHTTPAddress") ?
|
|
nodeInfo.getString("nodeHTTPAddress") : null;
|
|
nodeInfo.getString("nodeHTTPAddress") : null;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Class to handle retry
|
|
|
|
+ static class ClientConnectionRetry {
|
|
|
|
+
|
|
|
|
+ // maxRetries < 0 means keep trying
|
|
|
|
+ @Private
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public int maxRetries;
|
|
|
|
+
|
|
|
|
+ @Private
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public long retryInterval;
|
|
|
|
+
|
|
|
|
+ // Indicates if retries happened last time. Only tests should read it.
|
|
|
|
+ // In unit tests, retryOn() calls should _not_ be concurrent.
|
|
|
|
+ private boolean retried = false;
|
|
|
|
+
|
|
|
|
+ @Private
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ boolean getRetired() {
|
|
|
|
+ return retried;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Constructor with default retry settings
|
|
|
|
+ public ClientConnectionRetry(int inputMaxRetries,
|
|
|
|
+ long inputRetryInterval) {
|
|
|
|
+ this.maxRetries = inputMaxRetries;
|
|
|
|
+ this.retryInterval = inputRetryInterval;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Object retryOn(ClientRetryOp op)
|
|
|
|
+ throws RuntimeException, IOException {
|
|
|
|
+ int leftRetries = maxRetries;
|
|
|
|
+ retried = false;
|
|
|
|
+
|
|
|
|
+ // keep trying
|
|
|
|
+ while (true) {
|
|
|
|
+ try {
|
|
|
|
+ // try perform the op, if fail, keep retrying
|
|
|
|
+ return op.run();
|
|
|
|
+ } catch (IOException | RuntimeException e) {
|
|
|
|
+ // break if there's no retries left
|
|
|
|
+ if (leftRetries == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ if (op.shouldRetryOn(e)) {
|
|
|
|
+ logException(e, leftRetries);
|
|
|
|
+ } else {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (leftRetries > 0) {
|
|
|
|
+ leftRetries--;
|
|
|
|
+ }
|
|
|
|
+ retried = true;
|
|
|
|
+ try {
|
|
|
|
+ // sleep for the given time interval
|
|
|
|
+ Thread.sleep(retryInterval);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ System.out.println("Client retry sleep interrupted! ");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ throw new RuntimeException("Connection retries limit exceeded.");
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private void logException(Exception e, int leftRetries) {
|
|
|
|
+ if (leftRetries > 0) {
|
|
|
|
+ System.out.println("Exception caught by ClientConnectionRetry,"
|
|
|
|
+ + " will try " + leftRetries + " more time(s).\nMessage: "
|
|
|
|
+ + e.getMessage());
|
|
|
|
+ } else {
|
|
|
|
+ // note that maxRetries may be -1 at the very beginning
|
|
|
|
+ System.out.println("ConnectionException caught by ClientConnectionRetry,"
|
|
|
|
+ + " will keep retrying.\nMessage: "
|
|
|
|
+ + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private class ClientJerseyRetryFilter extends ClientFilter {
|
|
|
|
+ @Override
|
|
|
|
+ public ClientResponse handle(final ClientRequest cr)
|
|
|
|
+ throws ClientHandlerException {
|
|
|
|
+ // Set up the retry operation
|
|
|
|
+ ClientRetryOp jerseyRetryOp = new ClientRetryOp() {
|
|
|
|
+ @Override
|
|
|
|
+ public Object run() {
|
|
|
|
+ // Try pass the request, if fail, keep retrying
|
|
|
|
+ return getNext().handle(cr);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean shouldRetryOn(Exception e) {
|
|
|
|
+ // Only retry on connection exceptions
|
|
|
|
+ return (e instanceof ClientHandlerException)
|
|
|
|
+ && (e.getCause() instanceof ConnectException ||
|
|
|
|
+ e.getCause() instanceof SocketTimeoutException ||
|
|
|
|
+ e.getCause() instanceof SocketException);
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ try {
|
|
|
|
+ return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ throw new ClientHandlerException("Jersey retry failed!\nMessage: "
|
|
|
|
+ + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Abstract class for an operation that should be retried by client
|
|
|
|
+ private static abstract class ClientRetryOp {
|
|
|
|
+ // The operation that should be retried
|
|
|
|
+ public abstract Object run() throws IOException;
|
|
|
|
+ // The method to indicate if we should retry given the incoming exception
|
|
|
|
+ public abstract boolean shouldRetryOn(Exception e);
|
|
|
|
+ }
|
|
}
|
|
}
|