Browse Source

YARN-99. Modify private distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465853 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 years ago
parent
commit
b96d18bd23
14 changed files with 523 additions and 171 deletions
  1. 4 0
      hadoop-yarn-project/CHANGES.txt
  2. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
  3. 118 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
  4. 5 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
  5. 40 56
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
  6. 11 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
  7. 0 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
  8. 93 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
  9. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
  10. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
  11. 28 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
  12. 14 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
  13. 58 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
  14. 70 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -201,6 +201,10 @@ Release 2.0.5-beta - UNRELEASED
     to implement closeable so that they can be stopped when needed via
     to implement closeable so that they can be stopped when needed via
     RPC.stopProxy(). (Siddharth Seth via vinodkv)
     RPC.stopProxy(). (Siddharth Seth via vinodkv)
 
 
+    YARN-99. Modify private distributed cache to localize files such that no
+    local directory hits unix file count limits and thus prevent job failures.
+    (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 Release 2.0.4-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@VisibleForTesting
+public interface ResourceLocalizationSpec {
+
+  public void setResource(LocalResource rsrc);
+
+  public LocalResource getResource();
+
+  public void setDestinationDirectory(URL destinationDirectory);
+
+  public URL getDestinationDirectory();
+}

+ 118 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProtoOrBuilder;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+
+public class ResourceLocalizationSpecPBImpl extends
+    ProtoBase<ResourceLocalizationSpecProto> implements
+    ResourceLocalizationSpec {
+
+  private ResourceLocalizationSpecProto proto = ResourceLocalizationSpecProto
+    .getDefaultInstance();
+  private ResourceLocalizationSpecProto.Builder builder = null;
+  private boolean viaProto;
+  private LocalResource resource = null;
+  private URL destinationDirectory = null;
+
+  public ResourceLocalizationSpecPBImpl() {
+    builder = ResourceLocalizationSpecProto.newBuilder();
+  }
+
+  public ResourceLocalizationSpecPBImpl(ResourceLocalizationSpecProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  @Override
+  public LocalResource getResource() {
+    ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+    if (resource != null) {
+      return resource;
+    }
+    if (!p.hasResource()) {
+      return null;
+    }
+    resource = new LocalResourcePBImpl(p.getResource());
+    return resource;
+  }
+
+  @Override
+  public void setResource(LocalResource rsrc) {
+    maybeInitBuilder();
+    resource = rsrc;
+  }
+
+  @Override
+  public URL getDestinationDirectory() {
+    ResourceLocalizationSpecProtoOrBuilder p = viaProto ? proto : builder;
+    if (destinationDirectory != null) {
+      return destinationDirectory;
+    }
+    if (!p.hasDestinationDirectory()) {
+      return null;
+    }
+    destinationDirectory = new URLPBImpl(p.getDestinationDirectory());
+    return destinationDirectory;
+  }
+
+  @Override
+  public void setDestinationDirectory(URL destinationDirectory) {
+    maybeInitBuilder();
+    this.destinationDirectory = destinationDirectory;
+  }
+
+  @Override
+  public ResourceLocalizationSpecProto getProto() {
+    mergeLocalToBuilder();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (builder == null || viaProto) {
+      builder = ResourceLocalizationSpecProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToBuilder() {
+    ResourceLocalizationSpecProtoOrBuilder l = viaProto ? proto : builder;
+    if (this.resource != null
+        && !(l.getResource()
+          .equals(((LocalResourcePBImpl) resource).getProto()))) {
+      maybeInitBuilder();
+      builder.setResource(((LocalResourcePBImpl) resource).getProto());
+    }
+    if (this.destinationDirectory != null
+        && !(l.getDestinationDirectory()
+          .equals(((URLPBImpl) destinationDirectory).getProto()))) {
+      maybeInitBuilder();
+      builder.setDestinationDirectory(((URLPBImpl) destinationDirectory)
+        .getProto());
+    }
+  }
+}

+ 5 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java

@@ -18,18 +18,13 @@
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
 
 
 import java.util.List;
 import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.*;
 
 
 public interface LocalizerHeartbeatResponse {
 public interface LocalizerHeartbeatResponse {
-  public LocalizerAction getLocalizerAction();
-  public List<LocalResource> getAllResources();
-  public LocalResource getLocalResource(int i);
 
 
+  public LocalizerAction getLocalizerAction();
   public void setLocalizerAction(LocalizerAction action);
   public void setLocalizerAction(LocalizerAction action);
 
 
-  public void addAllResources(List<LocalResource> resources);
-  public void addResource(LocalResource resource);
-  public void removeResource(int index);
-  public void clearResources();
-}
+  public List<ResourceLocalizationSpec> getResourceSpecs();
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
+}

+ 40 - 56
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java

@@ -21,13 +21,14 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+
 import org.apache.hadoop.yarn.api.records.ProtoBase;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
 
@@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponsePBImpl
   LocalizerHeartbeatResponseProto.Builder builder = null;
   LocalizerHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
   boolean viaProto = false;
 
 
-  private List<LocalResource> resources;
+  private List<ResourceLocalizationSpec> resourceSpecs;
 
 
   public LocalizerHeartbeatResponsePBImpl() {
   public LocalizerHeartbeatResponsePBImpl() {
     builder = LocalizerHeartbeatResponseProto.newBuilder();
     builder = LocalizerHeartbeatResponseProto.newBuilder();
   }
   }
 
 
-  public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+  public LocalizerHeartbeatResponsePBImpl(
+      LocalizerHeartbeatResponseProto proto) {
     this.proto = proto;
     this.proto = proto;
     viaProto = true;
     viaProto = true;
   }
   }
@@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponsePBImpl
   }
   }
 
 
   private void mergeLocalToBuilder() {
   private void mergeLocalToBuilder() {
-    if (resources != null) {
+    if (resourceSpecs != null) {
       addResourcesToProto();
       addResourcesToProto();
     }
     }
   }
   }
@@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponsePBImpl
     viaProto = false;
     viaProto = false;
   }
   }
 
 
+  @Override
   public LocalizerAction getLocalizerAction() {
   public LocalizerAction getLocalizerAction() {
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasAction()) {
     if (!p.hasAction()) {
@@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponsePBImpl
     return convertFromProtoFormat(p.getAction());
     return convertFromProtoFormat(p.getAction());
   }
   }
 
 
-  public List<LocalResource> getAllResources() {
-    initResources();
-    return this.resources;
-  }
-
-  public LocalResource getLocalResource(int i) {
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
     initResources();
     initResources();
-    return this.resources.get(i);
+    return this.resourceSpecs;
   }
   }
 
 
   public void setLocalizerAction(LocalizerAction action) {
   public void setLocalizerAction(LocalizerAction action) {
@@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponsePBImpl
     builder.setAction(convertToProtoFormat(action));
     builder.setAction(convertToProtoFormat(action));
   }
   }
 
 
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
+    maybeInitBuilder();
+    if (rsrcs == null) {
+      builder.clearResources();
+      return;
+    }
+    this.resourceSpecs = rsrcs;
+  }
+
   private void initResources() {
   private void initResources() {
-    if (this.resources != null) {
+    if (this.resourceSpecs != null) {
       return;
       return;
     }
     }
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<LocalResourceProto> list = p.getResourcesList();
-    this.resources = new ArrayList<LocalResource>();
-
-    for (LocalResourceProto c : list) {
-      this.resources.add(convertFromProtoFormat(c));
+    List<ResourceLocalizationSpecProto> list = p.getResourcesList();
+    this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
+    for (ResourceLocalizationSpecProto c : list) {
+      this.resourceSpecs.add(convertFromProtoFormat(c));
     }
     }
   }
   }
 
 
   private void addResourcesToProto() {
   private void addResourcesToProto() {
     maybeInitBuilder();
     maybeInitBuilder();
     builder.clearResources();
     builder.clearResources();
-    if (this.resources == null) 
+    if (this.resourceSpecs == null) 
       return;
       return;
-    Iterable<LocalResourceProto> iterable =
-        new Iterable<LocalResourceProto>() {
+    Iterable<ResourceLocalizationSpecProto> iterable =
+        new Iterable<ResourceLocalizationSpecProto>() {
       @Override
       @Override
-      public Iterator<LocalResourceProto> iterator() {
-        return new Iterator<LocalResourceProto>() {
+      public Iterator<ResourceLocalizationSpecProto> iterator() {
+        return new Iterator<ResourceLocalizationSpecProto>() {
 
 
-          Iterator<LocalResource> iter = resources.iterator();
+          Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
 
 
           @Override
           @Override
           public boolean hasNext() {
           public boolean hasNext() {
@@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponsePBImpl
           }
           }
 
 
           @Override
           @Override
-          public LocalResourceProto next() {
-            return convertToProtoFormat(iter.next());
+          public ResourceLocalizationSpecProto next() {
+            ResourceLocalizationSpec resource = iter.next();
+            
+            return ((ResourceLocalizationSpecPBImpl)resource).getProto();
           }
           }
 
 
           @Override
           @Override
@@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponsePBImpl
     builder.addAllResources(iterable);
     builder.addAllResources(iterable);
   }
   }
 
 
-  public void addAllResources(List<LocalResource> resources) {
-    if (resources == null)
-      return;
-    initResources();
-    this.resources.addAll(resources);
-  }
 
 
-  public void addResource(LocalResource resource) {
-    initResources();
-    this.resources.add(resource);
-  }
-
-  public void removeResource(int index) {
-    initResources();
-    this.resources.remove(index);
-  }
-
-  public void clearResources() {
-    initResources();
-    this.resources.clear();
-  }
-
-  private LocalResource convertFromProtoFormat(LocalResourceProto p) {
-    return new LocalResourcePBImpl(p);
-  }
-
-  private LocalResourceProto convertToProtoFormat(LocalResource s) {
-    return ((LocalResourcePBImpl)s).getProto();
+  private ResourceLocalizationSpec convertFromProtoFormat(
+      ResourceLocalizationSpecProto p) {
+    return new ResourceLocalizationSpecPBImpl(p);
   }
   }
 
 
   private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
   private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponsePBImpl
   private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
   private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
     return LocalizerAction.valueOf(a.name());
     return LocalizerAction.valueOf(a.name());
   }
   }
-
-}
+}

+ 11 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@@ -89,8 +91,6 @@ public class ContainerLocalizer {
   private final String localizerId;
   private final String localizerId;
   private final FileContext lfs;
   private final FileContext lfs;
   private final Configuration conf;
   private final Configuration conf;
-  private final LocalDirAllocator appDirs;
-  private final LocalDirAllocator userDirs;
   private final RecordFactory recordFactory;
   private final RecordFactory recordFactory;
   private final Map<LocalResource,Future<Path>> pendingResources;
   private final Map<LocalResource,Future<Path>> pendingResources;
   private final String appCacheDirContextName;
   private final String appCacheDirContextName;
@@ -112,8 +112,6 @@ public class ContainerLocalizer {
     this.recordFactory = recordFactory;
     this.recordFactory = recordFactory;
     this.conf = new Configuration();
     this.conf = new Configuration();
     this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
     this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
-    this.appDirs = new LocalDirAllocator(appCacheDirContextName);
-    this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
     this.pendingResources = new HashMap<LocalResource,Future<Path>>();
     this.pendingResources = new HashMap<LocalResource,Future<Path>>();
   }
   }
 
 
@@ -197,10 +195,10 @@ public class ContainerLocalizer {
     return new ExecutorCompletionService<Path>(exec);
     return new ExecutorCompletionService<Path>(exec);
   }
   }
 
 
-  Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
+  Callable<Path> download(Path path, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
       UserGroupInformation ugi) throws IOException {
-    Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
-    return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+    DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+    return new FSDownload(lfs, ugi, conf, path, rsrc, new Random());
   }
   }
 
 
   static long getEstimatedSize(LocalResource rsrc) {
   static long getEstimatedSize(LocalResource rsrc) {
@@ -238,25 +236,12 @@ public class ContainerLocalizer {
         LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
         LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
         switch (response.getLocalizerAction()) {
         switch (response.getLocalizerAction()) {
         case LIVE:
         case LIVE:
-          List<LocalResource> newResources = response.getAllResources();
-          for (LocalResource r : newResources) {
-            if (!pendingResources.containsKey(r)) {
-              final LocalDirAllocator lda;
-              switch (r.getVisibility()) {
-              default:
-                LOG.warn("Unknown visibility: " + r.getVisibility()
-                        + ", Using userDirs");
-                //Falling back to userDirs for unknown visibility.
-              case PUBLIC:
-              case PRIVATE:
-                lda = userDirs;
-                break;
-              case APPLICATION:
-                lda = appDirs;
-                break;
-              }
-              // TODO: Synchronization??
-              pendingResources.put(r, cs.submit(download(lda, r, ugi)));
+          List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
+          for (ResourceLocalizationSpec newRsrc : newRsrcs) {
+            if (!pendingResources.containsKey(newRsrc.getResource())) {
+              pendingResources.put(newRsrc.getResource(), cs.submit(download(
+                new Path(newRsrc.getDestinationDirectory().getFile()),
+                newRsrc.getResource(), ugi)));
             }
             }
           }
           }
           break;
           break;

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java

@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Queue;
 
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;

+ 93 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -80,10 +80,12 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -326,7 +329,7 @@ public class ResourceLocalizationService extends CompositeService
     // 0) Create application tracking structs
     // 0) Create application tracking structs
     String userName = app.getUser();
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-      dispatcher, false, super.getConfig()));
+      dispatcher, true, super.getConfig()));
     if (null != appRsrc.putIfAbsent(
     if (null != appRsrc.putIfAbsent(
       ConverterUtils.toString(app.getAppId()),
       ConverterUtils.toString(app.getAppId()),
       new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
       new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@@ -476,6 +479,21 @@ public class ResourceLocalizationService extends CompositeService
     }
     }
   }
   }
 
 
+  private String getUserFileCachePath(String user) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+    return path;
+  }
+
+  private String getUserAppCachePath(String user, String appId) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+            + Path.SEPARATOR + appId;
+    return path;
+  }
+
   /**
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
    */
@@ -803,7 +821,20 @@ public class ResourceLocalizationService extends CompositeService
         LocalResource next = findNextResource();
         LocalResource next = findNextResource();
         if (next != null) {
         if (next != null) {
           response.setLocalizerAction(LocalizerAction.LIVE);
           response.setLocalizerAction(LocalizerAction.LIVE);
-          response.addResource(next);
+          try {
+            ArrayList<ResourceLocalizationSpec> rsrcs =
+                new ArrayList<ResourceLocalizationSpec>();
+            ResourceLocalizationSpec rsrc =
+                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                  getPathForLocalization(next));
+            rsrcs.add(rsrc);
+            response.setResourceSpecs(rsrcs);
+          } catch (IOException e) {
+            LOG.error("local path for PRIVATE localization could not be found."
+                + "Disks might have failed.", e);
+          } catch (URISyntaxException e) {
+            // TODO fail? Already translated several times...
+          }
         } else if (pending.isEmpty()) {
         } else if (pending.isEmpty()) {
           // TODO: Synchronization
           // TODO: Synchronization
           response.setLocalizerAction(LocalizerAction.DIE);
           response.setLocalizerAction(LocalizerAction.DIE);
@@ -812,7 +843,8 @@ public class ResourceLocalizationService extends CompositeService
         }
         }
         return response;
         return response;
       }
       }
-
+      ArrayList<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
       for (LocalResourceStatus stat : remoteResourceStatuses) {
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
         LocalResourceRequest req = null;
@@ -835,6 +867,7 @@ public class ResourceLocalizationService extends CompositeService
                   new ResourceLocalizedEvent(req,
                   new ResourceLocalizedEvent(req,
                     ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
                     ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
                     stat.getLocalSize()));
                     stat.getLocalSize()));
+              localizationCompleted(stat);
             } catch (URISyntaxException e) { }
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
             if (pending.isEmpty()) {
               // TODO: Synchronization
               // TODO: Synchronization
@@ -844,7 +877,17 @@ public class ResourceLocalizationService extends CompositeService
             response.setLocalizerAction(LocalizerAction.LIVE);
             response.setLocalizerAction(LocalizerAction.LIVE);
             LocalResource next = findNextResource();
             LocalResource next = findNextResource();
             if (next != null) {
             if (next != null) {
-              response.addResource(next);
+              try {
+                ResourceLocalizationSpec resource =
+                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                      getPathForLocalization(next));
+                rsrcs.add(resource);
+              } catch (IOException e) {
+                LOG.error("local path for PRIVATE localization could not be " +
+                  "found. Disks might have failed.", e);
+              } catch (URISyntaxException e) {
+                  //TODO fail? Already translated several times...
+              }
             }
             }
             break;
             break;
           case FETCH_PENDING:
           case FETCH_PENDING:
@@ -854,6 +897,7 @@ public class ResourceLocalizationService extends CompositeService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
             response.setLocalizerAction(LocalizerAction.DIE);
+            localizationCompleted(stat);
             // TODO: Why is this event going directly to the container. Why not
             // TODO: Why is this event going directly to the container. Why not
             // the resource itself? What happens to the resource? Is it removed?
             // the resource itself? What happens to the resource? Is it removed?
             dispatcher.getEventHandler().handle(
             dispatcher.getEventHandler().handle(
@@ -869,9 +913,53 @@ public class ResourceLocalizationService extends CompositeService
             break;
             break;
         }
         }
       }
       }
+      response.setResourceSpecs(rsrcs);
       return response;
       return response;
     }
     }
 
 
+    private void localizationCompleted(LocalResourceStatus stat) {
+      try {
+        LocalResource rsrc = stat.getResource();
+        LocalResourceRequest key = new LocalResourceRequest(rsrc);
+        String user = context.getUser();
+        ApplicationId appId =
+            context.getContainerId().getApplicationAttemptId()
+              .getApplicationId();
+        LocalResourceVisibility vis = rsrc.getVisibility();
+        LocalResourcesTracker tracker =
+            getLocalResourcesTracker(vis, user, appId);
+        if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
+          tracker.localizationCompleted(key, true);
+        } else {
+          tracker.localizationCompleted(key, false);
+        }
+      } catch (URISyntaxException e) {
+        LOG.error("Invalid resource URL specified", e);
+      }
+    }
+
+    private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+        URISyntaxException {
+      String user = context.getUser();
+      ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      LocalResourceVisibility vis = rsrc.getVisibility();
+      LocalResourcesTracker tracker =
+          getLocalResourcesTracker(vis, user, appId);
+      String cacheDirectory = null;
+      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+        cacheDirectory = getUserFileCachePath(user);
+      } else {// APPLICATION ONLY
+        cacheDirectory = getUserAppCachePath(user, appId.toString());
+      }
+      Path dirPath =
+          dirsHandler.getLocalPathForWrite(cacheDirectory,
+            ContainerLocalizer.getEstimatedSize(rsrc), false);
+      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+        dirPath);
+
+    }
+
     @Override
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
     public void run() {
@@ -1033,4 +1121,4 @@ public class ResourceLocalizationService extends CompositeService
     del.delete(null, dirPath, new Path[] {});
     del.delete(null, dirPath, new Path[] {});
   }
   }
 
 
-}
+}

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+public class NodeManagerBuilderUtils {
+
+  public static ResourceLocalizationSpec newResourceLocalizationSpec(
+      LocalResource rsrc, Path path) {
+    URL local = ConverterUtils.getYarnUrlFromPath(path);
+    ResourceLocalizationSpec resourceLocalizationSpec =
+        Records.newRecord(ResourceLocalizationSpec.class);
+    resourceLocalizationSpec.setDestinationDirectory(local);
+    resourceLocalizationSpec.setResource(rsrc);
+    return resourceLocalizationSpec;
+  }
+
+}

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto

@@ -47,7 +47,12 @@ enum LocalizerActionProto {
   DIE = 2;
   DIE = 2;
 }
 }
 
 
+message ResourceLocalizationSpecProto {
+  optional LocalResourceProto resource = 1;
+  optional URLProto destination_directory = 2;
+}
+
 message LocalizerHeartbeatResponseProto {
 message LocalizerHeartbeatResponseProto {
   optional LocalizerActionProto action = 1;
   optional LocalizerActionProto action = 1;
-  repeated LocalResourceProto resources = 2;
+  repeated ResourceLocalizationSpecProto resources = 2;
 }
 }

+ 28 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java

@@ -17,6 +17,13 @@
 */
 */
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.junit.Test;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 
 public class TestPBRecordImpl {
 public class TestPBRecordImpl {
 
 
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
   static LocalResource createResource() {
   static LocalResource createResource() {
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     assertTrue(ret instanceof LocalResourcePBImpl);
     assertTrue(ret instanceof LocalResourcePBImpl);
-    ret.setResource(
-        ConverterUtils.getYarnUrlFromPath(
-          new Path("hdfs://y.ak:8020/foo/bar")));
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+      "hdfs://y.ak:8020/foo/bar")));
     ret.setSize(4344L);
     ret.setSize(4344L);
     ret.setTimestamp(3141592653589793L);
     ret.setTimestamp(3141592653589793L);
     ret.setVisibility(LocalResourceVisibility.PUBLIC);
     ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
     return ret;
     return ret;
   }
   }
 
 
-  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() 
+      throws URISyntaxException {
     LocalizerHeartbeatResponse ret =
     LocalizerHeartbeatResponse ret =
       recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
       recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
     assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
     assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
     ret.setLocalizerAction(LocalizerAction.LIVE);
     ret.setLocalizerAction(LocalizerAction.LIVE);
-    ret.addResource(createResource());
+    LocalResource rsrc = createResource();
+    ArrayList<ResourceLocalizationSpec> rsrcs =
+      new ArrayList<ResourceLocalizationSpec>();
+    ResourceLocalizationSpec resource =
+      recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+    resource.setResource(rsrc);
+    resource.setDestinationDirectory(ConverterUtils
+      .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+    rsrcs.add(resource);
+    ret.setResourceSpecs(rsrcs);
+    System.out.println(resource);
     return ret;
     return ret;
   }
   }
 
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalResourceStatusSerDe() throws Exception {
   public void testLocalResourceStatusSerDe() throws Exception {
     LocalResourceStatus rsrcS = createLocalResourceStatus();
     LocalResourceStatus rsrcS = createLocalResourceStatus();
     assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
     assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
     assertEquals(createResource(), rsrcD.getResource());
     assertEquals(createResource(), rsrcD.getResource());
   }
   }
 
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerStatusSerDe() throws Exception {
   public void testLocalizerStatusSerDe() throws Exception {
     LocalizerStatus rsrcS = createLocalizerStatus();
     LocalizerStatus rsrcS = createLocalizerStatus();
     assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
     assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
     assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
     assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
   }
   }
 
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerHeartbeatResponseSerDe() throws Exception {
   public void testLocalizerHeartbeatResponseSerDe() throws Exception {
     LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
     LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
     assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
     assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
       new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
       new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
 
 
     assertEquals(rsrcS, rsrcD);
     assertEquals(rsrcS, rsrcD);
-    assertEquals(createResource(), rsrcS.getLocalResource(0));
-    assertEquals(createResource(), rsrcD.getLocalResource(0));
+    assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+    assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
   }
   }
 
 
 }
 }

+ 14 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
 
@@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatResponse
     implements LocalizerHeartbeatResponse {
     implements LocalizerHeartbeatResponse {
 
 
   LocalizerAction action;
   LocalizerAction action;
-  List<LocalResource> rsrc;
+  List<ResourceLocalizationSpec> resourceSpecs;
 
 
   MockLocalizerHeartbeatResponse() {
   MockLocalizerHeartbeatResponse() {
-    rsrc = new ArrayList<LocalResource>();
+    resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
   }
   }
 
 
   MockLocalizerHeartbeatResponse(
   MockLocalizerHeartbeatResponse(
-      LocalizerAction action, List<LocalResource> rsrc) {
+      LocalizerAction action, List<ResourceLocalizationSpec> resources) {
     this.action = action;
     this.action = action;
-    this.rsrc = rsrc;
+    this.resourceSpecs = resources;
   }
   }
 
 
   public LocalizerAction getLocalizerAction() { return action; }
   public LocalizerAction getLocalizerAction() { return action; }
-  public List<LocalResource> getAllResources() { return rsrc; }
-  public LocalResource getLocalResource(int i) { return rsrc.get(i); }
   public void setLocalizerAction(LocalizerAction action) {
   public void setLocalizerAction(LocalizerAction action) {
     this.action = action;
     this.action = action;
   }
   }
-  public void addAllResources(List<LocalResource> resources) {
-    rsrc.addAll(resources);
+
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
+    return resourceSpecs;
+}
+
+  @Override
+  public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
+    this.resourceSpecs = resourceSpecs;
   }
   }
-  public void addResource(LocalResource resource) { rsrc.add(resource); }
-  public void removeResource(int index) { rsrc.remove(index); }
-  public void clearResources() { rsrc.clear(); }
 }
 }

+ 58 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java

@@ -50,7 +50,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -66,9 +65,11 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.invocation.InvocationOnMock;
@@ -95,12 +96,33 @@ public class TestContainerLocalizer {
   public void testContainerLocalizerMain() throws Exception {
   public void testContainerLocalizerMain() throws Exception {
     ContainerLocalizer localizer = setupContainerLocalizerForTest();
     ContainerLocalizer localizer = setupContainerLocalizerForTest();
 
 
+    // verify created cache
+    List<Path> privCacheList = new ArrayList<Path>();
+    List<Path> appCacheList = new ArrayList<Path>();
+    for (Path p : localDirs) {
+      Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
+      Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+      privCacheList.add(privcache);
+      Path appDir =
+          new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+      Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+      appCacheList.add(appcache);
+    }
+
     // mock heartbeat responses from NM
     // mock heartbeat responses from NM
-    LocalResource rsrcA = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcB = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
-    LocalResource rsrcC = getMockRsrc(random,
-        LocalResourceVisibility.APPLICATION);
-    LocalResource rsrcD = getMockRsrc(random, LocalResourceVisibility.PRIVATE);
+    ResourceLocalizationSpec rsrcA =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    ResourceLocalizationSpec rsrcB =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    ResourceLocalizationSpec rsrcC =
+        getMockRsrc(random, LocalResourceVisibility.APPLICATION,
+          appCacheList.get(0));
+    ResourceLocalizationSpec rsrcD =
+        getMockRsrc(random, LocalResourceVisibility.PRIVATE,
+          privCacheList.get(0));
+    
     when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
     when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcA)))
             Collections.singletonList(rsrcA)))
@@ -111,27 +133,33 @@ public class TestContainerLocalizer {
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
             Collections.singletonList(rsrcD)))
             Collections.singletonList(rsrcD)))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
-            Collections.<LocalResource>emptyList()))
+            Collections.<ResourceLocalizationSpec>emptyList()))
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
       .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
             null));
             null));
 
 
-    doReturn(new FakeDownload(rsrcA.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcA),
+    LocalResource tRsrcA = rsrcA.getResource();
+    LocalResource tRsrcB = rsrcB.getResource();
+    LocalResource tRsrcC = rsrcC.getResource();
+    LocalResource tRsrcD = rsrcD.getResource();
+    doReturn(
+      new FakeDownload(rsrcA.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcA),
         isA(UserGroupInformation.class));
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcB.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcB),
+    doReturn(
+      new FakeDownload(rsrcB.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcB),
         isA(UserGroupInformation.class));
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcC.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcC),
+    doReturn(
+      new FakeDownload(rsrcC.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcC),
         isA(UserGroupInformation.class));
         isA(UserGroupInformation.class));
-    doReturn(new FakeDownload(rsrcD.getResource().getFile(), true)).when(
-        localizer).download(isA(LocalDirAllocator.class), eq(rsrcD),
+    doReturn(
+      new FakeDownload(rsrcD.getResource().getResource().getFile(), true))
+      .when(localizer).download(isA(Path.class), eq(tRsrcD),
         isA(UserGroupInformation.class));
         isA(UserGroupInformation.class));
 
 
     // run localization
     // run localization
     assertEquals(0, localizer.runLocalization(nmAddr));
     assertEquals(0, localizer.runLocalization(nmAddr));
-
-    // verify created cache
     for (Path p : localDirs) {
     for (Path p : localDirs) {
       Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
       Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), appUser);
       Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
       Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
@@ -143,15 +171,14 @@ public class TestContainerLocalizer {
       Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
       Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
       verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
       verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(false));
     }
     }
-
     // verify tokens read at expected location
     // verify tokens read at expected location
     verify(spylfs).open(tokenPath);
     verify(spylfs).open(tokenPath);
 
 
     // verify downloaded resources reported to NM
     // verify downloaded resources reported to NM
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
-    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC.getResource())));
+    verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD.getResource())));
 
 
     // verify all HB use localizerID provided
     // verify all HB use localizerID provided
     verify(nmProxy, never()).heartbeat(argThat(
     verify(nmProxy, never()).heartbeat(argThat(
@@ -306,10 +333,12 @@ public class TestContainerLocalizer {
     return mockRF;
     return mockRF;
   }
   }
 
 
-  static LocalResource getMockRsrc(Random r,
-      LocalResourceVisibility vis) {
-    LocalResource rsrc = mock(LocalResource.class);
+  static ResourceLocalizationSpec getMockRsrc(Random r,
+      LocalResourceVisibility vis, Path p) {
+    ResourceLocalizationSpec resourceLocalizationSpec =
+      mock(ResourceLocalizationSpec.class);
 
 
+    LocalResource rsrc = mock(LocalResource.class);
     String name = Long.toHexString(r.nextLong());
     String name = Long.toHexString(r.nextLong());
     URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
     URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
     when(uri.getScheme()).thenReturn("file");
     when(uri.getScheme()).thenReturn("file");
@@ -322,7 +351,10 @@ public class TestContainerLocalizer {
     when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
     when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
     when(rsrc.getVisibility()).thenReturn(vis);
     when(rsrc.getVisibility()).thenReturn(vis);
 
 
-    return rsrc;
+    when(resourceLocalizationSpec.getResource()).thenReturn(rsrc);
+    when(resourceLocalizationSpec.getDestinationDirectory()).
+      thenReturn(ConverterUtils.getYarnUrlFromPath(p));
+    return resourceLocalizationSpec;
   }
   }
 
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @SuppressWarnings({ "rawtypes", "unchecked" })

+ 70 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyLong;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
@@ -375,7 +377,7 @@ public class TestResourceLocalizationService {
     }
     }
   }
   }
   
   
-  @Test
+  @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
   public void testLocalizationHeartbeat() throws Exception {
     Configuration conf = new YarnConfiguration();
     Configuration conf = new YarnConfiguration();
@@ -386,12 +388,17 @@ public class TestResourceLocalizationService {
         isA(Path.class), isA(FsPermission.class), anyBoolean());
         isA(Path.class), isA(FsPermission.class), anyBoolean());
 
 
     List<Path> localDirs = new ArrayList<Path>();
     List<Path> localDirs = new ArrayList<Path>();
-    String[] sDirs = new String[4];
-    for (int i = 0; i < 4; ++i) {
-      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
-      sDirs[i] = localDirs.get(i).toString();
-    }
+    String[] sDirs = new String[1];
+    // Making sure that we have only one local disk so that it will only be
+    // selected for consecutive resource localization calls.  This is required
+    // to test LocalCacheDirectoryManager.
+    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
+    sDirs[0] = localDirs.get(0).toString();
+
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    // Adding configuration to make sure there is only one file per
+    // directory
+    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
     String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
     String logDir = lfs.makeQualified(new Path(basedir, "logdir " )).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     DrainDispatcher dispatcher = new DrainDispatcher();
     DrainDispatcher dispatcher = new DrainDispatcher();
@@ -452,12 +459,23 @@ public class TestResourceLocalizationService {
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
           anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
           anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
-      final LocalResource resource = getPrivateMockedResource(r);
-      final LocalResourceRequest req = new LocalResourceRequest(resource);
+      final LocalResource resource1 = getPrivateMockedResource(r);
+      LocalResource resource2 = null;
+      do {
+        resource2 = getPrivateMockedResource(r);
+      } while (resource2 == null || resource2.equals(resource1));
+      // above call to make sure we don't get identical resources.
+      
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
         new HashMap<LocalResourceVisibility, 
         new HashMap<LocalResourceVisibility, 
                     Collection<LocalResourceRequest>>();
                     Collection<LocalResourceRequest>>();
-      rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+      List<LocalResourceRequest> privateResourceList =
+          new ArrayList<LocalResourceRequest>();
+      privateResourceList.add(req1);
+      privateResourceList.add(req2);
+      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
       // Sigh. Thread init of private localizer not accessible
       Thread.sleep(1000);
       Thread.sleep(1000);
@@ -471,33 +489,64 @@ public class TestResourceLocalizationService {
       Path localizationTokenPath = tokenPathCaptor.getValue();
       Path localizationTokenPath = tokenPathCaptor.getValue();
 
 
       // heartbeat from localizer
       // heartbeat from localizer
-      LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
       LocalizerStatus stat = mock(LocalizerStatus.class);
       LocalizerStatus stat = mock(LocalizerStatus.class);
       when(stat.getLocalizerId()).thenReturn(ctnrStr);
       when(stat.getLocalizerId()).thenReturn(ctnrStr);
-      when(rsrcStat.getResource()).thenReturn(resource);
-      when(rsrcStat.getLocalSize()).thenReturn(4344L);
+      when(rsrcStat1.getResource()).thenReturn(resource1);
+      when(rsrcStat2.getResource()).thenReturn(resource2);
+      when(rsrcStat1.getLocalSize()).thenReturn(4344L);
+      when(rsrcStat2.getLocalSize()).thenReturn(2342L);
       URL locPath = getPath("/cache/private/blah");
       URL locPath = getPath("/cache/private/blah");
-      when(rsrcStat.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrcStat1.getLocalPath()).thenReturn(locPath);
+      when(rsrcStat2.getLocalPath()).thenReturn(locPath);
+      when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
       when(stat.getResources())
       when(stat.getResources())
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
-        .thenReturn(Collections.singletonList(rsrcStat))
+        .thenReturn(Collections.singletonList(rsrcStat1))
+        .thenReturn(Collections.singletonList(rsrcStat2))
         .thenReturn(Collections.<LocalResourceStatus>emptyList());
         .thenReturn(Collections.<LocalResourceStatus>emptyList());
 
 
-      // get rsrc
+      String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
+          Path.SEPARATOR + "user0" + Path.SEPARATOR +
+          ContainerLocalizer.FILECACHE;
+      
+      // get first resource
       LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
       LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req1,
+        new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
+      URL localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      assertTrue(localizedPath.getFile().endsWith(localPath));
+
+      // get second resource
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
+        .get(0).getResource()));
+      localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      // Resource's destination path should be now inside sub directory 0 as
+      // LocalCacheDirectoryManager will be used and we have restricted number
+      // of files per directory to 1.
+      assertTrue(localizedPath.getFile().endsWith(
+        localPath + Path.SEPARATOR + "0"));
 
 
       // empty rsrc
       // empty rsrc
       response = spyService.heartbeat(stat);
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(0, response.getAllResources().size());
+      assertEquals(0, response.getResourceSpecs().size());
 
 
       // get shutdown
       // get shutdown
       response = spyService.heartbeat(stat);
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
 
 
+
+      dispatcher.await();
       // verify container notification
       // verify container notification
       ArgumentMatcher<ContainerEvent> matchesContainerLoc =
       ArgumentMatcher<ContainerEvent> matchesContainerLoc =
         new ArgumentMatcher<ContainerEvent>() {
         new ArgumentMatcher<ContainerEvent>() {
@@ -508,9 +557,9 @@ public class TestResourceLocalizationService {
               && c.getContainerID() == evt.getContainerID();
               && c.getContainerID() == evt.getContainerID();
           }
           }
         };
         };
-      dispatcher.await();
-      verify(containerBus).handle(argThat(matchesContainerLoc));
-      
+      // total 2 resource localzation calls. one for each resource.
+      verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+        
       // Verify deletion of localization token.
       // Verify deletion of localization token.
       verify(delService).delete((String)isNull(), eq(localizationTokenPath));
       verify(delService).delete((String)isNull(), eq(localizationTokenPath));
     } finally {
     } finally {