|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.zookeeper.server;
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
import java.io.StringWriter;
|
|
import java.io.StringWriter;
|
|
@@ -34,6 +35,7 @@ import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
+import org.apache.jute.BinaryOutputArchive;
|
|
import org.apache.jute.Record;
|
|
import org.apache.jute.Record;
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
import org.apache.zookeeper.Watcher;
|
|
import org.apache.zookeeper.Watcher;
|
|
@@ -55,6 +57,8 @@ public abstract class ServerCnxn implements Stats, Watcher {
|
|
|
|
|
|
private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
|
|
private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
|
|
|
|
|
|
|
|
+ private static final byte[] fourBytes = new byte[4];
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* If the client is of old version, we don't send r-o mode info to it.
|
|
* If the client is of old version, we don't send r-o mode info to it.
|
|
* The reason is that if we would, old C client doesn't read it, which
|
|
* The reason is that if we would, old C client doesn't read it, which
|
|
@@ -66,8 +70,26 @@ public abstract class ServerCnxn implements Stats, Watcher {
|
|
|
|
|
|
abstract void close();
|
|
abstract void close();
|
|
|
|
|
|
- public abstract void sendResponse(ReplyHeader h, Record r, String tag)
|
|
|
|
- throws IOException;
|
|
|
|
|
|
+ public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
|
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
|
+ // Make space for length
|
|
|
|
+ BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
|
|
|
|
+ try {
|
|
|
|
+ baos.write(fourBytes);
|
|
|
|
+ bos.writeRecord(h, "header");
|
|
|
|
+ if (r != null) {
|
|
|
|
+ bos.writeRecord(r, tag);
|
|
|
|
+ }
|
|
|
|
+ baos.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Error serializing response");
|
|
|
|
+ }
|
|
|
|
+ byte b[] = baos.toByteArray();
|
|
|
|
+ serverStats().updateClientResponseSize(b.length - 4);
|
|
|
|
+ ByteBuffer bb = ByteBuffer.wrap(b);
|
|
|
|
+ bb.putInt(b.length - 4).rewind();
|
|
|
|
+ sendBuffer(bb);
|
|
|
|
+ }
|
|
|
|
|
|
/* notify the client the session is closing and close/cleanup socket */
|
|
/* notify the client the session is closing and close/cleanup socket */
|
|
abstract void sendCloseSession();
|
|
abstract void sendCloseSession();
|
|
@@ -133,12 +155,12 @@ public abstract class ServerCnxn implements Stats, Watcher {
|
|
incrPacketsSent();
|
|
incrPacketsSent();
|
|
ServerStats serverStats = serverStats();
|
|
ServerStats serverStats = serverStats();
|
|
if (serverStats != null) {
|
|
if (serverStats != null) {
|
|
- serverStats().incrementPacketsSent();
|
|
|
|
|
|
+ serverStats.incrementPacketsSent();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
protected abstract ServerStats serverStats();
|
|
protected abstract ServerStats serverStats();
|
|
-
|
|
|
|
|
|
+
|
|
protected final Date established = new Date();
|
|
protected final Date established = new Date();
|
|
|
|
|
|
protected final AtomicLong packetsReceived = new AtomicLong();
|
|
protected final AtomicLong packetsReceived = new AtomicLong();
|