|
@@ -24,8 +24,10 @@ import org.apache.commons.cli.OptionBuilder;
|
|
|
import org.apache.commons.cli.Options;
|
|
|
import org.apache.commons.cli.ParseException;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
+import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Pipeline;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
@@ -50,9 +52,15 @@ import java.util.Set;
|
|
|
|
|
|
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
|
|
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
|
|
|
+import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB;
|
|
|
|
|
|
/**
|
|
|
* This is the CLI that can be use to convert a levelDB into a sqlite DB file.
|
|
|
+ *
|
|
|
+ * NOTE: user should use this CLI in an offline fashion. Namely, this should not
|
|
|
+ * be used to convert a levelDB that is currently being used by Ozone. Instead,
|
|
|
+ * this should be used to debug and diagnosis closed levelDB instances.
|
|
|
+ *
|
|
|
*/
|
|
|
public class SQLCLI extends Configured implements Tool {
|
|
|
|
|
@@ -65,10 +73,10 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
"CREATE TABLE containerInfo (" +
|
|
|
"containerName TEXT PRIMARY KEY NOT NULL, " +
|
|
|
"leaderUUID TEXT NOT NULL)";
|
|
|
- private static final String CREATE_CONTAINER_MACHINE =
|
|
|
+ private static final String CREATE_CONTAINER_MEMBERS =
|
|
|
"CREATE TABLE containerMembers (" +
|
|
|
- "containerName INTEGER NOT NULL, " +
|
|
|
- "datanodeUUID INTEGER NOT NULL," +
|
|
|
+ "containerName TEXT NOT NULL, " +
|
|
|
+ "datanodeUUID TEXT NOT NULL," +
|
|
|
"PRIMARY KEY(containerName, datanodeUUID));";
|
|
|
private static final String CREATE_DATANODE_INFO =
|
|
|
"CREATE TABLE datanodeInfo (" +
|
|
@@ -98,6 +106,16 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
private static final String INSERT_BLOCK_CONTAINER =
|
|
|
"INSERT INTO blockContainer (blockKey, containerName) " +
|
|
|
"VALUES (\"%s\", \"%s\")";
|
|
|
+ // for nodepool.db
|
|
|
+ private static final String CREATE_NODE_POOL =
|
|
|
+ "CREATE TABLE nodePool (" +
|
|
|
+ "datanodeUUID TEXT NOT NULL," +
|
|
|
+ "poolName TEXT NOT NULL," +
|
|
|
+ "PRIMARY KEY(datanodeUUID, poolName))";
|
|
|
+ private static final String INSERT_NODE_POOL =
|
|
|
+ "INSERT INTO nodePool (datanodeUUID, poolName) " +
|
|
|
+ "VALUES (\"%s\", \"%s\")";
|
|
|
+ // and reuse CREATE_DATANODE_INFO and INSERT_DATANODE_INFO
|
|
|
|
|
|
|
|
|
private static final Logger LOG =
|
|
@@ -170,6 +188,9 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
} else if (dbName.toString().equals(BLOCK_DB)) {
|
|
|
LOG.info("Converting block DB");
|
|
|
convertBlockDB(dbPath, outPath);
|
|
|
+ } else if (dbName.toString().equals(NODEPOOL_DB)) {
|
|
|
+ LOG.info("Converting node pool DB");
|
|
|
+ convertNodePoolDB(dbPath, outPath);
|
|
|
} else {
|
|
|
LOG.error("Unrecognized db name {}", dbName);
|
|
|
}
|
|
@@ -183,10 +204,6 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
return DriverManager.getConnection(connectPath);
|
|
|
}
|
|
|
|
|
|
- private void closeDB(Connection conn) throws SQLException {
|
|
|
- conn.close();
|
|
|
- }
|
|
|
-
|
|
|
private void executeSQL(Connection conn, String sql) throws SQLException {
|
|
|
try (Statement stmt = conn.createStatement()) {
|
|
|
stmt.executeUpdate(sql);
|
|
@@ -226,24 +243,22 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
LOG.info("Create tables for sql container db.");
|
|
|
File dbFile = dbPath.toFile();
|
|
|
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
|
|
|
- LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
|
|
|
+ try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
|
|
|
+ Connection conn = connectDB(outPath.toString())) {
|
|
|
+ executeSQL(conn, CREATE_CONTAINER_INFO);
|
|
|
+ executeSQL(conn, CREATE_CONTAINER_MEMBERS);
|
|
|
+ executeSQL(conn, CREATE_DATANODE_INFO);
|
|
|
|
|
|
- Connection conn = connectDB(outPath.toString());
|
|
|
- executeSQL(conn, CREATE_CONTAINER_INFO);
|
|
|
- executeSQL(conn, CREATE_CONTAINER_MACHINE);
|
|
|
- executeSQL(conn, CREATE_DATANODE_INFO);
|
|
|
-
|
|
|
- DBIterator iter = dbStore.getIterator();
|
|
|
- iter.seekToFirst();
|
|
|
- HashSet<String> uuidChecked = new HashSet<>();
|
|
|
- while(iter.hasNext()) {
|
|
|
- Map.Entry<byte[], byte[]> entry = iter.next();
|
|
|
- String containerName = new String(entry.getKey(), encoding);
|
|
|
- Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
|
|
|
- insertContainerDB(conn, containerName, pipeline, uuidChecked);
|
|
|
+ DBIterator iter = dbStore.getIterator();
|
|
|
+ iter.seekToFirst();
|
|
|
+ HashSet<String> uuidChecked = new HashSet<>();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Map.Entry<byte[], byte[]> entry = iter.next();
|
|
|
+ String containerName = new String(entry.getKey(), encoding);
|
|
|
+ Pipeline pipeline = Pipeline.parseFrom(entry.getValue());
|
|
|
+ insertContainerDB(conn, containerName, pipeline, uuidChecked);
|
|
|
+ }
|
|
|
}
|
|
|
- closeDB(conn);
|
|
|
- dbStore.close();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -304,23 +319,79 @@ public class SQLCLI extends Configured implements Tool {
|
|
|
LOG.info("Create tables for sql block db.");
|
|
|
File dbFile = dbPath.toFile();
|
|
|
org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
|
|
|
- LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
|
|
|
+ try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
|
|
|
+ Connection conn = connectDB(outPath.toString())) {
|
|
|
+ executeSQL(conn, CREATE_BLOCK_CONTAINER);
|
|
|
|
|
|
- Connection conn = connectDB(outPath.toString());
|
|
|
- executeSQL(conn, CREATE_BLOCK_CONTAINER);
|
|
|
+ DBIterator iter = dbStore.getIterator();
|
|
|
+ iter.seekToFirst();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Map.Entry<byte[], byte[]> entry = iter.next();
|
|
|
+ String blockKey = DFSUtilClient.bytes2String(entry.getKey());
|
|
|
+ String containerName = DFSUtilClient.bytes2String(entry.getValue());
|
|
|
+ String insertBlockContainer = String.format(
|
|
|
+ INSERT_BLOCK_CONTAINER, blockKey, containerName);
|
|
|
+ executeSQL(conn, insertBlockContainer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- DBIterator iter = dbStore.getIterator();
|
|
|
- iter.seekToFirst();
|
|
|
- while (iter.hasNext()) {
|
|
|
- Map.Entry<byte[], byte[]> entry = iter.next();
|
|
|
- String blockKey = DFSUtilClient.bytes2String(entry.getKey());
|
|
|
- String containerName = DFSUtilClient.bytes2String(entry.getValue());
|
|
|
- String insertBlockContainer = String.format(
|
|
|
- INSERT_BLOCK_CONTAINER, blockKey, containerName);
|
|
|
- executeSQL(conn, insertBlockContainer);
|
|
|
+ /**
|
|
|
+ * Converts nodePool.db to sqlite. The schema of sql db:
|
|
|
+ * two tables, nodePool and datanodeInfo (the same datanode Info as for
|
|
|
+ * container.db).
|
|
|
+ *
|
|
|
+ * nodePool
|
|
|
+ * ---------------------------------------------------------
|
|
|
+ * datanodeUUID* | poolName*
|
|
|
+ * ---------------------------------------------------------
|
|
|
+ *
|
|
|
+ * datanodeInfo:
|
|
|
+ * ---------------------------------------------------------
|
|
|
+ * hostname | datanodeUUid* | xferPort | infoPort | ipcPort
|
|
|
+ * ---------------------------------------------------------
|
|
|
+ *
|
|
|
+ * --------------------------------
|
|
|
+ * | infoSecurePort | containerPort
|
|
|
+ * --------------------------------
|
|
|
+ *
|
|
|
+ * @param dbPath path to container db.
|
|
|
+ * @param outPath path to output sqlite
|
|
|
+ * @throws IOException throws exception.
|
|
|
+ */
|
|
|
+ private void convertNodePoolDB(Path dbPath, Path outPath) throws Exception {
|
|
|
+ LOG.info("Create table for sql node pool db.");
|
|
|
+ File dbFile = dbPath.toFile();
|
|
|
+ org.iq80.leveldb.Options dbOptions = new org.iq80.leveldb.Options();
|
|
|
+ try (LevelDBStore dbStore = new LevelDBStore(dbFile, dbOptions);
|
|
|
+ Connection conn = connectDB(outPath.toString())) {
|
|
|
+ executeSQL(conn, CREATE_NODE_POOL);
|
|
|
+ executeSQL(conn, CREATE_DATANODE_INFO);
|
|
|
+
|
|
|
+ DBIterator iter = dbStore.getIterator();
|
|
|
+ iter.seekToFirst();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Map.Entry<byte[], byte[]> entry = iter.next();
|
|
|
+ DatanodeID nodeId = DatanodeID.getFromProtoBuf(
|
|
|
+ HdfsProtos.DatanodeIDProto.PARSER.parseFrom(entry.getKey()));
|
|
|
+ String blockPool = DFSUtil.bytes2String(entry.getValue());
|
|
|
+ insertNodePoolDB(conn, blockPool, nodeId);
|
|
|
+ }
|
|
|
}
|
|
|
- closeDB(conn);
|
|
|
- dbStore.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void insertNodePoolDB(Connection conn, String blockPool,
|
|
|
+ DatanodeID datanodeID) throws SQLException {
|
|
|
+ String insertNodePool = String.format(INSERT_NODE_POOL,
|
|
|
+ datanodeID.getDatanodeUuid(), blockPool);
|
|
|
+ executeSQL(conn, insertNodePool);
|
|
|
+
|
|
|
+ String insertDatanodeID = String.format(INSERT_DATANODE_INFO,
|
|
|
+ datanodeID.getHostName(), datanodeID.getDatanodeUuid(),
|
|
|
+ datanodeID.getIpAddr(), datanodeID.getXferPort(),
|
|
|
+ datanodeID.getInfoPort(), datanodeID.getIpcPort(),
|
|
|
+ datanodeID.getInfoSecurePort(), datanodeID.getContainerPort());
|
|
|
+ executeSQL(conn, insertDatanodeID);
|
|
|
}
|
|
|
|
|
|
private CommandLine parseArgs(String[] argv)
|