浏览代码

YARN-3090. DeletionService can silently ignore deletion task failures. Contributed by Varun Saxena

Jason Lowe 10 年之前
父节点
当前提交
4eb5f7fa32

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -527,6 +527,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2971. RM uses conf instead of token service address to renew timeline
     delegation tokens (jeagles)
 
+    YARN-3090. DeletionService can silently ignore deletion task failures
+    (Varun Saxena via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 35 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java

@@ -29,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -113,13 +115,13 @@ public class DeletionService extends AbstractService {
       .setNameFormat("DeletionService #%d")
       .build();
     if (conf != null) {
-      sched = new ScheduledThreadPoolExecutor(
-          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
-          tf);
+      sched = new DelServiceSchedThreadPoolExecutor(
+          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
+          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
       debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
     } else {
-      sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
-          tf);
+      sched = new DelServiceSchedThreadPoolExecutor(
+          YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
     }
     sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
@@ -155,6 +157,34 @@ public class DeletionService extends AbstractService {
     return getServiceState() == STATE.STOPPED && sched.isTerminated();
   }
 
+  private static class DelServiceSchedThreadPoolExecutor extends
+      ScheduledThreadPoolExecutor {
+    public DelServiceSchedThreadPoolExecutor(int corePoolSize,
+        ThreadFactory threadFactory) {
+      super(corePoolSize, threadFactory);
+    }
+
+    @Override
+    protected void afterExecute(Runnable task, Throwable exception) {
+      if (task instanceof FutureTask<?>) {
+        FutureTask<?> futureTask = (FutureTask<?>) task;
+        if (!futureTask.isCancelled()) {
+          try {
+            futureTask.get();
+          } catch (ExecutionException ee) {
+            exception = ee.getCause();
+          } catch (InterruptedException ie) {
+            exception = ie;
+          }
+        }
+      }
+      if (exception != null) {
+        LOG.error("Exception during execution of task in DeletionService",
+          exception);
+      }
+    }
+  }
+
   public static class FileDeletionTask implements Runnable {
     public static final int INVALID_TASK_ID = -1;
     private int taskId;