|
@@ -35,6 +35,7 @@ import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URLDecoder;
|
|
import java.net.URLDecoder;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
|
|
|
|
@@ -513,41 +514,22 @@ public class HarFileSystem extends FileSystem {
|
|
if (!parentString.endsWith(Path.SEPARATOR)){
|
|
if (!parentString.endsWith(Path.SEPARATOR)){
|
|
parentString += Path.SEPARATOR;
|
|
parentString += Path.SEPARATOR;
|
|
}
|
|
}
|
|
- Path harPath = new Path(parentString);
|
|
|
|
- int harlen = harPath.depth();
|
|
|
|
- final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
|
|
|
|
-
|
|
|
|
- for (HarStatus hstatus : metadata.archive.values()) {
|
|
|
|
- String child = hstatus.getName();
|
|
|
|
- if ((child.startsWith(parentString))) {
|
|
|
|
- Path thisPath = new Path(child);
|
|
|
|
- if (thisPath.depth() == harlen + 1) {
|
|
|
|
- statuses.add(toFileStatus(hstatus, cache));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ for (String child: parent.children) {
|
|
|
|
+ Path p = new Path(parentString + child);
|
|
|
|
+ statuses.add(toFileStatus(metadata.archive.get(p)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Combine the status stored in the index and the underlying status.
|
|
* Combine the status stored in the index and the underlying status.
|
|
* @param h status stored in the index
|
|
* @param h status stored in the index
|
|
- * @param cache caching the underlying file statuses
|
|
|
|
* @return the combined file status
|
|
* @return the combined file status
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private FileStatus toFileStatus(HarStatus h,
|
|
|
|
- Map<String, FileStatus> cache) throws IOException {
|
|
|
|
- FileStatus underlying = null;
|
|
|
|
- if (cache != null) {
|
|
|
|
- underlying = cache.get(h.partName);
|
|
|
|
- }
|
|
|
|
- if (underlying == null) {
|
|
|
|
- final Path p = h.isDir? archivePath: new Path(archivePath, h.partName);
|
|
|
|
- underlying = fs.getFileStatus(p);
|
|
|
|
- if (cache != null) {
|
|
|
|
- cache.put(h.partName, underlying);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ private FileStatus toFileStatus(HarStatus h) throws IOException {
|
|
|
|
+ final Path p = h.isDir ? archivePath : new Path(archivePath, h.partName);
|
|
|
|
+ FileStatus underlying = metadata.getPartFileStatus(p);
|
|
|
|
|
|
long modTime = 0;
|
|
long modTime = 0;
|
|
int version = metadata.getVersion();
|
|
int version = metadata.getVersion();
|
|
@@ -658,7 +640,7 @@ public class HarFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
HarStatus hstatus = getFileHarStatus(f);
|
|
HarStatus hstatus = getFileHarStatus(f);
|
|
- return toFileStatus(hstatus, null);
|
|
|
|
|
|
+ return toFileStatus(hstatus);
|
|
}
|
|
}
|
|
|
|
|
|
private HarStatus getFileHarStatus(Path f) throws IOException {
|
|
private HarStatus getFileHarStatus(Path f) throws IOException {
|
|
@@ -815,7 +797,7 @@ public class HarFileSystem extends FileSystem {
|
|
if (hstatus.isDir()) {
|
|
if (hstatus.isDir()) {
|
|
fileStatusesInIndex(hstatus, statuses);
|
|
fileStatusesInIndex(hstatus, statuses);
|
|
} else {
|
|
} else {
|
|
- statuses.add(toFileStatus(hstatus, null));
|
|
|
|
|
|
+ statuses.add(toFileStatus(hstatus));
|
|
}
|
|
}
|
|
|
|
|
|
return statuses.toArray(new FileStatus[statuses.size()]);
|
|
return statuses.toArray(new FileStatus[statuses.size()]);
|
|
@@ -1143,7 +1125,8 @@ public class HarFileSystem extends FileSystem {
|
|
|
|
|
|
List<Store> stores = new ArrayList<Store>();
|
|
List<Store> stores = new ArrayList<Store>();
|
|
Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
|
|
Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
|
|
- private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>();
|
|
|
|
|
|
+ // keys are always the internal har path.
|
|
|
|
+ private Map<Path, FileStatus> partFileStatuses = new ConcurrentHashMap<>();
|
|
|
|
|
|
public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
|
|
public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
|
|
this.fs = fs;
|
|
this.fs = fs;
|
|
@@ -1151,16 +1134,23 @@ public class HarFileSystem extends FileSystem {
|
|
this.archiveIndexPath = archiveIndexPath;
|
|
this.archiveIndexPath = archiveIndexPath;
|
|
}
|
|
}
|
|
|
|
|
|
- public FileStatus getPartFileStatus(Path partPath) throws IOException {
|
|
|
|
|
|
+ public FileStatus getPartFileStatus(Path path) throws IOException {
|
|
|
|
+ Path partPath = getPathInHar(path);
|
|
FileStatus status;
|
|
FileStatus status;
|
|
status = partFileStatuses.get(partPath);
|
|
status = partFileStatuses.get(partPath);
|
|
if (status == null) {
|
|
if (status == null) {
|
|
- status = fs.getFileStatus(partPath);
|
|
|
|
|
|
+ status = fs.getFileStatus(path);
|
|
partFileStatuses.put(partPath, status);
|
|
partFileStatuses.put(partPath, status);
|
|
}
|
|
}
|
|
return status;
|
|
return status;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void addPartFileStatuses(Path path) throws IOException {
|
|
|
|
+ for (FileStatus stat : fs.listStatus(path)) {
|
|
|
|
+ partFileStatuses.put(getPathInHar(stat.getPath()), stat);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public long getMasterIndexTimestamp() {
|
|
public long getMasterIndexTimestamp() {
|
|
return masterIndexTimestamp;
|
|
return masterIndexTimestamp;
|
|
}
|
|
}
|
|
@@ -1217,16 +1207,22 @@ public class HarFileSystem extends FileSystem {
|
|
try {
|
|
try {
|
|
FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
|
|
FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
|
|
archiveIndexTimestamp = archiveStat.getModificationTime();
|
|
archiveIndexTimestamp = archiveStat.getModificationTime();
|
|
- LineReader aLin;
|
|
|
|
|
|
+
|
|
|
|
+ // pre-populate part cache.
|
|
|
|
+ addPartFileStatuses(archiveIndexPath.getParent());
|
|
|
|
+ LineReader aLin = null;
|
|
|
|
|
|
// now start reading the real index file
|
|
// now start reading the real index file
|
|
|
|
+ long pos = -1;
|
|
for (Store s: stores) {
|
|
for (Store s: stores) {
|
|
- read = 0;
|
|
|
|
- aIn.seek(s.begin);
|
|
|
|
- aLin = new LineReader(aIn, getConf());
|
|
|
|
- while (read + s.begin < s.end) {
|
|
|
|
- int tmp = aLin.readLine(line);
|
|
|
|
- read += tmp;
|
|
|
|
|
|
+ if (pos != s.begin) {
|
|
|
|
+ pos = s.begin;
|
|
|
|
+ aIn.seek(s.begin);
|
|
|
|
+ aLin = new LineReader(aIn, getConf());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ while (pos < s.end) {
|
|
|
|
+ pos += aLin.readLine(line);
|
|
String lineFeed = line.toString();
|
|
String lineFeed = line.toString();
|
|
String[] parsed = lineFeed.split(" ");
|
|
String[] parsed = lineFeed.split(" ");
|
|
parsed[0] = decodeFileName(parsed[0]);
|
|
parsed[0] = decodeFileName(parsed[0]);
|