|
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import org.apache.commons.collections.map.LRUMap;
|
|
import org.apache.commons.collections.map.LRUMap;
|
|
import org.apache.commons.io.FileUtils;
|
|
import org.apache.commons.io.FileUtils;
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@@ -48,6 +46,7 @@ import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
|
import org.fusesource.leveldbjni.JniDBFactory;
|
|
import org.fusesource.leveldbjni.JniDBFactory;
|
|
import org.iq80.leveldb.*;
|
|
import org.iq80.leveldb.*;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -118,8 +117,8 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
|
@InterfaceStability.Unstable
|
|
@InterfaceStability.Unstable
|
|
public class LeveldbTimelineStore extends AbstractService
|
|
public class LeveldbTimelineStore extends AbstractService
|
|
implements TimelineStore {
|
|
implements TimelineStore {
|
|
- private static final Log LOG = LogFactory
|
|
|
|
- .getLog(LeveldbTimelineStore.class);
|
|
|
|
|
|
+ private static final org.slf4j.Logger LOG = LoggerFactory
|
|
|
|
+ .getLogger(LeveldbTimelineStore.class);
|
|
|
|
|
|
@Private
|
|
@Private
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -240,7 +239,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
|
|
localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, localFS);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, localFS);
|
|
}
|
|
}
|
|
LOG.info("Using leveldb path " + dbPath);
|
|
LOG.info("Using leveldb path " + dbPath);
|
|
try {
|
|
try {
|
|
@@ -284,7 +283,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
" closing db now", e);
|
|
" closing db now", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- IOUtils.cleanup(LOG, db);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, db);
|
|
super.serviceStop();
|
|
super.serviceStop();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -320,7 +319,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
discardOldEntities(timestamp);
|
|
discardOldEntities(timestamp);
|
|
Thread.sleep(ttlInterval);
|
|
Thread.sleep(ttlInterval);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.error(e);
|
|
|
|
|
|
+ LOG.error(e.toString());
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.info("Deletion thread received interrupt, exiting");
|
|
LOG.info("Deletion thread received interrupt, exiting");
|
|
break;
|
|
break;
|
|
@@ -394,7 +393,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -570,7 +569,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
return events;
|
|
return events;
|
|
}
|
|
}
|
|
@@ -753,7 +752,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -925,7 +924,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} finally {
|
|
} finally {
|
|
lock.unlock();
|
|
lock.unlock();
|
|
writeLocks.returnLock(lock);
|
|
writeLocks.returnLock(lock);
|
|
- IOUtils.cleanup(LOG, writeBatch);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, writeBatch);
|
|
}
|
|
}
|
|
|
|
|
|
for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
|
|
for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
|
|
@@ -1376,7 +1375,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1506,7 +1505,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, writeBatch);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, writeBatch);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1548,7 +1547,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
LOG.error("Got IOException while deleting entities for type " +
|
|
LOG.error("Got IOException while deleting entities for type " +
|
|
entityType + ", continuing to next type", e);
|
|
entityType + ", continuing to next type", e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator, pfIterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator, pfIterator);
|
|
deleteLock.writeLock().unlock();
|
|
deleteLock.writeLock().unlock();
|
|
if (typeCount > 0) {
|
|
if (typeCount > 0) {
|
|
LOG.info("Deleted " + typeCount + " entities of type " +
|
|
LOG.info("Deleted " + typeCount + " entities of type " +
|
|
@@ -1629,7 +1628,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
String incompatibleMessage =
|
|
String incompatibleMessage =
|
|
"Incompatible version for timeline store: expecting version "
|
|
"Incompatible version for timeline store: expecting version "
|
|
+ getCurrentVersion() + ", but loading version " + loadedVersion;
|
|
+ getCurrentVersion() + ", but loading version " + loadedVersion;
|
|
- LOG.fatal(incompatibleMessage);
|
|
|
|
|
|
+ LOG.error(incompatibleMessage);
|
|
throw new IOException(incompatibleMessage);
|
|
throw new IOException(incompatibleMessage);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1718,7 +1717,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, writeBatch);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, writeBatch);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1755,7 +1754,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1805,7 +1804,7 @@ public class LeveldbTimelineStore extends AbstractService
|
|
} catch(DBException e) {
|
|
} catch(DBException e) {
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
- IOUtils.cleanup(LOG, iterator);
|
|
|
|
|
|
+ IOUtils.cleanupWithLogger(LOG, iterator);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|