+ synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
+ openNewChannel();
+ }
+ ByteBuffer buff = ByteBuffer.allocate(4);
+ buff.putInt(entry.remaining());
+ buff.flip();
+ logChannel.write(buff);
+ long pos = logChannel.position();
+ logChannel.write(entry);
+ //logChannel.flush(false);
+ somethingWritten = true;
+ return (logId << 32L) | pos;
+ }
+
+ byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
+ long entryLogId = location >> 32L;
+ long pos = location & 0xffffffffL;
+ ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ pos -= 4; // we want to get the ledgerId and length to check
+ BufferedChannel fc;
+ try {
+ fc = getChannelForLogId(entryLogId);
+ } catch (FileNotFoundException e) {
+ FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
+ newe.setStackTrace(e.getStackTrace());
+ throw newe;
+ }
+ if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ throw new IOException("Short read from entrylog " + entryLogId);
+ }
+ pos += 4;
+ sizeBuff.flip();
+ int entrySize = sizeBuff.getInt();
+ // entrySize does not include the ledgerId
+ if (entrySize > 1024*1024) {
+ LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
+
+ }
+ byte data[] = new byte[entrySize];
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int rc = fc.read(buff, pos);
+ if ( rc != data.length) {
+ throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
+ }
+ buff.flip();
+ long thisLedgerId = buff.getLong();
+ if (thisLedgerId != ledgerId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
+ }
+ long thisEntryId = buff.getLong();
+ if (thisEntryId != entryId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
+ public void setOffset(long offset, int position) {
+ checkPage();
+ version++;
+ this.clean = false;
+ page.putLong(position, offset);
+ }
+ public long getOffset(int position) {
+ checkPage();
+ return page.getLong(position);
+ }
+ static final byte zeroPage[] = new byte[64*1024];
+ public void zeroPage() {
+ checkPage();
+ page.clear();
+ page.put(zeroPage, 0, page.remaining());
+ clean = true;
+ }
+ public void readPage(FileInfo fi) throws IOException {
+ checkPage();
+ page.clear();
+ while(page.remaining() != 0) {
+ if (fi.read(page, getFirstEntry()*8) <= 0) {
+ throw new IOException("Short page read of ledger " + getLedger() + " tried to get " + page.capacity() + " from position " + getFirstEntry()*8 + " still need " + page.remaining());
+ }
+ }
+ clean = true;
+ }
+ public ByteBuffer getPageToWrite() {
+ checkPage();
+ page.clear();
+ return page;
+ }
+ void setLedger(long ledger) {
+ this.ledger = ledger;
+ }
+ long getLedger() {
+ return ledger;
+ }
+ int getVersion() {
+ return version;
+ }
+ void setFirstEntry(long firstEntry) {
+ if (firstEntry % ENTRIES_PER_PAGES != 0) {
+ throw new IllegalArgumentException(firstEntry + " is not a multiple of " + ENTRIES_PER_PAGES);
+ }
+ this.firstEntry = firstEntry;
+ }
+ long getFirstEntry() {
+ return firstEntry;
+ }
+ public boolean inUse() {
+ return useCount > 0;
+ }
+ public long getLastEntry() {
+ for(int i = ENTRIES_PER_PAGES - 1; i >= 0; i--) {
this.bookie = new Bookie(journalDirectory, ledgerDirectories);
this.bookie = new Bookie(journalDirectory, ledgerDirectories);
}
}
+
public void start() throws IOException {
public void start() throws IOException {
nioServerFactory = new NIOServerFactory(port, this);
nioServerFactory = new NIOServerFactory(port, this);
}
}
+
public void shutdown() throws InterruptedException {
public void shutdown() throws InterruptedException {
down = true;
down = true;
nioServerFactory.shutdown();
nioServerFactory.shutdown();
bookie.shutdown();
bookie.shutdown();
}
}
- public boolean isDown(){
+
+ public boolean isDown() {
return down;
return down;
}
}
+
public void join() throws InterruptedException {
public void join() throws InterruptedException {
nioServerFactory.join();
nioServerFactory.join();
}
}
+
/**
/**
* @param args
* @param args
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException
+ * @throws InterruptedException
*/
*/
public static void main(String[] args) throws IOException, InterruptedException {
public static void main(String[] args) throws IOException, InterruptedException {
- if (args.length < 3) {
+ if (args.length < 3) {
System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
return;
return;
}
}
int port = Integer.parseInt(args[0]);
int port = Integer.parseInt(args[0]);
File journalDirectory = new File(args[1]);
File journalDirectory = new File(args[1]);
- File ledgerDirectory[] = new File[args.length-2];
+ File ledgerDirectory[] = new File[args.length-2];
StringBuilder sb = new StringBuilder();
StringBuilder sb = new StringBuilder();
- for(int i = 0; i < ledgerDirectory.length; i++) {
- ledgerDirectory[i] = new File(args[i+2]);
+ for(int i = 0; i < ledgerDirectory.length; i++) {
+ ledgerDirectory[i] = new File(args[i+2]);
if (i != 0) {
if (i != 0) {
sb.append(',');
sb.append(',');
}
}
sb.append(ledgerDirectory[i]);
sb.append(ledgerDirectory[i]);
}
}
- String hello = String.format("Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.", port, journalDirectory, sb);
+ String hello = String.format(
+ "Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.",
+ port, journalDirectory, sb);
LOG.info(hello);
LOG.info(hello);
BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
bs.start();
bs.start();
bs.join();
bs.join();
}
}
-
public void processPacket(ByteBuffer packet, Cnxn src) {
public void processPacket(ByteBuffer packet, Cnxn src) {
int type = packet.getInt();
int type = packet.getInt();
- switch(type) {
+ switch(type) {
case BookieProtocol.ADDENTRY:
case BookieProtocol.ADDENTRY:
try {
try {
byte[] masterKey = new byte[20];
byte[] masterKey = new byte[20];
packet.get(masterKey, 0, 20);
packet.get(masterKey, 0, 20);
- //LOG.debug("Master key: " + new String(masterKey));
+ //LOG.debug("Master key: " + new String(masterKey));