|
@@ -0,0 +1,418 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.examples;
|
|
|
+
|
|
|
+import java.io.DataInput;
|
|
|
+import java.io.DataOutput;
|
|
|
+import java.io.IOException;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.DriverManager;
|
|
|
+import java.sql.PreparedStatement;
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.sql.Statement;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Random;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configured;
|
|
|
+import org.apache.hadoop.io.LongWritable;
|
|
|
+import org.apache.hadoop.io.NullWritable;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.mapred.JobClient;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.MapReduceBase;
|
|
|
+import org.apache.hadoop.mapred.Mapper;
|
|
|
+import org.apache.hadoop.mapred.OutputCollector;
|
|
|
+import org.apache.hadoop.mapred.Reducer;
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.lib.LongSumReducer;
|
|
|
+import org.apache.hadoop.mapred.lib.db.DBConfiguration;
|
|
|
+import org.apache.hadoop.mapred.lib.db.DBInputFormat;
|
|
|
+import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
|
|
|
+import org.apache.hadoop.mapred.lib.db.DBWritable;
|
|
|
+import org.apache.hadoop.util.Tool;
|
|
|
+import org.apache.hadoop.util.ToolRunner;
|
|
|
+import org.hsqldb.Server;
|
|
|
+
|
|
|
+/**
|
|
|
+ * This is a demonstrative program, which uses DBInputFormat for reading
|
|
|
+ * the input data from a database, and DBOutputFormat for writing the data
|
|
|
+ * to the database.
|
|
|
+ * <br>
|
|
|
+ * The Program first creates the necessary tables, populates the input table
|
|
|
+ * and runs the mapred job.
|
|
|
+ * <br>
|
|
|
+ * The input data is a mini access log, with a <code><url,referrer,time>
|
|
|
+ * </code> schema.The output is the number of pageviews of each url in the log,
|
|
|
+ * having the schema <code><url,pageview></code>.
|
|
|
+ *
|
|
|
+ * When called with no arguments the program starts a local HSQLDB server, and
|
|
|
+ * uses this database for storing/retrieving the data.
|
|
|
+ */
|
|
|
+public class DBCountPageView extends Configured implements Tool {
|
|
|
+
|
|
|
+ private static final Log LOG = LogFactory.getLog(DBCountPageView.class);
|
|
|
+
|
|
|
+ private Connection connection;
|
|
|
+ private boolean initialized = false;
|
|
|
+
|
|
|
+ private static final String[] AccessFieldNames = {"url", "referrer", "time"};
|
|
|
+ private static final String[] PageviewFieldNames = {"url", "pageview"};
|
|
|
+
|
|
|
+ private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
|
|
|
+ private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
|
|
|
+
|
|
|
+ private Server server;
|
|
|
+
|
|
|
+ private void startHsqldbServer() {
|
|
|
+ server = new Server();
|
|
|
+ server.setDatabasePath(0,
|
|
|
+ System.getProperty("test.build.data",".") + "/URLAccess");
|
|
|
+ server.setDatabaseName(0, "URLAccess");
|
|
|
+ server.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createConnection(String driverClassName
|
|
|
+ , String url) throws Exception {
|
|
|
+
|
|
|
+ Class.forName(driverClassName);
|
|
|
+ connection = DriverManager.getConnection(url);
|
|
|
+ connection.setAutoCommit(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void shutdown() throws SQLException {
|
|
|
+ connection.commit();
|
|
|
+ connection.close();
|
|
|
+
|
|
|
+ if(server != null) {
|
|
|
+ server.stop();
|
|
|
+ server.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initialize(String driverClassName, String url)
|
|
|
+ throws Exception {
|
|
|
+ if(!this.initialized) {
|
|
|
+ if(driverClassName.equals(DRIVER_CLASS)) {
|
|
|
+ startHsqldbServer();
|
|
|
+ }
|
|
|
+ createConnection(driverClassName, url);
|
|
|
+ dropTables();
|
|
|
+ createTables();
|
|
|
+ populateAccess();
|
|
|
+ this.initialized = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void dropTables() {
|
|
|
+ String dropAccess = "DROP TABLE Access";
|
|
|
+ String dropPageview = "DROP TABLE Pageview";
|
|
|
+
|
|
|
+ try {
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+ st.executeUpdate(dropAccess);
|
|
|
+ st.executeUpdate(dropPageview);
|
|
|
+ connection.commit();
|
|
|
+ st.close();
|
|
|
+ }catch (SQLException ex) {
|
|
|
+ //ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createTables() throws SQLException {
|
|
|
+
|
|
|
+ String createAccess =
|
|
|
+ "CREATE TABLE " +
|
|
|
+ "Access(url VARCHAR(100) NOT NULL," +
|
|
|
+ " referrer VARCHAR(100)," +
|
|
|
+ " time BIGINT NOT NULL, " +
|
|
|
+ " PRIMARY KEY (url, time))";
|
|
|
+
|
|
|
+ String createPageview =
|
|
|
+ "CREATE TABLE " +
|
|
|
+ "Pageview(url VARCHAR(100) NOT NULL," +
|
|
|
+ " pageview BIGINT NOT NULL, " +
|
|
|
+ " PRIMARY KEY (url))";
|
|
|
+
|
|
|
+ Statement st = connection.createStatement();
|
|
|
+ try {
|
|
|
+ st.executeUpdate(createAccess);
|
|
|
+ st.executeUpdate(createPageview);
|
|
|
+ connection.commit();
|
|
|
+ } finally {
|
|
|
+ st.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Populates the Access table with generated records.
|
|
|
+ */
|
|
|
+ private void populateAccess() throws SQLException {
|
|
|
+
|
|
|
+ PreparedStatement statement = null ;
|
|
|
+ try {
|
|
|
+ statement = connection.prepareStatement(
|
|
|
+ "INSERT INTO Access(url, referrer, time)" +
|
|
|
+ " VALUES (?, ?, ?)");
|
|
|
+
|
|
|
+ Random random = new Random();
|
|
|
+
|
|
|
+ int time = random.nextInt(50) + 50;
|
|
|
+
|
|
|
+ final int PROBABILITY_PRECISION = 100; // 1 / 100
|
|
|
+ final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
|
|
|
+
|
|
|
+
|
|
|
+ //Pages in the site :
|
|
|
+ String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
|
|
|
+ //linkMatrix[i] is the array of pages(indexes) that page_i links to.
|
|
|
+ int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
|
|
|
+ {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
|
|
|
+
|
|
|
+ //a mini model of user browsing a la pagerank
|
|
|
+ int currentPage = random.nextInt(pages.length);
|
|
|
+ String referrer = null;
|
|
|
+
|
|
|
+ for(int i=0; i<time; i++) {
|
|
|
+
|
|
|
+ statement.setString(1, pages[currentPage]);
|
|
|
+ statement.setString(2, referrer);
|
|
|
+ statement.setLong(3, i);
|
|
|
+ statement.execute();
|
|
|
+
|
|
|
+ int action = random.nextInt(PROBABILITY_PRECISION);
|
|
|
+
|
|
|
+ //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
|
|
|
+ if(action < NEW_PAGE_PROBABILITY) {
|
|
|
+ currentPage = random.nextInt(pages.length); // a random page
|
|
|
+ referrer = null;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ referrer = pages[currentPage];
|
|
|
+ action = random.nextInt(linkMatrix[currentPage].length);
|
|
|
+ currentPage = linkMatrix[currentPage][action];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ connection.commit();
|
|
|
+
|
|
|
+ }catch (SQLException ex) {
|
|
|
+ connection.rollback();
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ if(statement != null) {
|
|
|
+ statement.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**Verifies the results are correct */
|
|
|
+ private boolean verify() throws SQLException {
|
|
|
+ //check total num pageview
|
|
|
+ String countAccessQuery = "SELECT COUNT(*) FROM Access";
|
|
|
+ String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
|
|
|
+ Statement st = null;
|
|
|
+ ResultSet rs = null;
|
|
|
+ try {
|
|
|
+ st = connection.createStatement();
|
|
|
+ rs = st.executeQuery(countAccessQuery);
|
|
|
+ rs.next();
|
|
|
+ long totalPageview = rs.getLong(1);
|
|
|
+
|
|
|
+ rs = st.executeQuery(sumPageviewQuery);
|
|
|
+ rs.next();
|
|
|
+ long sumPageview = rs.getLong(1);
|
|
|
+
|
|
|
+ LOG.info("totalPageview=" + totalPageview);
|
|
|
+ LOG.info("sumPageview=" + sumPageview);
|
|
|
+
|
|
|
+ return totalPageview == sumPageview && totalPageview != 0;
|
|
|
+ }finally {
|
|
|
+ if(st != null)
|
|
|
+ st.close();
|
|
|
+ if(rs != null)
|
|
|
+ rs.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Holds a <url, referrer, time > tuple */
|
|
|
+ static class AccessRecord implements Writable, DBWritable {
|
|
|
+ String url;
|
|
|
+ String referrer;
|
|
|
+ long time;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ this.url = Text.readString(in);
|
|
|
+ this.referrer = Text.readString(in);
|
|
|
+ this.time = in.readLong();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ Text.writeString(out, url);
|
|
|
+ Text.writeString(out, referrer);
|
|
|
+ out.writeLong(time);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(ResultSet resultSet) throws SQLException {
|
|
|
+ this.url = resultSet.getString(1);
|
|
|
+ this.referrer = resultSet.getString(2);
|
|
|
+ this.time = resultSet.getLong(3);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void write(PreparedStatement statement) throws SQLException {
|
|
|
+ statement.setString(1, url);
|
|
|
+ statement.setString(2, referrer);
|
|
|
+ statement.setLong(3, time);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /** Holds a <url, pageview > tuple */
|
|
|
+ static class PageviewRecord implements Writable, DBWritable {
|
|
|
+ String url;
|
|
|
+ long pageview;
|
|
|
+
|
|
|
+ public PageviewRecord(String url, long pageview) {
|
|
|
+ this.url = url;
|
|
|
+ this.pageview = pageview;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readFields(DataInput in) throws IOException {
|
|
|
+ this.url = Text.readString(in);
|
|
|
+ this.pageview = in.readLong();
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void write(DataOutput out) throws IOException {
|
|
|
+ Text.writeString(out, url);
|
|
|
+ out.writeLong(pageview);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void readFields(ResultSet resultSet) throws SQLException {
|
|
|
+ this.url = resultSet.getString(1);
|
|
|
+ this.pageview = resultSet.getLong(2);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void write(PreparedStatement statement) throws SQLException {
|
|
|
+ statement.setString(1, url);
|
|
|
+ statement.setLong(2, pageview);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return url + " " + pageview;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mapper extracts URLs from the AccessRecord (tuples from db),
|
|
|
+ * and emits a <url,1> pair for each access record.
|
|
|
+ */
|
|
|
+ static class PageviewMapper extends MapReduceBase
|
|
|
+ implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
|
|
|
+
|
|
|
+ LongWritable ONE = new LongWritable(1L);
|
|
|
+ @Override
|
|
|
+ public void map(LongWritable key, AccessRecord value,
|
|
|
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ Text oKey = new Text(value.url);
|
|
|
+ output.collect(oKey, ONE);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reducer sums up the pageviews and emits a PageviewRecord,
|
|
|
+ * which will correspond to one tuple in the db.
|
|
|
+ */
|
|
|
+ static class PageviewReducer extends MapReduceBase
|
|
|
+ implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
|
|
|
+
|
|
|
+ NullWritable n = NullWritable.get();
|
|
|
+ @Override
|
|
|
+ public void reduce(Text key, Iterator<LongWritable> values,
|
|
|
+ OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ long sum = 0L;
|
|
|
+ while(values.hasNext()) {
|
|
|
+ sum += values.next().get();
|
|
|
+ }
|
|
|
+ output.collect(new PageviewRecord(key.toString(), sum), n);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ //Usage DBCountPageView [driverClass dburl]
|
|
|
+ public int run(String[] args) throws Exception {
|
|
|
+
|
|
|
+ String driverClassName = DRIVER_CLASS;
|
|
|
+ String url = DB_URL;
|
|
|
+
|
|
|
+ if(args.length > 1) {
|
|
|
+ driverClassName = args[0];
|
|
|
+ url = args[1];
|
|
|
+ }
|
|
|
+
|
|
|
+ initialize(driverClassName, url);
|
|
|
+
|
|
|
+ JobConf job = new JobConf(getConf(), DBCountPageView.class);
|
|
|
+
|
|
|
+ job.setJobName("Count Pageviews of URLs");
|
|
|
+
|
|
|
+ job.setMapperClass(PageviewMapper.class);
|
|
|
+ job.setCombinerClass(LongSumReducer.class);
|
|
|
+ job.setReducerClass(PageviewReducer.class);
|
|
|
+
|
|
|
+ DBConfiguration.configureDB(job, driverClassName, url);
|
|
|
+
|
|
|
+ DBInputFormat.setInput(job, AccessRecord.class, "Access"
|
|
|
+ , null, "url", AccessFieldNames);
|
|
|
+
|
|
|
+ DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
|
|
|
+
|
|
|
+ job.setMapOutputKeyClass(Text.class);
|
|
|
+ job.setMapOutputValueClass(LongWritable.class);
|
|
|
+
|
|
|
+ job.setOutputKeyClass(PageviewRecord.class);
|
|
|
+ job.setOutputValueClass(NullWritable.class);
|
|
|
+
|
|
|
+ try {
|
|
|
+ JobClient.runJob(job);
|
|
|
+
|
|
|
+ boolean correct = verify();
|
|
|
+ if(!correct) {
|
|
|
+ throw new RuntimeException("Evaluation was not correct!");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ shutdown();
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ int ret = ToolRunner.run(new DBCountPageView(), args);
|
|
|
+ System.exit(ret);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|