|
@@ -74,6 +74,7 @@ import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
|
|
|
|
|
/****************************************************************
|
|
@@ -396,7 +397,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
* and closes them. Any error recovery is also done by this thread.
|
|
|
*/
|
|
|
public void run() {
|
|
|
- long lastPacket = System.currentTimeMillis();
|
|
|
+ long lastPacket = Time.now();
|
|
|
while (!streamerClosed && dfsClient.clientRunning) {
|
|
|
|
|
|
// if the Responder encountered an error, shutdown Responder
|
|
@@ -420,7 +421,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
|
|
|
synchronized (dataQueue) {
|
|
|
// wait for a packet to be sent.
|
|
|
- long now = System.currentTimeMillis();
|
|
|
+ long now = Time.now();
|
|
|
while ((!streamerClosed && !hasError && dfsClient.clientRunning
|
|
|
&& dataQueue.size() == 0 &&
|
|
|
(stage != BlockConstructionStage.DATA_STREAMING ||
|
|
@@ -435,7 +436,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
} catch (InterruptedException e) {
|
|
|
}
|
|
|
doSleep = false;
|
|
|
- now = System.currentTimeMillis();
|
|
|
+ now = Time.now();
|
|
|
}
|
|
|
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
|
|
continue;
|
|
@@ -518,7 +519,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
errorIndex = 0;
|
|
|
throw e;
|
|
|
}
|
|
|
- lastPacket = System.currentTimeMillis();
|
|
|
+ lastPacket = Time.now();
|
|
|
|
|
|
if (one.isHeartbeatPacket()) { //heartbeat packet
|
|
|
}
|
|
@@ -981,7 +982,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
errorIndex = -1;
|
|
|
success = false;
|
|
|
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
+ long startTime = Time.now();
|
|
|
DatanodeInfo[] excluded = excludedNodes.toArray(
|
|
|
new DatanodeInfo[excludedNodes.size()]);
|
|
|
block = oldBlock;
|
|
@@ -1107,7 +1108,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
|
|
long sleeptime = 400;
|
|
|
while (true) {
|
|
|
- long localstart = System.currentTimeMillis();
|
|
|
+ long localstart = Time.now();
|
|
|
while (true) {
|
|
|
try {
|
|
|
return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
|
|
@@ -1130,9 +1131,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
} else {
|
|
|
--retries;
|
|
|
DFSClient.LOG.info("Exception while adding a block", e);
|
|
|
- if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
+ if (Time.now() - localstart > 5000) {
|
|
|
DFSClient.LOG.info("Waiting for replication for "
|
|
|
- + (System.currentTimeMillis() - localstart) / 1000
|
|
|
+ + (Time.now() - localstart) / 1000
|
|
|
+ " seconds");
|
|
|
}
|
|
|
try {
|
|
@@ -1721,14 +1722,14 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
// should be called holding (this) lock since setTestFilename() may
|
|
|
// be called during unit tests
|
|
|
private void completeFile(ExtendedBlock last) throws IOException {
|
|
|
- long localstart = System.currentTimeMillis();
|
|
|
+ long localstart = Time.now();
|
|
|
boolean fileComplete = false;
|
|
|
while (!fileComplete) {
|
|
|
fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
|
|
|
if (!fileComplete) {
|
|
|
if (!dfsClient.clientRunning ||
|
|
|
(dfsClient.hdfsTimeout > 0 &&
|
|
|
- localstart + dfsClient.hdfsTimeout < System.currentTimeMillis())) {
|
|
|
+ localstart + dfsClient.hdfsTimeout < Time.now())) {
|
|
|
String msg = "Unable to close file because dfsclient " +
|
|
|
" was unable to contact the HDFS servers." +
|
|
|
" clientRunning " + dfsClient.clientRunning +
|
|
@@ -1738,7 +1739,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
|
|
}
|
|
|
try {
|
|
|
Thread.sleep(400);
|
|
|
- if (System.currentTimeMillis() - localstart > 5000) {
|
|
|
+ if (Time.now() - localstart > 5000) {
|
|
|
DFSClient.LOG.info("Could not complete file " + src + " retrying...");
|
|
|
}
|
|
|
} catch (InterruptedException ie) {
|