|
@@ -80,13 +80,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
|
|
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
|
|
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
public final ClientProtocol namenode;
|
|
public final ClientProtocol namenode;
|
|
- private final ClientProtocol rpcNamenode;
|
|
|
|
|
|
+ final ClientProtocol rpcNamenode;
|
|
private final InetSocketAddress nnAddress;
|
|
private final InetSocketAddress nnAddress;
|
|
final UserGroupInformation ugi;
|
|
final UserGroupInformation ugi;
|
|
volatile boolean clientRunning = true;
|
|
volatile boolean clientRunning = true;
|
|
- Random r = new Random();
|
|
|
|
|
|
+ static Random r = new Random();
|
|
final String clientName;
|
|
final String clientName;
|
|
- final LeaseChecker leasechecker = new LeaseChecker();
|
|
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
private long defaultBlockSize;
|
|
private long defaultBlockSize;
|
|
private short defaultReplication;
|
|
private short defaultReplication;
|
|
@@ -108,7 +107,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
*/
|
|
*/
|
|
private volatile boolean serverSupportsHdfs630 = true;
|
|
private volatile boolean serverSupportsHdfs630 = true;
|
|
private volatile boolean serverSupportsHdfs200 = true;
|
|
private volatile boolean serverSupportsHdfs200 = true;
|
|
-
|
|
|
|
|
|
+ final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
|
+ private final String authority;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A map from file names to {@link DFSOutputStream} objects
|
|
|
|
+ * that are currently being written by this client.
|
|
|
|
+ * Note that a file can only be written by a single client.
|
|
|
|
+ */
|
|
|
|
+ private final Map<String, DFSOutputStream> filesBeingWritten
|
|
|
|
+ = new HashMap<String, DFSOutputStream>();
|
|
|
|
+
|
|
|
|
+ /** Create a {@link NameNode} proxy */
|
|
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
|
|
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
|
|
return createNamenode(NameNode.getAddress(conf), conf);
|
|
return createNamenode(NameNode.getAddress(conf), conf);
|
|
}
|
|
}
|
|
@@ -251,14 +261,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
|
|
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
|
|
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
|
|
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
|
|
|
|
|
|
|
|
+ this.hdfsTimeout = Client.getTimeout(conf);
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
+ this.authority = nameNodeAddr == null? "null":
|
|
|
|
+ nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
|
|
|
|
+ String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
|
|
|
|
+ this.clientName = "DFSClient_" + taskId + "_" +
|
|
|
|
+ r.nextInt() + "_" + Thread.currentThread().getId();
|
|
|
|
|
|
- String taskId = conf.get("mapred.task.id");
|
|
|
|
- if (taskId != null) {
|
|
|
|
- this.clientName = "DFSClient_" + taskId;
|
|
|
|
- } else {
|
|
|
|
- this.clientName = "DFSClient_" + r.nextInt();
|
|
|
|
- }
|
|
|
|
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
defaultReplication = (short) conf.getInt("dfs.replication", 3);
|
|
defaultReplication = (short) conf.getInt("dfs.replication", 3);
|
|
|
|
|
|
@@ -310,20 +320,116 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
throw result;
|
|
throw result;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /** Return the lease renewer instance. The renewer thread won't start
|
|
|
|
+ * until the first output stream is created. The same instance will
|
|
|
|
+ * be returned until all output streams are closed.
|
|
|
|
+ */
|
|
|
|
+ public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
|
|
|
|
+ return LeaseRenewer.getInstance(authority, ugi, this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Get a lease and start automatic renewal */
|
|
|
|
+ private void beginFileLease(final String src, final DFSOutputStream out)
|
|
|
|
+ throws IOException {
|
|
|
|
+ getLeaseRenewer().put(src, out, this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Stop renewal of lease for the file. */
|
|
|
|
+ void endFileLease(final String src) throws IOException {
|
|
|
|
+ getLeaseRenewer().closeFile(src, this);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+
|
|
|
|
+ /** Put a file. Only called from LeaseRenewer, where proper locking is
|
|
|
|
+ * enforced to consistently update its local dfsclients array and
|
|
|
|
+ * client's filesBeingWritten map.
|
|
|
|
+ */
|
|
|
|
+ void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
|
|
|
+ synchronized(filesBeingWritten) {
|
|
|
|
+ filesBeingWritten.put(src, out);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Remove a file. Only called from LeaseRenewer. */
|
|
|
|
+ void removeFileBeingWritten(final String src) {
|
|
|
|
+ synchronized(filesBeingWritten) {
|
|
|
|
+ filesBeingWritten.remove(src);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Is file-being-written map empty? */
|
|
|
|
+ boolean isFilesBeingWrittenEmpty() {
|
|
|
|
+ synchronized(filesBeingWritten) {
|
|
|
|
+ return filesBeingWritten.isEmpty();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Renew leases.
|
|
|
|
+ * @return true if lease was renewed. May return false if this
|
|
|
|
+ * client has been closed or has no files open.
|
|
|
|
+ **/
|
|
|
|
+ boolean renewLease() throws IOException {
|
|
|
|
+ if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
|
|
|
+ namenode.renewLease(clientName);
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Abort and release resources held. Ignore all errors. */
|
|
|
|
+ void abort() {
|
|
|
|
+ clientRunning = false;
|
|
|
|
+ closeAllFilesBeingWritten(true);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ // remove reference to this client and stop the renewer,
|
|
|
|
+ // if there is no more clients under the renewer.
|
|
|
|
+ getLeaseRenewer().closeClient(this);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.info("Exception occurred while aborting the client. " + ioe);
|
|
|
|
+ }
|
|
|
|
+ RPC.stopProxy(rpcNamenode); // close connections to the namenode
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Close/abort all files being written. */
|
|
|
|
+ private void closeAllFilesBeingWritten(final boolean abort) {
|
|
|
|
+ for(;;) {
|
|
|
|
+ final String src;
|
|
|
|
+ final DFSOutputStream out;
|
|
|
|
+ synchronized(filesBeingWritten) {
|
|
|
|
+ if (filesBeingWritten.isEmpty()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ src = filesBeingWritten.keySet().iterator().next();
|
|
|
|
+ out = filesBeingWritten.remove(src);
|
|
|
|
+ }
|
|
|
|
+ if (out != null) {
|
|
|
|
+ try {
|
|
|
|
+ if (abort) {
|
|
|
|
+ out.abort();
|
|
|
|
+ } else {
|
|
|
|
+ out.close();
|
|
|
|
+ }
|
|
|
|
+ } catch(IOException ie) {
|
|
|
|
+ LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
|
|
|
|
+ ie);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Close the file system, abandoning all of the leases and files being
|
|
* Close the file system, abandoning all of the leases and files being
|
|
* created and close connections to the namenode.
|
|
* created and close connections to the namenode.
|
|
*/
|
|
*/
|
|
public synchronized void close() throws IOException {
|
|
public synchronized void close() throws IOException {
|
|
if(clientRunning) {
|
|
if(clientRunning) {
|
|
- leasechecker.close();
|
|
|
|
|
|
+ closeAllFilesBeingWritten(false);
|
|
clientRunning = false;
|
|
clientRunning = false;
|
|
- try {
|
|
|
|
- leasechecker.interruptAndJoin();
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ getLeaseRenewer().closeClient(this);
|
|
// close connections to the namenode
|
|
// close connections to the namenode
|
|
RPC.stopProxy(rpcNamenode);
|
|
RPC.stopProxy(rpcNamenode);
|
|
}
|
|
}
|
|
@@ -757,10 +863,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
}
|
|
}
|
|
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
|
|
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
|
|
LOG.debug(src + ": masked=" + masked);
|
|
LOG.debug(src + ": masked=" + masked);
|
|
- OutputStream result = new DFSOutputStream(src, masked,
|
|
|
|
|
|
+ final DFSOutputStream result = new DFSOutputStream(src, masked,
|
|
overwrite, createParent, replication, blockSize, progress, buffersize,
|
|
overwrite, createParent, replication, blockSize, progress, buffersize,
|
|
conf.getInt("io.bytes.per.checksum", 512));
|
|
conf.getInt("io.bytes.per.checksum", 512));
|
|
- leasechecker.put(src, result);
|
|
|
|
|
|
+ beginFileLease(src, result);
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -815,7 +921,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
}
|
|
}
|
|
final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
|
|
final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
|
|
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
|
|
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
|
|
- leasechecker.put(src, result);
|
|
|
|
|
|
+ beginFileLease(src, result);
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1392,117 +1498,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
throw new IOException("No live nodes contain current block");
|
|
throw new IOException("No live nodes contain current block");
|
|
}
|
|
}
|
|
|
|
|
|
- boolean isLeaseCheckerStarted() {
|
|
|
|
- return leasechecker.daemon != null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** Lease management*/
|
|
|
|
- class LeaseChecker implements Runnable {
|
|
|
|
- /** A map from src -> DFSOutputStream of files that are currently being
|
|
|
|
- * written by this client.
|
|
|
|
- */
|
|
|
|
- private final SortedMap<String, OutputStream> pendingCreates
|
|
|
|
- = new TreeMap<String, OutputStream>();
|
|
|
|
-
|
|
|
|
- private Daemon daemon = null;
|
|
|
|
-
|
|
|
|
- synchronized void put(String src, OutputStream out) {
|
|
|
|
- if (clientRunning) {
|
|
|
|
- if (daemon == null) {
|
|
|
|
- daemon = new Daemon(this);
|
|
|
|
- daemon.start();
|
|
|
|
- }
|
|
|
|
- pendingCreates.put(src, out);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- synchronized void remove(String src) {
|
|
|
|
- pendingCreates.remove(src);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void interruptAndJoin() throws InterruptedException {
|
|
|
|
- Daemon daemonCopy = null;
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (daemon != null) {
|
|
|
|
- daemon.interrupt();
|
|
|
|
- daemonCopy = daemon;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (daemonCopy != null) {
|
|
|
|
- LOG.debug("Wait for lease checker to terminate");
|
|
|
|
- daemonCopy.join();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- void close() {
|
|
|
|
- while (true) {
|
|
|
|
- String src;
|
|
|
|
- OutputStream out;
|
|
|
|
- synchronized (this) {
|
|
|
|
- if (pendingCreates.isEmpty()) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- src = pendingCreates.firstKey();
|
|
|
|
- out = pendingCreates.remove(src);
|
|
|
|
- }
|
|
|
|
- if (out != null) {
|
|
|
|
- try {
|
|
|
|
- out.close();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.error("Exception closing file " + src+ " : " + ie, ie);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void renew() throws IOException {
|
|
|
|
- synchronized(this) {
|
|
|
|
- if (pendingCreates.isEmpty()) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- namenode.renewLease(clientName);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Periodically check in with the namenode and renew all the leases
|
|
|
|
- * when the lease period is half over.
|
|
|
|
- */
|
|
|
|
- public void run() {
|
|
|
|
- long lastRenewed = 0;
|
|
|
|
- while (clientRunning && !Thread.interrupted()) {
|
|
|
|
- if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
|
|
|
|
- try {
|
|
|
|
- renew();
|
|
|
|
- lastRenewed = System.currentTimeMillis();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.warn("Problem renewing lease for " + clientName, ie);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug(this + " is interrupted.", ie);
|
|
|
|
- }
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /** {@inheritDoc} */
|
|
|
|
- public String toString() {
|
|
|
|
- String s = getClass().getSimpleName();
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
|
- return s + "@" + DFSClient.this + ": "
|
|
|
|
- + StringUtils.stringifyException(new Throwable("for testing"));
|
|
|
|
- }
|
|
|
|
- return s;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/** Utility class to encapsulate data node info and its address. */
|
|
/** Utility class to encapsulate data node info and its address. */
|
|
private static class DNAddrPair {
|
|
private static class DNAddrPair {
|
|
DatanodeInfo info;
|
|
DatanodeInfo info;
|
|
@@ -3994,12 +3989,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
closeInternal();
|
|
closeInternal();
|
|
- leasechecker.remove(src);
|
|
|
|
|
|
|
|
if (s != null) {
|
|
if (s != null) {
|
|
s.close();
|
|
s.close();
|
|
s = null;
|
|
s = null;
|
|
}
|
|
}
|
|
|
|
+ endFileLease(src);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -4012,6 +4007,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
closed = true;
|
|
closed = true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Aborts this output stream and releases any system
|
|
|
|
+ * resources associated with this stream.
|
|
|
|
+ */
|
|
|
|
+ synchronized void abort() throws IOException {
|
|
|
|
+ if (closed) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ setLastException(new IOException("Lease timeout of "
|
|
|
|
+ + (hdfsTimeout / 1000) + " seconds expired."));
|
|
|
|
+ closeThreads();
|
|
|
|
+ endFileLease(src);
|
|
|
|
+ }
|
|
|
|
+
|
|
// shutdown datastreamer and responseprocessor threads.
|
|
// shutdown datastreamer and responseprocessor threads.
|
|
private void closeThreads() throws IOException {
|
|
private void closeThreads() throws IOException {
|
|
try {
|
|
try {
|
|
@@ -4080,6 +4089,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
while (!fileComplete) {
|
|
while (!fileComplete) {
|
|
fileComplete = namenode.complete(src, clientName);
|
|
fileComplete = namenode.complete(src, clientName);
|
|
if (!fileComplete) {
|
|
if (!fileComplete) {
|
|
|
|
+ if (!clientRunning ||
|
|
|
|
+ (hdfsTimeout > 0 &&
|
|
|
|
+ localstart + hdfsTimeout < System.currentTimeMillis())) {
|
|
|
|
+ String msg = "Unable to close file because dfsclient " +
|
|
|
|
+ " was unable to contact the HDFS servers." +
|
|
|
|
+ " clientRunning " + clientRunning +
|
|
|
|
+ " hdfsTimeout " + hdfsTimeout;
|
|
|
|
+ LOG.info(msg);
|
|
|
|
+ throw new IOException(msg);
|
|
|
|
+ }
|
|
try {
|
|
try {
|
|
Thread.sleep(400);
|
|
Thread.sleep(400);
|
|
if (System.currentTimeMillis() - localstart > 5000) {
|
|
if (System.currentTimeMillis() - localstart > 5000) {
|