Jelajahi Sumber

HADOOP-9. Use roulette scheduling for temporary space when the size
is not known. (Ari Rabkin via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@685320 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 tahun lalu
induk
melakukan
3358fb5b09

+ 5 - 2
CHANGES.txt

@@ -157,8 +157,11 @@ Trunk (unreleased changes)
     the end of the heartbeat rpc, rather than the start. This causes better
     the end of the heartbeat rpc, rather than the start. This causes better
     behavior if the JobTracker is overloaded. (acmurthy via omalley)
     behavior if the JobTracker is overloaded. (acmurthy via omalley)
 
 
-		HADOOP-3853. Move multiple input format (HADOOP-372) extension to 
-		library package. (tomwhite via johan)
+    HADOOP-3853. Move multiple input format (HADOOP-372) extension to 
+    library package. (tomwhite via johan)
+
+    HADOOP-9. Use roulette scheduling for temporary space when the size
+    is not known. (Ari Rabkin via omalley)
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 

+ 37 - 15
src/core/org/apache/hadoop/fs/LocalDirAllocator.java

@@ -267,18 +267,22 @@ public class LocalDirAllocator {
     }
     }
     
     
     /** Get a path from the local FS. This method should be used if the size of 
     /** Get a path from the local FS. This method should be used if the size of 
-     *  the file is not known apriori. We go round-robin over the set of disks
-     *  (via the configured dirs) and return the first complete path where
-     *  we could create the parent directory of the passed path. 
+     *  the file is not known a priori. 
+     *  
+     *  It will use roulette selection, picking directories
+     *  with probability proportional to their available space. 
      */
      */
     public synchronized Path getLocalPathForWrite(String path, 
     public synchronized Path getLocalPathForWrite(String path, 
         Configuration conf) throws IOException {
         Configuration conf) throws IOException {
       return getLocalPathForWrite(path, -1, conf);
       return getLocalPathForWrite(path, -1, conf);
     }
     }
 
 
-    /** Get a path from the local FS. Pass size as -1 if not known apriori. We
+    /** Get a path from the local FS. If size is known, we go
      *  round-robin over the set of disks (via the configured dirs) and return
      *  round-robin over the set of disks (via the configured dirs) and return
-     *  the first complete path which has enough space 
+     *  the first complete path which has enough space.
+     *  
+     *  If size is not known, use roulette selection -- pick directories
+     *  with probability proportional to their available space.
      */
      */
     public synchronized Path getLocalPathForWrite(String pathStr, long size, 
     public synchronized Path getLocalPathForWrite(String pathStr, long size, 
         Configuration conf) throws IOException {
         Configuration conf) throws IOException {
@@ -291,20 +295,38 @@ public class LocalDirAllocator {
         pathStr = pathStr.substring(1);
         pathStr = pathStr.substring(1);
       }
       }
       Path returnPath = null;
       Path returnPath = null;
-      while (numDirsSearched < numDirs && returnPath == null) {
-        if (size >= 0) {
+      
+      if(size == -1) {  //do roulette selection: pick dir with probability 
+                    //proportional to available size
+        long[] availableOnDisk = new long[dirDF.length];
+        long totalAvailable = 0;
+        
+            //build the "roulette wheel"
+        for(int i =0; i < dirDF.length; ++i) {
+          availableOnDisk[i] = dirDF[i].getAvailable();
+          totalAvailable += availableOnDisk[i];
+        }
+            // "roll the ball" -- pick a directory
+        Random r = new java.util.Random();
+        long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
+        int dir=0;
+        while(randomPosition > availableOnDisk[dir]) {
+          randomPosition -= availableOnDisk[dir];
+          dir++;
+        }
+        dirNumLastAccessed = dir;
+        returnPath = createPath(pathStr);
+      } else {
+        while (numDirsSearched < numDirs && returnPath == null) {
           long capacity = dirDF[dirNumLastAccessed].getAvailable();
           long capacity = dirDF[dirNumLastAccessed].getAvailable();
           if (capacity > size) {
           if (capacity > size) {
             returnPath = createPath(pathStr);
             returnPath = createPath(pathStr);
           }
           }
-        } else {
-          returnPath = createPath(pathStr);
-        }
-        dirNumLastAccessed++;
-        dirNumLastAccessed = dirNumLastAccessed % numDirs; 
-        numDirsSearched++;
-      } 
-
+          dirNumLastAccessed++;
+          dirNumLastAccessed = dirNumLastAccessed % numDirs; 
+          numDirsSearched++;
+        } 
+      }
       if (returnPath != null) {
       if (returnPath != null) {
         return returnPath;
         return returnPath;
       }
       }

+ 55 - 7
src/test/org/apache/hadoop/fs/TestLocalDirAllocator.java

@@ -39,10 +39,12 @@ public class TestLocalDirAllocator extends TestCase {
   final static private File BUFFER_ROOT = new File(BUFFER_DIR_ROOT);
   final static private File BUFFER_ROOT = new File(BUFFER_DIR_ROOT);
   final static private String BUFFER_DIR[] = new String[] {
   final static private String BUFFER_DIR[] = new String[] {
     BUFFER_DIR_ROOT+"/tmp0",  BUFFER_DIR_ROOT+"/tmp1", BUFFER_DIR_ROOT+"/tmp2",
     BUFFER_DIR_ROOT+"/tmp0",  BUFFER_DIR_ROOT+"/tmp1", BUFFER_DIR_ROOT+"/tmp2",
-    BUFFER_DIR_ROOT+"/tmp3", BUFFER_DIR_ROOT+"/tmp4"};
+    BUFFER_DIR_ROOT+"/tmp3", BUFFER_DIR_ROOT+"/tmp4", BUFFER_DIR_ROOT+"/tmp5",
+    BUFFER_DIR_ROOT+"/tmp6"};
   final static private Path BUFFER_PATH[] = new Path[] {
   final static private Path BUFFER_PATH[] = new Path[] {
     new Path(BUFFER_DIR[0]), new Path(BUFFER_DIR[1]), new Path(BUFFER_DIR[2]),
     new Path(BUFFER_DIR[0]), new Path(BUFFER_DIR[1]), new Path(BUFFER_DIR[2]),
-    new Path(BUFFER_DIR[3]), new Path(BUFFER_DIR[4])};
+    new Path(BUFFER_DIR[3]), new Path(BUFFER_DIR[4]), new Path(BUFFER_DIR[5]),
+    new Path(BUFFER_DIR[6])};
   final static private String CONTEXT = "dfs.client.buffer.dir";
   final static private String CONTEXT = "dfs.client.buffer.dir";
   final static private String FILENAME = "block";
   final static private String FILENAME = "block";
   final static private LocalDirAllocator dirAllocator = 
   final static private LocalDirAllocator dirAllocator = 
@@ -50,7 +52,7 @@ public class TestLocalDirAllocator extends TestCase {
   static LocalFileSystem localFs;
   static LocalFileSystem localFs;
   final static private boolean isWindows =
   final static private boolean isWindows =
     System.getProperty("os.name").startsWith("Windows");
     System.getProperty("os.name").startsWith("Windows");
-  
+  final static int SMALL_FILE_SIZE = 100;
   static {
   static {
     try {
     try {
       localFs = FileSystem.getLocal(conf);
       localFs = FileSystem.getLocal(conf);
@@ -68,7 +70,7 @@ public class TestLocalDirAllocator extends TestCase {
   }
   }
   
   
   private void validateTempDirCreation(int i) throws IOException {
   private void validateTempDirCreation(int i) throws IOException {
-    File result = createTempFile();
+    File result = createTempFile(SMALL_FILE_SIZE);
     assertTrue("Checking for " + BUFFER_DIR[i] + " in " + result + " - FAILED!", 
     assertTrue("Checking for " + BUFFER_DIR[i] + " in " + result + " - FAILED!", 
         result.getPath().startsWith(new File(BUFFER_DIR[i], FILENAME).getPath()));
         result.getPath().startsWith(new File(BUFFER_DIR[i], FILENAME).getPath()));
   }
   }
@@ -78,6 +80,13 @@ public class TestLocalDirAllocator extends TestCase {
     result.delete();
     result.delete();
     return result;
     return result;
   }
   }
+  
+  private File createTempFile(long size) throws IOException {
+    File result = dirAllocator.createTmpFileForWrite(FILENAME, size, conf);
+    result.delete();
+    return result;
+  }
+  
   /** Two buffer dirs. The first dir does not exist & is on a read-only disk; 
   /** Two buffer dirs. The first dir does not exist & is on a read-only disk; 
    * The second dir exists & is RW
    * The second dir exists & is RW
    * @throws Exception
    * @throws Exception
@@ -122,7 +131,7 @@ public class TestLocalDirAllocator extends TestCase {
       conf.set(CONTEXT, BUFFER_DIR[2]+","+BUFFER_DIR[3]);
       conf.set(CONTEXT, BUFFER_DIR[2]+","+BUFFER_DIR[3]);
 
 
       // create the first file, and then figure the round-robin sequence
       // create the first file, and then figure the round-robin sequence
-      createTempFile();
+      createTempFile(SMALL_FILE_SIZE);
       int firstDirIdx = (dirAllocator.getCurrentDirectoryIndex() == 0) ? 2 : 3;
       int firstDirIdx = (dirAllocator.getCurrentDirectoryIndex() == 0) ? 2 : 3;
       int secondDirIdx = (firstDirIdx == 2) ? 3 : 2;
       int secondDirIdx = (firstDirIdx == 2) ? 3 : 2;
       
       
@@ -146,8 +155,8 @@ public class TestLocalDirAllocator extends TestCase {
       assertTrue(localFs.mkdirs(BUFFER_PATH[3]));
       assertTrue(localFs.mkdirs(BUFFER_PATH[3]));
       assertTrue(localFs.mkdirs(BUFFER_PATH[4]));
       assertTrue(localFs.mkdirs(BUFFER_PATH[4]));
       
       
-      // create the first file, and then figure the round-robin sequence
-      createTempFile();
+      // create the first file with size, and then figure the round-robin sequence
+      createTempFile(SMALL_FILE_SIZE);
 
 
       int nextDirIdx = (dirAllocator.getCurrentDirectoryIndex() == 0) ? 3 : 4;
       int nextDirIdx = (dirAllocator.getCurrentDirectoryIndex() == 0) ? 3 : 4;
       validateTempDirCreation(nextDirIdx);
       validateTempDirCreation(nextDirIdx);
@@ -160,4 +169,43 @@ public class TestLocalDirAllocator extends TestCase {
       rmBufferDirs();
       rmBufferDirs();
     }
     }
   }
   }
+  
+  /**
+   * Two buffer dirs, on read-write disk.
+   * 
+   * Try to create a whole bunch of files.
+   *  Verify that they do indeed all get created where they should.
+   *  
+   *  Would ideally check statistical properties of distribution, but
+   *  we don't have the nerve to risk false-positives here.
+   * 
+   * @throws Exception
+   */
+  static final int TRIALS = 100;
+  public void test4() throws Exception {
+    if (isWindows) return;
+    try {
+
+      conf.set(CONTEXT, BUFFER_DIR[5]+","+BUFFER_DIR[6]);
+      assertTrue(localFs.mkdirs(BUFFER_PATH[5]));
+      assertTrue(localFs.mkdirs(BUFFER_PATH[6]));
+        
+      int inDir5=0, inDir6=0;
+      for(int i = 0; i < TRIALS; ++i) {
+        File result = createTempFile();
+        if(result.getPath().startsWith(new File(BUFFER_DIR[5], FILENAME).getPath())) {
+          inDir5++;
+        } else  if(result.getPath().startsWith(new File(BUFFER_DIR[6], FILENAME).getPath())) {
+          inDir6++;
+        }
+        result.delete();
+      }
+      
+      assertTrue( inDir5 + inDir6 == TRIALS);
+        
+    } finally {
+      rmBufferDirs();
+    }
+  }
+  
 }
 }