|
@@ -149,16 +149,13 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|
ContainerId containerId;
|
|
ContainerId containerId;
|
|
|
|
|
|
while (!this.appFinishing.get()) {
|
|
while (!this.appFinishing.get()) {
|
|
- try {
|
|
|
|
- containerId = this.pendingContainers.poll();
|
|
|
|
- if (containerId == null) {
|
|
|
|
- Thread.sleep(THREAD_SLEEP_TIME);
|
|
|
|
- } else {
|
|
|
|
- uploadLogsForContainer(containerId);
|
|
|
|
|
|
+ synchronized(this) {
|
|
|
|
+ try {
|
|
|
|
+ wait(THREAD_SLEEP_TIME);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.warn("PendingContainers queue is interrupted");
|
|
|
|
+ this.appFinishing.set(true);
|
|
}
|
|
}
|
|
- } catch (InterruptedException e) {
|
|
|
|
- LOG.warn("PendingContainers queue is interrupted");
|
|
|
|
- this.appFinishing.set(true);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -251,8 +248,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void finishLogAggregation() {
|
|
|
|
|
|
+ public synchronized void finishLogAggregation() {
|
|
LOG.info("Application just finished : " + this.applicationId);
|
|
LOG.info("Application just finished : " + this.applicationId);
|
|
this.appFinishing.set(true);
|
|
this.appFinishing.set(true);
|
|
|
|
+ this.notifyAll();
|
|
}
|
|
}
|
|
}
|
|
}
|