Browse Source

ZOOKEEPER-4714: Improve syncRequestProcessor performance (#2024)

Yan Zhao 1 year ago
parent
commit
e2e8ec661f

+ 20 - 3
zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java

@@ -34,6 +34,8 @@ public class BinaryOutputArchive implements OutputArchive {
 
 
     private DataOutput out;
     private DataOutput out;
 
 
+    private long dataSize;
+
     public static BinaryOutputArchive getArchive(OutputStream strm) {
     public static BinaryOutputArchive getArchive(OutputStream strm) {
         return new BinaryOutputArchive(new DataOutputStream(strm));
         return new BinaryOutputArchive(new DataOutputStream(strm));
     }
     }
@@ -47,26 +49,32 @@ public class BinaryOutputArchive implements OutputArchive {
 
 
     public void writeByte(byte b, String tag) throws IOException {
     public void writeByte(byte b, String tag) throws IOException {
         out.writeByte(b);
         out.writeByte(b);
+        dataSize += 1;
     }
     }
 
 
     public void writeBool(boolean b, String tag) throws IOException {
     public void writeBool(boolean b, String tag) throws IOException {
         out.writeBoolean(b);
         out.writeBoolean(b);
+        dataSize += 1;
     }
     }
 
 
     public void writeInt(int i, String tag) throws IOException {
     public void writeInt(int i, String tag) throws IOException {
         out.writeInt(i);
         out.writeInt(i);
+        dataSize += 4;
     }
     }
 
 
     public void writeLong(long l, String tag) throws IOException {
     public void writeLong(long l, String tag) throws IOException {
         out.writeLong(l);
         out.writeLong(l);
+        dataSize += 8;
     }
     }
 
 
     public void writeFloat(float f, String tag) throws IOException {
     public void writeFloat(float f, String tag) throws IOException {
         out.writeFloat(f);
         out.writeFloat(f);
+        dataSize += 4;
     }
     }
 
 
     public void writeDouble(double d, String tag) throws IOException {
     public void writeDouble(double d, String tag) throws IOException {
         out.writeDouble(d);
         out.writeDouble(d);
+        dataSize += 8;
     }
     }
 
 
     /**
     /**
@@ -108,18 +116,22 @@ public class BinaryOutputArchive implements OutputArchive {
             return;
             return;
         }
         }
         ByteBuffer bb = stringToByteBuffer(s);
         ByteBuffer bb = stringToByteBuffer(s);
-        writeInt(bb.remaining(), "len");
+        int strLen = bb.remaining();
+        writeInt(strLen, "len");
         out.write(bb.array(), bb.position(), bb.limit());
         out.write(bb.array(), bb.position(), bb.limit());
+        dataSize += strLen;
     }
     }
 
 
     public void writeBuffer(byte[] barr, String tag)
     public void writeBuffer(byte[] barr, String tag)
             throws IOException {
             throws IOException {
         if (barr == null) {
         if (barr == null) {
-            out.writeInt(-1);
+            writeInt(-1, "len");
             return;
             return;
         }
         }
-        out.writeInt(barr.length);
+        int len = barr.length;
+        writeInt(len, "len");
         out.write(barr);
         out.write(barr);
+        dataSize += len;
     }
     }
 
 
     public void writeRecord(Record r, String tag) throws IOException {
     public void writeRecord(Record r, String tag) throws IOException {
@@ -150,4 +162,9 @@ public class BinaryOutputArchive implements OutputArchive {
     public void endMap(TreeMap<?, ?> v, String tag) throws IOException {
     public void endMap(TreeMap<?, ?> v, String tag) throws IOException {
     }
     }
 
 
+    @Override
+    public long getDataSize() {
+        return dataSize;
+    }
+
 }
 }

+ 2 - 0
zookeeper-jute/src/main/java/org/apache/jute/OutputArchive.java

@@ -59,4 +59,6 @@ public interface OutputArchive {
 
 
     void endMap(TreeMap<?, ?> v, String tag) throws IOException;
     void endMap(TreeMap<?, ?> v, String tag) throws IOException;
 
 
+    long getDataSize();
+
 }
 }

+ 27 - 4
zookeeper-jute/src/main/java/org/apache/jute/ToStringOutputArchive.java

@@ -32,6 +32,7 @@ public class ToStringOutputArchive implements OutputArchive {
 
 
     private PrintStream stream;
     private PrintStream stream;
     private boolean isFirst = true;
     private boolean isFirst = true;
+    private long dataSize;
 
 
     private void throwExceptionOnError(String tag) throws IOException {
     private void throwExceptionOnError(String tag) throws IOException {
         if (stream.checkError()) {
         if (stream.checkError()) {
@@ -42,6 +43,7 @@ public class ToStringOutputArchive implements OutputArchive {
     private void printCommaUnlessFirst() {
     private void printCommaUnlessFirst() {
         if (!isFirst) {
         if (!isFirst) {
             stream.print(",");
             stream.print(",");
+            dataSize += 1;
         }
         }
         isFirst = false;
         isFirst = false;
     }
     }
@@ -61,6 +63,7 @@ public class ToStringOutputArchive implements OutputArchive {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
         String val = b ? "T" : "F";
         String val = b ? "T" : "F";
         stream.print(val);
         stream.print(val);
+        dataSize += 1;
         throwExceptionOnError(tag);
         throwExceptionOnError(tag);
     }
     }
 
 
@@ -70,7 +73,9 @@ public class ToStringOutputArchive implements OutputArchive {
 
 
     public void writeLong(long l, String tag) throws IOException {
     public void writeLong(long l, String tag) throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
-        stream.print(l);
+        String strValue = String.valueOf(l);
+        stream.print(strValue);
+        dataSize += strValue.length();
         throwExceptionOnError(tag);
         throwExceptionOnError(tag);
     }
     }
 
 
@@ -80,20 +85,26 @@ public class ToStringOutputArchive implements OutputArchive {
 
 
     public void writeDouble(double d, String tag) throws IOException {
     public void writeDouble(double d, String tag) throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
-        stream.print(d);
+        String strValue = String.valueOf(d);
+        stream.print(strValue);
+        dataSize += strValue.length();
         throwExceptionOnError(tag);
         throwExceptionOnError(tag);
     }
     }
 
 
     public void writeString(String s, String tag) throws IOException {
     public void writeString(String s, String tag) throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
-        stream.print(escapeString(s));
+        String strValue = escapeString(s);
+        stream.print(strValue);
+        dataSize += strValue.length();
         throwExceptionOnError(tag);
         throwExceptionOnError(tag);
     }
     }
 
 
     public void writeBuffer(byte[] buf, String tag)
     public void writeBuffer(byte[] buf, String tag)
             throws IOException {
             throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
-        stream.print(escapeBuffer(buf));
+        String strValue = escapeBuffer(buf);
+        stream.print(strValue);
+        dataSize += strValue.length();
         throwExceptionOnError(tag);
         throwExceptionOnError(tag);
     }
     }
 
 
@@ -108,6 +119,7 @@ public class ToStringOutputArchive implements OutputArchive {
         if (tag != null && !"".equals(tag)) {
         if (tag != null && !"".equals(tag)) {
             printCommaUnlessFirst();
             printCommaUnlessFirst();
             stream.print("s{");
             stream.print("s{");
+            dataSize += 2;
             isFirst = true;
             isFirst = true;
         }
         }
     }
     }
@@ -115,9 +127,11 @@ public class ToStringOutputArchive implements OutputArchive {
     public void endRecord(Record r, String tag) throws IOException {
     public void endRecord(Record r, String tag) throws IOException {
         if (tag == null || "".equals(tag)) {
         if (tag == null || "".equals(tag)) {
             stream.print("\n");
             stream.print("\n");
+            dataSize += 1;
             isFirst = true;
             isFirst = true;
         } else {
         } else {
             stream.print("}");
             stream.print("}");
+            dataSize += 1;
             isFirst = false;
             isFirst = false;
         }
         }
     }
     }
@@ -125,25 +139,34 @@ public class ToStringOutputArchive implements OutputArchive {
     public void startVector(List<?> v, String tag) throws IOException {
     public void startVector(List<?> v, String tag) throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
         stream.print("v{");
         stream.print("v{");
+        dataSize += 2;
         isFirst = true;
         isFirst = true;
     }
     }
 
 
     public void endVector(List<?> v, String tag) throws IOException {
     public void endVector(List<?> v, String tag) throws IOException {
         stream.print("}");
         stream.print("}");
+        dataSize += 1;
         isFirst = false;
         isFirst = false;
     }
     }
 
 
     public void startMap(TreeMap<?, ?> v, String tag) throws IOException {
     public void startMap(TreeMap<?, ?> v, String tag) throws IOException {
         printCommaUnlessFirst();
         printCommaUnlessFirst();
         stream.print("m{");
         stream.print("m{");
+        dataSize += 2;
         isFirst = true;
         isFirst = true;
     }
     }
 
 
     public void endMap(TreeMap<?, ?> v, String tag) throws IOException {
     public void endMap(TreeMap<?, ?> v, String tag) throws IOException {
         stream.print("}");
         stream.print("}");
+        dataSize += 1;
         isFirst = false;
         isFirst = false;
     }
     }
 
 
+    @Override
+    public long getDataSize() {
+        return dataSize;
+    }
+
     private static String escapeString(String s) {
     private static String escapeString(String s) {
         if (s == null) {
         if (s == null) {
             return "";
             return "";

+ 104 - 0
zookeeper-jute/src/test/java/org/apache/jute/BinaryOutputArchiveTest.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jute;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.zookeeper.data.ClientInfo;
+import org.apache.zookeeper.proto.WhoAmIResponse;
+import org.junit.jupiter.api.Test;
+
+public class BinaryOutputArchiveTest {
+
+    @Test
+    public void testDataSize() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(32);
+        BinaryOutputArchive outputArchive = BinaryOutputArchive.getArchive(baos);
+        int dataSize = 0;
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int boolSize = 1;
+        dataSize += boolSize;
+        outputArchive.writeBool(true, "bool");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int byteSize = 1;
+        dataSize += byteSize;
+        outputArchive.writeByte(Byte.MAX_VALUE, "byte");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int intSize = 4;
+        dataSize += intSize;
+        outputArchive.writeInt(1, "int");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int longSize = 8;
+        dataSize += longSize;
+        outputArchive.writeLong(8L, "long");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int stringLengthSize = 4;
+        String str = "ab";
+        dataSize += stringLengthSize + str.length();
+        outputArchive.writeString(str, "string");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int floatSize = 4;
+        dataSize += floatSize;
+        outputArchive.writeFloat(12.0f, "float");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int doubleSize = 8;
+        dataSize += doubleSize;
+        outputArchive.writeDouble(12.44d, "double");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int bytesLengthSize = 4;
+        byte[] bytes = new byte[4];
+        bytes[0] = 'a';
+        bytes[1] = 'b';
+        bytes[2] = 'c';
+        bytes[3] = 'd';
+        dataSize += bytesLengthSize + bytes.length;
+        outputArchive.writeBuffer(bytes, "bytes");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        String schema = "custom";
+        String user1 = "horizon";
+        String user2 = "zhao";
+        WhoAmIResponse whoAmIResponse = new WhoAmIResponse();
+        whoAmIResponse.setClientInfo(Arrays.asList(
+                new ClientInfo(schema, user1),
+                new ClientInfo(schema, user2)));
+
+        int listSizeLength = 4;
+        int clientInfo1Length = stringLengthSize + schema.length() + stringLengthSize + user1.length();
+        int clientInfo2Length = stringLengthSize + schema.length() + stringLengthSize + user2.length();
+        dataSize += listSizeLength + clientInfo1Length + clientInfo2Length;
+        outputArchive.writeRecord(whoAmIResponse, "record");
+        checkDataSize(dataSize, baos, outputArchive);
+    }
+
+    private void checkDataSize(int dataSize, ByteArrayOutputStream baos, OutputArchive outputArchive) {
+        assertEquals(dataSize, outputArchive.getDataSize());
+        assertEquals(baos.size(), outputArchive.getDataSize());
+    }
+
+}

+ 117 - 0
zookeeper-jute/src/test/java/org/apache/jute/ToStringOutputArchiveTest.java

@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jute;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.zookeeper.data.ClientInfo;
+import org.apache.zookeeper.proto.WhoAmIResponse;
+import org.junit.jupiter.api.Test;
+
+public class ToStringOutputArchiveTest {
+
+    @Test
+    public void testDataSize() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(32);
+        ToStringOutputArchive outputArchive = new ToStringOutputArchive(baos);
+        int dataSize = 0;
+        assertEquals(dataSize, outputArchive.getDataSize());
+        assertEquals(dataSize, baos.size());
+
+        int boolSize = 1;
+        dataSize += boolSize;
+        outputArchive.writeBool(true, "bool");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        String comma = ",";
+        byte b = Byte.MAX_VALUE;
+        int byteSize = String.valueOf(b).length();
+        dataSize += comma.length() + byteSize;
+        outputArchive.writeByte(b, "byte");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        int i = 1;
+        int intSize = String.valueOf(i).length();
+        dataSize += comma.length() + intSize;
+        outputArchive.writeInt(i, "int");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        long l = 8L;
+        int longSize = String.valueOf(l).length();
+        dataSize += comma.length() + longSize;
+        outputArchive.writeLong(l, "long");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        String apostrophe = "'";
+        String str = "ab";
+        int strSize = str.length();
+        dataSize += comma.length() + apostrophe.length() + strSize;
+        outputArchive.writeString(str, "string");
+        checkDataSize(dataSize, baos, outputArchive);
+
+
+        float f = 12.0f;
+        int floatSize = String.valueOf(f).length();
+        dataSize += comma.length() + floatSize;
+        outputArchive.writeFloat(f, "float");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        double d = 12.44d;
+        int doubleSize = String.valueOf(d).length();
+        dataSize += comma.length() + doubleSize;
+        outputArchive.writeDouble(d, "double");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        byte[] bytes = new byte[4];
+        bytes[0] = 'a';
+        bytes[1] = 'b';
+        bytes[2] = 'c';
+        bytes[3] = 'd';
+        String poundSign = "#";
+        int bytesSize = Integer.toHexString(bytes[0]).length()
+                + Integer.toHexString(bytes[1]).length()
+                + Integer.toHexString(bytes[2]).length()
+                + Integer.toHexString(bytes[3]).length();
+        dataSize += comma.length() + poundSign.length() + bytesSize;
+        outputArchive.writeBuffer(bytes, "bytes");
+        checkDataSize(dataSize, baos, outputArchive);
+
+        String schema = "custom";
+        String user1 = "horizon";
+        String user2 = "zhao";
+        WhoAmIResponse whoAmIResponse = new WhoAmIResponse();
+        whoAmIResponse.setClientInfo(Arrays.asList(
+                new ClientInfo(schema, user1),
+                new ClientInfo(schema, user2)));
+        String whoAmIResponseStr = whoAmIResponse.toString().replace("\n", "");
+
+        String startRecordSign = "s{";
+        String endRecordSign = "}";
+        dataSize += comma.length() + startRecordSign.length() + whoAmIResponseStr.length() + endRecordSign.length();
+        outputArchive.writeRecord(whoAmIResponse, "record");
+        checkDataSize(dataSize, baos, outputArchive);
+
+    }
+
+    private void checkDataSize(int dataSize, ByteArrayOutputStream baos, OutputArchive outputArchive) {
+        assertEquals(dataSize, outputArchive.getDataSize());
+        assertEquals(baos.size(), outputArchive.getDataSize());
+    }
+}

+ 6 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FilePadding.java

@@ -73,7 +73,11 @@ public class FilePadding {
      * @throws IOException
      * @throws IOException
      */
      */
     long padFile(FileChannel fileChannel) throws IOException {
     long padFile(FileChannel fileChannel) throws IOException {
-        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
+        return this.padFile(fileChannel, fileChannel.position());
+    }
+
+    long padFile(FileChannel fileChannel, long position) throws IOException {
+        long newFileSize = calculateFileSizeWithPadding(position, currentSize, preAllocSize);
         if (currentSize != newFileSize) {
         if (currentSize != newFileSize) {
             fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
             fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
             currentSize = newFileSize;
             currentSize = newFileSize;
@@ -81,6 +85,7 @@ public class FilePadding {
         return currentSize;
         return currentSize;
     }
     }
 
 
+
     /**
     /**
      * Calculates a new file size with padding. We only return a new size if
      * Calculates a new file size with padding. We only return a new size if
      * the current file position is sufficiently close (less than 4K) to end of
      * the current file position is sufficiently close (less than 4K) to end of

+ 32 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java

@@ -168,6 +168,12 @@ public class FileTxnLog implements TxnLog, Closeable {
      */
      */
     private long prevLogsRunningTotal;
     private long prevLogsRunningTotal;
 
 
+    long filePosition = 0;
+
+    private long unFlushedSize = 0;
+
+    private long fileSize = 0;
+
     /**
     /**
      * constructor for FileTxnLog. Take the directory
      * constructor for FileTxnLog. Take the directory
      * where the txnlogs are stored
      * where the txnlogs are stored
@@ -208,7 +214,7 @@ public class FileTxnLog implements TxnLog, Closeable {
      */
      */
     public synchronized long getCurrentLogSize() {
     public synchronized long getCurrentLogSize() {
         if (logFileWrite != null) {
         if (logFileWrite != null) {
-            return logFileWrite.length();
+            return fileSize;
         }
         }
         return 0;
         return 0;
     }
     }
@@ -239,7 +245,9 @@ public class FileTxnLog implements TxnLog, Closeable {
             prevLogsRunningTotal += getCurrentLogSize();
             prevLogsRunningTotal += getCurrentLogSize();
             this.logStream = null;
             this.logStream = null;
             oa = null;
             oa = null;
-
+            fileSize = 0;
+            filePosition = 0;
+            unFlushedSize = 0;
             // Roll over the current log file into the running total
             // Roll over the current log file into the running total
         }
         }
     }
     }
@@ -280,22 +288,34 @@ public class FileTxnLog implements TxnLog, Closeable {
             logStream = new BufferedOutputStream(fos);
             logStream = new BufferedOutputStream(fos);
             oa = BinaryOutputArchive.getArchive(logStream);
             oa = BinaryOutputArchive.getArchive(logStream);
             FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
             FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
+            long dataSize = oa.getDataSize();
             fhdr.serialize(oa, "fileheader");
             fhdr.serialize(oa, "fileheader");
             // Make sure that the magic number is written before padding.
             // Make sure that the magic number is written before padding.
             logStream.flush();
             logStream.flush();
-            filePadding.setCurrentSize(fos.getChannel().position());
+            // Before writing data, first obtain the size of the OutputArchive.
+            // After writing the data, obtain the size of the OutputArchive again,
+            // so we can obtain the size of the data written this time.
+            // In this case, the data already flush into the channel, so add the size to filePosition.
+            filePosition += oa.getDataSize() - dataSize;
+            filePadding.setCurrentSize(filePosition);
             streamsToFlush.add(fos);
             streamsToFlush.add(fos);
         }
         }
-        filePadding.padFile(fos.getChannel());
+        fileSize = filePadding.padFile(fos.getChannel(), filePosition);
         byte[] buf = request.getSerializeData();
         byte[] buf = request.getSerializeData();
         if (buf == null || buf.length == 0) {
         if (buf == null || buf.length == 0) {
             throw new IOException("Faulty serialization for header " + "and txn");
             throw new IOException("Faulty serialization for header " + "and txn");
         }
         }
+        long dataSize = oa.getDataSize();
         Checksum crc = makeChecksumAlgorithm();
         Checksum crc = makeChecksumAlgorithm();
         crc.update(buf, 0, buf.length);
         crc.update(buf, 0, buf.length);
         oa.writeLong(crc.getValue(), "txnEntryCRC");
         oa.writeLong(crc.getValue(), "txnEntryCRC");
         Util.writeTxnBytes(oa, buf);
         Util.writeTxnBytes(oa, buf);
-
+        // Before writing data, first obtain the size of the OutputArchive.
+        // After writing the data, obtain the size of the OutputArchive again,
+        // so we can obtain the size of the data written this time.
+        // In this case, the data just write to the cache, not flushed, so add the size to unFlushedSize.
+        // After flushed, the unFlushedSize will add to the filePosition.
+        unFlushedSize += oa.getDataSize() - dataSize;
         return true;
         return true;
     }
     }
 
 
@@ -367,6 +387,13 @@ public class FileTxnLog implements TxnLog, Closeable {
     public synchronized void commit() throws IOException {
     public synchronized void commit() throws IOException {
         if (logStream != null) {
         if (logStream != null) {
             logStream.flush();
             logStream.flush();
+            filePosition += unFlushedSize;
+            // If we have written more than we have previously preallocated,
+            // we should override the fileSize by filePosition.
+            if (filePosition > fileSize) {
+                fileSize = filePosition;
+            }
+            unFlushedSize = 0;
         }
         }
         for (FileOutputStream log : streamsToFlush) {
         for (FileOutputStream log : streamsToFlush) {
             log.flush();
             log.flush();

+ 5 - 5
zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnLogTest.java

@@ -195,15 +195,15 @@ public class FileTxnLogTest extends ZKTestCase {
             log.append(new Request(0, 0, 0, new TxnHeader(0, 0, zxid++, 0, 0), record, 0));
             log.append(new Request(0, 0, 0, new TxnHeader(0, 0, zxid++, 0, 0), record, 0));
             logSize += PREALLOCATE;
             logSize += PREALLOCATE;
             assertEquals(logSize, log.getCurrentLogSize());
             assertEquals(logSize, log.getCurrentLogSize());
-            assertEquals(position, log.fos.getChannel().position());
+            assertEquals(position, log.filePosition);
         }
         }
         log.commit();
         log.commit();
         TxnHeader mockHeader = new TxnHeader(0, 0, 0, 0, 0);
         TxnHeader mockHeader = new TxnHeader(0, 0, 0, 0, 0);
         int totalSize =  fileHeaderSize + calculateSingleRecordLength(mockHeader, record) * 4;
         int totalSize =  fileHeaderSize + calculateSingleRecordLength(mockHeader, record) * 4;
         assertEquals(totalSize, log.getCurrentLogSize());
         assertEquals(totalSize, log.getCurrentLogSize());
-        assertEquals(totalSize, log.fos.getChannel().position());
+        assertEquals(totalSize, log.filePosition);
         assertTrue(log.getCurrentLogSize() > (zxid - 1) * NODE_SIZE);
         assertTrue(log.getCurrentLogSize() > (zxid - 1) * NODE_SIZE);
-        logSize = FilePadding.calculateFileSizeWithPadding(log.fos.getChannel().position(), PREALLOCATE * 4, PREALLOCATE);
+        logSize = FilePadding.calculateFileSizeWithPadding(log.filePosition, PREALLOCATE * 4, PREALLOCATE);
         position = totalSize;
         position = totalSize;
         boolean recalculate = true;
         boolean recalculate = true;
         for (int i = 0; i < 4; i++) {
         for (int i = 0; i < 4; i++) {
@@ -214,12 +214,12 @@ public class FileTxnLogTest extends ZKTestCase {
                 logSize += PREALLOCATE;
                 logSize += PREALLOCATE;
             }
             }
             assertEquals(logSize, log.getCurrentLogSize());
             assertEquals(logSize, log.getCurrentLogSize());
-            assertEquals(position, log.fos.getChannel().position());
+            assertEquals(position, log.filePosition);
         }
         }
         log.commit();
         log.commit();
         totalSize += calculateSingleRecordLength(mockHeader, record) * 4;
         totalSize += calculateSingleRecordLength(mockHeader, record) * 4;
         assertEquals(totalSize, log.getCurrentLogSize());
         assertEquals(totalSize, log.getCurrentLogSize());
-        assertEquals(totalSize, log.fos.getChannel().position());
+        assertEquals(totalSize, log.filePosition);
         assertTrue(log.getCurrentLogSize() > (zxid - 1) * NODE_SIZE);
         assertTrue(log.getCurrentLogSize() > (zxid - 1) * NODE_SIZE);
     }
     }