|
@@ -39,9 +39,15 @@ import org.apache.log4j.Logger;
|
|
|
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
|
|
|
+/**
|
|
|
+ * This is a simple test program to compare the performance of writing to
|
|
|
+ * BookKeeper and to the local file system.
|
|
|
+ *
|
|
|
+ */
|
|
|
+
|
|
|
public class TestClient
|
|
|
implements AddCallback, ReadCallback{
|
|
|
- Logger LOG = Logger.getLogger(QuorumEngine.class);
|
|
|
+ private static final Logger LOG = Logger.getLogger(TestClient.class);
|
|
|
|
|
|
BookKeeper x;
|
|
|
LedgerHandle lh;
|
|
@@ -63,7 +69,7 @@ public class TestClient
|
|
|
try{
|
|
|
lh = x.createLedger(new byte[] {'a', 'b'});
|
|
|
} catch (BKException e) {
|
|
|
- System.out.println(e.toString());
|
|
|
+ LOG.error(e.toString());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -72,9 +78,9 @@ public class TestClient
|
|
|
this();
|
|
|
x = new BookKeeper(servers);
|
|
|
try{
|
|
|
- lh = x.createLedger(ensSize, new byte[] {'a', 'b'}, qSize, QMode.VERIFIABLE);
|
|
|
+ lh = x.createLedger(ensSize, qSize, QMode.VERIFIABLE, new byte[] {'a', 'b'});
|
|
|
} catch (BKException e) {
|
|
|
- System.out.println(e.toString());
|
|
|
+ LOG.error(e.toString());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -95,15 +101,9 @@ public class TestClient
|
|
|
|
|
|
public boolean removeEntryId(Integer id){
|
|
|
boolean retVal = false;
|
|
|
- //int val;
|
|
|
synchronized (map) {
|
|
|
- //val = map.get(id);
|
|
|
- //if(--val == 0){
|
|
|
map.remove(id);
|
|
|
retVal = true;
|
|
|
- //} else {
|
|
|
- //map.put(id, val);
|
|
|
- //}
|
|
|
|
|
|
if(map.size() == 0) map.notifyAll();
|
|
|
else{
|
|
@@ -118,8 +118,10 @@ public class TestClient
|
|
|
x.closeLedger(lh);
|
|
|
}
|
|
|
/**
|
|
|
- * First parameter is an integer defining the length of the message
|
|
|
- * Second parameter is the number of writes
|
|
|
+ * First says if entries should be written to BookKeeper (0) or to the local
|
|
|
+ * disk (1). Second parameter is an integer defining the length of a ledger entry.
|
|
|
+ * Third parameter is the number of writes.
|
|
|
+ *
|
|
|
* @param args
|
|
|
*/
|
|
|
public static void main(String[] args) {
|
|
@@ -140,23 +142,18 @@ public class TestClient
|
|
|
|
|
|
String servers = servers_sb.toString().trim().replace(' ', ',');
|
|
|
try {
|
|
|
- /*int lenght = Integer.parseInt(args[1]);
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
- while(lenght-- > 0){
|
|
|
- sb.append('a');
|
|
|
- }*/
|
|
|
TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4]));
|
|
|
c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2]));
|
|
|
//c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0]));
|
|
|
c.closeHandle();
|
|
|
} catch (NumberFormatException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
} catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
} catch (KeeperException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
} catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
}
|
|
|
break;
|
|
|
case 1:
|
|
@@ -165,7 +162,7 @@ public class TestClient
|
|
|
TestClient c = new TestClient(new FileOutputStream(args[2]));
|
|
|
c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3]));
|
|
|
} catch(FileNotFoundException e){
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
}
|
|
|
break;
|
|
|
case 2:
|
|
@@ -176,27 +173,18 @@ public class TestClient
|
|
|
void writeSameEntryBatch(byte[] data, int times) throws InterruptedException{
|
|
|
start = System.currentTimeMillis();
|
|
|
int count = times;
|
|
|
- System.out.println("Data: " + new String(data) + ", " + data.length);
|
|
|
+ LOG.debug("Data: " + new String(data) + ", " + data.length);
|
|
|
while(count-- > 0){
|
|
|
x.asyncAddEntry(lh, data, this, this.getFreshEntryId(2));
|
|
|
}
|
|
|
- System.out.println("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
|
|
|
+ LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
|
|
|
synchronized (map) {
|
|
|
if(map.size() != 0)
|
|
|
map.wait();
|
|
|
}
|
|
|
- System.out.println("Finished processing in ms: " + (System.currentTimeMillis() - start));
|
|
|
- /*Integer mon = Integer.valueOf(0);
|
|
|
- synchronized(mon){
|
|
|
-
|
|
|
- try{
|
|
|
- x.asyncReadEntries(lh, 0, times - 1, this, mon);
|
|
|
- mon.wait();
|
|
|
- } catch (BKException e){
|
|
|
- LOG.error(e);
|
|
|
- }
|
|
|
- } */
|
|
|
- LOG.error("Ended computation");
|
|
|
+ LOG.debug("Finished processing in ms: " + (System.currentTimeMillis() - start));
|
|
|
+
|
|
|
+ LOG.debug("Ended computation");
|
|
|
}
|
|
|
|
|
|
void writeConsecutiveEntriesBatch(int times) throws InterruptedException{
|
|
@@ -210,12 +198,12 @@ public class TestClient
|
|
|
write[1] = (byte) k;
|
|
|
x.asyncAddEntry(lh, write, this, this.getFreshEntryId(2));
|
|
|
}
|
|
|
- System.out.println("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
|
|
|
+ LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start));
|
|
|
synchronized (map) {
|
|
|
if(map.size() != 0)
|
|
|
map.wait();
|
|
|
}
|
|
|
- System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
|
|
|
+ LOG.debug("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
|
|
|
|
|
|
Integer mon = Integer.valueOf(0);
|
|
|
synchronized(mon){
|
|
@@ -231,7 +219,7 @@ public class TestClient
|
|
|
|
|
|
void writeSameEntryBatchFS(byte[] data, int times) {
|
|
|
int count = times;
|
|
|
- System.out.println("Data: " + data.length + ", " + times);
|
|
|
+ LOG.debug("Data: " + data.length + ", " + times);
|
|
|
try{
|
|
|
start = System.currentTimeMillis();
|
|
|
while(count-- > 0){
|
|
@@ -239,28 +227,23 @@ public class TestClient
|
|
|
fStreamLocal.write(data);
|
|
|
fStream.flush();
|
|
|
}
|
|
|
- //fStream.flush();
|
|
|
fStream.close();
|
|
|
System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start));
|
|
|
} catch(IOException e){
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
+
|
|
|
public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
|
|
|
this.removeEntryId((Integer) ctx);
|
|
|
- //if((entryId - lastId) > 1) LOG.error("Gap: " + entryId + ", " + lastId);
|
|
|
- //lastId = entryId;
|
|
|
- //if(entryId > 199000) LOG.error("Add completed: " + ledgerId + ", " + entryId + ", " + map.toString());
|
|
|
- //System.out.println((System.currentTimeMillis() - start));
|
|
|
}
|
|
|
- @Override
|
|
|
+
|
|
|
public void readComplete(int rc, long ledgerId, LedgerSequence seq, Object ctx){
|
|
|
System.out.println("Read callback: " + rc);
|
|
|
while(seq.hasMoreElements()){
|
|
|
LedgerEntry le = seq.nextElement();
|
|
|
- System.out.println(new String(le.getEntry()));
|
|
|
+ LOG.debug(new String(le.getEntry()));
|
|
|
}
|
|
|
synchronized(ctx){
|
|
|
ctx.notify();
|