|
@@ -638,8 +638,8 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
super("Public Localizer");
|
|
|
this.lfs = getLocalFileContext(conf);
|
|
|
this.conf = conf;
|
|
|
- this.pending =
|
|
|
- new ConcurrentHashMap<Future<Path>, LocalizerResourceRequestEvent>();
|
|
|
+ this.pending = Collections.synchronizedMap(
|
|
|
+ new HashMap<Future<Path>, LocalizerResourceRequestEvent>());
|
|
|
this.threadPool = createLocalizerExecutor(conf);
|
|
|
this.queue = new ExecutorCompletionService<Path>(threadPool);
|
|
|
}
|
|
@@ -675,8 +675,12 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
publicDirDestPath =
|
|
|
new Path(publicDirDestPath, Long.toString(publicRsrc
|
|
|
.nextUniqueNumber()));
|
|
|
- pending.put(queue.submit(new FSDownload(lfs, null, conf,
|
|
|
- publicDirDestPath, resource)), request);
|
|
|
+ // explicitly synchronize pending here to avoid future task
|
|
|
+ // completing and being dequeued before pending updated
|
|
|
+ synchronized (pending) {
|
|
|
+ pending.put(queue.submit(new FSDownload(lfs, null, conf,
|
|
|
+ publicDirDestPath, resource)), request);
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
rsrc.unlock();
|
|
|
// TODO Need to Fix IO Exceptions - Notifying resource
|