Commit 597e715a authored by Karsten Loesing's avatar Karsten Loesing
Browse files

Add webstats module and webstats-tb graph.

Implements #21236.
parent 8bf149b0
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
/stats/*.csv
/RData/*.RData
+65 −0
Original line number Diff line number Diff line
<project default="run" name="webstats" basedir=".">

  <property name="webstats-sources" value="src/main/java"/>
  <property name="webstats-tests" value="src/test/java"/>
  <property name="webstats-libs" value="../../shared/lib"/>
  <property name="webstats-classes" value="classes"/>
  <path id="classpath">
    <pathelement path="${webstats-classes}"/>
    <fileset dir="${webstats-libs}">
      <include name="commons-compress-1.9.jar"/>
      <include name="postgresql-jdbc3-9.2.jar"/>
      <include name="xz-1.5.jar"/>
      <include name="junit4-4.11.jar"/>
      <include name="slf4j-api-1.7.7.jar"/>
      <include name="logback-core-1.1.2.jar"/>
      <include name="logback-classic-1.1.2.jar"/>
    </fileset>
  </path>

  <target name="compile">
    <mkdir dir="${webstats-classes}"/>
    <javac destdir="${webstats-classes}"
           srcdir="${webstats-sources}"
           source="1.7"
           target="1.7"
           debug="true"
           deprecation="true"
           optimize="false"
           failonerror="true"
           includeantruntime="false">
      <classpath refid="classpath"/>
    </javac>
  </target>

  <target name="test" depends="compile">
    <javac destdir="${webstats-classes}"
           srcdir="${webstats-tests}"
           source="1.7"
           target="1.7"
           debug="true"
           deprecation="true"
           optimize="false"
           failonerror="true"
           includeantruntime="false">
      <classpath refid="classpath"/>
    </javac>
    <junit fork="true" haltonfailure="true" printsummary="off">
      <classpath refid="classpath"/>
      <formatter type="plain" usefile="false"/>
      <batchtest>
        <fileset dir="${webstats-classes}"
                 includes="**/*Test.class"/>
      </batchtest>
    </junit>
  </target>

  <target name="run" depends="compile">
    <java fork="true"
          maxmemory="1g"
          classname="org.torproject.metrics.webstats.Main">
      <classpath refid="classpath"/>
    </java>
  </target>
</project>
+157 −0
Original line number Diff line number Diff line
-- Copyright 2016--2017 The Tor Project
-- See LICENSE for licensing information

CREATE TYPE method AS ENUM ('GET', 'HEAD');

CREATE TABLE files (
  file_id SERIAL PRIMARY KEY,
  url CHARACTER VARYING(2048) UNIQUE NOT NULL,
  server CHARACTER VARYING(32) NOT NULL,
  site CHARACTER VARYING(128) NOT NULL,
  log_date DATE NOT NULL,
  UNIQUE (server, site, log_date)
);

CREATE TABLE resources (
  resource_id SERIAL PRIMARY KEY,
  resource_string CHARACTER VARYING(2048) UNIQUE NOT NULL
);

CREATE TABLE requests (
  file_id INTEGER REFERENCES files (file_id) NOT NULL,
  method METHOD NOT NULL,
  resource_id INTEGER REFERENCES resources (resource_id) NOT NULL,
  response_code SMALLINT NOT NULL,
  count INTEGER NOT NULL,
  UNIQUE (file_id, method, resource_id, response_code)
);

CREATE OR REPLACE VIEW webstats AS
  SELECT log_date,
    CASE WHEN resource_string LIKE '%.asc' THEN 'tbsd'
      ELSE 'tbid' END AS request_type,
    CASE WHEN resource_string LIKE '%.exe%' THEN 'w'
      WHEN resource_string LIKE '%.dmg%' THEN 'm'
      WHEN resource_string LIKE '%.tar.xz%' THEN 'l'
      ELSE 'o' END AS platform,
    CASE WHEN resource_string LIKE '%-hardened%' THEN 'h'
      WHEN resource_string LIKE '%/%.%a%/%' THEN 'a'
      ELSE 'r' END AS channel,
    SUBSTRING(resource_string FROM '.*_([^\.]*)\..*') AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE (resource_string LIKE '%/torbrowser/%.exe'
    OR resource_string LIKE '%/torbrowser/%.dmg'
    OR resource_string LIKE '%/torbrowser/%.tar.xz'
    OR resource_string LIKE '%/torbrowser/%.exe.asc'
    OR resource_string LIKE '%/torbrowser/%.dmg.asc'
    OR resource_string LIKE '%/torbrowser/%.tar.xz.asc')
  AND response_code = 200
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'tbup' AS request_type,
    CASE WHEN resource_string LIKE '%/WINNT%' THEN 'w'
      WHEN resource_string LIKE '%/Darwin%' THEN 'm'
      ELSE 'l' END AS platform,
    CASE WHEN resource_string LIKE '%/hardened/%' THEN 'h'
      WHEN resource_string LIKE '%/alpha/%' THEN 'a'
      WHEN resource_string LIKE '%/release/%' THEN 'r'
      ELSE 'o' END AS channel,
    SUBSTRING(resource_string FROM '.*/([^/\?]*)\??') AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE resource_string LIKE '%/torbrowser/update_2/%'
  AND resource_string NOT LIKE '%.xml'
  AND response_code = 200
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'tbur' AS request_type,
    CASE WHEN resource_string LIKE '%-win32-%' THEN 'w'
      WHEN resource_string LIKE '%-osx%' THEN 'm'
      ELSE 'l' END AS platform,
    CASE WHEN resource_string LIKE '%-hardened%' THEN 'h'
      WHEN resource_string LIKE '%/%.%a%/%' THEN 'a'
      ELSE 'r' END AS channel,
    SUBSTRING(resource_string FROM '.*_([^\.]*)\..*') AS locale,
    CASE WHEN resource_string LIKE '%.incremental.%' THEN TRUE
      ELSE FALSE END AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE resource_string LIKE '%/torbrowser/%.mar'
  AND response_code = 302
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'tmid' AS request_type,
    CASE WHEN resource_string LIKE '%.exe' THEN 'w'
      WHEN resource_string LIKE '%.dmg' THEN 'm'
      WHEN resource_string LIKE '%.tar.xz' THEN 'l'
      ELSE 'o' END AS platform,
    NULL AS channel,
    SUBSTRING(resource_string FROM '.*_([^\.]*)\..*') AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE (resource_string LIKE '%/tormessenger/%.exe'
    OR resource_string LIKE '%/tormessenger/%.dmg'
    OR resource_string LIKE '%/tormessenger/%.tar.xz')
  AND response_code = 200
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'tmup' AS request_type,
    CASE WHEN resource_string LIKE '%/WINNT%' THEN 'w'
      WHEN resource_string LIKE '%/Darwin%' THEN 'm'
      WHEN resource_string LIKE '%/Linux%' THEN 'l'
      ELSE 'o' END AS platform,
    NULL AS channel,
    SUBSTRING(resource_string FROM '.*/([^/\?]*)\??') AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE resource_string LIKE '%/tormessenger/update_2/%'
  AND resource_string NOT LIKE '%/'
  AND response_code = 200
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'twhph' AS request_type,
    NULL AS platform,
    NULL AS channel,
    NULL AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE (resource_string = '/'
    OR resource_string LIKE '/index%')
  AND response_code = 200
  AND (site = 'torproject.org'
    OR site = 'www.torproject.org')
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6
  UNION
  SELECT log_date,
    'twdph' AS request_type,
    NULL AS platform,
    NULL AS channel,
    NULL AS locale,
    NULL::BOOLEAN AS incremental,
    SUM(count) AS count
  FROM files NATURAL JOIN requests NATURAL JOIN resources
  WHERE (resource_string LIKE '/download/download%'
    OR resource_string LIKE '/projects/torbrowser.html%')
  AND response_code = 200
  AND (site = 'torproject.org'
    OR site = 'www.torproject.org')
  AND method = 'GET'
  GROUP BY 1, 2, 3, 4, 5, 6;
+376 −0
Original line number Diff line number Diff line
/* Copyright 2016--2017 The Tor Project
 * See LICENSE for licensing information */

package org.torproject.metrics.webstats;

import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.SortedSet;
import java.util.TimeZone;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/** Main class of the webstats module that downloads log files from the server,
 * imports them into a database, and exports aggregate statistics to a CSV
 * file. */
public class Main {

  /** Logger for this class. */
  private static Logger log = LoggerFactory.getLogger(Main.class);

  /** Pattern for links contained in directory listings. */
  private static final Pattern URL_STRING_PATTERN =
      Pattern.compile(".*<a href=\"([^\"]+)\">.*");

  private static final Pattern LOG_FILE_URL_PATTERN =
      Pattern.compile("^.*/([^/]+)/([^/]+)-access.log-(\\d{8}).xz$");

  private static DateFormat logDateFormat = new SimpleDateFormat("yyyyMMdd");

  private static final Pattern LOG_LINE_PATTERN = Pattern.compile(
      "^0.0.0.[01] - - \\[\\d{2}/\\w{3}/\\d{4}:00:00:00 \\+0000\\] "
      + "\"(GET|HEAD) ([^ ]+) HTTP[^ ]+\" (\\d+) (-|\\d+) \"-\" \"-\" -$");

  /** Executes this data-processing module. */
  public static void main(String[] args) throws Exception {
    log.info("Starting webstats module.");
    Connection connection = connectToDatabase(
        "jdbc:postgresql://localhost/webstats?user=vagrant&password=password");
    SortedSet<String> importedLogFileUrls = queryImportedFiles(connection);
    String baseUrl = "https://webstats.torproject.org/out/";
    SortedSet<String> newLogFileUrls = downloadDirectoryListings(baseUrl,
        importedLogFileUrls);
    importLogFiles(connection, newLogFileUrls);
    SortedSet<String> statistics = queryWebstats(connection);
    writeStatistics(new File("stats", "webstats.csv"), statistics);
    disconnectFromDatabase(connection);
    log.info("Terminated webstats module.");
  }

  private static Connection connectToDatabase(String jdbcString)
      throws SQLException {
    log.info("Connecting to database.");
    Connection connection = DriverManager.getConnection(jdbcString);
    connection.setAutoCommit(false);
    log.info("Successfully connected to database.");
    return connection;
  }

  private static SortedSet<String> queryImportedFiles(Connection connection)
      throws SQLException {
    log.info("Querying URLs of previously imported log files.");
    SortedSet<String> importedLogFileUrls = new TreeSet<>();
    Statement st = connection.createStatement();
    ResultSet rs = st.executeQuery("SELECT url FROM files");
    while (rs.next()) {
      importedLogFileUrls.add(rs.getString(1));
    }
    log.info("Found {} URLs of previously imported log files.",
        importedLogFileUrls.size());
    return importedLogFileUrls;
  }

  private static SortedSet<String> downloadDirectoryListings(String baseUrl,
      SortedSet<String> importedLogFileUrls) throws IOException {
    log.info("Downloading directory listings from {}.", baseUrl);
    List<String> directoryListings = new ArrayList<>();
    directoryListings.add(baseUrl);
    SortedSet<String> newLogFileUrls = new TreeSet<>();
    while (!directoryListings.isEmpty()) {
      String urlString = directoryListings.remove(0);
      if (urlString.endsWith("/")) {
        directoryListings.addAll(downloadDirectoryListing(urlString));
      } else if (urlString.endsWith(".xz")
          && !importedLogFileUrls.contains(urlString)) {
        newLogFileUrls.add(urlString);
      }
    }
    log.info("Found {} URLs of log files that have not yet been imported.",
        newLogFileUrls.size());
    return newLogFileUrls;
  }

  private static List<String> downloadDirectoryListing(String urlString)
      throws IOException {
    log.debug("Downloading directory listing from {}.", urlString);
    List<String> urlStrings = new ArrayList<>();
    try (BufferedReader br = new BufferedReader(new InputStreamReader(
        new URL(urlString).openStream()))) {
      String line;
      while ((line = br.readLine()) != null) {
        Matcher matcher = URL_STRING_PATTERN.matcher(line);
        if (matcher.matches() && !matcher.group(1).startsWith("/")) {
          urlStrings.add(urlString + matcher.group(1));
        }
      }
    }
    return urlStrings;
  }

  private static void importLogFiles(Connection connection,
      SortedSet<String> newLogFileUrls) {
    log.info("Downloading, parsing, and importing {} log files.",
        newLogFileUrls.size());
    for (String urlString : newLogFileUrls) {
      if (!urlString.contains("2017011")) {
        // TODO take me out after testing
        continue;
      }
      try {
        Object[] metaData = parseMetaData(urlString);
        if (metaData == null) {
          continue;
        }
        List<String> downloadedLogLines = downloadLogFile(urlString);
        Map<String, Integer> parsedLogLines = parseLogLines(urlString,
            downloadedLogLines);
        importLogLines(connection, urlString, metaData, parsedLogLines);
      } catch (IOException | ParseException exc) {
        log.warn("Cannot download or parse log file with URL {}.  Retrying "
            + "in the next run.", urlString, exc);
      } catch (SQLException exc) {
        log.warn("Cannot import log file with URL {} into the database.  "
            + "Rolling back and retrying in the next run.", urlString, exc);
        try {
          connection.rollback();
        } catch (SQLException exceptionWhileRollingBack) {
          /* Ignore. */
        }
      }
    }
  }

  private static Object[] parseMetaData(String urlString)
      throws ParseException {
    log.debug("Importing log file {}.", urlString);
    if (urlString.contains("-ssl-access.log-")) {
      log.debug("Skipping log file containing SSL requests with URL {}.",
          urlString);
      return null;
    }
    Matcher logFileUrlMatcher = LOG_FILE_URL_PATTERN.matcher(urlString);
    if (!logFileUrlMatcher.matches()) {
      log.debug("Skipping log file with unrecognized URL {}.", urlString);
      return null;
    }
    String server = logFileUrlMatcher.group(1);
    String site = logFileUrlMatcher.group(2);
    long logDateMillis = logDateFormat.parse(logFileUrlMatcher.group(3))
        .getTime();
    return new Object[] { server, site, new Long(logDateMillis) };
  }

  private static List<String> downloadLogFile(String urlString)
      throws IOException {
    List<String> downloadedLogLines = new ArrayList<>();
    try (BufferedReader br = new BufferedReader(new InputStreamReader(
        new XZCompressorInputStream(new URL(urlString).openStream())))) {
      String line;
      while ((line = br.readLine()) != null) {
        downloadedLogLines.add(line);
      }
    }
    return downloadedLogLines;
  }

  private static Map<String, Integer> parseLogLines(String urlString,
      List<String> logLines) {
    int skippedLines = 0;
    Map<String, Integer> parsedLogLines = new HashMap<>();
    for (String logLine : logLines) {
      Matcher logLineMatcher = LOG_LINE_PATTERN.matcher(logLine);
      if (!logLineMatcher.matches()) {
        skippedLines++;
        continue;
      }
      String method = logLineMatcher.group(1);
      String resource = truncateString(logLineMatcher.group(2), 2048);
      int responseCode = Integer.parseInt(logLineMatcher.group(3));
      String combined = String.format("%s %s %d", method, resource,
          responseCode);
      if (!parsedLogLines.containsKey(combined)) {
        parsedLogLines.put(combined, 1);
      } else {
        parsedLogLines.put(combined, parsedLogLines.get(combined) + 1);
      }
    }
    if (skippedLines > 0) {
      log.debug("Skipped {} lines while parsing log file {}.", skippedLines,
          urlString);
    }
    return parsedLogLines;
  }

  private static void importLogLines(Connection connection, String urlString,
      Object[] metaData, Map<String, Integer> parsedLogLines)
      throws SQLException {
    PreparedStatement psFiles = connection.prepareStatement(
        "INSERT INTO files (url, server, site, log_date) "
        + "VALUES (?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS);
    PreparedStatement psResourcesSelect = connection.prepareStatement(
        "SELECT resource_id FROM resources WHERE resource_string = ?");
    PreparedStatement psResourcesInsert = connection.prepareStatement(
        "INSERT INTO resources (resource_string) VALUES (?)",
        Statement.RETURN_GENERATED_KEYS);
    PreparedStatement psRequests = connection.prepareStatement(
        "INSERT INTO requests (file_id, method, resource_id, response_code, "
        + "count) VALUES (?, CAST(? AS method), ?, ?, ?)");
    String server = (String) metaData[0];
    String site = (String) metaData[1];
    long logDateMillis = (long) metaData[2];
    int fileId = insertFile(psFiles, urlString, server, site, logDateMillis);
    if (fileId < 0) {
      log.debug("Skipping previously imported log file {}.", urlString);
      return;
    }
    for (Map.Entry<String, Integer> requests : parsedLogLines.entrySet()) {
      String[] keyParts = requests.getKey().split(" ");
      String method = keyParts[0];
      String resource = keyParts[1];
      int responseCode = Integer.parseInt(keyParts[2]);
      int count = requests.getValue();
      int resourceId = insertResource(psResourcesSelect, psResourcesInsert,
          resource);
      if (resourceId < 0) {
        log.debug("Could not retrieve auto-generated key for new resources "
            + "entry.");
        connection.rollback();
        return;
      }
      insertRequest(psRequests, fileId, method, resourceId, responseCode,
          count);
    }
    connection.commit();
    log.debug("Finished importing log file with URL {} into database.",
        urlString);
  }

  private static int insertFile(PreparedStatement psFiles, String urlString,
      String server, String site, long logDateMillis) throws SQLException {
    int fileId = -1;
    psFiles.clearParameters();
    psFiles.setString(1, urlString);
    psFiles.setString(2, truncateString(server, 32));
    psFiles.setString(3, truncateString(site, 128));
    psFiles.setDate(4, new Date(logDateMillis));
    psFiles.execute();
    ResultSet resultSet = psFiles.getGeneratedKeys();
    if (resultSet.next()) {
      fileId = resultSet.getInt(1);
    }
    resultSet.close();
    return fileId;
  }

  private static void insertRequest(PreparedStatement psRequests, int fileId,
      String method, int resourceId, int responseCode, int count)
      throws SQLException {
    psRequests.clearParameters();
    psRequests.setInt(1, fileId);
    psRequests.setString(2, method);
    psRequests.setInt(3, resourceId);
    psRequests.setInt(4, responseCode);
    psRequests.setInt(5, count);
    psRequests.execute();
  }

  private static int insertResource(PreparedStatement psResourcesSelect,
      PreparedStatement psResourcesInsert, String resource)
      throws SQLException {
    int resourceId = -1;
    psResourcesSelect.clearParameters();
    psResourcesSelect.setString(1, resource);
    ResultSet rs = psResourcesSelect.executeQuery();
    if (rs.next()) {
      resourceId = rs.getInt(1);
    } else {
      /* There's a small potential for a race condition between the previous
       * SELECT and this INSERT INTO, but that will be resolved by the UNIQUE
       * constraint when committing the transaction. */
      psResourcesInsert.clearParameters();
      psResourcesInsert.setString(1, resource);
      psResourcesInsert.execute();
      ResultSet resultSet = psResourcesInsert.getGeneratedKeys();
      if (resultSet.next()) {
        resourceId = resultSet.getInt(1);
      }
      resultSet.close();
    }
    return resourceId;
  }

  private static String truncateString(String originalString,
      int truncateAfter) {
    if (originalString.length() > truncateAfter) {
      originalString = originalString.substring(0, truncateAfter);
    }
    return originalString;
  }

  private static SortedSet<String> queryWebstats(Connection connection)
      throws SQLException {
    SortedSet<String> statistics = new TreeSet<>();
    Statement st = connection.createStatement();
    ResultSet rs = st.executeQuery("SELECT log_date, request_type, platform, "
        + "channel, locale, incremental, count FROM webstats");
    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.US);
    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
    while (rs.next()) {
      statistics.add(String.format("%s,%s,%s,%s,%s,%s,%d",
          dateFormat.format(rs.getDate(1)),
          null == rs.getString(2) ? "" : rs.getString(2),
          null == rs.getString(3) ? "" : rs.getString(3),
          null == rs.getString(4) ? "" : rs.getString(4),
          null == rs.getString(5) ? "" : rs.getString(5),
          null == rs.getString(6) ? "" : rs.getString(6),
          rs.getLong(7)));
    }
    return statistics;
  }

  private static void writeStatistics(File webstatsFile,
      SortedSet<String> statistics) throws IOException {
    webstatsFile.getParentFile().mkdirs();
    try (BufferedWriter bw = new BufferedWriter(new FileWriter(webstatsFile))) {
      bw.write("log_date,request_type,platform,channel,locale,incremental,"
          + "count");
      bw.newLine();
      for (String line : statistics) {
        bw.write(line);
        bw.newLine();
      }
    }
  }

  private static void disconnectFromDatabase(Connection connection)
      throws SQLException {
    log.info("Disconnecting from database.");
    connection.close();
  }
}
+9 −0
Original line number Diff line number Diff line
dir.create("RData", showWarnings = FALSE)

d <- read.csv("stats/webstats.csv", stringsAsFactors = FALSE)
d <- d[d$request_type %in% c('tbid', 'tbsd', 'tbup', 'tbur'), ]
data <- aggregate(list(count = d$count),
    by = list(log_date = as.Date(d$log_date), request_type = d$request_type),
    FUN = sum)
save(data, file = "RData/webstats-tb.RData")
Loading