Parcourir la source

YARN-3304. Addendum patch. Cleaning up ResourceCalculatorProcessTree APIs for public use and removing inconsistencies in the default values. (Junping Du and Karthik Kambatla via vinodkv)

Vinod Kumar Vavilapalli il y a 10 ans
Parent
commit
7610925e90

+ 52 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java

@@ -344,15 +344,23 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
   public long getVirtualMemorySize(int olderThanAge) {
     long total = UNAVAILABLE;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) && (p.getAge() > olderThanAge)) {
+      if (p != null) {
         if (total == UNAVAILABLE ) {
           total = 0;
         }
-        total += p.getVmem();
+        if (p.getAge() > olderThanAge) {
+          total += p.getVmem();
+        }
       }
     }
     return total;
   }
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public long getCumulativeVmem(int olderThanAge) {
+    return getVirtualMemorySize(olderThanAge);
+  }
 
   @Override
   public long getRssMemorySize(int olderThanAge) {
@@ -365,13 +373,21 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
     boolean isAvailable = false;
     long totalPages = 0;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) && (p.getAge() > olderThanAge)) {
-        totalPages += p.getRssmemPage();
+      if ((p != null) ) {
+        if (p.getAge() > olderThanAge) {
+          totalPages += p.getRssmemPage();
+        }
         isAvailable = true;
       }
     }
     return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
   }
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public long getCumulativeRssmem(int olderThanAge) {
+    return getRssMemorySize(olderThanAge);
+  }
 
   /**
    * Get the resident set size (RSS) memory used by all the processes
@@ -388,36 +404,42 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
   private long getSmapBasedRssMemorySize(int olderThanAge) {
     long total = UNAVAILABLE;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) && (p.getAge() > olderThanAge)) {
-        ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid());
-        if (procMemInfo != null) {
-          for (ProcessSmapMemoryInfo info : procMemInfo.getMemoryInfoList()) {
-            // Do not account for r--s or r-xs mappings
-            if (info.getPermission().trim()
-              .equalsIgnoreCase(READ_ONLY_WITH_SHARED_PERMISSION)
-                || info.getPermission().trim()
-                  .equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) {
-              continue;
-            }
-            if (total == UNAVAILABLE){
-              total = 0;
-            }
-            total +=
-                Math.min(info.sharedDirty, info.pss) + info.privateDirty
-                    + info.privateClean;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(" total(" + olderThanAge + "): PID : " + p.getPid()
-                  + ", SharedDirty : " + info.sharedDirty + ", PSS : "
-                  + info.pss + ", Private_Dirty : " + info.privateDirty
-                  + ", Private_Clean : " + info.privateClean + ", total : "
-                  + (total * KB_TO_BYTES));
+      if (p != null) {
+        // set resource to 0 instead of UNAVAILABLE
+        if (total == UNAVAILABLE){
+          total = 0;
+        }
+        if (p.getAge() > olderThanAge) {
+          ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid());
+          if (procMemInfo != null) {
+            for (ProcessSmapMemoryInfo info : procMemInfo.getMemoryInfoList()) {
+              // Do not account for r--s or r-xs mappings
+              if (info.getPermission().trim()
+                .equalsIgnoreCase(READ_ONLY_WITH_SHARED_PERMISSION)
+                  || info.getPermission().trim()
+                    .equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) {
+                continue;
+              }
+
+              total +=
+                  Math.min(info.sharedDirty, info.pss) + info.privateDirty
+                      + info.privateClean;
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(" total(" + olderThanAge + "): PID : " + p.getPid()
+                    + ", SharedDirty : " + info.sharedDirty + ", PSS : "
+                    + info.pss + ", Private_Dirty : " + info.privateDirty
+                    + ", Private_Clean : " + info.privateClean + ", total : "
+                    + (total * KB_TO_BYTES));
+              }
             }
           }
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(procMemInfo.toString());
+        
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(procMemInfo.toString());
+          }
         }
       }
+      
     }
     if (total > 0) {
       total *= KB_TO_BYTES; // convert to bytes

+ 54 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java

@@ -78,6 +78,18 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
   public long getVirtualMemorySize() {
     return getVirtualMemorySize(0);
   }
+  
+  /**
+   * Get the virtual memory used by all the processes in the
+   * process-tree.
+   *
+   * @return virtual memory used by the process-tree in bytes,
+   * {@link #UNAVAILABLE} if it cannot be calculated.
+   */
+  @Deprecated
+  public long getCumulativeVmem() {
+    return getCumulativeVmem(0);
+  }
 
   /**
    * Get the resident set size (rss) memory used by all the processes
@@ -89,6 +101,18 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
   public long getRssMemorySize() {
     return getRssMemorySize(0);
   }
+  
+  /**
+   * Get the resident set size (rss) memory used by all the processes
+   * in the process-tree.
+   *
+   * @return rss memory used by the process-tree in bytes,
+   * {@link #UNAVAILABLE} if it cannot be calculated.
+   */
+  @Deprecated
+  public long getCumulativeRssmem() {
+    return getCumulativeRssmem(0);
+  }
 
   /**
    * Get the virtual memory used by all the processes in the
@@ -103,6 +127,21 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
   public long getVirtualMemorySize(int olderThanAge) {
     return UNAVAILABLE;
   }
+  
+  /**
+   * Get the virtual memory used by all the processes in the
+   * process-tree that are older than the passed in age.
+   *
+   * @param olderThanAge processes above this age are included in the
+   *                     memory addition
+   * @return virtual memory used by the process-tree in bytes for
+   * processes older than the specified age, {@link #UNAVAILABLE} if it
+   * cannot be calculated.
+   */
+  @Deprecated
+  public long getCumulativeVmem(int olderThanAge) {
+    return UNAVAILABLE;
+  }
 
   /**
    * Get the resident set size (rss) memory used by all the processes
@@ -117,6 +156,21 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
   public long getRssMemorySize(int olderThanAge) {
     return UNAVAILABLE;
   }
+  
+  /**
+   * Get the resident set size (rss) memory used by all the processes
+   * in the process-tree that are older than the passed in age.
+   *
+   * @param olderThanAge processes above this age are included in the
+   *                     memory addition
+   * @return rss memory used by the process-tree in bytes for
+   * processes older than specified age, {@link #UNAVAILABLE} if it cannot be
+   * calculated.
+   */
+  @Deprecated
+  public long getCumulativeRssmem(int olderThanAge) {
+    return UNAVAILABLE;
+  }
 
   /**
    * Get the CPU time in millisecond used by all the processes in the
@@ -158,7 +212,6 @@ public abstract class ResourceCalculatorProcessTree extends Configured {
    * @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree
    *         is not available for this system.
    */
-  @Private
   public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
     String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {
 

+ 20 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java

@@ -176,29 +176,45 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
   public long getVirtualMemorySize(int olderThanAge) {
     long total = UNAVAILABLE;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) && (p.age > olderThanAge)) {
+      if (p != null) {
         if (total == UNAVAILABLE) {
           total = 0;
         }
-        total += p.vmem;
+        if (p.age > olderThanAge) {
+          total += p.vmem;
+        }
       }
     }
     return total;
   }
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public long getCumulativeVmem(int olderThanAge) {
+    return getVirtualMemorySize(olderThanAge);
+  }
 
   @Override
   public long getRssMemorySize(int olderThanAge) {
     long total = UNAVAILABLE;
     for (ProcessInfo p : processTree.values()) {
-      if ((p != null) && (p.age > olderThanAge)) {
+      if (p != null) {
         if (total == UNAVAILABLE) {
           total = 0;
         }
-        total += p.workingSet;
+        if (p.age > olderThanAge) {
+          total += p.workingSet;
+        }
       }
     }
     return total;
   }
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public long getCumulativeRssmem(int olderThanAge) {
+    return getRssMemorySize(olderThanAge);
+  }
 
   @Override
   public long getCumulativeCpuTime() {

+ 86 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java

@@ -117,6 +117,7 @@ public class TestProcfsBasedProcessTree {
   }
 
   @Test(timeout = 30000)
+  @SuppressWarnings("deprecation")
   public void testProcessTree() throws Exception {
     try {
       Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
@@ -226,9 +227,13 @@ public class TestProcfsBasedProcessTree {
     // ProcessTree is gone now. Any further calls should be sane.
     p.updateProcessTree();
     Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+    
     Assert.assertTrue(
       "vmem for the gone-process is " + p.getVirtualMemorySize()
           + " . It should be zero.", p.getVirtualMemorySize() == 0);
+    Assert.assertTrue(
+      "vmem (old API) for the gone-process is " + p.getCumulativeVmem()
+          + " . It should be zero.", p.getCumulativeVmem() == 0);
     Assert.assertTrue(p.toString().equals("[ ]"));
   }
 
@@ -385,6 +390,7 @@ public class TestProcfsBasedProcessTree {
    *           files.
    */
   @Test(timeout = 30000)
+  @SuppressWarnings("deprecation")
   public void testCpuAndMemoryForProcessTree() throws IOException {
 
     // test processes
@@ -437,9 +443,13 @@ public class TestProcfsBasedProcessTree {
       // verify rss memory
       long cumuRssMem =
           ProcfsBasedProcessTree.PAGE_SIZE > 0
-              ? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+              ? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 
+                  ResourceCalculatorProcessTree.UNAVAILABLE;
       Assert.assertEquals("rss memory does not match", cumuRssMem,
         processTree.getRssMemorySize());
+      // verify old API
+      Assert.assertEquals("rss memory (old API) does not match", cumuRssMem,
+        processTree.getCumulativeRssmem());
 
       // verify cumulative cpu time
       long cumuCpuTime =
@@ -459,6 +469,9 @@ public class TestProcfsBasedProcessTree {
       // r--s)
       Assert.assertEquals("rss memory does not match",
         (100 * KB_TO_BYTES * 3), processTree.getRssMemorySize());
+      // verify old API
+      Assert.assertEquals("rss memory (old API) does not match",
+        (100 * KB_TO_BYTES * 3), processTree.getCumulativeRssmem());
 
       // test the cpu time again to see if it cumulates
       procInfos[0] =
@@ -524,6 +537,7 @@ public class TestProcfsBasedProcessTree {
     testMemForOlderProcesses(true);
   }
 
+  @SuppressWarnings("deprecation")
   private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
     // initial list of processes
     String[] pids = { "100", "200", "300", "400" };
@@ -565,8 +579,12 @@ public class TestProcfsBasedProcessTree {
       setSmapsInProceTree(processTree, smapEnabled);
 
       // verify virtual memory
-      Assert.assertEquals("Cumulative memory does not match", 700000L,
+      Assert.assertEquals("Virtual memory does not match", 700000L,
         processTree.getVirtualMemorySize());
+      
+      Assert.assertEquals("Virtual memory (old API) does not match", 700000L,
+        processTree.getCumulativeVmem());
+
       // write one more process as child of 100.
       String[] newPids = { "500" };
       setupPidDirs(procfsRootDir, newPids);
@@ -584,32 +602,58 @@ public class TestProcfsBasedProcessTree {
       processTree.updateProcessTree();
       Assert.assertEquals("vmem does not include new process",
         1200000L, processTree.getVirtualMemorySize());
+      
+      Assert.assertEquals("vmem (old API) does not include new process",
+        1200000L, processTree.getCumulativeVmem());
+      
       if (!smapEnabled) {
         long cumuRssMem =
             ProcfsBasedProcessTree.PAGE_SIZE > 0
-                ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+                ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 
+                    ResourceCalculatorProcessTree.UNAVAILABLE;
         Assert.assertEquals("rssmem does not include new process",
           cumuRssMem, processTree.getRssMemorySize());
+        // verify old API
+        Assert.assertEquals("rssmem (old API) does not include new process",
+          cumuRssMem, processTree.getCumulativeRssmem());
       } else {
         Assert.assertEquals("rssmem does not include new process",
           100 * KB_TO_BYTES * 4, processTree.getRssMemorySize());
+        // verify old API
+        Assert.assertEquals("rssmem (old API) does not include new process",
+          100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem());
       }
 
       // however processes older than 1 iteration will retain the older value
       Assert.assertEquals(
         "vmem shouldn't have included new process", 700000L,
         processTree.getVirtualMemorySize(1));
+      // verify old API
+      Assert.assertEquals(
+          "vmem (old API) shouldn't have included new process", 700000L,
+          processTree.getCumulativeVmem(1));
+      
       if (!smapEnabled) {
         long cumuRssMem =
             ProcfsBasedProcessTree.PAGE_SIZE > 0
-                ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+                ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 
+                    ResourceCalculatorProcessTree.UNAVAILABLE;
         Assert.assertEquals(
           "rssmem shouldn't have included new process", cumuRssMem,
           processTree.getRssMemorySize(1));
+        // Verify old API
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new process", cumuRssMem,
+          processTree.getCumulativeRssmem(1));
+        
       } else {
         Assert.assertEquals(
           "rssmem shouldn't have included new process",
           100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(1));
+        // Verify old API
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new process",
+          100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(1));
       }
 
       // one more process
@@ -632,17 +676,32 @@ public class TestProcfsBasedProcessTree {
       Assert.assertEquals(
         "vmem shouldn't have included new processes", 700000L,
         processTree.getVirtualMemorySize(2));
+      
+      // verify old API
+      Assert.assertEquals(
+        "vmem (old API) shouldn't have included new processes", 700000L,
+        processTree.getCumulativeVmem(2));
+      
       if (!smapEnabled) {
         long cumuRssMem =
             ProcfsBasedProcessTree.PAGE_SIZE > 0
-                ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+                ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 
+                    ResourceCalculatorProcessTree.UNAVAILABLE;
         Assert.assertEquals(
           "rssmem shouldn't have included new processes",
           cumuRssMem, processTree.getRssMemorySize(2));
+        // Verify old API
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new processes",
+          cumuRssMem, processTree.getCumulativeRssmem(2));
       } else {
         Assert.assertEquals(
           "rssmem shouldn't have included new processes",
           100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(2));
+        // Verify old API
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new processes",
+          100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(2));
       }
 
       // processes older than 1 iteration should not include new process,
@@ -650,29 +709,46 @@ public class TestProcfsBasedProcessTree {
       Assert.assertEquals(
         "vmem shouldn't have included new processes", 1200000L,
         processTree.getVirtualMemorySize(1));
+      // verify old API
+      Assert.assertEquals(
+        "vmem (old API) shouldn't have included new processes", 1200000L,
+        processTree.getCumulativeVmem(1));
       if (!smapEnabled) {
         long cumuRssMem =
             ProcfsBasedProcessTree.PAGE_SIZE > 0
-                ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+                ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 
+                    ResourceCalculatorProcessTree.UNAVAILABLE;
         Assert.assertEquals(
           "rssmem shouldn't have included new processes",
           cumuRssMem, processTree.getRssMemorySize(1));
+        // verify old API
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new processes",
+          cumuRssMem, processTree.getCumulativeRssmem(1));
       } else {
         Assert.assertEquals(
           "rssmem shouldn't have included new processes",
           100 * KB_TO_BYTES * 4, processTree.getRssMemorySize(1));
+        Assert.assertEquals(
+          "rssmem (old API) shouldn't have included new processes",
+          100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem(1));
       }
 
       // no processes older than 3 iterations
       Assert.assertEquals(
           "Getting non-zero vmem for processes older than 3 iterations",
-          UNAVAILABLE, processTree.getVirtualMemorySize(3));
+          0, processTree.getVirtualMemorySize(3));
+      // verify old API
       Assert.assertEquals(
-          "Getting non-zero rssmem for processes older than 3 iterations",
-          UNAVAILABLE, processTree.getRssMemorySize(3));
+          "Getting non-zero vmem (old API) for processes older than 3 iterations",
+          0, processTree.getCumulativeVmem(3));
       Assert.assertEquals(
           "Getting non-zero rssmem for processes older than 3 iterations",
-          UNAVAILABLE, processTree.getRssMemorySize(3));
+          0, processTree.getRssMemorySize(3));
+      // verify old API
+      Assert.assertEquals(
+          "Getting non-zero rssmem (old API) for processes older than 3 iterations",
+          0, processTree.getCumulativeRssmem(3));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestResourceCalculatorProcessTree.java

@@ -44,10 +44,20 @@ public class TestResourceCalculatorProcessTree {
     public long getRssMemorySize(int age) {
       return 0;
     }
+    
+    @SuppressWarnings("deprecation")
+    public long getCumulativeRssmem(int age) {
+      return 0;
+    }
 
     public long getVirtualMemorySize(int age) {
       return 0;
     }
+    
+    @SuppressWarnings("deprecation")
+    public long getCumulativeVmem(int age) {
+      return 0;
+    }
 
     public long getCumulativeCpuTime() {
       return 0;

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java

@@ -41,6 +41,7 @@ public class TestWindowsBasedProcessTree {
   }
 
   @Test (timeout = 30000)
+  @SuppressWarnings("deprecation")
   public void tree() {
     if( !Shell.WINDOWS) {
       LOG.info("Platform not Windows. Not testing");
@@ -49,30 +50,42 @@ public class TestWindowsBasedProcessTree {
     assertTrue("WindowsBasedProcessTree should be available on Windows", 
                WindowsBasedProcessTree.isAvailable());
     
-    
     WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1");
     pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n";
     pTree.updateProcessTree();
     assertTrue(pTree.getVirtualMemorySize() == 2048);
+    assertTrue(pTree.getCumulativeVmem() == 2048);
     assertTrue(pTree.getVirtualMemorySize(0) == 2048);
+    assertTrue(pTree.getCumulativeVmem(0) == 2048);
+    
     assertTrue(pTree.getRssMemorySize() == 2048);
+    assertTrue(pTree.getCumulativeRssmem() == 2048);
     assertTrue(pTree.getRssMemorySize(0) == 2048);
+    assertTrue(pTree.getCumulativeRssmem(0) == 2048);
     assertTrue(pTree.getCumulativeCpuTime() == 1000);
 
     pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n";
     pTree.updateProcessTree();
     assertTrue(pTree.getVirtualMemorySize() == 3072);
+    assertTrue(pTree.getCumulativeVmem() == 3072);
     assertTrue(pTree.getVirtualMemorySize(1) == 2048);
+    assertTrue(pTree.getCumulativeVmem(1) == 2048);
     assertTrue(pTree.getRssMemorySize() == 3072);
+    assertTrue(pTree.getCumulativeRssmem() == 3072);
     assertTrue(pTree.getRssMemorySize(1) == 2048);
+    assertTrue(pTree.getCumulativeRssmem(1) == 2048);
     assertTrue(pTree.getCumulativeCpuTime() == 3000);
 
     pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n";
     pTree.updateProcessTree();
     assertTrue(pTree.getVirtualMemorySize() == 2048);
+    assertTrue(pTree.getCumulativeVmem() == 2048);
     assertTrue(pTree.getVirtualMemorySize(2) == 2048);
+    assertTrue(pTree.getCumulativeVmem(2) == 2048);
     assertTrue(pTree.getRssMemorySize() == 2048);
+    assertTrue(pTree.getCumulativeRssmem() == 2048);
     assertTrue(pTree.getRssMemorySize(2) == 2048);
+    assertTrue(pTree.getCumulativeRssmem(2) == 2048);
     assertTrue(pTree.getCumulativeCpuTime() == 4000);
   }
 }