|
@@ -17,20 +17,6 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.fs;
|
|
|
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.UnsupportedEncodingException;
|
|
|
-import java.net.URI;
|
|
|
-import java.net.URISyntaxException;
|
|
|
-import java.net.URLDecoder;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.List;
|
|
|
-import java.util.LinkedHashMap;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.TreeMap;
|
|
|
-import java.util.HashMap;
|
|
|
-
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -40,6 +26,14 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.util.LineReader;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
+import java.net.URLDecoder;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
/**
|
|
|
* This is an implementation of the Hadoop Archive
|
|
|
* Filesystem. This archive Filesystem has index files
|
|
@@ -53,7 +47,7 @@ import org.apache.hadoop.util.Progressable;
|
|
|
* index for ranges of hashcodes.
|
|
|
*/
|
|
|
|
|
|
-public class HarFileSystem extends FilterFileSystem {
|
|
|
+public class HarFileSystem extends FileSystem {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(HarFileSystem.class);
|
|
|
|
|
@@ -75,11 +69,13 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
// pointer into the static metadata cache
|
|
|
private HarMetaData metadata;
|
|
|
|
|
|
+ private FileSystem fs;
|
|
|
+
|
|
|
/**
|
|
|
* public construction of harfilesystem
|
|
|
- *
|
|
|
*/
|
|
|
public HarFileSystem() {
|
|
|
+ // Must call #initialize() method to set the underlying file system
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -96,10 +92,11 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
/**
|
|
|
* Constructor to create a HarFileSystem with an
|
|
|
* underlying filesystem.
|
|
|
- * @param fs
|
|
|
+ * @param fs underlying file system
|
|
|
*/
|
|
|
public HarFileSystem(FileSystem fs) {
|
|
|
- super(fs);
|
|
|
+ this.fs = fs;
|
|
|
+ this.statistics = fs.statistics;
|
|
|
}
|
|
|
|
|
|
private synchronized void initializeMetadataCache(Configuration conf) {
|
|
@@ -171,6 +168,11 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Configuration getConf() {
|
|
|
+ return fs.getConf();
|
|
|
+ }
|
|
|
+
|
|
|
// get the version of the filesystem from the masterindex file
|
|
|
// the version is currently not useful since its the first version
|
|
|
// of archives
|
|
@@ -236,8 +238,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
throw new IOException("query component in Path not supported " + rawURI);
|
|
|
}
|
|
|
|
|
|
- URI tmp = null;
|
|
|
-
|
|
|
+ URI tmp;
|
|
|
try {
|
|
|
// convert <scheme>-<host> to <scheme>://<host>
|
|
|
URI baseUri = new URI(authority.replaceFirst("-", "://"));
|
|
@@ -256,7 +257,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
return URLDecoder.decode(str, "UTF-8");
|
|
|
}
|
|
|
|
|
|
- private String decodeFileName(String fname)
|
|
|
+ private String decodeFileName(String fname)
|
|
|
throws UnsupportedEncodingException {
|
|
|
int version = metadata.getVersion();
|
|
|
if (version == 2 || version == 3){
|
|
@@ -276,7 +277,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
/**
|
|
|
* Create a har specific auth
|
|
|
* har-underlyingfs:port
|
|
|
- * @param underLyingURI the uri of underlying
|
|
|
+ * @param underLyingUri the uri of underlying
|
|
|
* filesystem
|
|
|
* @return har specific auth
|
|
|
*/
|
|
@@ -294,7 +295,12 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
return auth;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected URI getCanonicalUri() {
|
|
|
+ return fs.canonicalizeUri(getUri());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Returns the uri of this filesystem.
|
|
|
* The uri is of the form
|
|
@@ -419,7 +425,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
/**
|
|
|
* Get block locations from the underlying fs and fix their
|
|
|
* offsets and lengths.
|
|
|
- * @param file the input filestatus to get block locations
|
|
|
+ * @param file the input file status to get block locations
|
|
|
* @param start the start of the desired range in the contained file
|
|
|
* @param len the length of the desired range
|
|
|
* @return block locations for this segment of file
|
|
@@ -441,8 +447,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * the hash of the path p inside iniside
|
|
|
- * the filesystem
|
|
|
+ * the hash of the path p inside the filesystem
|
|
|
* @param p the path in the harfilesystem
|
|
|
* @return the hash code of the path.
|
|
|
*/
|
|
@@ -475,13 +480,9 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
* the parent path directory
|
|
|
* @param statuses
|
|
|
* the list to add the children filestatuses to
|
|
|
- * @param children
|
|
|
- * the string list of children for this parent
|
|
|
- * @param archiveIndexStat
|
|
|
- * the archive index filestatus
|
|
|
*/
|
|
|
- private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
|
|
|
- List<String> children) throws IOException {
|
|
|
+ private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses)
|
|
|
+ throws IOException {
|
|
|
String parentString = parent.getName();
|
|
|
if (!parentString.endsWith(Path.SEPARATOR)){
|
|
|
parentString += Path.SEPARATOR;
|
|
@@ -547,7 +548,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
// stored in a single line in the index files
|
|
|
// the format is of the form
|
|
|
// filename "dir"/"file" partFileName startIndex length
|
|
|
- // <space seperated children>
|
|
|
+ // <space separated children>
|
|
|
private class HarStatus {
|
|
|
boolean isDir;
|
|
|
String name;
|
|
@@ -666,7 +667,6 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
|
|
// get the fs DataInputStream for the underlying file
|
|
|
HarStatus hstatus = getFileHarStatus(f);
|
|
|
- // we got it.. woo hooo!!!
|
|
|
if (hstatus.isDir()) {
|
|
|
throw new FileNotFoundException(f + " : not a file in " +
|
|
|
archivePath);
|
|
@@ -686,7 +686,12 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
Progressable progress) throws IOException {
|
|
|
throw new IOException("Har: create not allowed.");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
|
|
|
+ throw new IOException("Har: append not allowed.");
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
if (fs != null) {
|
|
@@ -704,9 +709,19 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
*/
|
|
|
@Override
|
|
|
public boolean setReplication(Path src, short replication) throws IOException{
|
|
|
- throw new IOException("Har: setreplication not allowed");
|
|
|
+ throw new IOException("Har: setReplication not allowed");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean rename(Path src, Path dst) throws IOException {
|
|
|
+ throw new IOException("Har: rename not allowed");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FSDataOutputStream append(Path f) throws IOException {
|
|
|
+ throw new IOException("Har: append not allowed");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Not implemented.
|
|
|
*/
|
|
@@ -714,7 +729,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
|
|
throw new IOException("Har: delete not allowed");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* liststatus returns the children of a directory
|
|
|
* after looking up the index files.
|
|
@@ -733,7 +748,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
throw new FileNotFoundException("File " + f + " not found in " + archivePath);
|
|
|
}
|
|
|
if (hstatus.isDir()) {
|
|
|
- fileStatusesInIndex(hstatus, statuses, hstatus.children);
|
|
|
+ fileStatusesInIndex(hstatus, statuses);
|
|
|
} else {
|
|
|
statuses.add(toFileStatus(hstatus, null));
|
|
|
}
|
|
@@ -748,7 +763,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
public Path getHomeDirectory() {
|
|
|
return new Path(uri.toString());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public void setWorkingDirectory(Path newDir) {
|
|
|
//does nothing.
|
|
@@ -811,7 +826,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
* Not implemented.
|
|
|
*/
|
|
|
@Override
|
|
|
- public void setPermission(Path p, FsPermission permisssion)
|
|
|
+ public void setPermission(Path p, FsPermission permission)
|
|
|
throws IOException {
|
|
|
throw new IOException("Har: setPermission not allowed");
|
|
|
}
|
|
@@ -900,7 +915,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
newlen = (int) (end - position);
|
|
|
}
|
|
|
// end case
|
|
|
- if (newlen == 0)
|
|
|
+ if (newlen == 0)
|
|
|
return ret;
|
|
|
ret = underLyingStream.read(b, offset, newlen);
|
|
|
position += ret;
|
|
@@ -937,8 +952,8 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
|
|
|
@Override
|
|
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
- //do not need to implement this
|
|
|
- // hdfs in itself does seektonewsource
|
|
|
+ // do not need to implement this
|
|
|
+ // hdfs in itself does seektonewsource
|
|
|
// while reading.
|
|
|
return false;
|
|
|
}
|
|
@@ -974,14 +989,12 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void setReadahead(Long readahead)
|
|
|
- throws IOException, UnsupportedEncodingException {
|
|
|
+ public void setReadahead(Long readahead) throws IOException {
|
|
|
underLyingStream.setReadahead(readahead);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void setDropBehind(Boolean dropBehind)
|
|
|
- throws IOException, UnsupportedEncodingException {
|
|
|
+ public void setDropBehind(Boolean dropBehind) throws IOException {
|
|
|
underLyingStream.setDropBehind(dropBehind);
|
|
|
}
|
|
|
}
|
|
@@ -999,19 +1012,6 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
long length, int bufsize) throws IOException {
|
|
|
super(new HarFsInputStream(fs, p, start, length, bufsize));
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * constructor for har input stream.
|
|
|
- * @param fs the underlying filesystem
|
|
|
- * @param p the path in the underlying file system
|
|
|
- * @param start the start position in the part file
|
|
|
- * @param length the length of valid data in the part file.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public HarFSDataInputStream(FileSystem fs, Path p, long start, long length)
|
|
|
- throws IOException {
|
|
|
- super(new HarFsInputStream(fs, p, start, length, 0));
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
private class HarMetaData {
|
|
@@ -1058,7 +1058,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
|
|
|
private void parseMetaData() throws IOException {
|
|
|
- Text line;
|
|
|
+ Text line = new Text();
|
|
|
long read;
|
|
|
FSDataInputStream in = null;
|
|
|
LineReader lin = null;
|
|
@@ -1068,7 +1068,6 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
FileStatus masterStat = fs.getFileStatus(masterIndexPath);
|
|
|
masterIndexTimestamp = masterStat.getModificationTime();
|
|
|
lin = new LineReader(in, getConf());
|
|
|
- line = new Text();
|
|
|
read = lin.readLine(line);
|
|
|
|
|
|
// the first line contains the version of the index file
|
|
@@ -1082,7 +1081,7 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
}
|
|
|
|
|
|
// each line contains a hashcode range and the index file name
|
|
|
- String[] readStr = null;
|
|
|
+ String[] readStr;
|
|
|
while(read < masterStat.getLen()) {
|
|
|
int b = lin.readLine(line);
|
|
|
read += b;
|
|
@@ -1094,6 +1093,9 @@ public class HarFileSystem extends FilterFileSystem {
|
|
|
endHash));
|
|
|
line.clear();
|
|
|
}
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Encountered exception ", ioe);
|
|
|
+ throw ioe;
|
|
|
} finally {
|
|
|
IOUtils.cleanup(LOG, lin, in);
|
|
|
}
|