|
@@ -61,7 +61,7 @@ import javax.security.auth.login.LoginException;
|
|
|
* filesystem tasks.
|
|
|
*
|
|
|
********************************************************/
|
|
|
-public class DFSClient implements FSConstants {
|
|
|
+public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
|
|
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
|
|
|
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
@@ -71,7 +71,7 @@ public class DFSClient implements FSConstants {
|
|
|
volatile boolean clientRunning = true;
|
|
|
Random r = new Random();
|
|
|
String clientName;
|
|
|
- Daemon leaseChecker;
|
|
|
+ private final LeaseChecker leasechecker = new LeaseChecker();
|
|
|
private Configuration conf;
|
|
|
private long defaultBlockSize;
|
|
|
private short defaultReplication;
|
|
@@ -81,12 +81,6 @@ public class DFSClient implements FSConstants {
|
|
|
final int writePacketSize;
|
|
|
private FileSystem.Statistics stats;
|
|
|
|
|
|
- /**
|
|
|
- * A map from name -> DFSOutputStream of files that are currently being
|
|
|
- * written by this client.
|
|
|
- */
|
|
|
- private TreeMap<String, OutputStream> pendingCreates =
|
|
|
- new TreeMap<String, OutputStream>();
|
|
|
|
|
|
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
|
|
|
return createNamenode(NameNode.getAddress(conf), conf);
|
|
@@ -186,8 +180,6 @@ public class DFSClient implements FSConstants {
|
|
|
}
|
|
|
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
|
defaultReplication = (short) conf.getInt("dfs.replication", 3);
|
|
|
- this.leaseChecker = new Daemon(new LeaseChecker());
|
|
|
- this.leaseChecker.start();
|
|
|
}
|
|
|
|
|
|
public DFSClient(InetSocketAddress nameNodeAddr,
|
|
@@ -207,33 +199,13 @@ public class DFSClient implements FSConstants {
|
|
|
* Close the file system, abandoning all of the leases and files being
|
|
|
* created and close connections to the namenode.
|
|
|
*/
|
|
|
- public void close() throws IOException {
|
|
|
- // synchronize in here so that we don't need to change the API
|
|
|
- synchronized (this) {
|
|
|
- checkOpen();
|
|
|
- synchronized (pendingCreates) {
|
|
|
- while (!pendingCreates.isEmpty()) {
|
|
|
- String name = pendingCreates.firstKey();
|
|
|
- OutputStream out = pendingCreates.remove(name);
|
|
|
- if (out != null) {
|
|
|
- try {
|
|
|
- out.close();
|
|
|
- } catch (IOException ie) {
|
|
|
- System.err.println("Exception closing file " + name);
|
|
|
- ie.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- this.clientRunning = false;
|
|
|
- try {
|
|
|
- leaseChecker.join();
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
-
|
|
|
- // close connections to the namenode
|
|
|
- RPC.stopProxy(rpcNamenode);
|
|
|
- }
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
+ checkOpen();
|
|
|
+ clientRunning = false;
|
|
|
+ leasechecker.close();
|
|
|
+
|
|
|
+ // close connections to the namenode
|
|
|
+ RPC.stopProxy(rpcNamenode);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -477,9 +449,7 @@ public class DFSClient implements FSConstants {
|
|
|
OutputStream result = new DFSOutputStream(src, masked,
|
|
|
overwrite, replication, blockSize, progress, buffersize,
|
|
|
conf.getInt("io.bytes.per.checksum", 512));
|
|
|
- synchronized (pendingCreates) {
|
|
|
- pendingCreates.put(src, result);
|
|
|
- }
|
|
|
+ leasechecker.put(src, result);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -508,9 +478,7 @@ public class DFSClient implements FSConstants {
|
|
|
}
|
|
|
OutputStream result = new DFSOutputStream(src, buffersize, progress,
|
|
|
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
|
|
|
- synchronized(pendingCreates) {
|
|
|
- pendingCreates.put(src, result);
|
|
|
- }
|
|
|
+ leasechecker.put(src, result);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -803,35 +771,98 @@ public class DFSClient implements FSConstants {
|
|
|
throw new IOException("No live nodes contain current block");
|
|
|
}
|
|
|
|
|
|
- /***************************************************************
|
|
|
- * Periodically check in with the namenode and renew all the leases
|
|
|
- * when the lease period is half over.
|
|
|
- ***************************************************************/
|
|
|
- class LeaseChecker implements Runnable {
|
|
|
+ boolean isLeaseCheckerStarted() {
|
|
|
+ return leasechecker.daemon != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Lease management*/
|
|
|
+ private class LeaseChecker implements Runnable {
|
|
|
+ /** A map from src -> DFSOutputStream of files that are currently being
|
|
|
+ * written by this client.
|
|
|
+ */
|
|
|
+ private final SortedMap<String, OutputStream> pendingCreates
|
|
|
+ = new TreeMap<String, OutputStream>();
|
|
|
+
|
|
|
+ private Daemon daemon = null;
|
|
|
+
|
|
|
+ synchronized void put(String src, OutputStream out) {
|
|
|
+ if (clientRunning) {
|
|
|
+ if (daemon == null) {
|
|
|
+ daemon = new Daemon(this);
|
|
|
+ daemon.start();
|
|
|
+ }
|
|
|
+ pendingCreates.put(src, out);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void remove(String src) {
|
|
|
+ pendingCreates.remove(src);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void close() {
|
|
|
+ while (!pendingCreates.isEmpty()) {
|
|
|
+ String src = pendingCreates.firstKey();
|
|
|
+ OutputStream out = pendingCreates.remove(src);
|
|
|
+ if (out != null) {
|
|
|
+ try {
|
|
|
+ out.close();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ System.err.println("Exception closing file " + src);
|
|
|
+ ie.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (daemon != null) {
|
|
|
+ daemon.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void renew() throws IOException {
|
|
|
+ synchronized(this) {
|
|
|
+ if (pendingCreates.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ namenode.renewLease(clientName);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
+ * Periodically check in with the namenode and renew all the leases
|
|
|
+ * when the lease period is half over.
|
|
|
*/
|
|
|
public void run() {
|
|
|
long lastRenewed = 0;
|
|
|
while (clientRunning) {
|
|
|
if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
|
|
|
try {
|
|
|
- synchronized (pendingCreates) {
|
|
|
- if (pendingCreates.size() > 0)
|
|
|
- namenode.renewLease(clientName);
|
|
|
- }
|
|
|
+ renew();
|
|
|
lastRenewed = System.currentTimeMillis();
|
|
|
} catch (IOException ie) {
|
|
|
- String err = StringUtils.stringifyException(ie);
|
|
|
- LOG.warn("Problem renewing lease for " + clientName +
|
|
|
- ": " + err);
|
|
|
+ LOG.warn("Problem renewing lease for " + clientName, ie);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(this + " is interrupted.", ie);
|
|
|
+ }
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public String toString() {
|
|
|
+ String s = getClass().getSimpleName();
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ return s + "@" + DFSClient.this + ": "
|
|
|
+ + StringUtils.stringifyException(new Throwable("for testing"));
|
|
|
+ }
|
|
|
+ return s;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/** Utility class to encapsulate data node info and its ip address. */
|
|
@@ -2792,10 +2823,7 @@ public class DFSClient implements FSConstants {
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
closeInternal();
|
|
|
-
|
|
|
- synchronized (pendingCreates) {
|
|
|
- pendingCreates.remove(src);
|
|
|
- }
|
|
|
+ leasechecker.remove(src);
|
|
|
|
|
|
if (s != null) {
|
|
|
s.close();
|
|
@@ -2918,4 +2946,10 @@ public class DFSClient implements FSConstants {
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /** {@inheritDoc} */
|
|
|
+ public String toString() {
|
|
|
+ return getClass().getSimpleName() + "[clientName=" + clientName
|
|
|
+ + ", ugi=" + ugi + "]";
|
|
|
+ }
|
|
|
}
|