|
@@ -680,7 +680,7 @@ class DataStreamer extends Daemon {
|
|
try {
|
|
try {
|
|
dataQueue.wait(timeout);
|
|
dataQueue.wait(timeout);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- LOG.warn("Caught exception", e);
|
|
|
|
|
|
+ LOG.debug("Thread interrupted", e);
|
|
}
|
|
}
|
|
doSleep = false;
|
|
doSleep = false;
|
|
now = Time.monotonicNow();
|
|
now = Time.monotonicNow();
|
|
@@ -695,7 +695,7 @@ class DataStreamer extends Daemon {
|
|
try {
|
|
try {
|
|
backOffIfNecessary();
|
|
backOffIfNecessary();
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- LOG.warn("Caught exception", e);
|
|
|
|
|
|
+ LOG.debug("Thread interrupted", e);
|
|
}
|
|
}
|
|
one = dataQueue.getFirst(); // regular data packet
|
|
one = dataQueue.getFirst(); // regular data packet
|
|
SpanId[] parents = one.getTraceParents();
|
|
SpanId[] parents = one.getTraceParents();
|
|
@@ -708,9 +708,8 @@ class DataStreamer extends Daemon {
|
|
}
|
|
}
|
|
|
|
|
|
// get new block from namenode.
|
|
// get new block from namenode.
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("stage=" + stage + ", " + this);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("stage={}, {}", stage, this);
|
|
|
|
+
|
|
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
LOG.debug("Allocating new block: {}", this);
|
|
LOG.debug("Allocating new block: {}", this);
|
|
setPipeline(nextBlockOutputStream());
|
|
setPipeline(nextBlockOutputStream());
|
|
@@ -738,7 +737,7 @@ class DataStreamer extends Daemon {
|
|
// wait for acks to arrive from datanodes
|
|
// wait for acks to arrive from datanodes
|
|
dataQueue.wait(1000);
|
|
dataQueue.wait(1000);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- LOG.warn("Caught exception", e);
|
|
|
|
|
|
+ LOG.debug("Thread interrupted", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -893,6 +892,7 @@ class DataStreamer extends Daemon {
|
|
}
|
|
}
|
|
checkClosed();
|
|
checkClosed();
|
|
} catch (ClosedChannelException cce) {
|
|
} catch (ClosedChannelException cce) {
|
|
|
|
+ LOG.debug("Closed channel exception", cce);
|
|
}
|
|
}
|
|
long duration = Time.monotonicNow() - begin;
|
|
long duration = Time.monotonicNow() - begin;
|
|
if (duration > dfsclientSlowLogThresholdMs) {
|
|
if (duration > dfsclientSlowLogThresholdMs) {
|
|
@@ -946,7 +946,8 @@ class DataStreamer extends Daemon {
|
|
}
|
|
}
|
|
checkClosed();
|
|
checkClosed();
|
|
queuePacket(packet);
|
|
queuePacket(packet);
|
|
- } catch (ClosedChannelException ignored) {
|
|
|
|
|
|
+ } catch (ClosedChannelException cce) {
|
|
|
|
+ LOG.debug("Closed channel exception", cce);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -985,7 +986,8 @@ class DataStreamer extends Daemon {
|
|
response.close();
|
|
response.close();
|
|
response.join();
|
|
response.join();
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- LOG.warn("Caught exception", e);
|
|
|
|
|
|
+ LOG.debug("Thread interrupted", e);
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
} finally {
|
|
} finally {
|
|
response = null;
|
|
response = null;
|
|
}
|
|
}
|
|
@@ -1097,9 +1099,7 @@ class DataStreamer extends Daemon {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("DFSClient {}", ack);
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.debug("DFSClient {}", ack);
|
|
|
|
|
|
long seqno = ack.getSeqno();
|
|
long seqno = ack.getSeqno();
|
|
// processes response status from datanodes.
|
|
// processes response status from datanodes.
|
|
@@ -1616,7 +1616,8 @@ class DataStreamer extends Daemon {
|
|
// good reports should follow bad ones, if client committed
|
|
// good reports should follow bad ones, if client committed
|
|
// with those nodes.
|
|
// with those nodes.
|
|
Thread.sleep(2000);
|
|
Thread.sleep(2000);
|
|
- } catch (InterruptedException ignored) {
|
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.debug("Thread interrupted", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|