|
@@ -61,6 +61,7 @@ import java.util.Set;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -1104,16 +1105,23 @@ public class TestResourceLocalizationService {
|
|
|
|
|
|
private static class DummyExecutor extends DefaultContainerExecutor {
|
|
|
private volatile boolean stopLocalization = false;
|
|
|
+ private AtomicInteger numLocalizers = new AtomicInteger(0);
|
|
|
@Override
|
|
|
public void startLocalizer(Path nmPrivateContainerTokensPath,
|
|
|
InetSocketAddress nmAddr, String user, String appId, String locId,
|
|
|
LocalDirsHandlerService dirsHandler)
|
|
|
throws IOException, InterruptedException {
|
|
|
+ numLocalizers.incrementAndGet();
|
|
|
while (!stopLocalization) {
|
|
|
Thread.yield();
|
|
|
}
|
|
|
}
|
|
|
- void setStopLocalization() {
|
|
|
+ private void waitForLocalizers(int num) {
|
|
|
+ while (numLocalizers.intValue() < num) {
|
|
|
+ Thread.yield();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private void setStopLocalization() {
|
|
|
stopLocalization = true;
|
|
|
}
|
|
|
}
|
|
@@ -1256,6 +1264,10 @@ public class TestResourceLocalizationService {
|
|
|
spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
|
|
|
|
|
|
dispatcher.await();
|
|
|
+ // Wait for localizers of both container c1 and c2 to begin.
|
|
|
+ exec.waitForLocalizers(2);
|
|
|
+ LocalizerRunner locC1 =
|
|
|
+ spyService.getLocalizerRunner(c1.getContainerId().toString());
|
|
|
final String containerIdStr = c1.getContainerId().toString();
|
|
|
// Heartbeats from container localizer
|
|
|
LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
|
|
@@ -1323,6 +1335,10 @@ public class TestResourceLocalizationService {
|
|
|
Set<Path> paths =
|
|
|
Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
|
|
|
new Path(locPath2), new Path(locPath2 + "_tmp"));
|
|
|
+ // Wait for localizer runner thread for container c1 to finish.
|
|
|
+ while (locC1.getState() != Thread.State.TERMINATED) {
|
|
|
+ Thread.sleep(50);
|
|
|
+ }
|
|
|
// Verify if downloading resources were submitted for deletion.
|
|
|
verify(delService).delete(eq(user),
|
|
|
(Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
|