Browse Source

HADOOP-6421 Adds Symbolic links to FileContext, AbstractFileSystem.
It also adds a limited implementation for the local file system
(RawLocalFs) that allows local symlinks. (Eli Collins via Sanjay Radia)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@910706 13f79535-47bb-0310-9956-ffa450edef68

Sanjay Radia 15 years ago
parent
commit
ea605b8cd7

+ 4 - 0
CHANGES.txt

@@ -54,6 +54,10 @@ Trunk (unreleased changes)
     HADOOP-6510. Adds a way for superusers to impersonate other users
     HADOOP-6510. Adds a way for superusers to impersonate other users
     in a secure environment. (Jitendra Nath Pandey via ddas)
     in a secure environment. (Jitendra Nath Pandey via ddas)
 
 
+    HADOOP-6421 Adds Symbolic links to FileContext, AbstractFileSystem.
+    It also adds a limited implementation for the local file system
+     (RawLocalFs) that allows local symlinks. (Eli Collins via Sanjay Radia)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-6283. Improve the exception messages thrown by
     HADOOP-6283. Improve the exception messages thrown by

+ 78 - 28
src/java/org/apache/hadoop/fs/AbstractFileSystem.java

@@ -77,7 +77,7 @@ public abstract class AbstractFileSystem {
   }
   }
   
   
   /**
   /**
-   * Prohibits names which contain a ".", "..". ":" or "/" 
+   * Prohibits names which contain a ".", "..", ":" or "/" 
    */
    */
   private static boolean isValidName(String src) {
   private static boolean isValidName(String src) {
     // Check for ".." "." ":" "/"
     // Check for ".." "." ":" "/"
@@ -352,7 +352,7 @@ public abstract class AbstractFileSystem {
    * @return server default configuration values
    * @return server default configuration values
    * @throws IOException
    * @throws IOException
    */
    */
-  protected abstract FsServerDefaults getServerDefaults() throws IOException;
+  protected abstract FsServerDefaults getServerDefaults() throws IOException; 
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -362,7 +362,7 @@ public abstract class AbstractFileSystem {
    */
    */
   protected final FSDataOutputStream create(final Path f,
   protected final FSDataOutputStream create(final Path f,
     final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
     final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     int bufferSize = -1;
     int bufferSize = -1;
     short replication = -1;
     short replication = -1;
@@ -457,7 +457,8 @@ public abstract class AbstractFileSystem {
   protected abstract FSDataOutputStream createInternal(Path f,
   protected abstract FSDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
       EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
       short replication, long blockSize, Progressable progress,
       short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException;
+      int bytesPerChecksum, boolean createParent) 
+      throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -467,7 +468,7 @@ public abstract class AbstractFileSystem {
    */
    */
   protected abstract void mkdir(final Path dir,
   protected abstract void mkdir(final Path dir,
       final FsPermission permission, final boolean createParent)
       final FsPermission permission, final boolean createParent)
-    throws IOException;
+    throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -475,14 +476,15 @@ public abstract class AbstractFileSystem {
    * this filesystem.
    * this filesystem.
    */
    */
   protected abstract boolean delete(final Path f, final boolean recursive)
   protected abstract boolean delete(final Path f, final boolean recursive)
-    throws IOException;
+    throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
    * {@link FileContext#open(Path)} except that Path f must be for this
    * {@link FileContext#open(Path)} except that Path f must be for this
    * filesystem.
    * filesystem.
    */
    */
-  protected FSDataInputStream open(final Path f) throws IOException {
+  protected FSDataInputStream open(final Path f) 
+    throws IOException, UnresolvedLinkException {
     return open(f, getServerDefaults().getFileBufferSize());
     return open(f, getServerDefaults().getFileBufferSize());
   }
   }
 
 
@@ -490,9 +492,10 @@ public abstract class AbstractFileSystem {
    * The specification of this method matches that of
    * The specification of this method matches that of
    * {@link FileContext#open(Path, int)} except that Path f must be for this
    * {@link FileContext#open(Path, int)} except that Path f must be for this
    * filesystem.
    * filesystem.
+   * @throws UnresolvedLinkException 
    */
    */
   protected abstract FSDataInputStream open(final Path f, int bufferSize)
   protected abstract FSDataInputStream open(final Path f, int bufferSize)
-    throws IOException;
+    throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -500,7 +503,7 @@ public abstract class AbstractFileSystem {
    * for this filesystem.
    * for this filesystem.
    */
    */
   protected abstract boolean setReplication(final Path f,
   protected abstract boolean setReplication(final Path f,
-    final short replication) throws IOException;
+    final short replication) throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -508,7 +511,8 @@ public abstract class AbstractFileSystem {
    * f must be for this filesystem.
    * f must be for this filesystem.
    */
    */
   protected final void rename(final Path src, final Path dst,
   protected final void rename(final Path src, final Path dst,
-    final Options.Rename... options) throws IOException {
+    final Options.Rename... options) 
+    throws IOException, UnresolvedLinkException {
     boolean overwrite = false;
     boolean overwrite = false;
     if (null != options) {
     if (null != options) {
       for (Rename option : options) {
       for (Rename option : options) {
@@ -530,7 +534,7 @@ public abstract class AbstractFileSystem {
    * {@link #renameInternal(Path, Path, boolean)}
    * {@link #renameInternal(Path, Path, boolean)}
    */
    */
   protected abstract void renameInternal(final Path src, final Path dst)
   protected abstract void renameInternal(final Path src, final Path dst)
-    throws IOException;
+    throws IOException, UnresolvedLinkException;
   
   
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -538,16 +542,16 @@ public abstract class AbstractFileSystem {
    * f must be for this filesystem.
    * f must be for this filesystem.
    */
    */
   protected void renameInternal(final Path src, final Path dst,
   protected void renameInternal(final Path src, final Path dst,
-    boolean overwrite) throws IOException {
+    boolean overwrite) throws IOException, UnresolvedLinkException {
     // Default implementation deals with overwrite in a non-atomic way
     // Default implementation deals with overwrite in a non-atomic way
-    final FileStatus srcStatus = getFileStatus(src);
+    final FileStatus srcStatus = getFileLinkStatus(src);
     if (srcStatus == null) {
     if (srcStatus == null) {
       throw new FileNotFoundException("rename source " + src + " not found.");
       throw new FileNotFoundException("rename source " + src + " not found.");
     }
     }
 
 
     FileStatus dstStatus;
     FileStatus dstStatus;
     try {
     try {
-      dstStatus = getFileStatus(dst);
+      dstStatus = getFileLinkStatus(dst);
     } catch (IOException e) {
     } catch (IOException e) {
       dstStatus = null;
       dstStatus = null;
     }
     }
@@ -571,12 +575,12 @@ public abstract class AbstractFileSystem {
       delete(dst, false);
       delete(dst, false);
     } else {
     } else {
       final Path parent = dst.getParent();
       final Path parent = dst.getParent();
-      final FileStatus parentStatus = getFileStatus(parent);
+      final FileStatus parentStatus = getFileLinkStatus(parent);
       if (parentStatus == null) {
       if (parentStatus == null) {
         throw new FileNotFoundException("rename destination parent " + parent
         throw new FileNotFoundException("rename destination parent " + parent
             + " not found.");
             + " not found.");
       }
       }
-      if (!parentStatus.isDir()) {
+      if (!parentStatus.isDir() && !parentStatus.isSymlink()) {
         throw new ParentNotDirectoryException("rename destination parent "
         throw new ParentNotDirectoryException("rename destination parent "
             + parent + " is a file.");
             + parent + " is a file.");
       }
       }
@@ -584,13 +588,42 @@ public abstract class AbstractFileSystem {
     renameInternal(src, dst);
     renameInternal(src, dst);
   }
   }
   
   
+  /**
+   * Returns true if the file system supports symlinks, false otherwise.
+   */
+  protected boolean supportsSymlinks() {
+    return false;
+  }
+  
+  /**
+   * The specification of this method matches that of  
+   * {@link FileContext#createSymlink(Path, Path, boolean)};
+   */
+  protected void createSymlink(final Path target, final Path link,
+      final boolean createParent) throws IOException, UnresolvedLinkException {
+    throw new IOException("File system does not support symlinks");    
+  }
+
+  /**
+   * The specification of this method matches that of  
+   * {@link FileContext#getLinkTarget(Path)};
+   */
+  protected Path getLinkTarget(final Path f) throws IOException {
+    /* We should never get here. Any file system that threw an
+     * UnresolvedLinkException, causing this function to be called,
+     * needs to override this method.
+     */
+    throw new AssertionError();
+  }
+    
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
    * {@link FileContext#setPermission(Path, FsPermission)} except that Path f
    * {@link FileContext#setPermission(Path, FsPermission)} except that Path f
    * must be for this filesystem.
    * must be for this filesystem.
    */
    */
   protected abstract void setPermission(final Path f,
   protected abstract void setPermission(final Path f,
-      final FsPermission permission) throws IOException;
+      final FsPermission permission) 
+      throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -598,7 +631,7 @@ public abstract class AbstractFileSystem {
    * be for this filesystem.
    * be for this filesystem.
    */
    */
   protected abstract void setOwner(final Path f, final String username,
   protected abstract void setOwner(final Path f, final String username,
-      final String groupname) throws IOException;
+      final String groupname) throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -606,7 +639,7 @@ public abstract class AbstractFileSystem {
    * for this filesystem.
    * for this filesystem.
    */
    */
   protected abstract void setTimes(final Path f, final long mtime,
   protected abstract void setTimes(final Path f, final long mtime,
-    final long atime) throws IOException;
+    final long atime) throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -614,14 +647,29 @@ public abstract class AbstractFileSystem {
    * this filesystem.
    * this filesystem.
    */
    */
   protected abstract FileChecksum getFileChecksum(final Path f)
   protected abstract FileChecksum getFileChecksum(final Path f)
-    throws IOException;
+    throws IOException, UnresolvedLinkException;
   
   
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
-   * {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
-   * must be for this filesystem.
+   * {@link FileContext#getFileStatus(Path)} 
+   * except that an UnresolvedLinkException may be thrown if a symlink is 
+   * encountered in the path.
    */
    */
-  protected abstract FileStatus getFileStatus(final Path f) throws IOException;
+  protected abstract FileStatus getFileStatus(final Path f) 
+    throws IOException, UnresolvedLinkException;
+
+  /**
+   * The specification of this method matches that of
+   * {@link FileContext#getFileLinkStatus(Path)}
+   * except that an UnresolvedLinkException may be thrown if a symlink is  
+   * encountered in the path leading up to the final path component.
+   * If the file system does not support symlinks then the behavior is
+   * equivalent to {@link AbstractFileSystem#getFileStatus(Path)}.
+   */
+  protected FileStatus getFileLinkStatus(final Path f)
+    throws IOException, UnresolvedLinkException {
+    return getFileStatus(f);
+  }
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
@@ -629,22 +677,23 @@ public abstract class AbstractFileSystem {
    * Path f must be for this filesystem.
    * Path f must be for this filesystem.
    */
    */
   protected abstract BlockLocation[] getFileBlockLocations(final Path f,
   protected abstract BlockLocation[] getFileBlockLocations(final Path f,
-    final long start, final long len) throws IOException;
+    final long start, final long len) 
+    throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
    * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
    * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
    * filesystem.
    * filesystem.
    */
    */
-  protected FsStatus getFsStatus(final Path f) throws IOException {
+  protected FsStatus getFsStatus(final Path f) 
+    throws IOException, UnresolvedLinkException {
     // default impl gets FsStatus of root
     // default impl gets FsStatus of root
     return getFsStatus();
     return getFsStatus();
   }
   }
   
   
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of
-   * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
-   * filesystem.
+   * {@link FileContext#getFsStatus(Path)}.
    */
    */
   protected abstract FsStatus getFsStatus() throws IOException;
   protected abstract FsStatus getFsStatus() throws IOException;
 
 
@@ -653,7 +702,8 @@ public abstract class AbstractFileSystem {
    * {@link FileContext#listStatus(Path)} except that Path f must be for this
    * {@link FileContext#listStatus(Path)} except that Path f must be for this
    * filesystem.
    * filesystem.
    */
    */
-  protected abstract FileStatus[] listStatus(final Path f) throws IOException;
+  protected abstract FileStatus[] listStatus(final Path f) 
+    throws IOException, UnresolvedLinkException;
 
 
   /**
   /**
    * The specification of this method matches that of
    * The specification of this method matches that of

+ 17 - 12
src/java/org/apache/hadoop/fs/ChecksumFs.java

@@ -115,12 +115,12 @@ public abstract class ChecksumFs extends FilterFs {
     private long fileLen = -1L;
     private long fileLen = -1L;
     
     
     public ChecksumFSInputChecker(ChecksumFs fs, Path file)
     public ChecksumFSInputChecker(ChecksumFs fs, Path file)
-      throws IOException {
+      throws IOException, UnresolvedLinkException {
       this(fs, file, fs.getServerDefaults().getFileBufferSize());
       this(fs, file, fs.getServerDefaults().getFileBufferSize());
     }
     }
     
     
     public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
     public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
-      throws IOException {
+      throws IOException, UnresolvedLinkException {
       super(file, fs.getFileStatus(file).getReplication());
       super(file, fs.getFileStatus(file).getReplication());
       this.datas = fs.getRawFs().open(file, bufferSize);
       this.datas = fs.getRawFs().open(file, bufferSize);
       this.fs = fs;
       this.fs = fs;
@@ -160,7 +160,7 @@ public abstract class ChecksumFs extends FilterFs {
     }
     }
     
     
     public int read(long position, byte[] b, int off, int len)
     public int read(long position, byte[] b, int off, int len)
-      throws IOException {
+      throws IOException, UnresolvedLinkException {
       // parameter check
       // parameter check
       if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
       if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
         throw new IndexOutOfBoundsException();
         throw new IndexOutOfBoundsException();
@@ -236,7 +236,7 @@ public abstract class ChecksumFs extends FilterFs {
     }
     }
     
     
     /* Return the file length */
     /* Return the file length */
-    private long getFileLength() throws IOException {
+    private long getFileLength() throws IOException, UnresolvedLinkException {
       if (fileLen==-1L) {
       if (fileLen==-1L) {
         fileLen = fs.getFileStatus(file).getLen();
         fileLen = fs.getFileStatus(file).getLen();
       }
       }
@@ -257,7 +257,7 @@ public abstract class ChecksumFs extends FilterFs {
      * @exception  IOException  if an I/O error occurs.
      * @exception  IOException  if an I/O error occurs.
      *             ChecksumException if the chunk to skip to is corrupted
      *             ChecksumException if the chunk to skip to is corrupted
      */
      */
-    public synchronized long skip(long n) throws IOException {
+    public synchronized long skip(long n) throws IOException { 
       final long curPos = getPos();
       final long curPos = getPos();
       final long fileLength = getFileLength();
       final long fileLength = getFileLength();
       if (n+curPos > fileLength) {
       if (n+curPos > fileLength) {
@@ -278,7 +278,7 @@ public abstract class ChecksumFs extends FilterFs {
      *             ChecksumException if the chunk to seek to is corrupted
      *             ChecksumException if the chunk to seek to is corrupted
      */
      */
 
 
-    public synchronized void seek(long pos) throws IOException {
+    public synchronized void seek(long pos) throws IOException { 
       if (pos>getFileLength()) {
       if (pos>getFileLength()) {
         throw new IOException("Cannot seek after EOF");
         throw new IOException("Cannot seek after EOF");
       }
       }
@@ -293,7 +293,8 @@ public abstract class ChecksumFs extends FilterFs {
    * @param bufferSize the size of the buffer to be used.
    * @param bufferSize the size of the buffer to be used.
    */
    */
   @Override
   @Override
-  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+  public FSDataInputStream open(Path f, int bufferSize) 
+    throws IOException, UnresolvedLinkException {
     return new FSDataInputStream(
     return new FSDataInputStream(
         new ChecksumFSInputChecker(this, f, bufferSize));
         new ChecksumFSInputChecker(this, f, bufferSize));
   }
   }
@@ -371,7 +372,8 @@ public abstract class ChecksumFs extends FilterFs {
   /** Check if exists.
   /** Check if exists.
    * @param f source file
    * @param f source file
    */
    */
-  private boolean exists(Path f) throws IOException {
+  private boolean exists(Path f) 
+    throws IOException, UnresolvedLinkException {
     try {
     try {
       return getMyFs().getFileStatus(f) != null;
       return getMyFs().getFileStatus(f) != null;
     } catch (FileNotFoundException e) {
     } catch (FileNotFoundException e) {
@@ -383,7 +385,8 @@ public abstract class ChecksumFs extends FilterFs {
    * Note: Avoid using this method. Instead reuse the FileStatus 
    * Note: Avoid using this method. Instead reuse the FileStatus 
    * returned by getFileStatus() or listStatus() methods.
    * returned by getFileStatus() or listStatus() methods.
    */
    */
-  private boolean isDirectory(Path f) throws IOException {
+  private boolean isDirectory(Path f) 
+    throws IOException, UnresolvedLinkException {
     try {
     try {
       return getMyFs().getFileStatus(f).isDir();
       return getMyFs().getFileStatus(f).isDir();
     } catch (FileNotFoundException e) {
     } catch (FileNotFoundException e) {
@@ -401,7 +404,7 @@ public abstract class ChecksumFs extends FilterFs {
    */
    */
   @Override
   @Override
   public boolean setReplication(Path src, short replication)
   public boolean setReplication(Path src, short replication)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     boolean value = getMyFs().setReplication(src, replication);
     boolean value = getMyFs().setReplication(src, replication);
     if (!value) {
     if (!value) {
       return false;
       return false;
@@ -417,7 +420,8 @@ public abstract class ChecksumFs extends FilterFs {
    * Rename files/dirs.
    * Rename files/dirs.
    */
    */
   @Override
   @Override
-  public void renameInternal(Path src, Path dst) throws IOException {
+  public void renameInternal(Path src, Path dst) 
+    throws IOException, UnresolvedLinkException {
     if (isDirectory(src)) {
     if (isDirectory(src)) {
       getMyFs().rename(src, dst);
       getMyFs().rename(src, dst);
     } else {
     } else {
@@ -438,7 +442,8 @@ public abstract class ChecksumFs extends FilterFs {
    * Implement the delete(Path, boolean) in checksum
    * Implement the delete(Path, boolean) in checksum
    * file system.
    * file system.
    */
    */
-  public boolean delete(Path f, boolean recursive) throws IOException{
+  public boolean delete(Path f, boolean recursive) 
+    throws IOException, UnresolvedLinkException {
     FileStatus fstatus = null;
     FileStatus fstatus = null;
     try {
     try {
       fstatus = getMyFs().getFileStatus(f);
       fstatus = getMyFs().getFileStatus(f);

+ 27 - 4
src/java/org/apache/hadoop/fs/DelegateToFileSystem.java

@@ -26,6 +26,7 @@ import java.util.EnumSet;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
@@ -106,6 +107,11 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
     return fsImpl.getFileStatus(f);
     return fsImpl.getFileStatus(f);
   }
   }
 
 
+  @Override
+  protected FileStatus getFileLinkStatus(final Path f) throws IOException {
+    return getFileStatus(f);
+  }
+
   @Override
   @Override
   protected FsStatus getFsStatus() throws IOException {
   protected FsStatus getFsStatus() throws IOException {
     return fsImpl.getStatus();
     return fsImpl.getStatus();
@@ -148,7 +154,6 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
     checkPath(src);
     checkPath(src);
     checkPath(dst);
     checkPath(dst);
     fsImpl.rename(src, dst, Options.Rename.NONE);
     fsImpl.rename(src, dst, Options.Rename.NONE);
-    
   }
   }
 
 
   @Override
   @Override
@@ -156,7 +161,6 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
       throws IOException {
       throws IOException {
     checkPath(f);
     checkPath(f);
     fsImpl.setOwner(f, username, groupname);
     fsImpl.setOwner(f, username, groupname);
-    
   }
   }
 
 
   @Override
   @Override
@@ -177,11 +181,30 @@ public abstract class DelegateToFileSystem extends AbstractFileSystem {
   protected void setTimes(Path f, long mtime, long atime) throws IOException {
   protected void setTimes(Path f, long mtime, long atime) throws IOException {
     checkPath(f);
     checkPath(f);
     fsImpl.setTimes(f, mtime, atime);
     fsImpl.setTimes(f, mtime, atime);
-    
   }
   }
 
 
   @Override
   @Override
   protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
   protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
     fsImpl.setVerifyChecksum(verifyChecksum);
     fsImpl.setVerifyChecksum(verifyChecksum);
   }
   }
-}
+
+  @Override
+  protected boolean supportsSymlinks() {
+    return false;
+  }  
+  
+  @Override
+  protected void createSymlink(Path target, Path link, boolean createParent) 
+      throws IOException { 
+    throw new IOException("File system does not support symlinks");
+  } 
+  
+  @Override
+  protected Path getLinkTarget(final Path f) throws IOException {
+    /* We should never get here. Any file system that threw an 
+     * UnresolvedLinkException, causing this function to be called,
+     * should override getLinkTarget. 
+     */
+    throw new AssertionError();
+  }
+}

+ 353 - 45
src/java/org/apache/hadoop/fs/FileContext.java

@@ -40,7 +40,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Options.CreateOpts;
-import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 
 
@@ -142,7 +141,8 @@ import org.apache.hadoop.io.IOUtils;
  *  Generally you should not need use a config unless you are doing
  *  Generally you should not need use a config unless you are doing
  *   <ul> 
  *   <ul> 
  *   <li> configX = someConfigSomeOnePassedToYou.
  *   <li> configX = someConfigSomeOnePassedToYou.
- *   <li> myFContext = getFileContext(configX); //configX not changed but passeddown
+ *   <li> myFContext = getFileContext(configX); // configX is not changed,
+ *                                              // is passed down 
  *   <li> myFContext.create(path, ...);
  *   <li> myFContext.create(path, ...);
  *   <li>...
  *   <li>...
  *  </ul>                                          
  *  </ul>                                          
@@ -213,15 +213,15 @@ public final class FileContext {
    * 
    * 
    * Applications that use FileContext should use #makeQualified() since
    * Applications that use FileContext should use #makeQualified() since
    * they really want a fully qualified URI.
    * they really want a fully qualified URI.
-   * Hence this method os not called makeAbsolute() and 
+   * Hence this method is not called makeAbsolute() and 
    * has been deliberately declared private.
    * has been deliberately declared private.
    */
    */
 
 
-  private Path fixRelativePart(Path f) {
-    if (f.isUriPathAbsolute()) {
-      return f;
+  private Path fixRelativePart(Path p) {
+    if (p.isUriPathAbsolute()) {
+      return p;
     } else {
     } else {
-      return new Path(workingDir, f);
+      return new Path(workingDir, p);
     }
     }
   }
   }
 
 
@@ -429,12 +429,14 @@ public final class FileContext {
    */
    */
   public void setWorkingDirectory(final Path newWDir) throws IOException {
   public void setWorkingDirectory(final Path newWDir) throws IOException {
     checkNotSchemeWithRelative(newWDir);
     checkNotSchemeWithRelative(newWDir);
-    // wd is stored as fully qualified path. 
-
-    final Path newWorkingDir =  new Path(workingDir, newWDir);
+    /* wd is stored as a fully qualified path. We check if the given 
+     * path is not relative first since resolve requires and returns 
+     * an absolute path.
+     */  
+    final Path newWorkingDir = resolve(new Path(workingDir, newWDir));
     FileStatus status = getFileStatus(newWorkingDir);
     FileStatus status = getFileStatus(newWorkingDir);
     if (!status.isDir()) {
     if (!status.isDir()) {
-      throw new FileNotFoundException(" Cannot setWD to a file");
+      throw new FileNotFoundException("Cannot setWD to a file");
     }
     }
     workingDir = newWorkingDir;
     workingDir = newWorkingDir;
   }
   }
@@ -510,7 +512,6 @@ public final class FileContext {
                                    Options.CreateOpts... opts)
                                    Options.CreateOpts... opts)
     throws IOException {
     throws IOException {
     Path absF = fixRelativePart(f);
     Path absF = fixRelativePart(f);
-    AbstractFileSystem fsOfAbsF = getFSofPath(absF);
 
 
     // If one of the options is a permission, extract it & apply umask
     // If one of the options is a permission, extract it & apply umask
     // If not, add a default Perms and apply umask;
     // If not, add a default Perms and apply umask;
@@ -522,9 +523,14 @@ public final class FileContext {
                                       FsPermission.getDefault();
                                       FsPermission.getDefault();
     permission = permission.applyUMask(umask);
     permission = permission.applyUMask(umask);
 
 
-    CreateOpts[] updatedOpts = 
+    final CreateOpts[] updatedOpts = 
                       CreateOpts.setOpt(CreateOpts.perms(permission), opts);
                       CreateOpts.setOpt(CreateOpts.perms(permission), opts);
-    return fsOfAbsF.create(absF, createFlag, updatedOpts);
+    return new FSLinkResolver<FSDataOutputStream>() {
+      public FSDataOutputStream next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.create(p, createFlag, updatedOpts);
+      }
+    }.resolve(this, absF);
   }
   }
   
   
   /**
   /**
@@ -541,10 +547,16 @@ public final class FileContext {
   public void mkdir(final Path dir, final FsPermission permission,
   public void mkdir(final Path dir, final FsPermission permission,
       final boolean createParent)
       final boolean createParent)
     throws IOException {
     throws IOException {
-    Path absDir = fixRelativePart(dir);
-    FsPermission absFerms = (permission == null ? 
+    final Path absDir = fixRelativePart(dir);
+    final FsPermission absFerms = (permission == null ? 
           FsPermission.getDefault() : permission).applyUMask(umask);
           FsPermission.getDefault() : permission).applyUMask(umask);
-    getFSofPath(absDir).mkdir(absDir, absFerms, createParent);
+    new FSLinkResolver<Void>() {
+      public Void next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        fs.mkdir(p, absFerms, createParent);
+        return null;
+      }
+    }.resolve(this, absDir);
   }
   }
 
 
   /**
   /**
@@ -559,7 +571,12 @@ public final class FileContext {
   public boolean delete(final Path f, final boolean recursive) 
   public boolean delete(final Path f, final boolean recursive) 
     throws IOException {
     throws IOException {
     Path absF = fixRelativePart(f);
     Path absF = fixRelativePart(f);
-    return getFSofPath(absF).delete(absF, recursive);
+    return new FSLinkResolver<Boolean>() {
+      public Boolean next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return Boolean.valueOf(fs.delete(p, recursive));
+      }
+    }.resolve(this, absF);
   }
   }
  
  
   /**
   /**
@@ -569,7 +586,12 @@ public final class FileContext {
    */
    */
   public FSDataInputStream open(final Path f) throws IOException {
   public FSDataInputStream open(final Path f) throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).open(absF);
+    return new FSLinkResolver<FSDataInputStream>() {
+      public FSDataInputStream next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.open(p);
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -577,10 +599,15 @@ public final class FileContext {
    * @param f the file name to open
    * @param f the file name to open
    * @param bufferSize the size of the buffer to be used.
    * @param bufferSize the size of the buffer to be used.
    */
    */
-  public FSDataInputStream open(final Path f, int bufferSize)
+  public FSDataInputStream open(final Path f, final int bufferSize)
     throws IOException {
     throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).open(absF, bufferSize);
+    return new FSLinkResolver<FSDataInputStream>() {
+      public FSDataInputStream next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.open(p, bufferSize);
+      }
+    }.resolve(this, absF);
   }
   }
 
 
  /**
  /**
@@ -595,7 +622,12 @@ public final class FileContext {
   public boolean setReplication(final Path f, final short replication)
   public boolean setReplication(final Path f, final short replication)
     throws IOException {
     throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).setReplication(absF, replication);
+    return new FSLinkResolver<Boolean>() {
+      public Boolean next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return Boolean.valueOf(fs.setReplication(p, replication));
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -633,7 +665,22 @@ public final class FileContext {
     if(!srcFS.getUri().equals(dstFS.getUri())) {
     if(!srcFS.getUri().equals(dstFS.getUri())) {
       throw new IOException("Renames across AbstractFileSystems not supported");
       throw new IOException("Renames across AbstractFileSystems not supported");
     }
     }
-    srcFS.rename(absSrc, absDst, options);
+    try {
+      srcFS.rename(absSrc, absDst, options);
+    } catch (UnresolvedLinkException e) {
+      /* We do not know whether the source or the destination path
+       * was unresolved. Resolve the source path completely, then
+       * resolve the destination. 
+       */
+      final Path source = resolve(absSrc);    
+      new FSLinkResolver<Void>() {
+        public Void next(final AbstractFileSystem fs, final Path p) 
+          throws IOException, UnresolvedLinkException {
+          fs.rename(source, p, options);
+          return null;
+        }
+      }.resolve(this, absDst);
+    }
   }
   }
   
   
   /**
   /**
@@ -644,7 +691,13 @@ public final class FileContext {
   public void setPermission(final Path f, final FsPermission permission)
   public void setPermission(final Path f, final FsPermission permission)
     throws IOException {
     throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    getFSofPath(absF).setPermission(absF, permission);
+    new FSLinkResolver<Void>() {
+      public Void next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        fs.setPermission(p, permission);
+        return null;
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -655,13 +708,19 @@ public final class FileContext {
    * @param groupname If it is null, the original groupname remains unchanged.
    * @param groupname If it is null, the original groupname remains unchanged.
    */
    */
   public void setOwner(final Path f, final String username,
   public void setOwner(final Path f, final String username,
-                        final String groupname) throws IOException {
+                       final String groupname) throws IOException {
     if ((username == null) && (groupname == null)) {
     if ((username == null) && (groupname == null)) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
-          "usernme and groupname cannot both be null");
+          "username and groupname cannot both be null");
     }
     }
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    getFSofPath(absF).setOwner(absF, username, groupname);
+    new FSLinkResolver<Void>() {
+      public Void next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        fs.setOwner(p, username, groupname);
+        return null;
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -677,7 +736,13 @@ public final class FileContext {
   public void setTimes(final Path f, final long mtime, final long atime)
   public void setTimes(final Path f, final long mtime, final long atime)
     throws IOException {
     throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    getFSofPath(absF).setTimes(absF, mtime, atime);
+    new FSLinkResolver<Void>() {
+      public Void next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        fs.setTimes(p, mtime, atime);
+        return null;
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -690,7 +755,12 @@ public final class FileContext {
    */
    */
   public FileChecksum getFileChecksum(final Path f) throws IOException {
   public FileChecksum getFileChecksum(final Path f) throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).getFileChecksum(absF);
+    return new FSLinkResolver<FileChecksum>() {
+      public FileChecksum next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.getFileChecksum(p);
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -704,10 +774,8 @@ public final class FileContext {
 
 
   public void setVerifyChecksum(final boolean verifyChecksum, final Path f)
   public void setVerifyChecksum(final boolean verifyChecksum, final Path f)
     throws IOException {
     throws IOException {
-    final Path absF = fixRelativePart(f);
+    final Path absF = resolve(fixRelativePart(f));
     getFSofPath(absF).setVerifyChecksum(verifyChecksum);
     getFSofPath(absF).setVerifyChecksum(verifyChecksum);
-    
-    //TBD need to be changed when we add symlinks.
   }
   }
 
 
   /**
   /**
@@ -719,7 +787,84 @@ public final class FileContext {
    */
    */
   public FileStatus getFileStatus(final Path f) throws IOException {
   public FileStatus getFileStatus(final Path f) throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).getFileStatus(absF);
+    return new FSLinkResolver<FileStatus>() {
+      public FileStatus next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.getFileStatus(p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /**
+   * Return a fully qualified version of the given symlink target if it
+   * has no scheme and authority. Partially and fully qualified paths 
+   * are returned unmodified.
+   * @param linkFS The AbstractFileSystem of link 
+   * @param link   The path of the symlink
+   * @param target The symlink's target
+   * @return Fully qualified version of the target.
+   */
+  private Path qualifySymlinkTarget(final AbstractFileSystem linkFS, 
+      Path link, Path target) {
+    /* NB: makeQualified uses link's scheme/authority, if specified, 
+     * and the scheme/authority of linkFS, if not. If link does have
+     * a scheme and authority they should match those of linkFS since
+     * resolve updates the path and file system of a path that contains
+     * links each time a link is encountered.
+     */
+    final String linkScheme = link.toUri().getScheme();
+    final String linkAuth   = link.toUri().getAuthority();
+    if (linkScheme != null && linkAuth != null) {
+      assert linkScheme.equals(linkFS.getUri().getScheme());
+      assert linkAuth.equals(linkFS.getUri().getAuthority());
+    }
+    final boolean justPath = (target.toUri().getScheme() == null &&
+                              target.toUri().getAuthority() == null);
+    return justPath ? target.makeQualified(linkFS.getUri(), link.getParent()) 
+                    : target;
+  }
+  
+  /**
+   * Return a file status object that represents the path. If the path 
+   * refers to a symlink then the FileStatus of the symlink is returned.
+   * The behavior is equivalent to #getFileStatus() if the underlying
+   * file system does not support symbolic links.
+   * @param  f The path we want information from.
+   * @return A FileStatus object
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation.
+   */
+  public FileStatus getFileLinkStatus(final Path f) throws IOException {
+    final Path absF = fixRelativePart(f);
+    return new FSLinkResolver<FileStatus>() {
+      public FileStatus next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        FileStatus fi = fs.getFileLinkStatus(p);
+        if (fi.isSymlink()) {
+          fi.setSymlink(qualifySymlinkTarget(fs, p, fi.getSymlink()));
+        }
+        return fi;
+      }
+    }.resolve(this, absF);
+  }
+  
+  /**
+   * Returns the un-interpreted target of the given symbolic link.
+   * Transparently resolves all links up to the final path component.
+   * @param f
+   * @return The un-interpreted target of the symbolic link.
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException if the last path component of f is not a symlink.
+   */
+  public Path getLinkTarget(final Path f) throws IOException {
+    final Path absF = fixRelativePart(f);
+    return new FSLinkResolver<Path>() {
+      public Path next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        FileStatus fi = fs.getFileLinkStatus(p);
+        return fi.getSymlink();
+      }
+    }.resolve(this, absF);
   }
   }
   
   
   /**
   /**
@@ -740,12 +885,18 @@ public final class FileContext {
   @InterfaceStability.Evolving
   @InterfaceStability.Evolving
   public BlockLocation[] getFileBlockLocations(final Path p, 
   public BlockLocation[] getFileBlockLocations(final Path p, 
     final long start, final long len) throws IOException {
     final long start, final long len) throws IOException {
-    return getFSofPath(p).getFileBlockLocations(p, start, len);
+    final Path absF = fixRelativePart(p);
+    return new FSLinkResolver<BlockLocation[]>() {
+      public BlockLocation[] next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.getFileBlockLocations(p, start, len);
+      }
+    }.resolve(this, absF);
   }
   }
   
   
   /**
   /**
    * Returns a status object describing the use and capacity of the
    * Returns a status object describing the use and capacity of the
-   * filesystem denoted by the Parh argument p.
+   * filesystem denoted by the Path argument p.
    * If the filesystem has multiple partitions, the
    * If the filesystem has multiple partitions, the
    * use and capacity of the partition pointed to by the specified
    * use and capacity of the partition pointed to by the specified
    * path is reflected.
    * path is reflected.
@@ -758,12 +909,99 @@ public final class FileContext {
    */
    */
   public FsStatus getFsStatus(final Path f) throws IOException {
   public FsStatus getFsStatus(final Path f) throws IOException {
     if (f == null) {
     if (f == null) {
-      return defaultFS.getFsStatus(null);
+      return defaultFS.getFsStatus();
     }
     }
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).getFsStatus(absF);
+    return new FSLinkResolver<FsStatus>() {
+      public FsStatus next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.getFsStatus(p);
+      }
+    }.resolve(this, absF);
   }
   }
-  
+
+  /**
+   * Creates a symbolic link to an existing file. An exception is thrown if 
+   * the symlink exits, the user does not have permission to create symlink,
+   * or the underlying file system does not support symlinks.
+   * 
+   * Symlink permissions are ignored, access to a symlink is determined by
+   * the permissions of the symlink target.
+   * 
+   * Symlinks in paths leading up to the final path component are resolved 
+   * transparently. If the final path component refers to a symlink some 
+   * functions operate on the symlink itself, these are:
+   * - delete(f) and deleteOnExit(f) - Deletes the symlink.
+   * - rename(src, dst) - If src refers to a symlink, the symlink is 
+   *   renamed. If dst refers to a symlink, the symlink is over-written.
+   * - getLinkTarget(f) - Returns the target of the symlink. 
+   * - getFileLinkStatus(f) - Returns a FileStatus object describing
+   *   the symlink.
+   * Some functions, create() and mkdir(), expect the final path component
+   * does not exist. If they are given a path that refers to a symlink that 
+   * does exist they behave as if the path referred to an existing file or 
+   * directory. All other functions fully resolve, ie follow, the symlink. 
+   * These are: open, setReplication, setOwner, setTimes, setWorkingDirectory,
+   * setPermission, getFileChecksum, setVerifyChecksum, getFileBlockLocations,
+   * getFsStatus, getFileStatus, isDirectory, isFile, exists, and listStatus.
+   * 
+   * Symlink targets are stored as given to createSymlink, assuming the 
+   * underlying file system is capable of storign a fully qualified URI. 
+   * Dangling symlinks are permitted. FileContext supports four types of 
+   * symlink targets, and resolves them as follows
+   * <pre>
+   * Given a path referring to a symlink of form:
+   * 
+   *   <---X---> 
+   *   fs://host/A/B/link 
+   *   <-----Y----->
+   * 
+   * In this path X is the scheme and authority that identify the file system,
+   * and Y is the path leading up to the final path component "link". If Y is
+   * a symlink  itself then let Y' be the target of Y and X' be the scheme and
+   * authority of Y'. Symlink targets may:
+   * 
+   * 1. Fully qualified URIs
+   * 
+   * fs://hostX/A/B/file  Resolved according to the target file system.
+   * 
+   * 2. Partially qualified URIs (eg scheme but no host)
+   * 
+   * fs:///A/B/file  Resolved according to the target file sytem. Eg resolving
+   *                 a symlink to hdfs:///A results in an exception because
+   *                 HDFS URIs must be fully qualified, while a symlink to 
+   *                 file:///A will not since Hadoop's local file systems 
+   *                 require partially qualified URIs.
+   * 
+   * 3. Relative paths
+   * 
+   * path  Resolves to [Y'][path]. Eg if Y resolves to hdfs://host/A and path 
+   *       is "../B/file" then [Y'][path] is hdfs://host/B/file
+   * 
+   * 4. Absolute paths
+   * 
+   * path  Resolves to [X'][path]. Eg if Y resolves hdfs://host/A/B and path
+   *       is "/file" then [X][path] is hdfs://host/file
+   * </pre>
+   * 
+   * @param target the target of the symbolic link
+   * @param link the path to be created that points to target
+   * @param createParent if true then missing parent dirs are created if 
+   *                     false then parent must exist
+   * @throws IOException
+   */
+  public void createSymlink(final Path target, final Path link, 
+    final boolean createParent) throws IOException { 
+    final Path nonRelLink = fixRelativePart(link);
+    new FSLinkResolver<Void>() {
+      public Void next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        fs.createSymlink(target, p, createParent);
+        return null;
+      }
+    }.resolve(this, nonRelLink);
+  }
+
   /**
   /**
    * Does the file exist?
    * Does the file exist?
    * Note: Avoid using this method if you already have FileStatus in hand.
    * Note: Avoid using this method if you already have FileStatus in hand.
@@ -821,7 +1059,12 @@ public final class FileContext {
    */
    */
   public FileStatus[] listStatus(final Path f) throws IOException {
   public FileStatus[] listStatus(final Path f) throws IOException {
     final Path absF = fixRelativePart(f);
     final Path absF = fixRelativePart(f);
-    return getFSofPath(absF).listStatus(absF);
+    return new FSLinkResolver<FileStatus[]>() {
+      public FileStatus[] next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.listStatus(p);
+      }
+    }.resolve(this, absF);
   }
   }
 
 
   /**
   /**
@@ -920,8 +1163,7 @@ public final class FileContext {
      *         applying the filter default Path filter
      *         applying the filter default Path filter
      * @exception IOException
      * @exception IOException
      */
      */
-    public FileStatus[] listStatus(Path[] files)
-      throws IOException {
+    public FileStatus[] listStatus(Path[] files) throws IOException {
       return listStatus(files, DEFAULT_FILTER);
       return listStatus(files, DEFAULT_FILTER);
     }
     }
      
      
@@ -1264,9 +1506,6 @@ public final class FileContext {
     /** Default pattern character: Character set close. */
     /** Default pattern character: Character set close. */
     private static final char  PAT_SET_CLOSE = ']';
     private static final char  PAT_SET_CLOSE = ']';
       
       
-    GlobFilter() {
-    }
-      
     GlobFilter(final String filePattern) throws IOException {
     GlobFilter(final String filePattern) throws IOException {
       setRegex(filePattern);
       setRegex(filePattern);
     }
     }
@@ -1441,4 +1680,73 @@ public final class FileContext {
       processDeleteOnExit();
       processDeleteOnExit();
     }
     }
   }
   }
-}
+
+  /**
+   * Resolves all symbolic links in the specified path.
+   * Returns the new path object.
+   */
+  protected Path resolve(final Path f) throws IOException {
+    return new FSLinkResolver<FileStatus>() {
+      public FileStatus next(final AbstractFileSystem fs, final Path p) 
+        throws IOException, UnresolvedLinkException {
+        return fs.getFileStatus(p);
+      }
+    }.resolve(this, f).getPath();
+  }
+
+  /**
+   * Class used to perform an operation on and resolve symlinks in a
+   * path. The operation may potentially span multiple file systems.  
+   */
+  protected abstract class FSLinkResolver<T> {
+    // The maximum number of symbolic link components in a path
+    private static final int MAX_PATH_LINKS = 32;
+
+    /**
+     * Generic helper function overridden on instantiation to perform a 
+     * specific operation on the given file system using the given path
+     * which may result in an UnresolvedLinkException. 
+     * @param fs AbstractFileSystem to perform the operation on.
+     * @param p Path given the file system.
+     * @return Generic type determined by the specific implementation.
+     * @throws IOException on error.
+     * @throws UnresolvedLinkException when a symlink is encountered.
+     */
+    public abstract T next(final AbstractFileSystem fs, final Path p) 
+      throws IOException, UnresolvedLinkException;  
+        
+    /**
+     * Performs the operation specified by the next function, calling it
+     * repeatedly until all symlinks in the given path are resolved.
+     * @param fc FileContext used to access file systems.
+     * @param p The path to resolve symlinks in.
+     * @return Generic type determined by the implementation of next.
+     * @throws IOException
+     */
+    public T resolve(final FileContext fc, Path p) throws IOException {
+      int count = 0;
+      T in = null;
+      Path first = p;
+      // NB: More than one AbstractFileSystem can match a scheme, eg 
+      // "file" resolves to LocalFs but could have come by RawLocalFs.
+      AbstractFileSystem fs = fc.getFSofPath(p);      
+      
+      // Loop until all symlinks are resolved or the limit is reached
+      for (boolean isLink = true; isLink;) {
+        try {
+          in = next(fs, p);
+          isLink = false;
+        } catch (UnresolvedLinkException e) {
+          if (count++ > MAX_PATH_LINKS) {
+            throw new IOException("Possible cyclic loop while " +
+                                  "following symbolic link " + first);
+          }
+          // Resolve the first unresolved path component
+          p = qualifySymlinkTarget(fs, p, fs.getLinkTarget(p));
+          fs = fc.getFSofPath(p);
+        }
+      }
+      return in;
+    }
+  }
+}

+ 49 - 2
src/java/org/apache/hadoop/fs/FileStatus.java

@@ -39,6 +39,7 @@ public class FileStatus implements Writable, Comparable {
   private FsPermission permission;
   private FsPermission permission;
   private String owner;
   private String owner;
   private String group;
   private String group;
+  private Path symlink;
   
   
   public FileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
   public FileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
   
   
@@ -49,10 +50,24 @@ public class FileStatus implements Writable, Comparable {
     this(length, isdir, block_replication, blocksize, modification_time,
     this(length, isdir, block_replication, blocksize, modification_time,
          0, null, null, null, path);
          0, null, null, null, path);
   }
   }
-  
-  public FileStatus(long length, boolean isdir, int block_replication,
+
+  /**
+   * Constructor for file systems on which symbolic links are not supported
+   */
+  public FileStatus(long length, boolean isdir,
+                    int block_replication,
+                    long blocksize, long modification_time, long access_time,
+                    FsPermission permission, String owner, String group, 
+                    Path path) {
+    this(length, isdir, block_replication, blocksize, modification_time,
+         access_time, permission, owner, group, null, path);
+  }
+
+  public FileStatus(long length, boolean isdir,
+                    int block_replication,
                     long blocksize, long modification_time, long access_time,
                     long blocksize, long modification_time, long access_time,
                     FsPermission permission, String owner, String group, 
                     FsPermission permission, String owner, String group, 
+                    Path symlink,
                     Path path) {
                     Path path) {
     this.length = length;
     this.length = length;
     this.isdir = isdir;
     this.isdir = isdir;
@@ -64,6 +79,7 @@ public class FileStatus implements Writable, Comparable {
                       FsPermission.getDefault() : permission;
                       FsPermission.getDefault() : permission;
     this.owner = (owner == null) ? "" : owner;
     this.owner = (owner == null) ? "" : owner;
     this.group = (group == null) ? "" : group;
     this.group = (group == null) ? "" : group;
+    this.symlink = symlink;
     this.path = path;
     this.path = path;
   }
   }
 
 
@@ -182,6 +198,28 @@ public class FileStatus implements Writable, Comparable {
     this.group = (group == null) ? "" :  group;
     this.group = (group == null) ? "" :  group;
   }
   }
 
 
+  /**
+   * Is this a symbolic link?
+   * @return true if this is a symbolic link
+   */
+  public boolean isSymlink() {
+    return symlink != null;
+  }
+
+  /**
+   * @return The contents of the symbolic link.
+   */
+  public Path getSymlink() throws IOException {
+    if (!isSymlink()) {
+      throw new IOException("Path " + path + " is not a symbolic link");
+    }
+    return symlink;
+  }
+
+  public void setSymlink(final Path p) {
+    symlink = p;
+  }
+  
   //////////////////////////////////////////////////
   //////////////////////////////////////////////////
   // Writable
   // Writable
   //////////////////////////////////////////////////
   //////////////////////////////////////////////////
@@ -196,6 +234,10 @@ public class FileStatus implements Writable, Comparable {
     permission.write(out);
     permission.write(out);
     Text.writeString(out, owner);
     Text.writeString(out, owner);
     Text.writeString(out, group);
     Text.writeString(out, group);
+    out.writeBoolean(isSymlink());
+    if (isSymlink()) {
+      Text.writeString(out, symlink.toString());
+    }
   }
   }
 
 
   public void readFields(DataInput in) throws IOException {
   public void readFields(DataInput in) throws IOException {
@@ -210,6 +252,11 @@ public class FileStatus implements Writable, Comparable {
     permission.readFields(in);
     permission.readFields(in);
     owner = Text.readString(in);
     owner = Text.readString(in);
     group = Text.readString(in);
     group = Text.readString(in);
+    if (in.readBoolean()) {
+      this.symlink = new Path(Text.readString(in));
+    } else {
+      this.symlink = null;
+    }
   }
   }
 
 
   /**
   /**

+ 46 - 16
src/java/org/apache/hadoop/fs/FilterFs.java

@@ -60,37 +60,48 @@ public abstract class FilterFs extends AbstractFileSystem {
   protected FSDataOutputStream createInternal(Path f,
   protected FSDataOutputStream createInternal(Path f,
     EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
     EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
     short replication, long blockSize, Progressable progress,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum, boolean createParent) throws IOException {
+    int bytesPerChecksum, boolean createParent) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.createInternal(f, flag, absolutePermission, bufferSize,
     return myFs.createInternal(f, flag, absolutePermission, bufferSize,
         replication, blockSize, progress, bytesPerChecksum, createParent);
         replication, blockSize, progress, bytesPerChecksum, createParent);
   }
   }
 
 
   @Override
   @Override
-  protected boolean delete(Path f, boolean recursive) throws IOException {
+  protected boolean delete(Path f, boolean recursive) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.delete(f, recursive);
     return myFs.delete(f, recursive);
   }
   }
 
 
   @Override
   @Override
   protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
   protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
-    throws IOException {
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.getFileBlockLocations(f, start, len);
     return myFs.getFileBlockLocations(f, start, len);
   }
   }
 
 
   @Override
   @Override
-  protected FileChecksum getFileChecksum(Path f) throws IOException {
+  protected FileChecksum getFileChecksum(Path f) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.getFileChecksum(f);
     return myFs.getFileChecksum(f);
   }
   }
 
 
   @Override
   @Override
-  protected FileStatus getFileStatus(Path f) throws IOException {
+  protected FileStatus getFileStatus(Path f) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.getFileStatus(f);
     return myFs.getFileStatus(f);
   }
   }
 
 
+  @Override
+  protected FileStatus getFileLinkStatus(final Path f) 
+    throws IOException, UnresolvedLinkException {
+    checkPath(f);
+    return myFs.getFileLinkStatus(f);
+  }
+  
   @Override
   @Override
   protected FsStatus getFsStatus() throws IOException {
   protected FsStatus getFsStatus() throws IOException {
     return myFs.getFsStatus();
     return myFs.getFsStatus();
@@ -107,36 +118,38 @@ public abstract class FilterFs extends AbstractFileSystem {
   }
   }
 
 
   @Override
   @Override
-  protected FileStatus[] listStatus(Path f) throws IOException {
+  protected FileStatus[] listStatus(Path f) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.listStatus(f);
     return myFs.listStatus(f);
   }
   }
 
 
   @Override
   @Override
   protected void mkdir(Path dir, FsPermission permission, boolean createParent)
   protected void mkdir(Path dir, FsPermission permission, boolean createParent)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     checkPath(dir);
     checkPath(dir);
     myFs.mkdir(dir, permission, createParent);
     myFs.mkdir(dir, permission, createParent);
     
     
   }
   }
 
 
   @Override
   @Override
-  protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
+  protected FSDataInputStream open(Path f, int bufferSize) 
+    throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.open(f, bufferSize);
     return myFs.open(f, bufferSize);
   }
   }
 
 
   @Override
   @Override
-  protected void renameInternal(Path src, Path dst) throws IOException {
+  protected void renameInternal(Path src, Path dst) 
+    throws IOException, UnresolvedLinkException {
     checkPath(src);
     checkPath(src);
     checkPath(dst);
     checkPath(dst);
     myFs.rename(src, dst, Options.Rename.NONE);
     myFs.rename(src, dst, Options.Rename.NONE);
-    
   }
   }
 
 
   @Override
   @Override
   protected void setOwner(Path f, String username, String groupname)
   protected void setOwner(Path f, String username, String groupname)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     myFs.setOwner(f, username, groupname);
     myFs.setOwner(f, username, groupname);
     
     
@@ -144,27 +157,44 @@ public abstract class FilterFs extends AbstractFileSystem {
 
 
   @Override
   @Override
   protected void setPermission(Path f, FsPermission permission)
   protected void setPermission(Path f, FsPermission permission)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     myFs.setPermission(f, permission);
     myFs.setPermission(f, permission);
   }
   }
 
 
   @Override
   @Override
   protected boolean setReplication(Path f, short replication)
   protected boolean setReplication(Path f, short replication)
-    throws IOException {
+    throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     return myFs.setReplication(f, replication);
     return myFs.setReplication(f, replication);
   }
   }
 
 
   @Override
   @Override
-  protected void setTimes(Path f, long mtime, long atime) throws IOException {
+  protected void setTimes(Path f, long mtime, long atime) 
+      throws IOException, UnresolvedLinkException {
     checkPath(f);
     checkPath(f);
     myFs.setTimes(f, mtime, atime);
     myFs.setTimes(f, mtime, atime);
-    
   }
   }
 
 
   @Override
   @Override
-  protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+  protected void setVerifyChecksum(boolean verifyChecksum) 
+      throws IOException, UnresolvedLinkException {
     myFs.setVerifyChecksum(verifyChecksum);
     myFs.setVerifyChecksum(verifyChecksum);
   }
   }
+
+  @Override
+  protected boolean supportsSymlinks() {
+    return myFs.supportsSymlinks();
+  }
+
+  @Override
+  protected void createSymlink(Path target, Path link, boolean createParent) 
+    throws IOException, UnresolvedLinkException {
+    myFs.createSymlink(target, link, createParent);
+  }
+
+  @Override
+  protected Path getLinkTarget(final Path f) throws IOException {
+    return myFs.getLinkTarget(f);
+  }
 }
 }

+ 1 - 1
src/java/org/apache/hadoop/fs/Path.java

@@ -191,7 +191,7 @@ public class Path implements Comparable {
     return uri.getPath().startsWith(SEPARATOR, start);
     return uri.getPath().startsWith(SEPARATOR, start);
    }
    }
   
   
-  /** True if the directory of this path is absolute. */
+  /** True if the path component of this URI is absolute. */
   /**
   /**
    * There is some ambiguity here. An absolute path is a slash
    * There is some ambiguity here. An absolute path is a slash
    * relative name without a scheme or an authority.
    * relative name without a scheme or an authority.

+ 40 - 0
src/java/org/apache/hadoop/fs/UnresolvedLinkException.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
+
+/** 
+ * Thrown when a symbolic link is encountered in a path.
+ */
+@InterfaceAudience.LimitedPrivate({Project.HDFS})
+public class UnresolvedLinkException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public UnresolvedLinkException() {
+    super();
+  }
+
+  public UnresolvedLinkException(String msg) {
+    super(msg);
+  }
+}

+ 104 - 1
src/java/org/apache/hadoop/fs/local/RawLocalFs.java

@@ -18,17 +18,21 @@
 package org.apache.hadoop.fs.local;
 package org.apache.hadoop.fs.local;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.DelegateToFileSystem;
 import org.apache.hadoop.fs.DelegateToFileSystem;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Shell;
 
 
 /**
 /**
  * The RawLocalFs implementation of AbstractFileSystem.
  * The RawLocalFs implementation of AbstractFileSystem.
@@ -37,6 +41,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 @InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
 @InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
 public class RawLocalFs extends DelegateToFileSystem {
 public class RawLocalFs extends DelegateToFileSystem {
+
   RawLocalFs(final Configuration conf) throws IOException, URISyntaxException {
   RawLocalFs(final Configuration conf) throws IOException, URISyntaxException {
     this(FsConstants.LOCAL_FS_URI, conf);
     this(FsConstants.LOCAL_FS_URI, conf);
   }
   }
@@ -65,4 +70,102 @@ public class RawLocalFs extends DelegateToFileSystem {
   protected FsServerDefaults getServerDefaults() throws IOException {
   protected FsServerDefaults getServerDefaults() throws IOException {
     return LocalConfigKeys.getServerDefaults();
     return LocalConfigKeys.getServerDefaults();
   }
   }
+  
+  @Override
+  protected boolean supportsSymlinks() {
+    return true;
+  }  
+  
+  @Override
+  protected void createSymlink(Path target, Path link, boolean createParent) 
+      throws IOException {
+    final String targetScheme = target.toUri().getScheme();
+    if (targetScheme != null && !"file".equals(targetScheme)) {
+      throw new IOException("Unable to create symlink to non-local file "+
+                            "system: "+target.toString());
+    }
+    if (createParent) {
+      mkdir(link.getParent(), FsPermission.getDefault(), true);
+    }
+    // NB: Use createSymbolicLink in java.nio.file.Path once available
+    try {
+      Shell.execCommand(Shell.LINK_COMMAND, "-s",
+                        new URI(target.toString()).getPath(),
+                        new URI(link.toString()).getPath());
+    } catch (URISyntaxException x) {
+      throw new IOException("Invalid symlink path: "+x.getMessage());
+    } catch (IOException x) {
+      throw new IOException("Unable to create symlink: "+x.getMessage());
+    }
+  }
+
+  /** 
+   * Returns the target of the given symlink. Returns the empty string if  
+   * the given path does not refer to a symlink or there is an error 
+   * acessing the symlink.
+   */
+  private String readLink(Path p) {
+    /* NB: Use readSymbolicLink in java.nio.file.Path once available. Could
+     * use getCanonicalPath in File to get the target of the symlink but that 
+     * does not indicate if the given path refers to a symlink.
+     */
+    try {
+      final String path = p.toUri().getPath();
+      return Shell.execCommand(Shell.READ_LINK_COMMAND, path).trim(); 
+    } catch (IOException x) {
+      return "";
+    }
+  }
+  
+  /**
+   * Return a FileStatus representing the given path. If the path refers 
+   * to a symlink return a FileStatus representing the link rather than
+   * the object the link refers to.
+   */
+  @Override
+  protected FileStatus getFileLinkStatus(final Path f) throws IOException {
+    String target = readLink(f);
+    try {
+      FileStatus fs = getFileStatus(f);
+      // If f refers to a regular file or directory      
+      if ("".equals(target)) {
+        return fs;
+      }
+      // Otherwise f refers to a symlink
+      return new FileStatus(fs.getLen(), 
+          false,
+          fs.getReplication(), 
+          fs.getBlockSize(),
+          fs.getModificationTime(),
+          fs.getAccessTime(),
+          fs.getPermission(),
+          fs.getOwner(),
+          fs.getGroup(),
+          new Path(target),
+          f);
+    } catch (FileNotFoundException e) {
+      /* The exists method in the File class returns false for dangling 
+       * links so we can get a FileNotFoundException for links that exist.
+       * It's also possible that we raced with a delete of the link. Use
+       * the readBasicFileAttributes method in java.nio.file.attributes 
+       * when available.
+       */
+      if (!"".equals(target)) {
+        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(), 
+            "", "", new Path(target), f);        
+      }
+      // f refers to a file or directory that does not exist
+      throw e;
+    }
+  }
+  
+  @Override
+  protected Path getLinkTarget(Path f) throws IOException {
+    /* We should never get here. Valid local links are resolved transparently
+     * by the underlying local file system and accessing a dangling link will 
+     * result in an IOException, not an UnresolvedLinkException, so FileContext
+     * should never call this function.
+     */
+    throw new AssertionError();
+  }
 }
 }

+ 4 - 0
src/java/org/apache/hadoop/util/Shell.java

@@ -57,6 +57,10 @@ abstract public class Shell {
   /** a Unix command to set owner */
   /** a Unix command to set owner */
   public static final String SET_OWNER_COMMAND = "chown";
   public static final String SET_OWNER_COMMAND = "chown";
   public static final String SET_GROUP_COMMAND = "chgrp";
   public static final String SET_GROUP_COMMAND = "chgrp";
+  /** a Unix command to create a link */
+  public static final String LINK_COMMAND = "ln";
+  /** a Unix command to get a link target */
+  public static final String READ_LINK_COMMAND = "readlink";
   /** Return a Unix command to get permission information. */
   /** Return a Unix command to get permission information. */
   public static String[] getGET_PERMISSION_COMMAND() {
   public static String[] getGET_PERMISSION_COMMAND() {
     //force /bin/ls, except on windows.
     //force /bin/ls, except on windows.

+ 25 - 0
src/test/core/org/apache/hadoop/fs/FileContextMainOperationsBaseTest.java

@@ -967,6 +967,31 @@ public abstract class FileContextMainOperationsBaseTest  {
     out.close();
     out.close();
   }
   }
 
 
+  @Test
+  /** Test FileContext APIs when symlinks are not supported */
+  public void testUnsupportedSymlink() throws IOException {
+    Path file = getTestRootPath(fc, "file");
+    Path link = getTestRootPath(fc, "linkToFile");
+    if (!fc.getDefaultFileSystem().supportsSymlinks()) {
+      try {
+        fc.createSymlink(file, link, false);
+        Assert.fail("Created a symlink on a file system that "+
+                    "does not support symlinks.");
+      } catch (IOException e) {
+        // Expected
+      }
+      createFile(file);
+      try {
+        fc.getLinkTarget(file);
+        Assert.fail("Got a link target on a file system that "+
+                    "does not support symlinks.");
+      } catch (IOException e) {
+        // Expected
+      }
+      Assert.assertEquals(fc.getFileStatus(file), fc.getFileLinkStatus(file));
+    }
+  }
+  
   protected void createFile(Path path) throws IOException {
   protected void createFile(Path path) throws IOException {
     FSDataOutputStream out = fc.create(path, EnumSet.of(CreateFlag.CREATE),
     FSDataOutputStream out = fc.create(path, EnumSet.of(CreateFlag.CREATE),
         Options.CreateOpts.createParent());
         Options.CreateOpts.createParent());

+ 818 - 0
src/test/core/org/apache/hadoop/fs/FileContextSymlinkBaseTest.java

@@ -0,0 +1,818 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URI;
+import java.util.Random;
+import java.util.EnumSet;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+
+/**
+ * Test symbolic links using FileContext.
+ */
+public abstract class FileContextSymlinkBaseTest {
+  static final long seed = 0xDEADBEEFL;
+  static final int  blockSize =  8192;
+  static final int  fileSize  = 16384;
+ 
+  protected static FileContext fc;
+  
+  abstract protected String getScheme();
+  abstract protected String testBaseDir1();
+  abstract protected String testBaseDir2();
+  abstract protected URI testURI();
+
+  protected static void createAndWriteFile(FileContext fc, Path p) 
+      throws IOException {
+    FSDataOutputStream out;
+    out = fc.create(p, EnumSet.of(CreateFlag.CREATE),
+                    CreateOpts.createParent(),
+                    CreateOpts.repFac((short) 1),
+                    CreateOpts.blockSize(blockSize));
+    byte[] buf = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buf);
+    out.write(buf);
+    out.close();
+  }
+  
+  protected static void createAndWriteFile(Path p) throws IOException {
+    createAndWriteFile(fc, p);
+  }
+
+  protected void readFile(Path p) throws IOException {
+    FSDataInputStream out = fc.open(p);
+    byte[] actual = new byte[fileSize];
+    out.readFully(actual);
+    out.close();
+  }
+
+  protected void readFile(FileContext fc, Path p) throws IOException {
+    FSDataInputStream out = fc.open(p);
+    byte[] actual = new byte[fileSize];
+    out.readFully(actual);
+    out.close();
+  }
+  
+  protected void appendToFile(Path p) throws IOException {
+    FSDataOutputStream out;
+    out = fc.create(p, EnumSet.of(CreateFlag.APPEND));
+    byte[] buf = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buf);
+    out.write(buf);
+    out.close();
+  }
+  
+  @Before
+  public void setUp() throws Exception {
+    fc.mkdir(new Path(testBaseDir1()), FileContext.DEFAULT_PERM, true);
+    fc.mkdir(new Path(testBaseDir2()), FileContext.DEFAULT_PERM, true);
+  }
+  
+  @After
+  public void tearDown() throws Exception { 
+    fc.delete(new Path(testBaseDir1()), true);
+    fc.delete(new Path(testBaseDir2()), true);
+  } 
+  
+  @Test
+  /** The root is not a symlink */
+  public void testStatRoot() throws IOException {
+    assertFalse(fc.getFileLinkStatus(new Path("/")).isSymlink());    
+  }
+  
+  @Test
+  /** Test setWorkingDirectory resolves symlinks */
+  public void testSetWDResolvesLinks() throws IOException {
+    Path dir       = new Path(testBaseDir1());
+    Path linkToDir = new Path(testBaseDir1()+"/link");
+    fc.createSymlink(dir, linkToDir, false);
+    fc.setWorkingDirectory(linkToDir);
+    // Local file system does not resolve symlinks, others do.
+    if ("file".equals(getScheme())) {
+      assertEquals(linkToDir.getName(), fc.getWorkingDirectory().getName());
+    } else {
+      assertEquals(dir.getName(), fc.getWorkingDirectory().getName());
+    }
+  }
+  
+  @Test
+  /** Test create a dangling link */
+  public void testCreateDanglingLink() throws IOException {
+    Path file = new Path("/noSuchFile");
+    Path link = new Path(testBaseDir1()+"/link");    
+    try {
+      fc.createSymlink(file, link, false);
+    } catch (IOException x) {
+      fail("failed to create dangling symlink");
+    }
+    try {
+      fc.getFileStatus(link);
+      fail("Got file status of non-existant file");
+    } catch (FileNotFoundException f) {
+      // Expected
+    }
+    fc.delete(link, false);
+  } 
+
+  @Test
+  /** Test create a link to null and empty path */
+  public void testCreateLinkToNullEmpty() throws IOException {
+    Path link = new Path(testBaseDir1()+"/link");
+    try {
+      fc.createSymlink(null, link, false);
+      fail("Can't create symlink to null");
+    } catch (java.lang.NullPointerException e) {
+      // Expected, create* with null yields NPEs
+    }
+    try {
+      fc.createSymlink(new Path(""), link, false);
+      fail("Can't create symlink to empty string");
+    } catch (java.lang.IllegalArgumentException e) {
+      // Expected, Path("") is invalid
+    }
+  } 
+    
+  @Test
+  /** Create a link with createParent set */
+  public void testCreateLinkCanCreateParent() throws IOException {
+    Path file = new Path(testBaseDir1()+"/file");
+    Path link = new Path(testBaseDir2()+"/linkToFile");
+    createAndWriteFile(file);
+    fc.delete(new Path(testBaseDir2()), true);
+    try {
+      fc.createSymlink(file, link, false);
+      fail("Created link without first creating parent dir");
+    } catch (IOException x) {
+      // Expected. Need to create testBaseDir2() first.
+    }
+    assertFalse(fc.exists(new Path(testBaseDir2())));
+    fc.createSymlink(file, link, true);
+    readFile(link);
+  }
+
+  @Test
+  /** Delete a link */
+  public void testDeleteLink() throws IOException {
+    Path file = new Path(testBaseDir1()+"/file");
+    Path link = new Path(testBaseDir1()+"/linkToFile");    
+    createAndWriteFile(file);  
+    fc.createSymlink(file, link, false);
+    readFile(link);
+    fc.delete(link, false);
+    try {
+      readFile(link);
+      fail("Symlink should have been deleted");
+    } catch (IOException x) {
+      // Expected
+    }
+    // If we deleted the link we can put it back
+    fc.createSymlink(file, link, false);    
+  }
+  
+  @Test
+  /** Ensure open resolves symlinks */
+  public void testOpenResolvesLinks() throws IOException {
+    Path file = new Path(testBaseDir1()+"/noSuchFile");
+    Path link = new Path(testBaseDir1()+"/link");
+    fc.createSymlink(file, link, false);
+    try {
+      fc.open(link);
+      fail("link target does not exist");
+    } catch (FileNotFoundException x) {
+      // Expected
+    }
+    fc.delete(link, false);
+  } 
+
+  @Test
+  /** Stat a link to a file */
+  public void testStatLinkToFile() throws IOException {
+    Path file  = new Path(testBaseDir1()+"/file");
+    Path link  = new Path(testBaseDir1()+"/linkToFile");    
+    createAndWriteFile(file);
+    readFile(file);
+    fc.createSymlink(file, link, false);
+    assertFalse(fc.getFileStatus(link).isSymlink());
+    assertFalse(fc.getFileStatus(link).isDir());
+    assertTrue(fc.getFileLinkStatus(link).isSymlink());
+    assertFalse(fc.getFileLinkStatus(link).isDir());
+    assertTrue(fc.isFile(link));
+    assertFalse(fc.isDirectory(link));
+    assertEquals(file.toUri().getPath(), fc.getLinkTarget(link).toString());
+  }
+
+  @Test
+  /** Stat a link to a directory */
+  public void testStatLinkToDir() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path link = new Path(testBaseDir1()+"/linkToDir");
+    fc.createSymlink(dir, link, false);
+    assertFalse(fc.getFileStatus(link).isSymlink());
+    assertTrue(fc.getFileStatus(link).isDir());
+    assertTrue(fc.getFileLinkStatus(link).isSymlink());
+    assertFalse(fc.getFileLinkStatus(link).isDir());
+    assertFalse(fc.isFile(link));
+    assertTrue(fc.isDirectory(link));
+    assertEquals(dir.toUri().getPath(), fc.getLinkTarget(link).toString());
+  }
+
+  @Test
+  /** lstat a non-existant file */
+  public void testStatNonExistantFiles() throws IOException {
+    Path fileAbs = new Path("/doesNotExist");
+    try {
+      fc.getFileLinkStatus(fileAbs);
+      fail("Got FileStatus for non-existant file");
+    } catch (FileNotFoundException f) {
+      // Expected
+    }
+    try {
+      fc.getLinkTarget(fileAbs);
+      fail("Got link target for non-existant file");
+    } catch (FileNotFoundException f) {
+      // Expected
+    }
+  }
+
+  @Test
+  /** Test stat'ing a regular file and directory */
+  public void testStatNonLinks() throws IOException {
+    Path dir   = new Path(testBaseDir1());
+    Path file  = new Path(testBaseDir1()+"/file");
+    createAndWriteFile(file);
+    try {
+      fc.getLinkTarget(dir);
+      fail("Lstat'd a non-symlink");
+    } catch (IOException e) {
+      // Expected.
+    }
+    try {
+      fc.getLinkTarget(file);
+      fail("Lstat'd a non-symlink");
+    } catch (IOException e) {
+      // Expected.
+    }
+  }
+  
+  @Test
+  /** Test links that link to each other */
+  public void testRecursiveLinks() throws IOException {
+    Path link1 = new Path(testBaseDir1()+"/link1");
+    Path link2 = new Path(testBaseDir1()+"/link2");
+    fc.createSymlink(link1, link2, false);
+    fc.createSymlink(link2, link1, false);
+    try {
+      readFile(link1);
+      fail("Read recursive link");
+    } catch (FileNotFoundException f) {
+      // LocalFs throws sub class of IOException, since File.exists 
+      // returns false for a link to link. 
+    } catch (IOException x) {
+      assertEquals("Possible cyclic loop while following symbolic link "+
+                   link1.toString(), x.getMessage());
+    }    
+  }
+
+  private void checkLink(Path linkAbs, Path expectedTarget, Path targetQual) 
+      throws IOException { 
+    Path dir = new Path(testBaseDir1());
+    // isFile/Directory
+    assertTrue(fc.isFile(linkAbs));
+    assertFalse(fc.isDirectory(linkAbs));
+
+    // Check getFileStatus 
+    assertFalse(fc.getFileStatus(linkAbs).isSymlink());
+    assertFalse(fc.getFileStatus(linkAbs).isDir());
+    assertEquals(fileSize, fc.getFileStatus(linkAbs).getLen());
+
+    // Check getFileLinkStatus
+    assertTrue(fc.getFileLinkStatus(linkAbs).isSymlink());
+    assertFalse(fc.getFileLinkStatus(linkAbs).isDir());
+
+    // Check getSymlink always returns a qualified target, except
+    // when partially qualified paths are used (see tests below).
+    assertEquals(targetQual.toString(), 
+        fc.getFileLinkStatus(linkAbs).getSymlink().toString());
+    assertEquals(targetQual, fc.getFileLinkStatus(linkAbs).getSymlink());
+    // Check that the target is qualified using the file system of the 
+    // path used to access the link (if the link target was not specified 
+    // fully qualified, in that case we use the link target verbatim).
+    if (!"file".equals(getScheme())) {
+      FileContext localFc = FileContext.getLocalFSFileContext();
+      Path linkQual = new Path(testURI().toString(), linkAbs);
+      assertEquals(targetQual, 
+                   localFc.getFileLinkStatus(linkQual).getSymlink());
+    }
+    
+    // Check getLinkTarget
+    assertEquals(expectedTarget, fc.getLinkTarget(linkAbs));
+    
+    // Now read using all path types..
+    fc.setWorkingDirectory(dir);    
+    readFile(new Path("linkToFile"));
+    readFile(linkAbs);
+    // And fully qualified.. (NB: for local fs this is partially qualified)
+    readFile(new Path(testURI().toString(), linkAbs));
+    // And partially qualified..
+    boolean failureExpected = "file".equals(getScheme()) ? false : true;
+    try {
+      readFile(new Path(getScheme()+"://"+testBaseDir1()+"/linkToFile"));
+      assertFalse(failureExpected);
+    } catch (Exception e) {
+      assertTrue(failureExpected);
+    }
+    
+    // Now read using a different file context (for HDFS at least)
+    if (!"file".equals(getScheme())) {
+      FileContext localFc = FileContext.getLocalFSFileContext();
+      readFile(localFc, new Path(testURI().toString(), linkAbs));
+    }
+  }
+  
+  @Test
+  /** Test creating a symlink using relative paths */
+  public void testCreateLinkUsingRelPaths() throws IOException {
+    Path fileAbs = new Path(testBaseDir1(), "file");
+    Path linkAbs = new Path(testBaseDir1(), "linkToFile");
+    Path schemeAuth = new Path(testURI().toString()); 
+    Path fileQual = new Path(schemeAuth, testBaseDir1()+"/file");
+    createAndWriteFile(fileAbs);
+    
+    fc.setWorkingDirectory(new Path(testBaseDir1()));
+    fc.createSymlink(new Path("file"), new Path("linkToFile"), false);
+    checkLink(linkAbs, new Path("file"), fileQual);
+    
+    // Now rename the link's parent. Because the target was specified 
+    // with a relative path the link should still resolve.
+    Path dir1        = new Path(testBaseDir1());
+    Path dir2        = new Path(testBaseDir2());
+    Path linkViaDir2 = new Path(testBaseDir2(), "linkToFile");
+    Path fileViaDir2 = new Path(schemeAuth, testBaseDir2()+"/file");
+    fc.rename(dir1, dir2, Rename.OVERWRITE);
+    assertEquals(fileViaDir2, fc.getFileLinkStatus(linkViaDir2).getSymlink());
+    readFile(linkViaDir2);
+  }
+
+  @Test
+  /** Test creating a symlink using absolute paths */
+  public void testCreateLinkUsingAbsPaths() throws IOException {
+    Path fileAbs = new Path(testBaseDir1()+"/file");
+    Path linkAbs = new Path(testBaseDir1()+"/linkToFile");
+    Path schemeAuth = new Path(testURI().toString()); 
+    Path fileQual = new Path(schemeAuth, testBaseDir1()+"/file");
+    createAndWriteFile(fileAbs);
+
+    fc.createSymlink(fileAbs, linkAbs, false);
+    checkLink(linkAbs, fileAbs, fileQual);
+
+    // Now rename the link's parent. The target doesn't change and
+    // now no longer exists so accessing the link should fail.
+    Path dir1        = new Path(testBaseDir1());
+    Path dir2        = new Path(testBaseDir2());
+    Path linkViaDir2 = new Path(testBaseDir2(), "linkToFile");
+    fc.rename(dir1, dir2, Rename.OVERWRITE);
+    assertEquals(fileQual, fc.getFileLinkStatus(linkViaDir2).getSymlink());    
+    try {
+      readFile(linkViaDir2);
+      fail("The target should not exist");
+    } catch (FileNotFoundException x) {
+      // Expected
+    }
+  } 
+  
+  @Test
+  /** 
+   * Test creating a symlink using fully and partially qualified paths.
+   * NB: For local fs this actually tests partially qualified paths,
+   * as they don't support fully qualified paths.
+   */
+  public void testCreateLinkUsingFullyQualPaths() throws IOException {
+    Path fileAbs  = new Path(testBaseDir1(), "file");
+    Path linkAbs  = new Path(testBaseDir1(), "linkToFile");
+    Path fileQual = new Path(testURI().toString(), fileAbs);
+    Path linkQual = new Path(testURI().toString(), linkAbs);
+    createAndWriteFile(fileAbs);
+    
+    fc.createSymlink(fileQual, linkQual, false);
+    checkLink(linkAbs, 
+              "file".equals(getScheme()) ? fileAbs : fileQual, 
+              fileQual);
+    
+    // Now rename the link's parent. The target doesn't change and
+    // now no longer exists so accessing the link should fail.
+    Path dir1        = new Path(testBaseDir1());
+    Path dir2        = new Path(testBaseDir2());
+    Path linkViaDir2 = new Path(testBaseDir2(), "linkToFile");
+    fc.rename(dir1, dir2, Rename.OVERWRITE);    
+    assertEquals(fileQual, fc.getFileLinkStatus(linkViaDir2).getSymlink());    
+    try {
+      readFile(linkViaDir2);
+      fail("The target should not exist");
+    } catch (FileNotFoundException x) {
+      // Expected
+    }
+  } 
+    
+  @Test
+  /** 
+   * Test creating a symlink using partially qualified paths, ie a scheme 
+   * but no authority and vice versa. We just test link targets here since
+   * creating using a partially qualified path is file system specific.
+   */
+  public void testCreateLinkUsingPartQualPath1() throws IOException {
+    Path schemeAuth   = new Path(testURI().toString());
+    Path fileWoHost   = new Path(getScheme()+"://"+testBaseDir1()+"/file");
+    Path link         = new Path(testBaseDir1()+"/linkToFile");
+    Path linkQual     = new Path(schemeAuth, testBaseDir1()+"/linkToFile");
+    
+    // Partially qualified paths are covered for local file systems
+    // in the previous test.
+    if ("file".equals(getScheme())) {
+      return;
+    }
+    FileContext localFc = FileContext.getLocalFSFileContext();
+    
+    fc.createSymlink(fileWoHost, link, false);
+    // Partially qualified path is stored
+    assertEquals(fileWoHost, fc.getLinkTarget(linkQual));    
+    // NB: We do not add an authority
+    assertEquals(fileWoHost.toString(),
+      fc.getFileLinkStatus(link).getSymlink().toString());
+    assertEquals(fileWoHost.toString(),
+      fc.getFileLinkStatus(linkQual).getSymlink().toString());
+    // Ditto even from another file system
+    assertEquals(fileWoHost.toString(),
+      localFc.getFileLinkStatus(linkQual).getSymlink().toString());
+    // Same as if we accessed a partially qualified path directly
+    try { 
+      readFile(link);
+      fail("DFS requires URIs with schemes have an authority");
+    } catch (java.lang.RuntimeException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  /** Same as above but vice versa (authority but no scheme) */
+  public void testCreateLinkUsingPartQualPath2() throws IOException {
+    Path link         = new Path(testBaseDir1(), "linkToFile");
+    Path fileWoScheme = new Path("//"+testURI().getAuthority()+ 
+                                 testBaseDir1()+"/file");
+    if ("file".equals(getScheme())) {
+      return;
+    }
+    fc.createSymlink(fileWoScheme, link, false);
+    assertEquals(fileWoScheme, fc.getLinkTarget(link));
+    assertEquals(fileWoScheme.toString(),
+      fc.getFileLinkStatus(link).getSymlink().toString());
+    try {
+      readFile(link);
+      fail("Accessed a file with w/o scheme");
+    } catch (IOException e) {
+      // Expected      
+      assertEquals("No AbstractFileSystem for scheme: null", e.getMessage());
+    }
+  }
+
+  @Test
+  /** Lstat and readlink on a normal file and directory */
+  public void testLinkStatusAndTargetWithNonLink() throws IOException {
+    Path schemeAuth = new Path(testURI().toString());
+    Path dir        = new Path(testBaseDir1());
+    Path dirQual    = new Path(schemeAuth, dir.toString());
+    Path file       = new Path(testBaseDir1(), "file");
+    Path fileQual   = new Path(schemeAuth, file.toString());
+    createAndWriteFile(file);
+    assertEquals(fc.getFileStatus(file), fc.getFileLinkStatus(file));
+    assertEquals(fc.getFileStatus(dir), fc.getFileLinkStatus(dir));
+    try {
+      fc.getLinkTarget(file);
+      fail("Get link target on non-link should throw an IOException");
+    } catch (IOException x) {
+      assertEquals("Path "+fileQual+" is not a symbolic link", x.getMessage());
+    }
+    try {
+      fc.getLinkTarget(dir);
+      fail("Get link target on non-link should throw an IOException");
+    } catch (IOException x) {
+      assertEquals("Path "+dirQual+" is not a symbolic link", x.getMessage());
+    }    
+  }
+
+  @Test
+  /** Test create symlink to a directory */
+  public void testCreateLinkToDirectory() throws IOException {
+    Path dir1      = new Path(testBaseDir1());
+    Path file      = new Path(testBaseDir1(), "file");
+    Path linkToDir = new Path(testBaseDir2(), "linkToDir");
+    createAndWriteFile(file);
+    fc.createSymlink(dir1, linkToDir, false);
+    assertFalse(fc.isFile(linkToDir));
+    assertTrue(fc.isDirectory(linkToDir)); 
+    assertTrue(fc.getFileStatus(linkToDir).isDir());
+    assertTrue(fc.getFileLinkStatus(linkToDir).isSymlink());
+  }
+  
+  @Test
+  /** Test create and remove a file through a symlink */
+  public void testCreateFileViaSymlink() throws IOException {
+    Path dir         = new Path(testBaseDir1());
+    Path linkToDir   = new Path(testBaseDir2(), "linkToDir");
+    Path fileViaLink = new Path(linkToDir, "file");
+    fc.createSymlink(dir, linkToDir, false);
+    createAndWriteFile(fileViaLink);
+    assertTrue(fc.isFile(fileViaLink));
+    assertFalse(fc.isDirectory(fileViaLink));
+    assertFalse(fc.getFileLinkStatus(fileViaLink).isSymlink());
+    assertFalse(fc.getFileStatus(fileViaLink).isDir());
+    readFile(fileViaLink);
+    fc.delete(fileViaLink, true);
+    assertFalse(fc.exists(fileViaLink));
+  }
+  
+  @Test
+  /** Test make and delete directory through a symlink */
+  public void testCreateDirViaSymlink() throws IOException {
+    Path dir1          = new Path(testBaseDir1());
+    Path subDir        = new Path(testBaseDir1(), "subDir");
+    Path linkToDir     = new Path(testBaseDir2(), "linkToDir");
+    Path subDirViaLink = new Path(linkToDir, "subDir");
+    fc.createSymlink(dir1, linkToDir, false);
+    fc.mkdir(subDirViaLink, FileContext.DEFAULT_PERM, true);
+    assertTrue(fc.getFileStatus(subDirViaLink).isDir());
+    fc.delete(subDirViaLink, false);
+    assertFalse(fc.exists(subDirViaLink));
+    assertFalse(fc.exists(subDir));
+  }
+
+  @Test
+  /** Create symlink through a symlink */
+  public void testCreateLinkViaLink() throws IOException {
+    Path dir1        = new Path(testBaseDir1());
+    Path file        = new Path(testBaseDir1(), "file");
+    Path linkToDir   = new Path(testBaseDir2(), "linkToDir");
+    Path fileViaLink = new Path(linkToDir, "file");
+    Path linkToFile  = new Path(linkToDir, "linkToFile");
+    /*
+     * /b2/linkToDir            -> /b1
+     * /b2/linkToDir/linkToFile -> /b2/linkToDir/file
+     */
+    createAndWriteFile(file);
+    fc.createSymlink(dir1, linkToDir, false);
+    fc.createSymlink(fileViaLink, linkToFile, false);
+    assertTrue(fc.isFile(linkToFile));
+    assertTrue(fc.getFileLinkStatus(linkToFile).isSymlink());
+    readFile(linkToFile);
+    assertEquals(fileSize, fc.getFileStatus(linkToFile).getLen());
+    assertEquals(fileViaLink, fc.getLinkTarget(linkToFile));
+  }
+
+  @Test
+  /** Test create symlink to a directory */
+  public void testListStatusUsingLink() throws IOException {
+    Path file  = new Path(testBaseDir1(), "file");
+    Path link  = new Path(testBaseDir1(), "link");
+    createAndWriteFile(file);
+    fc.createSymlink(new Path(testBaseDir1()), link, false);
+    // The size of the result is file system dependent, Hdfs is 2 (file 
+    // and link) and LocalFs is 3 (file, link, file crc).
+    assertTrue(fc.listStatus(link).length == 2 ||
+               fc.listStatus(link).length == 3);
+  }
+  
+  @Test
+  /** Test create symlink using the same path */
+  public void testCreateLinkTwice() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    try {
+      fc.createSymlink(file, link, false);
+      fail("link already exists");
+    } catch (IOException x) {
+      // Expected
+    }
+  } 
+  
+  @Test
+  /** Test access via a symlink to a symlink */
+  public void testCreateLinkToLink() throws IOException {
+    Path dir1        = new Path(testBaseDir1());
+    Path file        = new Path(testBaseDir1(), "file");
+    Path linkToDir   = new Path(testBaseDir2(), "linkToDir");
+    Path linkToLink  = new Path(testBaseDir2(), "linkToLink");
+    Path fileViaLink = new Path(testBaseDir2(), "linkToLink/file");
+    createAndWriteFile(file);
+    fc.createSymlink(dir1, linkToDir, false);
+    fc.createSymlink(linkToDir, linkToLink, false);
+    assertTrue(fc.isFile(fileViaLink));
+    assertFalse(fc.isDirectory(fileViaLink));
+    assertFalse(fc.getFileLinkStatus(fileViaLink).isSymlink());
+    assertFalse(fc.getFileStatus(fileViaLink).isDir());
+    readFile(fileViaLink);
+  }
+
+  @Test
+  /** Can not create a file with path that refers to a symlink */
+  public void testCreateFileDirExistingLink() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    try {
+      createAndWriteFile(link);
+      fail("link already exists");
+    } catch (IOException x) {
+      // Expected
+    }
+    try {
+      fc.mkdir(link, FsPermission.getDefault(), false);
+      fail("link already exists");
+    } catch (IOException x) {
+      // Expected
+    }    
+  } 
+
+  @Test
+  /** Test deleting and recreating a symlink */
+  public void testUseLinkAferDeleteLink() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    fc.delete(link, false);
+    try {
+      readFile(link);        
+      fail("link was deleted");
+    } catch (IOException x) {
+      // Expected
+    }
+    readFile(file);
+    fc.createSymlink(file, link, false);
+    readFile(link);    
+  } 
+  
+  
+  @Test
+  /** Test create symlink to . */
+  public void testCreateLinkToDot() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path file = new Path(testBaseDir1(), "file");    
+    Path link = new Path(testBaseDir1(), "linkToDot");
+    createAndWriteFile(file);    
+    fc.setWorkingDirectory(dir);
+    try {
+      fc.createSymlink(new Path("."), link, false);
+      fail("Created symlink to dot");
+      readFile(new Path(testBaseDir1(), "linkToDot/file"));
+    } catch (IOException x) {
+      // Expected. Path(".") resolves to "" because URI normalizes
+      // the dot away and AbstractFileSystem considers "" invalid.  
+    }
+  }
+
+  @Test
+  /** Test create symlink to .. */
+  public void testCreateLinkToDotDot() throws IOException {
+    Path file        = new Path(testBaseDir1(), "test/file");
+    Path dotDot      = new Path(testBaseDir1(), "test/..");
+    Path linkToDir   = new Path(testBaseDir2(), "linkToDir");
+    Path fileViaLink = new Path(linkToDir,      "test/file");
+    // Symlink to .. is not a problem since the .. is squashed early
+    assertEquals(testBaseDir1(), dotDot.toString());
+    createAndWriteFile(file);
+    fc.createSymlink(dotDot, linkToDir, false);
+    readFile(fileViaLink);
+    assertEquals(fileSize, fc.getFileStatus(fileViaLink).getLen());    
+  }
+
+  @Test
+  /** Test create symlink to ../foo */
+  public void testCreateLinkToDotDotPrefix() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path dir  = new Path(testBaseDir1(), "test");
+    Path link = new Path(testBaseDir1(), "test/link");
+    createAndWriteFile(file);
+    fc.mkdir(dir, FsPermission.getDefault(), false);
+    fc.setWorkingDirectory(dir);
+    fc.createSymlink(new Path("../file"), link, false);
+    readFile(link);
+    assertEquals(new Path("../file"), fc.getLinkTarget(link));
+  }
+  
+  @Test
+  /** Append data to a file specified using a symlink */
+  public void testAppendFileViaSymlink() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    assertEquals(fileSize, fc.getFileStatus(link).getLen());
+    appendToFile(link);
+    assertEquals(fileSize*2, fc.getFileStatus(link).getLen());
+  }
+  
+  @Test
+  /** Test rename file through a symlink */
+  public void testRenameFileViaSymlink() throws IOException {
+    Path dir1           = new Path(testBaseDir1());
+    Path file           = new Path(testBaseDir1(), "file");
+    Path linkToDir      = new Path(testBaseDir2(), "linkToDir");
+    Path fileViaLink    = new Path(linkToDir, "file");
+    Path fileNewViaLink = new Path(linkToDir, "fileNew");
+    createAndWriteFile(file);
+    fc.createSymlink(dir1, linkToDir, false);
+    fc.rename(fileViaLink, fileNewViaLink, Rename.OVERWRITE);
+    assertFalse(fc.exists(fileViaLink));
+    assertFalse(fc.exists(file));
+    assertTrue(fc.exists(fileNewViaLink));
+  }
+  
+  @Test
+  /** Rename a symlink */
+  public void testRenameSymlink() throws IOException {
+    Path file  = new Path(testBaseDir1(), "file");
+    Path link1 = new Path(testBaseDir1(), "linkToFile1");
+    Path link2 = new Path(testBaseDir1(), "linkToFile2");    
+    createAndWriteFile(file);
+    fc.createSymlink(file, link1, false);
+    fc.rename(link1, link2);
+    assertTrue(fc.getFileLinkStatus(link2).isSymlink());
+    assertFalse(fc.getFileStatus(link2).isDir());
+    readFile(link2);
+    readFile(file);
+    try {
+      createAndWriteFile(link2);
+      fail("link was not renamed");
+    } catch (IOException x) {
+      // Expected
+    }
+  } 
+    
+  @Test
+  /** Test renaming symlink target */
+  public void testMoveLinkTarget() throws IOException {
+    Path file    = new Path(testBaseDir1(), "file");
+    Path fileNew = new Path(testBaseDir1(), "fileNew");
+    Path link    = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    fc.rename(file, fileNew, Rename.OVERWRITE);
+    try {
+      readFile(link);        
+      fail("link target was renamed");
+    } catch (IOException x) {
+      // Expected
+    }
+    fc.rename(fileNew, file, Rename.OVERWRITE);
+    readFile(link);
+  }
+  
+  @Test
+  /** setTimes affects the target not the link */    
+  public void testSetTimes() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    long at = fc.getFileLinkStatus(link).getAccessTime(); 
+    fc.setTimes(link, 2L, 3L);
+    // NB: local file systems don't implement setTimes
+    if (!"file".equals(getScheme())) {
+      assertEquals(at, fc.getFileLinkStatus(link).getAccessTime());
+      assertEquals(3, fc.getFileStatus(file).getAccessTime());
+      assertEquals(2, fc.getFileStatus(file).getModificationTime());
+    }
+  }
+}

+ 179 - 0
src/test/core/org/apache/hadoop/fs/TestLocalFSFileContextSymlink.java

@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.junit.Before;
+
+/**
+ * Test symbolic links using FileContext and LocalFs.
+ */
+public class TestLocalFSFileContextSymlink extends FileContextSymlinkBaseTest {
+  
+  protected String getScheme() {
+    return "file";
+  }
+
+  protected String testBaseDir1() {
+    return "/tmp/test1";
+  }
+  
+  protected String testBaseDir2() {
+    return "/tmp/test2";
+  }
+
+  protected URI testURI() {
+    try {
+      return new URI("file:///");
+    } catch (URISyntaxException e) {
+      return null;
+    }
+  }
+  
+  @Before
+  public void setUp() throws Exception {
+    fc = FileContext.getLocalFSFileContext();
+    super.setUp();
+  }
+
+  @Test
+  /** Test access a symlink using FileSystem */
+  public void testAccessLinkFromFileSystem() throws IOException {
+    Path fileAbs = new Path(testBaseDir1()+"/file");
+    Path link    = new Path(testBaseDir1()+"/linkToFile");
+    createAndWriteFile(fileAbs);
+    fc.createSymlink(fileAbs, link, false);
+    readFile(link);
+  } 
+  
+  @Test
+  /** lstat a non-existant file using a partially qualified path */
+  public void testDanglingLinkFilePartQual() throws IOException {
+    Path filePartQual = new Path(getScheme()+":///doesNotExist");
+    try {
+      fc.getFileLinkStatus(filePartQual);
+      fail("Got FileStatus for non-existant file");
+    } catch (FileNotFoundException f) {
+      // Expected
+    }
+    try {
+      fc.getLinkTarget(filePartQual);
+      fail("Got link target for non-existant file");      
+    } catch (FileNotFoundException f) {
+      // Expected
+    }
+  }
+  
+  @Test
+  /** Stat and lstat a dangling link */
+  public void testDanglingLink() throws IOException {
+    Path fileAbs  = new Path(testBaseDir1()+"/file");    
+    Path fileQual = new Path(testURI().toString(), fileAbs);    
+    Path link     = new Path(testBaseDir1()+"/linkToFile");
+    fc.createSymlink(fileAbs, link, false);
+    // Deleting the link using FileContext currently fails because
+    // resolve looks up LocalFs rather than RawLocalFs for the path 
+    // so we call ChecksumFs delete (which doesn't delete dangling 
+    // links) instead of delegating to delete in RawLocalFileSystem 
+    // which deletes via fullyDelete. testDeleteLink above works 
+    // because the link is not dangling.
+    //assertTrue(fc.delete(link, false));
+    FileUtil.fullyDelete(new File(link.toUri().getPath()));
+    fc.createSymlink(fileAbs, link, false);
+    try {
+      fc.getFileStatus(link);
+      fail("Got FileStatus for dangling link");
+    } catch (FileNotFoundException f) {
+      // Expected. File's exists method returns false for dangling links
+    }
+    // We can stat a dangling link
+    FileStatus fsd = fc.getFileLinkStatus(link);
+    assertEquals(fileQual, fsd.getSymlink());
+    assertTrue(fsd.isSymlink());
+    assertFalse(fsd.isDir());
+    assertEquals("", fsd.getOwner());
+    assertEquals("", fsd.getGroup());
+    assertEquals(link, fsd.getPath());
+    assertEquals(0, fsd.getLen());
+    assertEquals(0, fsd.getBlockSize());
+    assertEquals(0, fsd.getReplication());
+    assertEquals(0, fsd.getAccessTime());
+    assertEquals(FsPermission.getDefault(), fsd.getPermission());
+    // Accessing the link 
+    try {
+      readFile(link);
+      fail("Got FileStatus for dangling link");
+    } catch (FileNotFoundException f) {
+      // Ditto.
+    }
+    // Creating the file makes the link work
+    createAndWriteFile(fileAbs);
+    fc.getFileStatus(link);
+  }
+
+  @Test
+  /** 
+   * Test getLinkTarget with a partially qualified target. 
+   * NB: Hadoop does not support fully qualified URIs for the 
+   * file scheme (eg file://host/tmp/test).
+   */  
+  public void testGetLinkStatusPartQualTarget() throws IOException {
+    Path fileAbs  = new Path(testBaseDir1()+"/file");
+    Path fileQual = new Path(testURI().toString(), fileAbs);
+    Path dir      = new Path(testBaseDir1());    
+    Path link     = new Path(testBaseDir1()+"/linkToFile");
+    Path dirNew   = new Path(testBaseDir2());
+    Path linkNew  = new Path(testBaseDir2()+"/linkToFile");
+    fc.delete(dirNew, true);
+    createAndWriteFile(fileQual);
+    fc.setWorkingDirectory(dir);
+    // Link target is partially qualified, we get the same back.
+    fc.createSymlink(fileQual, link, false);
+    assertEquals(fileQual, fc.getFileLinkStatus(link).getSymlink());
+    // Because the target was specified with an absolute path the
+    // link fails to resolve after moving the parent directory. 
+    fc.rename(dir, dirNew);
+    // The target is still the old path
+    assertEquals(fileQual, fc.getFileLinkStatus(linkNew).getSymlink());    
+    try {
+      readFile(linkNew);
+      fail("The link should be dangling now.");
+    } catch (FileNotFoundException x) {
+      // Expected.
+    }
+    // RawLocalFs only maintains the path part, not the URI, and
+    // therefore does not support links to other file systems.
+    Path anotherFs = new Path("hdfs://host:1000/dir/file");
+    FileUtil.fullyDelete(new File("/tmp/test2/linkToFile"));
+    try {
+      fc.createSymlink(anotherFs, linkNew, false);
+      fail("Created a local fs link to a non-local fs");
+    } catch (IOException x) {
+      // Excpected.
+    }
+  }
+}

+ 17 - 0
src/test/core/org/apache/hadoop/fs/TestPath.java

@@ -61,6 +61,10 @@ public class TestPath extends TestCase {
 
 
   public void testNormalize() {
   public void testNormalize() {
     assertEquals("/", new Path("//").toString());
     assertEquals("/", new Path("//").toString());
+    assertEquals("/", new Path("///").toString());
+    assertEquals("//foo/", new Path("//foo/").toString());
+    assertEquals("//foo/", new Path("//foo//").toString());
+    assertEquals("//foo/bar", new Path("//foo//bar").toString());
     assertEquals("/foo", new Path("/foo/").toString());
     assertEquals("/foo", new Path("/foo/").toString());
     assertEquals("/foo", new Path("/foo/").toString());
     assertEquals("/foo", new Path("/foo/").toString());
     assertEquals("foo", new Path("foo/").toString());
     assertEquals("foo", new Path("foo/").toString());
@@ -176,6 +180,19 @@ public class TestPath extends TestCase {
     // if the child uri is absolute path
     // if the child uri is absolute path
     assertEquals("foo://bar/fud#boo", new Path(new Path(new URI(
     assertEquals("foo://bar/fud#boo", new Path(new Path(new URI(
         "foo://bar/baz#bud")), new Path(new URI("/fud#boo"))).toString());
         "foo://bar/baz#bud")), new Path(new URI("/fud#boo"))).toString());
+  }
+  
+  public void testMakeQualified() throws URISyntaxException {
+    URI defaultUri = new URI("hdfs://host1/dir1");
+    URI wd         = new URI("hdfs://host2/dir2");
+
+    // The scheme from defaultUri is used but the path part is not
+    assertEquals(new Path("hdfs://host1/dir/file"),
+        new Path("file").makeQualified(defaultUri, new Path("/dir")));
+
+    // The defaultUri is only used if the path + wd has no scheme    
+    assertEquals(new Path("hdfs://host2/dir2/file"),
+                 new Path("file").makeQualified(defaultUri, new Path(wd)));
  }
  }
 
 
 }
 }