|
@@ -1198,17 +1198,22 @@ public abstract class Server {
|
|
|
error = StringUtils.stringifyException(e);
|
|
|
}
|
|
|
CurCall.set(null);
|
|
|
- setupResponse(buf, call,
|
|
|
- (error == null) ? Status.SUCCESS : Status.ERROR,
|
|
|
- value, errorClass, error);
|
|
|
- // Discard the large buf and reset it back to
|
|
|
- // smaller size to freeup heap
|
|
|
- if (buf.size() > MAX_RESP_BUF_SIZE) {
|
|
|
- LOG.warn("Large response size " + buf.size() + " for call " +
|
|
|
- call.toString());
|
|
|
- buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
|
|
|
+ synchronized (call.connection.responseQueue) {
|
|
|
+ // setupResponse() needs to be sync'ed together with
|
|
|
+ // responder.doResponse() since setupResponse may use
|
|
|
+ // SASL to encrypt response data and SASL enforces
|
|
|
+ // its own message ordering.
|
|
|
+ setupResponse(buf, call, (error == null) ? Status.SUCCESS
|
|
|
+ : Status.ERROR, value, errorClass, error);
|
|
|
+ // Discard the large buf and reset it back to
|
|
|
+ // smaller size to freeup heap
|
|
|
+ if (buf.size() > MAX_RESP_BUF_SIZE) {
|
|
|
+ LOG.warn("Large response size " + buf.size() + " for call "
|
|
|
+ + call.toString());
|
|
|
+ buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
|
|
|
+ }
|
|
|
+ responder.doRespond(call);
|
|
|
}
|
|
|
- responder.doRespond(call);
|
|
|
} catch (InterruptedException e) {
|
|
|
if (running) { // unexpected -- log it
|
|
|
LOG.info(getName() + " caught: " +
|