|
@@ -117,6 +117,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|
|
private final int retentionSize;
|
|
|
private final long rollingMonitorInterval;
|
|
|
private final NodeId nodeId;
|
|
|
+ // This variable is only for testing
|
|
|
+ private final AtomicBoolean waiting = new AtomicBoolean(false);
|
|
|
|
|
|
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
|
|
|
new HashMap<ContainerId, ContainerLogAggregator>();
|
|
@@ -391,6 +393,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|
|
while (!this.appFinishing.get() && !this.aborted.get()) {
|
|
|
synchronized(this) {
|
|
|
try {
|
|
|
+ waiting.set(true);
|
|
|
if (this.rollingMonitorInterval > 0) {
|
|
|
wait(this.rollingMonitorInterval * 1000);
|
|
|
if (this.appFinishing.get() || this.aborted.get()) {
|
|
@@ -507,7 +510,19 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|
|
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
+ // This is only used for testing.
|
|
|
+ // This will wake the log aggregation thread that is waiting for
|
|
|
+ // rollingMonitorInterval.
|
|
|
+ // To use this method, make sure the log aggregation thread is running
|
|
|
+ // and waiting for rollingMonitorInterval.
|
|
|
public synchronized void doLogAggregationOutOfBand() {
|
|
|
+ while(!waiting.get()) {
|
|
|
+ try {
|
|
|
+ wait(200);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // Do Nothing
|
|
|
+ }
|
|
|
+ }
|
|
|
LOG.info("Do OutOfBand log aggregation");
|
|
|
this.notifyAll();
|
|
|
}
|