|
@@ -132,6 +132,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
|
private static final FastDateFormat DATE_FORMAT =
|
|
private static final FastDateFormat DATE_FORMAT =
|
|
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
|
|
FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
|
|
private final Object lock = new Object();
|
|
private final Object lock = new Object();
|
|
|
|
+ private boolean initialized = false;
|
|
|
|
+ private SubsetConfiguration properties;
|
|
|
|
+ private Configuration conf;
|
|
private String source;
|
|
private String source;
|
|
private boolean ignoreError;
|
|
private boolean ignoreError;
|
|
private boolean allowAppend;
|
|
private boolean allowAppend;
|
|
@@ -163,63 +166,102 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
|
protected static FileSystem suppliedFilesystem = null;
|
|
protected static FileSystem suppliedFilesystem = null;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void init(SubsetConfiguration conf) {
|
|
|
|
- basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
|
|
|
|
- source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
|
|
|
|
- ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
|
|
|
|
- allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
|
|
|
|
|
|
+ public void init(SubsetConfiguration metrics2Properties) {
|
|
|
|
+ properties = metrics2Properties;
|
|
|
|
+ basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
|
|
|
|
+ source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
|
|
|
|
+ ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
|
|
|
|
+ allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
|
|
|
|
|
|
- Configuration configuration = loadConf();
|
|
|
|
-
|
|
|
|
- UserGroupInformation.setConfiguration(configuration);
|
|
|
|
|
|
+ conf = loadConf();
|
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
|
|
|
// Don't do secure setup if it's not needed.
|
|
// Don't do secure setup if it's not needed.
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
// Validate config so that we don't get an NPE
|
|
// Validate config so that we don't get an NPE
|
|
- checkForProperty(conf, KEYTAB_PROPERTY_KEY);
|
|
|
|
- checkForProperty(conf, USERNAME_PROPERTY_KEY);
|
|
|
|
|
|
+ checkForProperty(properties, KEYTAB_PROPERTY_KEY);
|
|
|
|
+ checkForProperty(properties, USERNAME_PROPERTY_KEY);
|
|
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
// Login as whoever we're supposed to be and let the hostname be pulled
|
|
// Login as whoever we're supposed to be and let the hostname be pulled
|
|
// from localhost. If security isn't enabled, this does nothing.
|
|
// from localhost. If security isn't enabled, this does nothing.
|
|
- SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
|
|
|
|
- conf.getString(USERNAME_PROPERTY_KEY));
|
|
|
|
|
|
+ SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
|
|
|
|
+ properties.getString(USERNAME_PROPERTY_KEY));
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
throw new MetricsException("Error logging in securely: ["
|
|
throw new MetricsException("Error logging in securely: ["
|
|
+ ex.toString() + "]", ex);
|
|
+ ex.toString() + "]", ex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Initialize the connection to HDFS and create the base directory. Also
|
|
|
|
+ * launch the flush thread.
|
|
|
|
+ */
|
|
|
|
+ private boolean initFs() {
|
|
|
|
+ boolean success = false;
|
|
|
|
|
|
- fileSystem = getFileSystem(configuration);
|
|
|
|
|
|
+ fileSystem = getFileSystem();
|
|
|
|
|
|
// This step isn't strictly necessary, but it makes debugging issues much
|
|
// This step isn't strictly necessary, but it makes debugging issues much
|
|
// easier. We try to create the base directory eagerly and fail with
|
|
// easier. We try to create the base directory eagerly and fail with
|
|
// copious debug info if it fails.
|
|
// copious debug info if it fails.
|
|
try {
|
|
try {
|
|
fileSystem.mkdirs(basePath);
|
|
fileSystem.mkdirs(basePath);
|
|
|
|
+ success = true;
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
- throw new MetricsException("Failed to create " + basePath + "["
|
|
|
|
- + SOURCE_KEY + "=" + source + ", "
|
|
|
|
- + IGNORE_ERROR_KEY + "=" + ignoreError + ", "
|
|
|
|
- + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
|
|
|
- + KEYTAB_PROPERTY_KEY + "="
|
|
|
|
- + conf.getString(KEYTAB_PROPERTY_KEY) + ", "
|
|
|
|
- + conf.getString(KEYTAB_PROPERTY_KEY) + "="
|
|
|
|
- + configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
|
|
|
|
- + USERNAME_PROPERTY_KEY + "="
|
|
|
|
- + conf.getString(USERNAME_PROPERTY_KEY) + ", "
|
|
|
|
- + conf.getString(USERNAME_PROPERTY_KEY) + "="
|
|
|
|
- + configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
|
|
|
|
- + "] -- " + ex.toString(), ex);
|
|
|
|
|
|
+ if (!ignoreError) {
|
|
|
|
+ throw new MetricsException("Failed to create " + basePath + "["
|
|
|
|
+ + SOURCE_KEY + "=" + source + ", "
|
|
|
|
+ + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
|
|
|
|
+ + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
|
|
|
|
+ + stringifySecurityProperty(USERNAME_PROPERTY_KEY)
|
|
|
|
+ + "] -- " + ex.toString(), ex);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- // If we're permitted to append, check if we actually can
|
|
|
|
- if (allowAppend) {
|
|
|
|
- allowAppend = checkAppend(fileSystem);
|
|
|
|
|
|
+ if (success) {
|
|
|
|
+ // If we're permitted to append, check if we actually can
|
|
|
|
+ if (allowAppend) {
|
|
|
|
+ allowAppend = checkAppend(fileSystem);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
|
}
|
|
}
|
|
|
|
|
|
- flushTimer = new Timer("RollingFileSystemSink Flusher", true);
|
|
|
|
|
|
+ return success;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Turn a security property into a nicely formatted set of <i>name=value</i>
|
|
|
|
+ * strings, allowing for either the property or the configuration not to be
|
|
|
|
+ * set.
|
|
|
|
+ *
|
|
|
|
+ * @param properties the sink properties
|
|
|
|
+ * @param conf the conf
|
|
|
|
+ * @param property the property to stringify
|
|
|
|
+ * @return the stringified property
|
|
|
|
+ */
|
|
|
|
+ private String stringifySecurityProperty(String property) {
|
|
|
|
+ String securityProperty;
|
|
|
|
+
|
|
|
|
+ if (properties.containsKey(property)) {
|
|
|
|
+ String propertyValue = properties.getString(property);
|
|
|
|
+ String confValue = conf.get(properties.getString(property));
|
|
|
|
+
|
|
|
|
+ if (confValue != null) {
|
|
|
|
+ securityProperty = property + "=" + propertyValue
|
|
|
|
+ + ", " + properties.getString(property) + "=" + confValue;
|
|
|
|
+ } else {
|
|
|
|
+ securityProperty = property + "=" + propertyValue
|
|
|
|
+ + ", " + properties.getString(property) + "=<NOT SET>";
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ securityProperty = property + "=<NOT SET>";
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return securityProperty;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -242,17 +284,17 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
|
* @return the configuration to use
|
|
* @return the configuration to use
|
|
*/
|
|
*/
|
|
private Configuration loadConf() {
|
|
private Configuration loadConf() {
|
|
- Configuration conf;
|
|
|
|
|
|
+ Configuration c;
|
|
|
|
|
|
if (suppliedConf != null) {
|
|
if (suppliedConf != null) {
|
|
- conf = suppliedConf;
|
|
|
|
|
|
+ c = suppliedConf;
|
|
} else {
|
|
} else {
|
|
// The config we're handed in init() isn't the one we want here, so we
|
|
// The config we're handed in init() isn't the one we want here, so we
|
|
// create a new one to pick up the full settings.
|
|
// create a new one to pick up the full settings.
|
|
- conf = new Configuration();
|
|
|
|
|
|
+ c = new Configuration();
|
|
}
|
|
}
|
|
|
|
|
|
- return conf;
|
|
|
|
|
|
+ return c;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -263,7 +305,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
|
* @return the file system to use
|
|
* @return the file system to use
|
|
* @throws MetricsException thrown if the file system could not be retrieved
|
|
* @throws MetricsException thrown if the file system could not be retrieved
|
|
*/
|
|
*/
|
|
- private FileSystem getFileSystem(Configuration conf) throws MetricsException {
|
|
|
|
|
|
+ private FileSystem getFileSystem() throws MetricsException {
|
|
FileSystem fs = null;
|
|
FileSystem fs = null;
|
|
|
|
|
|
if (suppliedFilesystem != null) {
|
|
if (suppliedFilesystem != null) {
|
|
@@ -317,22 +359,29 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
|
|
// because if currentDirPath is null, then currentOutStream is null, but
|
|
// because if currentDirPath is null, then currentOutStream is null, but
|
|
// currentOutStream can be null for other reasons.
|
|
// currentOutStream can be null for other reasons.
|
|
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
|
|
if ((currentOutStream == null) || !path.equals(currentDirPath)) {
|
|
- // Close the stream. This step could have been handled already by the
|
|
|
|
- // flusher thread, but if it has, the PrintStream will just swallow the
|
|
|
|
- // exception, which is fine.
|
|
|
|
- if (currentOutStream != null) {
|
|
|
|
- currentOutStream.close();
|
|
|
|
|
|
+ // If we're not yet connected to HDFS, create the connection
|
|
|
|
+ if (!initialized) {
|
|
|
|
+ initialized = initFs();
|
|
}
|
|
}
|
|
|
|
|
|
- currentDirPath = path;
|
|
|
|
|
|
+ if (initialized) {
|
|
|
|
+ // Close the stream. This step could have been handled already by the
|
|
|
|
+ // flusher thread, but if it has, the PrintStream will just swallow the
|
|
|
|
+ // exception, which is fine.
|
|
|
|
+ if (currentOutStream != null) {
|
|
|
|
+ currentOutStream.close();
|
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
|
- rollLogDir();
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- throwMetricsException("Failed to create new log file", ex);
|
|
|
|
- }
|
|
|
|
|
|
+ currentDirPath = path;
|
|
|
|
|
|
- scheduleFlush(now);
|
|
|
|
|
|
+ try {
|
|
|
|
+ rollLogDir();
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ throwMetricsException("Failed to create new log file", ex);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ scheduleFlush(now);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|