|
@@ -1721,68 +1721,67 @@ class DFSClient implements FSConstants {
|
|
|
while (!closed && clientRunning && !lastPacketInBlock) {
|
|
|
// process responses from datanodes.
|
|
|
try {
|
|
|
- // verify seqno from datanode
|
|
|
- int numTargets = -1;
|
|
|
- long seqno = blockReplyStream.readLong();
|
|
|
- LOG.debug("DFSClient received ack for seqno " + seqno);
|
|
|
- if (seqno == -1) {
|
|
|
- continue;
|
|
|
- } else if (seqno == -2) {
|
|
|
- // no nothing
|
|
|
- } else {
|
|
|
- Packet one = null;
|
|
|
- synchronized (ackQueue) {
|
|
|
- one = ackQueue.getFirst();
|
|
|
- }
|
|
|
- if (one.seqno != seqno) {
|
|
|
- throw new IOException("Responseprocessor: Expecting seqno " +
|
|
|
- " for block " + block +
|
|
|
- one.seqno + " but received " + seqno);
|
|
|
- }
|
|
|
- lastPacketInBlock = one.lastPacketInBlock;
|
|
|
- }
|
|
|
-
|
|
|
- // processes response status from all datanodes.
|
|
|
- for (int i = 0; i < targets.length && clientRunning; i++) {
|
|
|
- short reply = blockReplyStream.readShort();
|
|
|
- if (reply != OP_STATUS_SUCCESS) {
|
|
|
- errorIndex = i; // first bad datanode
|
|
|
- throw new IOException("Bad response " + reply +
|
|
|
- " for block " + block +
|
|
|
- " from datanode " +
|
|
|
- targets[i].getName());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ // verify seqno from datanode
|
|
|
+ long seqno = blockReplyStream.readLong();
|
|
|
+ LOG.debug("DFSClient received ack for seqno " + seqno);
|
|
|
+ if (seqno == -1) {
|
|
|
+ continue;
|
|
|
+ } else if (seqno == -2) {
|
|
|
+ // no nothing
|
|
|
+ } else {
|
|
|
+ Packet one = null;
|
|
|
synchronized (ackQueue) {
|
|
|
- ackQueue.removeFirst();
|
|
|
- ackQueue.notifyAll();
|
|
|
+ one = ackQueue.getFirst();
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- if (!closed) {
|
|
|
- hasError = true;
|
|
|
- LOG.warn("DFSOutputStream ResponseProcessor exception " +
|
|
|
- " for block " + block +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
- closed = true;
|
|
|
+ if (one.seqno != seqno) {
|
|
|
+ throw new IOException("Responseprocessor: Expecting seqno " +
|
|
|
+ " for block " + block +
|
|
|
+ one.seqno + " but received " + seqno);
|
|
|
}
|
|
|
+ lastPacketInBlock = one.lastPacketInBlock;
|
|
|
}
|
|
|
|
|
|
- synchronized (dataQueue) {
|
|
|
- dataQueue.notifyAll();
|
|
|
+ // processes response status from all datanodes.
|
|
|
+ for (int i = 0; i < targets.length && clientRunning; i++) {
|
|
|
+ short reply = blockReplyStream.readShort();
|
|
|
+ if (reply != OP_STATUS_SUCCESS) {
|
|
|
+ errorIndex = i; // first bad datanode
|
|
|
+ throw new IOException("Bad response " + reply +
|
|
|
+ " for block " + block +
|
|
|
+ " from datanode " +
|
|
|
+ targets[i].getName());
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
synchronized (ackQueue) {
|
|
|
+ ackQueue.removeFirst();
|
|
|
ackQueue.notifyAll();
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (!closed) {
|
|
|
+ hasError = true;
|
|
|
+ LOG.warn("DFSOutputStream ResponseProcessor exception " +
|
|
|
+ " for block " + block +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ closed = true;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- void close() {
|
|
|
- closed = true;
|
|
|
- this.interrupt();
|
|
|
+ synchronized (dataQueue) {
|
|
|
+ dataQueue.notifyAll();
|
|
|
+ }
|
|
|
+ synchronized (ackQueue) {
|
|
|
+ ackQueue.notifyAll();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void close() {
|
|
|
+ closed = true;
|
|
|
+ this.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// If this stream has encountered any errors so far, shutdown
|
|
|
// threads and mark stream as closed.
|
|
|
//
|