|
@@ -25,8 +25,6 @@ import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
|
|
|
-import java.security.MessageDigest;
|
|
|
-import java.security.NoSuchAlgorithmException;
|
|
|
import java.net.URI;
|
|
|
|
|
|
/*******************************************************************************
|
|
@@ -37,12 +35,16 @@ import java.net.URI;
|
|
|
public class DistributedCache {
|
|
|
// cacheID to cacheStatus mapping
|
|
|
private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
|
|
|
- // buffer size for reading checksum files
|
|
|
- private static final int CRC_BUFFER_SIZE = 64 * 1024;
|
|
|
+
|
|
|
+ // default total cache size
|
|
|
+ private static final long DEFAULT_CACHE_SIZE = 1048576L;
|
|
|
+
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(DistributedCache.class);
|
|
|
|
|
|
/**
|
|
|
+ * Get the locally cached file or archive; it could either be
|
|
|
+ * previously cached (and valid) or copy it from the {@link FileSystem} now.
|
|
|
*
|
|
|
* @param cache the cache to be localized, this should be specified as
|
|
|
* new URI(hdfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema
|
|
@@ -54,9 +56,8 @@ public class DistributedCache {
|
|
|
* with a .zip or .jar extension it will be unzipped/unjarred automatically
|
|
|
* and the directory where the archive is unjarred is returned as the Path.
|
|
|
* In case of a file, the path to the file is returned
|
|
|
- * @param md5 this is a mere checksum to verufy if you are using the right cache.
|
|
|
- * You need to pass the md5 of the crc file in DFS. This is matched against the one
|
|
|
- * calculated in this api and if it does not match, the cache is not localized.
|
|
|
+ * @param confFileStamp this is the hdfs file modification timestamp to verify that the
|
|
|
+ * file to be cached hasn't changed since the job started
|
|
|
* @param currentWorkDir this is the directory where you would want to create symlinks
|
|
|
* for the locally cached files/archives
|
|
|
* @return the path to directory where the archives are unjarred in case of archives,
|
|
@@ -64,7 +65,8 @@ public class DistributedCache {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
|
|
|
- boolean isArchive, String md5, Path currentWorkDir) throws IOException {
|
|
|
+ boolean isArchive, long confFileStamp, Path currentWorkDir)
|
|
|
+ throws IOException {
|
|
|
String cacheId = makeRelative(cache, conf);
|
|
|
CacheStatus lcacheStatus;
|
|
|
Path localizedPath;
|
|
@@ -72,16 +74,13 @@ public class DistributedCache {
|
|
|
lcacheStatus = cachedArchives.get(cacheId);
|
|
|
if (lcacheStatus == null) {
|
|
|
// was never localized
|
|
|
- lcacheStatus = new CacheStatus();
|
|
|
- lcacheStatus.currentStatus = false;
|
|
|
- lcacheStatus.refcount = 0;
|
|
|
- lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));
|
|
|
+ lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
|
|
|
cachedArchives.put(cacheId, lcacheStatus);
|
|
|
}
|
|
|
|
|
|
synchronized (lcacheStatus) {
|
|
|
- localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive,
|
|
|
- md5, currentWorkDir);
|
|
|
+ localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
|
|
|
+ isArchive, currentWorkDir);
|
|
|
lcacheStatus.refcount++;
|
|
|
}
|
|
|
}
|
|
@@ -89,7 +88,7 @@ public class DistributedCache {
|
|
|
// try deleting stuff if you can
|
|
|
long size = FileUtil.getDU(new File(baseDir.toString()));
|
|
|
// setting the cache size to a default of 1MB
|
|
|
- long allowedSize = conf.getLong("local.cache.size", 1048576L);
|
|
|
+ long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
|
|
|
if (allowedSize < size) {
|
|
|
// try some cache deletions
|
|
|
deleteCache(conf);
|
|
@@ -163,16 +162,18 @@ public class DistributedCache {
|
|
|
return new Path(p, p.getName());
|
|
|
}
|
|
|
|
|
|
- // the methoed which actually copies the caches locally and unjars/unzips them
|
|
|
- private static Path localizeCache(URI cache, CacheStatus cacheStatus,
|
|
|
- Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException {
|
|
|
- boolean b = true;
|
|
|
+ // the method which actually copies the caches locally and unjars/unzips them
|
|
|
+ private static Path localizeCache(Configuration conf,
|
|
|
+ URI cache, long confFileStamp,
|
|
|
+ CacheStatus cacheStatus,
|
|
|
+ boolean isArchive,
|
|
|
+ Path currentWorkDir)
|
|
|
+ throws IOException {
|
|
|
boolean doSymlink = getSymlink(conf);
|
|
|
- FileSystem dfs = FileSystem.get(cache, conf);
|
|
|
- b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
|
|
|
+ FileSystem fs = getFileSystem(cache, conf);
|
|
|
String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
|
|
|
File flink = new File(link);
|
|
|
- if (b) {
|
|
|
+ if (ifExistsAndFresh(conf, fs, cache, confFileStamp, cacheStatus)) {
|
|
|
if (isArchive) {
|
|
|
if (doSymlink){
|
|
|
if (!flink.exists())
|
|
@@ -197,7 +198,7 @@ public class DistributedCache {
|
|
|
if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
|
|
|
throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
|
|
|
+ " is in use and cannot be refreshed");
|
|
|
- byte[] checkSum = createMD5(cache, conf);
|
|
|
+
|
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
localFs.delete(cacheStatus.localLoadPath);
|
|
|
Path parchive = new Path(cacheStatus.localLoadPath,
|
|
@@ -206,11 +207,9 @@ public class DistributedCache {
|
|
|
throw new IOException("Mkdirs failed to create directory " +
|
|
|
cacheStatus.localLoadPath.toString());
|
|
|
}
|
|
|
+
|
|
|
String cacheId = cache.getPath();
|
|
|
- dfs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
- dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
|
|
|
- .toString()
|
|
|
- + "_md5"));
|
|
|
+ fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
if (isArchive) {
|
|
|
String tmpArchive = parchive.toString().toLowerCase();
|
|
|
if (tmpArchive.endsWith(".jar")) {
|
|
@@ -224,10 +223,10 @@ public class DistributedCache {
|
|
|
// else will not do anyhting
|
|
|
// and copy the file into the dir as it is
|
|
|
}
|
|
|
- // create a symlink if #NAME is specified as fragment in the
|
|
|
- // symlink
|
|
|
+
|
|
|
+ // update cacheStatus to reflect the newly cached file
|
|
|
cacheStatus.currentStatus = true;
|
|
|
- cacheStatus.md5 = checkSum;
|
|
|
+ cacheStatus.mtime = getTimestamp(conf, cache);
|
|
|
}
|
|
|
|
|
|
if (isArchive){
|
|
@@ -249,92 +248,45 @@ public class DistributedCache {
|
|
|
}
|
|
|
|
|
|
// Checks if the cache has already been localized and is fresh
|
|
|
- private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache,
|
|
|
- FileSystem dfs, String confMD5, Configuration conf) throws IOException {
|
|
|
- // compute the md5 of the crc
|
|
|
- byte[] digest = null;
|
|
|
- byte[] fsDigest = createMD5(cache, conf);
|
|
|
- byte[] confDigest = StringUtils.hexStringToByte(confMD5);
|
|
|
+ private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
|
|
|
+ URI cache, long confFileStamp,
|
|
|
+ CacheStatus lcacheStatus)
|
|
|
+ throws IOException {
|
|
|
// check for existence of the cache
|
|
|
if (lcacheStatus.currentStatus == false) {
|
|
|
return false;
|
|
|
} else {
|
|
|
- digest = lcacheStatus.md5;
|
|
|
- if (!MessageDigest.isEqual(confDigest, fsDigest)) {
|
|
|
- throw new IOException("Inconsistencty in data caching, "
|
|
|
- + "Cache archives have been changed");
|
|
|
- } else {
|
|
|
- if (!MessageDigest.isEqual(confDigest, digest)) {
|
|
|
- // needs refreshing
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- // does not need any refreshing
|
|
|
- return true;
|
|
|
- }
|
|
|
+ long dfsFileStamp = getTimestamp(conf, cache);
|
|
|
+
|
|
|
+ // ensure that the file on hdfs hasn't been modified since the job started
|
|
|
+ if (dfsFileStamp != confFileStamp) {
|
|
|
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
|
|
|
+ throw new IOException("File: " + cache +
|
|
|
+ " has changed on HDFS since job started");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dfsFileStamp != lcacheStatus.mtime) {
|
|
|
+ // needs refreshing
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns md5 of the checksum file for a given dfs file.
|
|
|
- * This method also creates file filename_md5 existence of which
|
|
|
- * signifies a new cache has been loaded into dfs. So if you want to
|
|
|
- * refresh the cache, you need to delete this md5 file as well.
|
|
|
- * @param cache The cache to get the md5 checksum for
|
|
|
+ * Returns mtime of a given cache file on hdfs.
|
|
|
* @param conf configuration
|
|
|
- * @return md5 of the crc of the cache parameter
|
|
|
+ * @param cache cache file
|
|
|
+ * @return mtime of a given cache file on hdfs
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static byte[] createMD5(URI cache, Configuration conf)
|
|
|
+ public static long getTimestamp(Configuration conf, URI cache)
|
|
|
throws IOException {
|
|
|
- byte[] b = new byte[CRC_BUFFER_SIZE];
|
|
|
- byte[] digest = null;
|
|
|
-
|
|
|
FileSystem fileSystem = FileSystem.get(cache, conf);
|
|
|
- if (!(fileSystem instanceof ChecksumFileSystem)) {
|
|
|
- throw new IOException("Not a checksummed file system: "
|
|
|
- +fileSystem.getUri());
|
|
|
- }
|
|
|
- String filename = cache.getPath();
|
|
|
- Path filePath = new Path(filename);
|
|
|
- Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
|
|
|
- + filePath.getName() + "_md5");
|
|
|
- MessageDigest md5 = null;
|
|
|
- try {
|
|
|
- md5 = MessageDigest.getInstance("MD5");
|
|
|
- } catch (NoSuchAlgorithmException na) {
|
|
|
- // do nothing
|
|
|
- }
|
|
|
- if (!fileSystem.exists(md5File)) {
|
|
|
- ChecksumFileSystem checksumFs;
|
|
|
- if (!(fileSystem instanceof ChecksumFileSystem)) {
|
|
|
- throw new IOException(
|
|
|
- "Not a checksumed file system: "+fileSystem.getUri());
|
|
|
- } else {
|
|
|
- checksumFs = (ChecksumFileSystem)fileSystem;
|
|
|
- }
|
|
|
- FSDataInputStream fsStream = checksumFs.getRawFileSystem().open(
|
|
|
- checksumFs.getChecksumFile(filePath));
|
|
|
- int read = fsStream.read(b);
|
|
|
- while (read != -1) {
|
|
|
- md5.update(b, 0, read);
|
|
|
- read = fsStream.read(b);
|
|
|
- }
|
|
|
- fsStream.close();
|
|
|
- digest = md5.digest();
|
|
|
-
|
|
|
- short replication = fileSystem.getReplication(filePath);
|
|
|
- FSDataOutputStream out = fileSystem.create(md5File, replication);
|
|
|
- out.write(digest);
|
|
|
- out.close();
|
|
|
- } else {
|
|
|
- FSDataInputStream fsStream = fileSystem.open(md5File);
|
|
|
- digest = new byte[md5.getDigestLength()];
|
|
|
- fsStream.readFully(digest);
|
|
|
- fsStream.close();
|
|
|
- }
|
|
|
+ Path filePath = new Path(cache.getPath());
|
|
|
|
|
|
- return digest;
|
|
|
+ return fileSystem.getFileStatus(filePath).getModificationTime();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -358,6 +310,26 @@ public class DistributedCache {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static String getFileSysName(URI url) {
|
|
|
+ String fsname = url.getScheme();
|
|
|
+ if ("hdfs".equals(fsname)) {
|
|
|
+ String host = url.getHost();
|
|
|
+ int port = url.getPort();
|
|
|
+ return (port == (-1)) ? host : (host + ":" + port);
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static FileSystem getFileSystem(URI cache, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ String fileSysName = getFileSysName(cache);
|
|
|
+ if (fileSysName != null)
|
|
|
+ return FileSystem.getNamed(fileSysName, conf);
|
|
|
+ else
|
|
|
+ return FileSystem.get(conf);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Set the configuration with the given set of archives
|
|
@@ -424,50 +396,50 @@ public class DistributedCache {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the md5 checksums of the archives
|
|
|
- * @param conf The configuration which stored the md5's
|
|
|
- * @return a string array of md5 checksums
|
|
|
+ * Get the timestamps of the archives
|
|
|
+ * @param conf The configuration which stored the timestamps
|
|
|
+ * @return a string array of timestamps
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static String[] getArchiveMd5(Configuration conf) throws IOException {
|
|
|
- return conf.getStrings("mapred.cache.archivemd5");
|
|
|
+ public static String[] getArchiveTimestamps(Configuration conf) {
|
|
|
+ return conf.getStrings("mapred.cache.archives.timestamps");
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * Get the md5 checksums of the files
|
|
|
- * @param conf The configuration which stored the md5's
|
|
|
- * @return a string array of md5 checksums
|
|
|
+ * Get the timestamps of the files
|
|
|
+ * @param conf The configuration which stored the timestamps
|
|
|
+ * @return a string array of timestamps
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static String[] getFileMd5(Configuration conf) throws IOException {
|
|
|
- return conf.getStrings("mapred.cache.filemd5");
|
|
|
+ public static String[] getFileTimestamps(Configuration conf) {
|
|
|
+ return conf.getStrings("mapred.cache.files.timestamps");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This is to check the md5 of the archives to be localized
|
|
|
- * @param conf Configuration which stores the md5's
|
|
|
- * @param md5 comma seperated list of md5 checksums of the .crc's of archives.
|
|
|
- * The order should be the same as the order in which the archives are added
|
|
|
+ * This is to check the timestamp of the archives to be localized
|
|
|
+ * @param conf Configuration which stores the timestamp's
|
|
|
+ * @param timestamps comma separated list of timestamps of archives.
|
|
|
+ * The order should be the same as the order in which the archives are added.
|
|
|
*/
|
|
|
- public static void setArchiveMd5(Configuration conf, String md5) {
|
|
|
- conf.set("mapred.cache.archivemd5", md5);
|
|
|
+ public static void setArchiveTimestamps(Configuration conf, String timestamps) {
|
|
|
+ conf.set("mapred.cache.archives.timestamps", timestamps);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * This is to check the md5 of the files to be localized
|
|
|
- * @param conf Configuration which stores the md5's
|
|
|
- * @param md5 comma seperated list of md5 checksums of the .crc's of files.
|
|
|
- * The order should be the same as the order in which the files are added
|
|
|
+ * This is to check the timestamp of the files to be localized
|
|
|
+ * @param conf Configuration which stores the timestamp's
|
|
|
+ * @param timestamps comma separated list of timestamps of files.
|
|
|
+ * The order should be the same as the order in which the files are added.
|
|
|
*/
|
|
|
- public static void setFileMd5(Configuration conf, String md5) {
|
|
|
- conf.set("mapred.cache.filemd5", md5);
|
|
|
+ public static void setFileTimestamps(Configuration conf, String timestamps) {
|
|
|
+ conf.set("mapred.cache.files.timestamps", timestamps);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Set the conf to contain the location for localized archives
|
|
|
* @param conf The conf to modify to contain the localized caches
|
|
|
- * @param str a comma seperated list of local archives
|
|
|
+ * @param str a comma separated list of local archives
|
|
|
*/
|
|
|
public static void setLocalArchives(Configuration conf, String str) {
|
|
|
conf.set("mapred.cache.localArchives", str);
|
|
@@ -476,7 +448,7 @@ public class DistributedCache {
|
|
|
/**
|
|
|
* Set the conf to contain the location for localized files
|
|
|
* @param conf The conf to modify to contain the localized caches
|
|
|
- * @param str a comma seperated list of local files
|
|
|
+ * @param str a comma separated list of local files
|
|
|
*/
|
|
|
public static void setLocalFiles(Configuration conf, String str) {
|
|
|
conf.set("mapred.cache.localFiles", str);
|
|
@@ -648,16 +620,24 @@ public class DistributedCache {
|
|
|
|
|
|
private static class CacheStatus {
|
|
|
// false, not loaded yet, true is loaded
|
|
|
- public boolean currentStatus;
|
|
|
+ boolean currentStatus;
|
|
|
|
|
|
// the local load path of this cache
|
|
|
- public Path localLoadPath;
|
|
|
+ Path localLoadPath;
|
|
|
|
|
|
// number of instances using this cache
|
|
|
- public int refcount;
|
|
|
+ int refcount;
|
|
|
+
|
|
|
+ // the cache-file modification time
|
|
|
+ long mtime;
|
|
|
|
|
|
- // The md5 checksum of the crc file of this cache
|
|
|
- public byte[] md5;
|
|
|
+ public CacheStatus(Path localLoadPath) {
|
|
|
+ super();
|
|
|
+ this.currentStatus = false;
|
|
|
+ this.localLoadPath = localLoadPath;
|
|
|
+ this.refcount = 0;
|
|
|
+ this.mtime = -1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|