|
@@ -160,7 +160,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
|
|
|
final FileSystem.Statistics stats;
|
|
|
final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
- final LeaseRenewer leaserenewer;
|
|
|
+ private final String authority;
|
|
|
final SocketCache socketCache;
|
|
|
final Conf dfsClientConf;
|
|
|
|
|
@@ -310,10 +310,10 @@ public class DFSClient implements java.io.Closeable {
|
|
|
// The hdfsTimeout is currently the same as the ipc timeout
|
|
|
this.hdfsTimeout = Client.getTimeout(conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
- final String authority = nameNodeAddr == null? "null":
|
|
|
+ this.authority = nameNodeAddr == null? "null":
|
|
|
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
|
|
|
- this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
|
|
|
- this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
|
|
|
+ this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
|
|
|
+ DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
|
|
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
|
|
|
if (nameNodeAddr != null && rpcNamenode == null) {
|
|
|
this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi);
|
|
@@ -373,7 +373,30 @@ public class DFSClient implements java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Put a file. */
|
|
|
+ /** Return the lease renewer instance. The renewer thread won't start
|
|
|
+ * until the first output stream is created. The same instance will
|
|
|
+ * be returned until all output streams are closed.
|
|
|
+ */
|
|
|
+ public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
|
|
|
+ return LeaseRenewer.getInstance(authority, ugi, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Get a lease and start automatic renewal */
|
|
|
+ private void beginFileLease(final String src, final DFSOutputStream out)
|
|
|
+ throws IOException {
|
|
|
+ getLeaseRenewer().put(src, out, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Stop renewal of lease for the file. */
|
|
|
+ void endFileLease(final String src) throws IOException {
|
|
|
+ getLeaseRenewer().closeFile(src, this);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /** Put a file. Only called from LeaseRenewer, where proper locking is
|
|
|
+ * enforced to consistently update its local dfsclients array and
|
|
|
+ * client's filesBeingWritten map.
|
|
|
+ */
|
|
|
void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
|
|
synchronized(filesBeingWritten) {
|
|
|
filesBeingWritten.put(src, out);
|
|
@@ -386,7 +409,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Remove a file. */
|
|
|
+ /** Remove a file. Only called from LeaseRenewer. */
|
|
|
void removeFileBeingWritten(final String src) {
|
|
|
synchronized(filesBeingWritten) {
|
|
|
filesBeingWritten.remove(src);
|
|
@@ -450,12 +473,26 @@ public class DFSClient implements java.io.Closeable {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Close connections the Namenode.
|
|
|
+ */
|
|
|
+ void closeConnectionToNamenode() {
|
|
|
+ RPC.stopProxy(rpcNamenode);
|
|
|
+ }
|
|
|
+
|
|
|
/** Abort and release resources held. Ignore all errors. */
|
|
|
void abort() {
|
|
|
clientRunning = false;
|
|
|
closeAllFilesBeingWritten(true);
|
|
|
socketCache.clear();
|
|
|
- RPC.stopProxy(rpcNamenode); // close connections to the namenode
|
|
|
+ try {
|
|
|
+ // remove reference to this client and stop the renewer,
|
|
|
+ // if there is no more clients under the renewer.
|
|
|
+ getLeaseRenewer().closeClient(this);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Exception occurred while aborting the client. " + ioe);
|
|
|
+ }
|
|
|
+ closeConnectionToNamenode();
|
|
|
}
|
|
|
|
|
|
/** Close/abort all files being written. */
|
|
@@ -494,9 +531,9 @@ public class DFSClient implements java.io.Closeable {
|
|
|
closeAllFilesBeingWritten(false);
|
|
|
socketCache.clear();
|
|
|
clientRunning = false;
|
|
|
- leaserenewer.closeClient(this);
|
|
|
+ getLeaseRenewer().closeClient(this);
|
|
|
// close connections to the namenode
|
|
|
- RPC.stopProxy(rpcNamenode);
|
|
|
+ closeConnectionToNamenode();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -950,7 +987,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
|
|
|
createParent, replication, blockSize, progress, buffersize,
|
|
|
dfsClientConf.createChecksum());
|
|
|
- leaserenewer.put(src, result, this);
|
|
|
+ beginFileLease(src, result);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -1000,7 +1037,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
flag, createParent, replication, blockSize, progress, buffersize,
|
|
|
checksum);
|
|
|
}
|
|
|
- leaserenewer.put(src, result, this);
|
|
|
+ beginFileLease(src, result);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -1086,7 +1123,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
+ src + " on client " + clientName);
|
|
|
}
|
|
|
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
|
|
|
- leaserenewer.put(src, result, this);
|
|
|
+ beginFileLease(src, result);
|
|
|
return result;
|
|
|
}
|
|
|
|