Commit c0ba389d authored by Karsten Loesing's avatar Karsten Loesing
Browse files

Add consensuses to totalcw graph.

Implements #28352.
parent 7b39042e
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -1558,7 +1558,7 @@ write_advbw_ipv6 <- function(start_p = NULL, end_p = NULL, path_p) {

prepare_totalcw <- function(start_p, end_p) {
  read.csv(paste(stats_dir, "totalcw.csv", sep = ""),
    colClasses = c("valid_after_date" = "Date")) %>%
    colClasses = c("valid_after_date" = "Date", "nickname" = "character")) %>%
    filter(if (!is.null(start_p))
        valid_after_date >= as.Date(start_p) else TRUE) %>%
    filter(if (!is.null(end_p))
@@ -1569,6 +1569,9 @@ prepare_totalcw <- function(start_p, end_p) {

plot_totalcw <- function(start_p, end_p, path_p) {
  prepare_totalcw(start_p, end_p) %>%
    mutate(nickname = ifelse(nickname == "", "consensus", nickname)) %>%
    mutate(nickname = factor(nickname,
      levels = c("consensus", unique(nickname[nickname != "consensus"])))) %>%
    complete(valid_after_date = full_seq(valid_after_date, period = 1),
        nesting(nickname)) %>%
    ggplot(aes(x = valid_after_date, y = measured_sum_avg,
+45 −17
Original line number Diff line number Diff line
@@ -10,6 +10,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -63,23 +65,36 @@ class Database implements AutoCloseable {
        "INSERT INTO authority (nickname, identity_hex) VALUES (?, ?)",
        Statement.RETURN_GENERATED_KEYS);
    this.psVoteSelect = this.connection.prepareStatement(
        "SELECT EXISTS (SELECT 1 FROM vote "
        "SELECT EXISTS (SELECT 1 FROM status "
            + "WHERE valid_after = ? AND authority_id = ?)");
    this.psVoteInsert = this.connection.prepareStatement(
        "INSERT INTO vote (valid_after, authority_id, have_guard_flag, "
        "INSERT INTO status (valid_after, authority_id, have_guard_flag, "
        + "have_exit_flag, measured_sum) VALUES (?, ?, ?, ?, ?)");
  }

  /** Insert a parsed vote into the vote table. */
  void insertVote(TotalcwRelayNetworkStatusVote vote) throws SQLException {
    if (null == vote) {
      /* Nothing to insert. */
      return;
  /** Insert a parsed consensus into the status table. */
  void insertConsensus(TotalcwRelayNetworkStatus consensus)
      throws SQLException {
    if (null != consensus) {
      insertStatusIfAbsent(consensus.validAfter, null, consensus.measuredSums);
    }
  }

  /** Insert a parsed vote into the status table. */
  void insertVote(TotalcwRelayNetworkStatus vote) throws SQLException {
    if (null != vote) {
      int authorityId = insertAuthorityIfAbsent(vote.nickname,
          vote.identityHex);
      insertStatusIfAbsent(vote.validAfter, authorityId, vote.measuredSums);
    }
  }

  private int insertAuthorityIfAbsent(String nickname, String identityHex)
      throws SQLException {
    int authorityId = -1;
    this.psAuthoritySelect.clearParameters();
    this.psAuthoritySelect.setString(1, vote.nickname);
    this.psAuthoritySelect.setString(2, vote.identityHex);
    this.psAuthoritySelect.setString(1, nickname);
    this.psAuthoritySelect.setString(2, identityHex);
    try (ResultSet rs = this.psAuthoritySelect.executeQuery()) {
      if (rs.next()) {
        authorityId = rs.getInt(1);
@@ -87,8 +102,8 @@ class Database implements AutoCloseable {
    }
    if (authorityId < 0) {
      this.psAuthorityInsert.clearParameters();
      this.psAuthorityInsert.setString(1, vote.nickname);
      this.psAuthorityInsert.setString(2, vote.identityHex);
      this.psAuthorityInsert.setString(1, nickname);
      this.psAuthorityInsert.setString(2, identityHex);
      this.psAuthorityInsert.execute();
      try (ResultSet rs = this.psAuthorityInsert.getGeneratedKeys()) {
        if (rs.next()) {
@@ -100,13 +115,22 @@ class Database implements AutoCloseable {
            + "authority entry.");
      }
    }
    return authorityId;
  }

  private void insertStatusIfAbsent(LocalDateTime validAfter,
      Integer authorityId, long[] measuredSums) throws SQLException {
    Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
        Locale.US);
    this.psVoteSelect.clearParameters();
    this.psVoteSelect.setTimestamp(1,
        Timestamp.from(ZonedDateTime.of(vote.validAfter,
        Timestamp.from(ZonedDateTime.of(validAfter,
            ZoneId.of("UTC")).toInstant()), calendar);
    if (null == authorityId) {
      this.psVoteSelect.setNull(2, Types.INTEGER);
    } else {
      this.psVoteSelect.setInt(2, authorityId);
    }
    try (ResultSet rs = this.psVoteSelect.executeQuery()) {
      if (rs.next()) {
        if (rs.getBoolean(1)) {
@@ -119,12 +143,16 @@ class Database implements AutoCloseable {
         measuredSumsIndex++) {
      this.psVoteInsert.clearParameters();
      this.psVoteInsert.setTimestamp(1,
          Timestamp.from(ZonedDateTime.of(vote.validAfter,
          Timestamp.from(ZonedDateTime.of(validAfter,
              ZoneId.of("UTC")).toInstant()), calendar);
      if (null == authorityId) {
        this.psVoteInsert.setNull(2, Types.INTEGER);
      } else {
        this.psVoteInsert.setInt(2, authorityId);
      }
      this.psVoteInsert.setBoolean(3, 1 == (measuredSumsIndex & 1));
      this.psVoteInsert.setBoolean(4, 2 == (measuredSumsIndex & 2));
      this.psVoteInsert.setLong(5, vote.measuredSums[measuredSumsIndex]);
      this.psVoteInsert.setLong(5, measuredSums[measuredSumsIndex]);
      this.psVoteInsert.execute();
    }
  }
+9 −2
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ package org.torproject.metrics.stats.totalcw;
import org.torproject.descriptor.Descriptor;
import org.torproject.descriptor.DescriptorReader;
import org.torproject.descriptor.DescriptorSourceFactory;
import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.torproject.descriptor.RelayNetworkStatusVote;

import org.slf4j.Logger;
@@ -24,6 +25,8 @@ public class Main {
  private static Logger log = LoggerFactory.getLogger(Main.class);

  private static String[][] paths =  {
    {"recent", "relay-descriptors", "consensuses"},
    {"archive", "relay-descriptors", "consensuses"},
    {"recent", "relay-descriptors", "votes"},
    {"archive", "relay-descriptors", "votes"}};

@@ -32,7 +35,8 @@ public class Main {

    log.info("Starting totalcw module.");

    log.info("Reading votes and inserting relevant parts into the database.");
    log.info("Reading consensuses and votes and inserting relevant parts into "
        + "the database.");
    DescriptorReader reader = DescriptorSourceFactory.createDescriptorReader();
    File historyFile = new File(Configuration.history);
    reader.setHistoryFile(historyFile);
@@ -43,7 +47,10 @@ public class Main {
            Arrays.stream(paths).map((String[] path)
                -> Paths.get(Configuration.descriptors, path).toFile())
            .toArray(File[]::new))) {
          if (descriptor instanceof RelayNetworkStatusVote) {
          if (descriptor instanceof RelayNetworkStatusConsensus) {
            database.insertConsensus(parser.parseRelayNetworkStatusConsensus(
                (RelayNetworkStatusConsensus) descriptor));
          } else if (descriptor instanceof RelayNetworkStatusVote) {
            database.insertVote(parser.parseRelayNetworkStatusVote(
                (RelayNetworkStatusVote) descriptor));
          } else {
+5 −3
Original line number Diff line number Diff line
@@ -26,7 +26,8 @@ class OutputLine {
  /** Date. */
  LocalDate validAfterDate;

  /** Server type, which can be "relay" or "bridge". */
  /** Nickname of the authority generating votes, or <code>null</code> in case
   * of consensuses. */
  String nickname;

  /** Whether contained relays all have the "Guard" flag. */
@@ -42,8 +43,9 @@ class OutputLine {
   * file. */
  @Override
  public String toString() {
    return String.format("%s,%s,%s,%s,%d", validAfterDate, nickname,
        haveGuardFlag ? "t" : "f", haveExitFlag ? "t" : "f", measuredSumAvg);
    return String.format("%s,%s,%s,%s,%d", validAfterDate,
        null == nickname ? "" : nickname, haveGuardFlag ? "t" : "f",
        haveExitFlag ? "t" : "f", measuredSumAvg);
  }
}
+39 −8
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@
package org.torproject.metrics.stats.totalcw;

import org.torproject.descriptor.NetworkStatusEntry;
import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.torproject.descriptor.RelayNetworkStatusVote;

import java.time.Instant;
@@ -13,9 +14,40 @@ import java.time.ZoneId;
 * data objects for them. */
class Parser {

  /** Parse and return a consensus, but return <code>null</code> if the
   * consensus did not contain any bandwidth values. */
  TotalcwRelayNetworkStatus parseRelayNetworkStatusConsensus(
      RelayNetworkStatusConsensus consensus) {
    boolean containsBandwidthValues = false;
    long[] measuredSums = new long[4];
    for (NetworkStatusEntry entry : consensus.getStatusEntries().values()) {
      if (null == entry.getFlags() || !entry.getFlags().contains("Running")
          || entry.getBandwidth() < 0L) {
        continue;
      }
      containsBandwidthValues = true;
      /* Encode flags as sum of Guard = 1 and (Exit and !BadExit) = 2. */
      int measuredSumsIndex = (entry.getFlags().contains("Guard") ? 1 : 0)
          + (entry.getFlags().contains("Exit")
          && !entry.getFlags().contains("BadExit") ? 2 : 0);
      measuredSums[measuredSumsIndex] += entry.getBandwidth();
    }
    if (!containsBandwidthValues) {
      /* Return null, because we wouldn't want to add this consensus to the
       * database anyway. */
      return null;
    }
    TotalcwRelayNetworkStatus parsedStatus = new TotalcwRelayNetworkStatus();
    parsedStatus.validAfter = Instant.ofEpochMilli(
        consensus.getValidAfterMillis())
        .atZone(ZoneId.of("UTC")).toLocalDateTime();
    parsedStatus.measuredSums = measuredSums;
    return parsedStatus;
  }

  /** Parse and return a vote, but return <code>null</code> if the vote did not
   * contain any bandwidth measurements. */
  TotalcwRelayNetworkStatusVote parseRelayNetworkStatusVote(
  TotalcwRelayNetworkStatus parseRelayNetworkStatusVote(
      RelayNetworkStatusVote vote) {
    boolean containsMeasuredBandwidths = false;
    long[] measuredSums = new long[4];
@@ -36,14 +68,13 @@ class Parser {
       * anyway. */
      return null;
    }
    TotalcwRelayNetworkStatusVote parsedVote
        = new TotalcwRelayNetworkStatusVote();
    parsedVote.validAfter = Instant.ofEpochMilli(vote.getValidAfterMillis())
    TotalcwRelayNetworkStatus parsedStatus = new TotalcwRelayNetworkStatus();
    parsedStatus.validAfter = Instant.ofEpochMilli(vote.getValidAfterMillis())
        .atZone(ZoneId.of("UTC")).toLocalDateTime();
    parsedVote.identityHex = vote.getIdentity();
    parsedVote.nickname = vote.getNickname();
    parsedVote.measuredSums = measuredSums;
    return parsedVote;
    parsedStatus.identityHex = vote.getIdentity();
    parsedStatus.nickname = vote.getNickname();
    parsedStatus.measuredSums = measuredSums;
    return parsedStatus;
  }
}
Loading