瀏覽代碼

HADOOP-997. Implement S3 retry mechanism for failed block transfers.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@510258 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 年之前
父節點
當前提交
0f7cbc0f85

+ 3 - 0
CHANGES.txt

@@ -95,6 +95,9 @@ Trunk (unreleased changes)
     
 28. HADOOP-1025. Remove some obsolete code in ipc.Server.  (cutting)
 
+29. HADOOP-997. Implement S3 retry mechanism for failed block
+    transfers. This includes a generic retry mechanism for use
+    elsewhere in Hadoop. (tomwhite)
 
 Release 0.11.2 - 2007-02-16
 

+ 25 - 11
conf/hadoop-default.xml

@@ -375,17 +375,31 @@ creations/deletions), or "all".</description>
 
 <property>
   <name>fs.s3.block.size</name>
-  <value>1048576</value>
-  <description>
-        Block size to use writing S3.  Note, the default block size for jets3t,
-        the library at the base of the S3 filesystem, is less.  Its 131072.
-        If S3 is having a bad day requiring retries, if the retry is at some
-        point after the end of jets3t RepeatableInputStream buffer and end of
-        the S3 block, the transfer will fail. To change the jets3t block size,
-        add a jets3t.properties file to $HADOOP_HOME/conf so it can be found
-        on the CLASSPATH and amend the 's3service.stream-retry-buffer-size'
-        property. See http://jets3t.s3.amazonaws.com/toolkit/configuration.html
-        for more on jets3t configurables.
+  <value>67108864</value>
+  <description>Block size to use when writing files to S3.</description>
+</property>
+
+<property>
+  <name>fs.s3.buffer.dir</name>
+  <value>${hadoop.tmp.dir}/s3</value>
+  <description>Determines where on the local filesystem the S3 filesystem
+  should store its blocks before it sends them to S3
+  or after it retrieves them from S3.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3.maxRetries</name>
+  <value>4</value>
+  <description>The maximum number of retries for reading or writing blocks to S3, 
+  before we signal failure to the application.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3.sleepTimeSeconds</name>
+  <value>10</value>
+  <description>The number of seconds to sleep between each S3 retry.
   </description>
 </property>
 

+ 4 - 1
src/java/org/apache/hadoop/fs/s3/Block.java

@@ -1,6 +1,9 @@
 package org.apache.hadoop.fs.s3;
 
-class Block {
+/**
+ * Holds metadata about a block of data being stored in a {@link FileSystemStore}.
+ */
+public class Block {
   private long id;
 
   private long length;

+ 8 - 5
src/java/org/apache/hadoop/fs/s3/FileSystemStore.java

@@ -1,25 +1,28 @@
 package org.apache.hadoop.fs.s3;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-interface FileSystemStore {
+/**
+ * A facility for storing and retrieving {@link INode}s and {@link Block}s.
+ */
+public interface FileSystemStore {
   
   void initialize(URI uri, Configuration conf) throws IOException;
 
   void storeINode(Path path, INode inode) throws IOException;
-  void storeBlock(Block block, InputStream in) throws IOException;
+  void storeBlock(Block block, File file) throws IOException;
   
   boolean inodeExists(Path path) throws IOException;
   boolean blockExists(long blockId) throws IOException;
 
-  INode getINode(Path path) throws IOException;
-  InputStream getBlockStream(Block block, long byteRangeStart) throws IOException;
+  INode retrieveINode(Path path) throws IOException;
+  File retrieveBlock(Block block, long byteRangeStart) throws IOException;
 
   void deleteINode(Path path) throws IOException;
   void deleteBlock(Block block) throws IOException;

+ 1 - 1
src/java/org/apache/hadoop/fs/s3/INode.java

@@ -11,7 +11,7 @@ import java.io.InputStream;
  * Holds file metadata including type (regular file, or directory),
  * and the list of blocks that are pointers to the data.
  */
-class INode {
+public class INode {
 	
   enum FileType {
     DIRECTORY, FILE

+ 73 - 7
src/java/org/apache/hadoop/fs/s3/Jets3tFileSystemStore.java

@@ -1,7 +1,14 @@
 package org.apache.hadoop.fs.s3;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URLDecoder;
@@ -24,11 +31,18 @@ class Jets3tFileSystemStore implements FileSystemStore {
   private static final String PATH_DELIMITER = urlEncode(Path.SEPARATOR);
   private static final String BLOCK_PREFIX = "block_";
 
+  private Configuration conf;
+  
   private S3Service s3Service;
 
   private S3Bucket bucket;
-
+  
+  private int bufferSize;
+  
   public void initialize(URI uri, Configuration conf) throws IOException {
+    
+    this.conf = conf;
+    
     try {
       String accessKey = null;
       String secretAccessKey = null;
@@ -78,6 +92,8 @@ class Jets3tFileSystemStore implements FileSystemStore {
     bucket = new S3Bucket(uri.getHost());
 
     createBucket(bucket.getName());
+    
+    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
   }  
 
   private void createBucket(String bucketName) throws IOException {
@@ -159,13 +175,47 @@ class Jets3tFileSystemStore implements FileSystemStore {
     }
   }
 
-  public INode getINode(Path path) throws IOException {
+  public INode retrieveINode(Path path) throws IOException {
     return INode.deserialize(get(pathToKey(path)));
   }
 
-  public InputStream getBlockStream(Block block, long byteRangeStart)
+  public File retrieveBlock(Block block, long byteRangeStart)
       throws IOException {
-    return get(blockToKey(block), byteRangeStart);
+    File fileBlock = null;
+    InputStream in = null;
+    OutputStream out = null;
+    try {
+      fileBlock = newBackupFile();
+      in = get(blockToKey(block), byteRangeStart);
+      out = new BufferedOutputStream(new FileOutputStream(fileBlock));
+      byte[] buf = new byte[bufferSize];
+      int numRead;
+      while ((numRead = in.read(buf)) >= 0) {
+        out.write(buf, 0, numRead);
+      }
+      return fileBlock;
+    } catch (IOException e) {
+      // close output stream to file then delete file
+      closeQuietly(out);
+      out = null; // to prevent a second close
+      if (fileBlock != null) {
+        fileBlock.delete();
+      }
+      throw e;
+    } finally {
+      closeQuietly(out);
+      closeQuietly(in);
+    }
+  }
+  
+  private File newBackupFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("input-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
   }
 
   public Set<Path> listSubPaths(Path path) throws IOException {
@@ -229,8 +279,24 @@ class Jets3tFileSystemStore implements FileSystemStore {
     put(pathToKey(path), inode.serialize(), inode.getSerializedLength());
   }
 
-  public void storeBlock(Block block, InputStream in) throws IOException {
-    put(blockToKey(block), in, block.getLength());
+  public void storeBlock(Block block, File file) throws IOException {
+    BufferedInputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      put(blockToKey(block), in, block.getLength());
+    } finally {
+      closeQuietly(in);
+    }    
+  }
+
+  private void closeQuietly(Closeable closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (IOException e) {
+        // ignore
+      }
+    }
   }
 
   private String pathToKey(Path path) {
@@ -296,7 +362,7 @@ class Jets3tFileSystemStore implements FileSystemStore {
       for (int i = 0; i < objects.length; i++) {
         Path path = keyToPath(objects[i].getKey());
         sb.append(path).append("\n");
-        INode m = getINode(path);
+        INode m = retrieveINode(path);
         sb.append("\t").append(m.getFileType()).append("\n");
         if (m.getFileType() == FileType.DIRECTORY) {
           continue;

+ 48 - 18
src/java/org/apache/hadoop/fs/s3/S3FileSystem.java

@@ -2,7 +2,10 @@ package org.apache.hadoop.fs.s3;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
@@ -10,6 +13,9 @@ import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -20,7 +26,7 @@ import org.apache.hadoop.util.Progressable;
  */
 public class S3FileSystem extends FileSystem {
 
-  private static final long DEFAULT_BLOCK_SIZE = 1 * 1024 * 1024;
+  private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
   
   private URI uri;
 
@@ -31,9 +37,9 @@ public class S3FileSystem extends FileSystem {
   private Path workingDir = new Path("/user", System.getProperty("user.name"));
 
   public S3FileSystem() {
-    this(new Jets3tFileSystemStore());
+    // set store in initialize()
   }
-
+  
   public S3FileSystem(FileSystemStore store) {
     this.store = store;
   }
@@ -45,12 +51,36 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public void initialize(URI uri, Configuration conf) throws IOException {
+    if (store == null) {
+      store = createDefaultStore(conf);
+    }
     store.initialize(uri, conf);
     setConf(conf);
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
     this.localFs = get(URI.create("file:///"), conf);
   }  
 
+  private static FileSystemStore createDefaultStore(Configuration conf) {
+    FileSystemStore store = new Jets3tFileSystemStore();
+    
+    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        conf.getInt("fs.s3.maxRetries", 4),
+        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(IOException.class, basePolicy);
+    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
+    
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("storeBlock", methodPolicy);
+    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
+    
+    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
+        store, methodNameToPolicyMap);
+  }
+  
   @Override
   public String getName() {
     return getUri().toString();
@@ -81,7 +111,7 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean mkdirs(Path path) throws IOException {
     Path absolutePath = makeAbsolute(path);
-    INode inode = store.getINode(absolutePath);
+    INode inode = store.retrieveINode(absolutePath);
     if (inode == null) {
       store.storeINode(absolutePath, INode.DIRECTORY_INODE);
     } else if (inode.isFile()) {
@@ -94,7 +124,7 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public boolean isDirectory(Path path) throws IOException {
-    INode inode = store.getINode(makeAbsolute(path));
+    INode inode = store.retrieveINode(makeAbsolute(path));
     if (inode == null) {
       return false;
     }
@@ -103,7 +133,7 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public boolean isFile(Path path) throws IOException {
-    INode inode = store.getINode(makeAbsolute(path));
+    INode inode = store.retrieveINode(makeAbsolute(path));
     if (inode == null) {
       return false;
     }
@@ -111,7 +141,7 @@ public class S3FileSystem extends FileSystem {
   }
 
   private INode checkFile(Path path) throws IOException {
-    INode inode = store.getINode(makeAbsolute(path));
+    INode inode = store.retrieveINode(makeAbsolute(path));
     if (inode == null) {
       throw new IOException("No such file.");
     }
@@ -124,7 +154,7 @@ public class S3FileSystem extends FileSystem {
   @Override
   public Path[] listPathsRaw(Path path) throws IOException {
     Path absolutePath = makeAbsolute(path);
-    INode inode = store.getINode(absolutePath);
+    INode inode = store.retrieveINode(absolutePath);
     if (inode == null) {
       return null;
     } else if (inode.isFile()) {
@@ -147,7 +177,7 @@ public class S3FileSystem extends FileSystem {
       short replication, long blockSize, Progressable progress)
       throws IOException {
 
-    INode inode = store.getINode(makeAbsolute(file));
+    INode inode = store.retrieveINode(makeAbsolute(file));
     if (inode != null) {
       if (overwrite) {
         deleteRaw(file);
@@ -175,16 +205,16 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean renameRaw(Path src, Path dst) throws IOException {
     Path absoluteSrc = makeAbsolute(src);
-    INode srcINode = store.getINode(absoluteSrc);
+    INode srcINode = store.retrieveINode(absoluteSrc);
     if (srcINode == null) {
       // src path doesn't exist
       return false; 
     }
     Path absoluteDst = makeAbsolute(dst);
-    INode dstINode = store.getINode(absoluteDst);
+    INode dstINode = store.retrieveINode(absoluteDst);
     if (dstINode != null && dstINode.isDirectory()) {
       absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
-      dstINode = store.getINode(absoluteDst);
+      dstINode = store.retrieveINode(absoluteDst);
     }
     if (dstINode != null) {
       // dst path already exists - can't overwrite
@@ -192,7 +222,7 @@ public class S3FileSystem extends FileSystem {
     }
     Path dstParent = absoluteDst.getParent();
     if (dstParent != null) {
-      INode dstParentINode = store.getINode(dstParent);
+      INode dstParentINode = store.retrieveINode(dstParent);
       if (dstParentINode == null || dstParentINode.isFile()) {
         // dst parent doesn't exist or is a file
         return false;
@@ -202,12 +232,12 @@ public class S3FileSystem extends FileSystem {
   }
   
   private boolean renameRawRecursive(Path src, Path dst) throws IOException {
-    INode srcINode = store.getINode(src);
+    INode srcINode = store.retrieveINode(src);
     store.storeINode(dst, srcINode);
     store.deleteINode(src);
     if (srcINode.isDirectory()) {
       for (Path oldSrc : store.listDeepSubPaths(src)) {
-        INode inode = store.getINode(oldSrc);
+        INode inode = store.retrieveINode(oldSrc);
         if (inode == null) {
           return false;
         }
@@ -222,7 +252,7 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean deleteRaw(Path path) throws IOException {
     Path absolutePath = makeAbsolute(path);
-    INode inode = store.getINode(absolutePath);
+    INode inode = store.retrieveINode(absolutePath);
     if (inode == null) {
       return false;
     }
@@ -282,9 +312,9 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public long getBlockSize(Path path) throws IOException {
-    INode inode = store.getINode(makeAbsolute(path));
+    INode inode = store.retrieveINode(makeAbsolute(path));
     if (inode == null) {
-      throw new IOException("No such file or directory.");
+      throw new IOException(path.toString() + ": No such file or directory.");
     }
     Block[] blocks = inode.getBlocks();
     if (blocks == null || blocks.length == 0) {

+ 7 - 20
src/java/org/apache/hadoop/fs/s3/S3InputStream.java

@@ -1,21 +1,15 @@
 package org.apache.hadoop.fs.s3;
 
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 
 class S3InputStream extends FSInputStream {
 
-  private int bufferSize;
-
   private FileSystemStore store;
 
   private Block[] blocks;
@@ -26,6 +20,8 @@ class S3InputStream extends FSInputStream {
 
   private long pos = 0;
 
+  private File blockFile;
+  
   private DataInputStream blockStream;
 
   private long blockEnd = -1;
@@ -38,7 +34,6 @@ class S3InputStream extends FSInputStream {
     for (Block block : blocks) {
       this.fileLength += block.getLength();
     }
-    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);    
   }
 
   @Override
@@ -128,21 +123,11 @@ class S3InputStream extends FSInputStream {
 
     // read block blocks[targetBlock] from position offsetIntoBlock
 
-    File fileBlock = File.createTempFile("s3fs-in", "");
-    fileBlock.deleteOnExit();
-    InputStream in = store.getBlockStream(blocks[targetBlock], offsetIntoBlock);
-    OutputStream out = new BufferedOutputStream(new FileOutputStream(fileBlock));
-    byte[] buf = new byte[bufferSize];
-    int numRead;
-    while ((numRead = in.read(buf)) >= 0) {
-      out.write(buf, 0, numRead);
-    }
-    out.close();
-    in.close();
+    this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
 
     this.pos = target;
     this.blockEnd = targetBlockEnd;
-    this.blockStream = new DataInputStream(new FileInputStream(fileBlock));
+    this.blockStream = new DataInputStream(new FileInputStream(blockFile));
 
   }
 
@@ -152,10 +137,12 @@ class S3InputStream extends FSInputStream {
       throw new IOException("Stream closed");
     }
     if (blockStream != null) {
-      blockStream.close();
       blockStream.close();
       blockStream = null;
     }
+    if (blockFile != null) {
+      blockFile.delete();
+    }
     super.close();
     closed = true;
   }

+ 9 - 6
src/java/org/apache/hadoop/fs/s3/S3OutputStream.java

@@ -1,10 +1,8 @@
 package org.apache.hadoop.fs.s3;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -18,6 +16,8 @@ import org.apache.hadoop.util.Progressable;
 
 class S3OutputStream extends FSOutputStream {
 
+  private Configuration conf;
+  
   private int bufferSize;
 
   private FileSystemStore store;
@@ -49,6 +49,7 @@ class S3OutputStream extends FSOutputStream {
   public S3OutputStream(Configuration conf, FileSystemStore store,
       Path path, long blockSize, Progressable progress) throws IOException {
     
+    this.conf = conf;
     this.store = store;
     this.path = path;
     this.blockSize = blockSize;
@@ -60,7 +61,11 @@ class S3OutputStream extends FSOutputStream {
   }
 
   private File newBackupFile() throws IOException {
-    File result = File.createTempFile("s3fs-out", "");
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("output-", ".tmp", dir);
     result.deleteOnExit();
     return result;
   }
@@ -147,9 +152,7 @@ class S3OutputStream extends FSOutputStream {
     //
     // TODO: Use passed in Progressable to report progress.
     nextBlockOutputStream();
-    InputStream in = new FileInputStream(backupFile);
-    store.storeBlock(nextBlock, in);
-    in.close();
+    store.storeBlock(nextBlock, backupFile);
     internalClose();
 
     //

+ 85 - 0
src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+class RetryInvocationHandler implements InvocationHandler {
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.io.retry.RetryInvocationHandler");  
+  private Object implementation;
+  
+  private RetryPolicy defaultPolicy;
+  private Map<String,RetryPolicy> methodNameToPolicyMap;
+  
+  public RetryInvocationHandler(Object implementation, RetryPolicy retryPolicy) {
+    this.implementation = implementation;
+    this.defaultPolicy = retryPolicy;
+    this.methodNameToPolicyMap = Collections.emptyMap();
+  }
+  
+  public RetryInvocationHandler(Object implementation, Map<String, RetryPolicy> methodNameToPolicyMap) {
+    this.implementation = implementation;
+    this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+    this.methodNameToPolicyMap = methodNameToPolicyMap;
+  }
+
+  public Object invoke(Object proxy, Method method, Object[] args)
+      throws Throwable {
+    RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
+    if (policy == null) {
+      policy = defaultPolicy;
+    }
+    
+    int retries = 0;
+    while (true) {
+      try {
+        return invokeMethod(method, args);
+      } catch (Exception e) {
+        if (!policy.shouldRetry(e, retries++)) {
+          LOG.warn("Exception while invoking " + method.getName()
+              + " of " + implementation.getClass() + ". Not retrying."
+              + StringUtils.stringifyException(e));
+          if (!method.getReturnType().equals(Void.TYPE)) {
+            throw e; // non-void methods can't fail without an exception
+          }
+          return null;
+        }
+        LOG.warn("Exception while invoking " + method.getName()
+            + " of " + implementation.getClass() + ". Retrying."
+            + StringUtils.stringifyException(e));
+      }
+    }
+  }
+
+  private Object invokeMethod(Method method, Object[] args) throws Throwable {
+    try {
+      return method.invoke(implementation, args);
+    } catch (InvocationTargetException e) {
+      throw e.getCause();
+    }
+  }
+
+}

+ 188 - 0
src/java/org/apache/hadoop/io/retry/RetryPolicies.java

@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * A collection of useful implementations of {@link RetryPolicy}.
+ * </p>
+ * @author Tom White
+ */
+public class RetryPolicies {
+  
+  /**
+   * <p>
+   * Try once, and fail by re-throwing the exception.
+   * This corresponds to having no retry mechanism in place.
+   * </p>
+   */
+  public static final RetryPolicy TRY_ONCE_THEN_FAIL = new TryOnceThenFail();
+  
+  /**
+   * <p>
+   * Try once, and fail silently for <code>void</code> methods, or by
+   * re-throwing the exception for non-<code>void</code> methods.
+   * </p>
+   */
+  public static final RetryPolicy TRY_ONCE_DONT_FAIL = new TryOnceDontFail();
+  
+  /**
+   * <p>
+   * Keep trying forever.
+   * </p>
+   */
+  public static final RetryPolicy RETRY_FOREVER = new RetryForever();
+  
+  /**
+   * <p>
+   * Keep trying a limited number of times, waiting a fixed time between attempts,
+   * and then fail by re-throwing the exception.
+   * </p>
+   */
+  public static final RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+    return new RetryUpToMaximumCountWithFixedSleep(maxRetries, sleepTime, timeUnit);
+  }
+  
+  /**
+   * <p>
+   * Keep trying for a maximum time, waiting a fixed time between attempts,
+   * and then fail by re-throwing the exception.
+   * </p>
+   */
+  public static final RetryPolicy retryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) {
+    return new RetryUpToMaximumTimeWithFixedSleep(maxTime, sleepTime, timeUnit);
+  }
+  
+  /**
+   * <p>
+   * Keep trying a limited number of times, waiting a growing amount of time between attempts,
+   * and then fail by re-throwing the exception.
+   * The time between attempts is <code>sleepTime</code> mutliplied by the number of tries so far.
+   * </p>
+   */
+  public static final RetryPolicy retryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+    return new RetryUpToMaximumCountWithProportionalSleep(maxRetries, sleepTime, timeUnit);
+  }
+  
+  /**
+   * <p>
+   * Set a default policy with some explicit handlers for specific exceptions.
+   * </p>
+   */
+  public static final RetryPolicy retryByException(RetryPolicy defaultPolicy,
+        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
+    return new ExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
+  }
+  
+  static class TryOnceThenFail implements RetryPolicy {
+    public boolean shouldRetry(Exception e, int retries) throws Exception {
+      throw e;
+    }
+  }
+  static class TryOnceDontFail implements RetryPolicy {
+    public boolean shouldRetry(Exception e, int retries) throws Exception {
+      return false;
+    }
+  }
+  
+  static class RetryForever implements RetryPolicy {
+    public boolean shouldRetry(Exception e, int retries) throws Exception {
+      return true;
+    }
+  }
+  
+  static abstract class RetryLimited implements RetryPolicy {
+    int maxRetries;
+    long sleepTime;
+    TimeUnit timeUnit;
+    
+    public RetryLimited(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      this.maxRetries = maxRetries;
+      this.sleepTime = sleepTime;
+      this.timeUnit = timeUnit;
+    }
+
+    public boolean shouldRetry(Exception e, int retries) throws Exception {
+      if (retries > maxRetries) {
+        throw e;
+      }
+      try {
+        timeUnit.sleep(calculateSleepTime(retries));
+      } catch (InterruptedException ie) {
+        // retry
+      }
+      return true;
+    }
+    
+    protected abstract long calculateSleepTime(int retries);
+  }
+  
+  static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
+    public RetryUpToMaximumCountWithFixedSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      super(maxRetries, sleepTime, timeUnit);
+    }
+    
+    @Override
+    protected long calculateSleepTime(int retries) {
+      return sleepTime;
+    }
+  }
+  
+  static class RetryUpToMaximumTimeWithFixedSleep extends RetryUpToMaximumCountWithFixedSleep {
+    public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) {
+      super((int) (maxTime / sleepTime), sleepTime, timeUnit);
+    }
+  }
+  
+  static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited {
+    public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
+      super(maxRetries, sleepTime, timeUnit);
+    }
+    
+    @Override
+    protected long calculateSleepTime(int retries) {
+      return sleepTime * (retries + 1);
+    }
+  }
+  
+  static class ExceptionDependentRetry implements RetryPolicy {
+
+    RetryPolicy defaultPolicy;
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap;
+    
+    public ExceptionDependentRetry(RetryPolicy defaultPolicy,
+        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
+      this.defaultPolicy = defaultPolicy;
+      this.exceptionToPolicyMap = exceptionToPolicyMap;
+    }
+
+    public boolean shouldRetry(Exception e, int retries) throws Exception {
+      RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
+      if (policy == null) {
+        policy = defaultPolicy;
+      }
+      return policy.shouldRetry(e, retries);
+    }
+    
+  }
+  
+  
+}

+ 44 - 0
src/java/org/apache/hadoop/io/retry/RetryPolicy.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+/**
+ * <p>
+ * Specifies a policy for retrying method failures.
+ * Implementations of this interface should be immutable.
+ * </p>
+ * @author Tom White
+ */
+public interface RetryPolicy {
+  /**
+   * <p>
+   * Determines whether the framework should retry a
+   * method for the given exception, and the number
+   * of retries that have been made for that operation
+   * so far.
+   * </p>
+   * @param e The exception that caused the method to fail.
+   * @param retries The number of times the method has been retried.
+   * @return <code>true</code> if the method should be retried,
+   *   <code>false</code> if the method should not be retried
+   *   but shouldn't fail with an exception (only for void methods).
+   * @throws Exception The re-thrown exception <code>e</code> indicating
+   *   that the method failed and should not be retried further. 
+   */
+  public boolean shouldRetry(Exception e, int retries) throws Exception;
+}

+ 69 - 0
src/java/org/apache/hadoop/io/retry/RetryProxy.java

@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.reflect.Proxy;
+import java.util.Map;
+
+/**
+ * <p>
+ * A factory for creating retry proxies.
+ * </p>
+ * @author Tom White
+ */
+public class RetryProxy {
+  /**
+   * <p>
+   * Create a proxy for an interface of an implementation class
+   * using the same retry policy for each method in the interface. 
+   * </p>
+   * @param iface the interface that the retry will implement
+   * @param implementation the instance whose methods should be retried
+   * @param retryPolicy the policy for retirying method call failures
+   * @return the retry proxy
+   */
+  public static Object create(Class<?> iface, Object implementation,
+      RetryPolicy retryPolicy) {
+    return Proxy.newProxyInstance(
+        implementation.getClass().getClassLoader(),
+        new Class<?>[] { iface },
+        new RetryInvocationHandler(implementation, retryPolicy)
+    );
+  }  
+  
+  /**
+   * <p>
+   * Create a proxy for an interface of an implementation class
+   * using the a set of retry policies specified by method name.
+   * If no retry policy is defined for a method then a default of
+   * {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
+   * </p>
+   * @param iface the interface that the retry will implement
+   * @param implementation the instance whose methods should be retried
+   * @param methodNameToPolicyMap a map of method names to retry policies
+   * @return the retry proxy
+   */
+  public static Object create(Class<?> iface, Object implementation,
+      Map<String,RetryPolicy> methodNameToPolicyMap) {
+    return Proxy.newProxyInstance(
+        implementation.getClass().getClassLoader(),
+        new Class<?>[] { iface },
+        new RetryInvocationHandler(implementation, methodNameToPolicyMap)
+    );
+  }
+}

+ 30 - 0
src/java/org/apache/hadoop/io/retry/package.html

@@ -0,0 +1,30 @@
+<html>
+<body>
+
+<p>
+A mechanism for selectively retrying methods that throw exceptions under certain circumstances.
+</p>
+
+<p>
+Typical usage is
+</p>
+
+<pre>
+UnreliableImplementation unreliableImpl = new UnreliableImplementation();
+UnreliableInterface unreliable = (UnreliableInterface)
+  RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+    RetryPolicies.retryUpToMaximumCountWithFixedSleep(4, 10, TimeUnit.SECONDS));
+unreliable.call();
+</pre>
+
+<p>
+This will retry any method called on <code>unreliable</code> four times - in this case the <code>call()</code>
+method - sleeping 10 seconds between
+each retry. There are a number of {@link org.apache.hadoop.io.retry.RetryPolicies retry policies}
+available, or you can implement a custom one by implementing {@link org.apache.hadoop.io.retry.RetryPolicy}.
+It is also possible to specify retry policies on a 
+{@link org.apache.hadoop.io.retry.RetryProxy#create(Class, Object, Map) per-method basis}.
+</p>
+
+</body>
+</html>

+ 41 - 9
src/test/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java

@@ -1,9 +1,12 @@
 package org.apache.hadoop.fs.s3;
 
-import java.io.ByteArrayInputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -23,11 +26,12 @@ import org.apache.hadoop.fs.s3.INode.FileType;
  */
 class InMemoryFileSystemStore implements FileSystemStore {
   
+  private Configuration conf;
   private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
   private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
   
   public void initialize(URI uri, Configuration conf) {
-    // Nothing to initialize
+    this.conf = conf;
   }
 
   public void deleteINode(Path path) throws IOException {
@@ -46,13 +50,33 @@ class InMemoryFileSystemStore implements FileSystemStore {
     return blocks.containsKey(blockId);
   }
 
-  public INode getINode(Path path) throws IOException {
+  public INode retrieveINode(Path path) throws IOException {
     return inodes.get(path);
   }
 
-  public InputStream getBlockStream(Block block, long byteRangeStart) throws IOException {
+  public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
     byte[] data = blocks.get(block.getId());
-    return new ByteArrayInputStream(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+    File file = createTempFile();
+    BufferedOutputStream out = null;
+    try {
+      out = new BufferedOutputStream(new FileOutputStream(file));
+      out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+    return file;
+  }
+  
+  private File createTempFile() throws IOException {
+    File dir = new File(conf.get("fs.s3.buffer.dir"));
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Cannot create S3 buffer directory: " + dir);
+    }
+    File result = File.createTempFile("test-", ".tmp", dir);
+    result.deleteOnExit();
+    return result;
   }
 
   public Set<Path> listSubPaths(Path path) throws IOException {
@@ -85,12 +109,20 @@ class InMemoryFileSystemStore implements FileSystemStore {
     inodes.put(path, inode);
   }
 
-  public void storeBlock(Block block, InputStream in) throws IOException {
+  public void storeBlock(Block block, File file) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     byte[] buf = new byte[8192];
     int numRead;
-    while ((numRead = in.read(buf)) >= 0) {
-      out.write(buf, 0, numRead);
+    BufferedInputStream in = null;
+    try {
+      in = new BufferedInputStream(new FileInputStream(file));
+      while ((numRead = in.read(buf)) >= 0) {
+        out.write(buf, 0, numRead);
+      }
+    } finally {
+      if (in != null) {
+        in.close();
+      }
     }
     blocks.put(block.getId(), out.toByteArray());
   }

+ 1 - 1
src/test/org/apache/hadoop/fs/s3/Jets3tS3FileSystemTest.java

@@ -6,7 +6,7 @@ public class Jets3tS3FileSystemTest extends S3FileSystemBaseTest {
 
   @Override
   public FileSystemStore getFileSystemStore() throws IOException {
-    return new Jets3tFileSystemStore();
+    return null; // use default store
   }
 
 }

+ 120 - 0
src/test/org/apache/hadoop/io/retry/TestRetryProxy.java

@@ -0,0 +1,120 @@
+package org.apache.hadoop.io.retry;
+
+import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
+import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_DONT_FAIL;
+import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
+import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
+import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
+
+public class TestRetryProxy extends TestCase {
+  
+  private UnreliableImplementation unreliableImpl;
+  
+  @Override
+  protected void setUp() throws Exception {
+    unreliableImpl = new UnreliableImplementation();
+  }
+
+  public void testTryOnceThenFail() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL);
+    unreliable.alwaysSucceeds();
+    try {
+      unreliable.failsOnceThenSucceeds();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testTryOnceDontFail() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+      RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_DONT_FAIL);
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.failsOnceThenSucceedsWithReturnValue();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testRetryForever() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+    RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    unreliable.failsTenTimesThenSucceeds();
+  }
+  
+  public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+    RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+        retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.failsTenTimesThenSucceeds();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+    RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+        retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.failsTenTimesThenSucceeds();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
+    UnreliableInterface unreliable = (UnreliableInterface)
+    RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+        retryUpToMaximumCountWithProportionalSleep(8, 1, TimeUnit.NANOSECONDS));
+    unreliable.alwaysSucceeds();
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.failsTenTimesThenSucceeds();
+      fail("Should fail");
+    } catch (UnreliableException e) {
+      // expected
+    }
+  }
+  
+  public void testRetryByException() throws UnreliableException {
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+      Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
+    
+    UnreliableInterface unreliable = (UnreliableInterface)
+    RetryProxy.create(UnreliableInterface.class, unreliableImpl,
+        retryByException(RETRY_FOREVER, exceptionToPolicyMap));
+    unreliable.failsOnceThenSucceeds();
+    try {
+      unreliable.alwaysfailsWithFatalException();
+      fail("Should fail");
+    } catch (FatalException e) {
+      // expected
+    }
+  }
+  
+}

+ 36 - 0
src/test/org/apache/hadoop/io/retry/UnreliableImplementation.java

@@ -0,0 +1,36 @@
+package org.apache.hadoop.io.retry;
+
+public class UnreliableImplementation implements UnreliableInterface {
+
+  private int failsOnceInvocationCount,
+    failsOnceWithValueInvocationCount,
+    failsTenTimesInvocationCount;
+  
+  public void alwaysSucceeds() {
+    // do nothing
+  }
+  
+  public void alwaysfailsWithFatalException() throws FatalException {
+    throw new FatalException();
+  }
+
+  public void failsOnceThenSucceeds() throws UnreliableException {
+    if (failsOnceInvocationCount++ == 0) {
+      throw new UnreliableException();
+    }
+  }
+
+  public boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException {
+    if (failsOnceWithValueInvocationCount++ == 0) {
+      throw new UnreliableException();
+    }
+    return true;
+  }
+
+  public void failsTenTimesThenSucceeds() throws UnreliableException {
+    if (failsTenTimesInvocationCount++ < 10) {
+      throw new UnreliableException();
+    }
+  }
+
+}

+ 21 - 0
src/test/org/apache/hadoop/io/retry/UnreliableInterface.java

@@ -0,0 +1,21 @@
+package org.apache.hadoop.io.retry;
+
+public interface UnreliableInterface {
+  
+  public static class UnreliableException extends Exception {
+    // no body
+  }
+  
+  public static class FatalException extends UnreliableException {
+    // no body
+  }
+  
+  void alwaysSucceeds() throws UnreliableException;
+  
+  void alwaysfailsWithFatalException() throws FatalException;
+
+  void failsOnceThenSucceeds() throws UnreliableException;
+  boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
+
+  void failsTenTimesThenSucceeds() throws UnreliableException;
+}