|
@@ -1632,9 +1632,9 @@ class DFSClient implements FSConstants {
|
|
|
" lastPacketInBlock:" + one.lastPacketInBlock);
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("DataStreamer Exception: " + e);
|
|
|
- hasError = true;
|
|
|
- }
|
|
|
- }
|
|
|
+ hasError = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if (closed || hasError || !clientRunning) {
|
|
|
continue;
|
|
@@ -1684,8 +1684,8 @@ class DFSClient implements FSConstants {
|
|
|
Thread.sleep(artificialSlowdown);
|
|
|
} catch (InterruptedException e) {}
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// shutdown thread
|
|
|
void close() {
|
|
@@ -1698,12 +1698,12 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
this.interrupt();
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // Processes reponses from the datanodes. A packet is removed
|
|
|
+ // from the ackQueue when its response arrives.
|
|
|
//
|
|
|
- // Processes reponses from the datanodes. A packet is removed
|
|
|
- // from the ackQueue when its response arrives.
|
|
|
- //
|
|
|
private class ResponseProcessor extends Thread {
|
|
|
|
|
|
private volatile boolean closed = false;
|
|
@@ -1714,31 +1714,31 @@ class DFSClient implements FSConstants {
|
|
|
this.targets = targets;
|
|
|
}
|
|
|
|
|
|
- public void run() {
|
|
|
+ public void run() {
|
|
|
|
|
|
this.setName("ResponseProcessor for block " + block);
|
|
|
|
|
|
while (!closed && clientRunning && !lastPacketInBlock) {
|
|
|
- // process responses from datanodes.
|
|
|
- try {
|
|
|
- // verify seqno from datanode
|
|
|
+ // process responses from datanodes.
|
|
|
+ try {
|
|
|
+ // verify seqno from datanode
|
|
|
int numTargets = -1;
|
|
|
- long seqno = blockReplyStream.readLong();
|
|
|
+ long seqno = blockReplyStream.readLong();
|
|
|
LOG.debug("DFSClient received ack for seqno " + seqno);
|
|
|
if (seqno == -1) {
|
|
|
continue;
|
|
|
} else if (seqno == -2) {
|
|
|
// no nothing
|
|
|
} else {
|
|
|
- Packet one = null;
|
|
|
- synchronized (ackQueue) {
|
|
|
- one = ackQueue.getFirst();
|
|
|
- }
|
|
|
- if (one.seqno != seqno) {
|
|
|
- throw new IOException("Responseprocessor: Expecting seqno " +
|
|
|
+ Packet one = null;
|
|
|
+ synchronized (ackQueue) {
|
|
|
+ one = ackQueue.getFirst();
|
|
|
+ }
|
|
|
+ if (one.seqno != seqno) {
|
|
|
+ throw new IOException("Responseprocessor: Expecting seqno " +
|
|
|
" for block " + block +
|
|
|
- one.seqno + " but received " + seqno);
|
|
|
- }
|
|
|
+ one.seqno + " but received " + seqno);
|
|
|
+ }
|
|
|
lastPacketInBlock = one.lastPacketInBlock;
|
|
|
}
|
|
|
|