Ver código fonte

ZOOKEEPER-4723. Cleanup more in-place bytebuffer manipulations (#2036)

Signed-off-by: tison <wander4096@gmail.com>
tison 1 ano atrás
pai
commit
31c7a7e4c2

+ 0 - 83
zookeeper-jute/src/main/java/org/apache/jute/RecordReader.java

@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jute;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end interface to deserializers. Also acts as a factory
- * for deserializers.
- */
-public class RecordReader {
-
-    private static  HashMap<String, Method> archiveFactory;
-
-    private InputArchive archive;
-
-    static {
-        archiveFactory = new HashMap<>();
-
-        try {
-            archiveFactory.put(
-                    "binary",
-                    BinaryInputArchive.class.getDeclaredMethod("getArchive", InputStream.class));
-        } catch (SecurityException | NoSuchMethodException ex) {
-            ex.printStackTrace();
-        }
-    }
-
-    private static InputArchive createArchive(InputStream in, String format) {
-        Method factory = archiveFactory.get(format);
-
-        if (factory != null) {
-            Object[] params = {in};
-            try {
-                return (InputArchive) factory.invoke(null, params);
-            } catch (IllegalArgumentException | InvocationTargetException | IllegalAccessException ex) {
-                ex.printStackTrace();
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * Creates a new instance of RecordReader.
-     *
-     * @param in     Stream from which to deserialize a record
-     * @param format Deserialization format ("binary", "xml", or "csv")
-     */
-    public RecordReader(InputStream in, String format) {
-        archive = createArchive(in, format);
-    }
-
-    /**
-     * Deserialize a record.
-     *
-     * @param r Record to be deserialized
-     */
-    public void read(Record r) throws IOException {
-        r.deserialize(archive, "");
-    }
-
-}

+ 0 - 81
zookeeper-jute/src/main/java/org/apache/jute/RecordWriter.java

@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jute;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end for serializers. Also serves as a factory for serializers.
- */
-public class RecordWriter {
-
-    private OutputArchive archive;
-
-    static HashMap<String, Method> constructFactory() {
-        HashMap<String, Method> factory = new HashMap<>();
-
-        try {
-            factory.put(
-                    "binary",
-                    BinaryOutputArchive.class.getDeclaredMethod("getArchive", OutputStream.class));
-        } catch (SecurityException | NoSuchMethodException ex) {
-            ex.printStackTrace();
-        }
-
-        return factory;
-    }
-
-    private static HashMap<String, Method> archiveFactory = constructFactory();
-
-    private static OutputArchive createArchive(OutputStream out, String format) {
-        Method factory = archiveFactory.get(format);
-        if (factory != null) {
-            Object[] params = {out};
-            try {
-                return (OutputArchive) factory.invoke(null, params);
-            } catch (IllegalArgumentException | InvocationTargetException | IllegalAccessException ex) {
-                ex.printStackTrace();
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Creates a new instance of RecordWriter.
-     *
-     * @param out    Output stream where the records will be serialized
-     * @param format Serialization format ("binary", "xml", or "csv")
-     */
-    public RecordWriter(OutputStream out, String format) {
-        archive = createArchive(out, format);
-    }
-
-    /**
-     * Serialize a record.
-     *
-     * @param r record to be serialized
-     */
-    public void write(Record r) throws IOException {
-        r.serialize(archive, "");
-    }
-}

+ 0 - 57
zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java

@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import javax.annotation.Nonnull;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.Record;
-
-public class ByteBufferOutputStream extends OutputStream {
-
-    private final ByteBuffer bb;
-
-    public ByteBufferOutputStream(ByteBuffer bb) {
-        this.bb = bb;
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-        bb.put((byte) b);
-    }
-
-    @Override
-    public void write(@Nonnull byte[] b) throws IOException {
-        bb.put(b);
-    }
-
-    @Override
-    public void write(@Nonnull byte[] b, int off, int len) throws IOException {
-        bb.put(b, off, len);
-    }
-
-    public static void record2ByteBuffer(Record record, ByteBuffer bb) throws IOException {
-        BinaryOutputArchive oa;
-        oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
-        record.serialize(oa, "request");
-    }
-
-}

+ 12 - 15
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -21,7 +21,6 @@ package org.apache.zookeeper.server;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,6 +33,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
@@ -969,46 +969,43 @@ public class DataTree {
 
                 boolean post_failed = false;
                 for (Txn subtxn : txns) {
-                    ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
-                    Record record;
+                    final Supplier<Record> supplier;
                     switch (subtxn.getType()) {
                     case OpCode.create:
                     case OpCode.create2:
-                        record = new CreateTxn();
+                        supplier = CreateTxn::new;
                         break;
                     case OpCode.createTTL:
-                        record = new CreateTTLTxn();
+                        supplier = CreateTTLTxn::new;
                         break;
                     case OpCode.createContainer:
-                        record = new CreateContainerTxn();
+                        supplier = CreateContainerTxn::new;
                         break;
                     case OpCode.delete:
                     case OpCode.deleteContainer:
-                        record = new DeleteTxn();
+                        supplier = DeleteTxn::new;
                         break;
                     case OpCode.setData:
-                        record = new SetDataTxn();
+                        supplier = SetDataTxn::new;
                         break;
                     case OpCode.error:
-                        record = new ErrorTxn();
+                        supplier = ErrorTxn::new;
                         post_failed = true;
                         break;
                     case OpCode.check:
-                        record = new CheckVersionTxn();
+                        supplier = CheckVersionTxn::new;
                         break;
                     default:
                         throw new IOException("Invalid type of op: " + subtxn.getType());
                     }
 
-                    assert record != null;
-
-                    ByteBufferInputStream.byteBuffer2Record(bb, record);
-
+                    final Record record;
                     if (failed && subtxn.getType() != OpCode.error) {
                         int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
-
                         subtxn.setType(OpCode.error);
                         record = new ErrorTxn(ec);
+                    } else {
+                        record = RequestRecord.fromBytes(subtxn.getData()).readRecord(supplier);
                     }
 
                     assert !failed || (subtxn.getType() == OpCode.error);

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -687,7 +687,7 @@ public class NIOServerCnxn extends ServerCnxn {
     public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
         int responseSize = 0;
         try {
-            ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+            ByteBuffer[] bb = serialize(h, r, cacheKey, stat, opCode);
             responseSize = bb[0].getInt();
             bb[0].rewind();
             sendBuffer(bb);

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -190,7 +190,7 @@ public class NettyServerCnxn extends ServerCnxn {
         if (closingChannel || !channel.isOpen()) {
             return 0;
         }
-        ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+        ByteBuffer[] bb = serialize(h, r, cacheKey, stat, opCode);
         int responseSize = bb[0].getInt();
         bb[0].rewind();
         sendBuffer(bb);

+ 2 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -18,10 +18,8 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.StringReader;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,7 +30,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.DeleteContainerRequest;
@@ -868,12 +865,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                     // TODO: I don't want to have to serialize it here and then
                     //       immediately deserialize in next processor. But I'm
                     //       not sure how else to get the txn stored into our list.
-                    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-                        txn.serialize(boa, "request");
-                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-                        txns.add(new Txn(type, bb.array()));
-                    }
+                    byte[] bb = RequestRecord.fromRecord(txn).readBytes();
+                    txns.add(new Txn(type, bb));
                 }
 
                 request.setTxn(new MultiTxn(txns));

+ 14 - 18
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -35,7 +34,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.WatchedEvent;
@@ -173,29 +171,27 @@ public abstract class ServerCnxn implements Stats, Watcher {
      * @param r reply payload, can be null
      * @param tag Jute serialization tag, can be null
      * @param cacheKey Key for caching the serialized payload. A null value prevents caching.
-     * @param stat Stat information for the the reply payload, used for cache invalidation.
+     * @param stat Stat information for the reply payload, used for cache invalidation.
      *             A value of 0 prevents caching.
      * @param opCode The op code appertains to the corresponding request of the response,
      *               used to decide which cache (e.g. read response cache,
      *               list of children response cache, ...) object to look up to when applicable.
      */
-    public abstract int sendResponse(ReplyHeader h, Record r, String tag,
-                                      String cacheKey, Stat stat, int opCode) throws IOException;
+    public abstract int sendResponse(
+            ReplyHeader h,
+            Record r,
+            String tag,
+            String cacheKey,
+            Stat stat,
+            int opCode
+    ) throws IOException;
 
     public int sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
         return sendResponse(h, r, tag, null, null, -1);
     }
 
-    protected byte[] serializeRecord(Record record) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(ZooKeeperServer.intBufferStartingSizeBytes);
-        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
-        bos.writeRecord(record, null);
-        return baos.toByteArray();
-    }
-
-    protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
-                                     String cacheKey, Stat stat, int opCode) throws IOException {
-        byte[] header = serializeRecord(h);
+    protected ByteBuffer[] serialize(ReplyHeader h, Record r, String cacheKey, Stat stat, int opCode) throws IOException {
+        byte[] header = RequestRecord.fromRecord(h).readBytes();
         byte[] data = null;
         if (r != null) {
             ResponseCache cache = null;
@@ -221,18 +217,18 @@ public abstract class ServerCnxn implements Stats, Watcher {
                 // Use cache to get serialized data.
                 //
                 // NB: Tag is ignored both during cache lookup and serialization,
-                // since is is not used in read responses, which are being cached.
+                // since it is not used in read responses, which are being cached.
                 data = cache.get(cacheKey, stat);
                 if (data == null) {
                     // Cache miss, serialize the response and put it in cache.
-                    data = serializeRecord(r);
+                    data = RequestRecord.fromRecord(r).readBytes();
                     cache.put(cacheKey, data, stat);
                     cacheMiss.add(1);
                 } else {
                     cacheHit.add(1);
                 }
             } else {
-                data = serializeRecord(r);
+                data = RequestRecord.fromRecord(r).readBytes();
             }
         }
         int dataLength = data == null ? 0 : data.length;

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java

@@ -50,7 +50,8 @@ public class SimpleRequestRecord implements RequestRecord {
             return bytes;
         }
 
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(
+                ZooKeeperServer.intBufferStartingSizeBytes)) {
             BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
             record.serialize(boa, "request");
             bytes = baos.toByteArray();

+ 6 - 11
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -53,9 +53,9 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.ByteBufferOutputStream;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -517,8 +517,7 @@ public class Zab1_0Test extends ZKTestCase {
 
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
-                byte[] liBytes = new byte[20];
-                ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
+                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null);
                 oa.writeRecord(qp, null);
 
@@ -830,8 +829,7 @@ public class Zab1_0Test extends ZKTestCase {
 
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
-                byte[] liBytes = new byte[20];
-                ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
+                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                 oa.writeRecord(qp, null);
 
@@ -871,8 +869,7 @@ public class Zab1_0Test extends ZKTestCase {
                 assertEquals(0, l.self.getCurrentEpoch());
 
                 LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
-                byte[] liBytes = new byte[20];
-                ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
+                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                 oa.writeRecord(qp, null);
 
@@ -1074,8 +1071,7 @@ public class Zab1_0Test extends ZKTestCase {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
-                byte[] liBytes = new byte[20];
-                ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
+                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                 /* we are going to say we last acked epoch 20 */
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null);
                 oa.writeRecord(qp, null);
@@ -1112,8 +1108,7 @@ public class Zab1_0Test extends ZKTestCase {
             public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException {
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
-                byte[] liBytes = new byte[20];
-                ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
+                byte[] liBytes = RequestRecord.fromRecord(li).readBytes();
                 QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
                 oa.writeRecord(qp, null);
                 readPacketSkippingPing(ia, qp);