Commit b998073d authored by iwakeh's avatar iwakeh 🌴
Browse files

Implements task-18910 sync-functionality.

ReferenceChecker had to be disabled (cf. #20335).
Added persistence and sync package as well as tests and test resources.
parent f9ca243b
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
# Changes in version 1.1.0 - 2016-10-XX

 * Major changes
   - Added sync-functionality
   - Changed path structure.

 * Medium changes
   - Added Annotation enum and modular file persistence.
   - Replace four properties for configuring where to write
     descriptors by a single 'OutPath' property.
   - Introduced *Sources and *Origins properties to simplify
     data source definition.
   - Replaced unnecessary configuration values with hard-coded ones.


# Changes in version 1.0.2 - 2016-10-07
+19 −3
Original line number Diff line number Diff line
@@ -3,10 +3,14 @@

package org.torproject.collector.bridgedescs;

import org.torproject.collector.conf.Annotation;
import org.torproject.collector.conf.Configuration;
import org.torproject.collector.conf.ConfigurationException;
import org.torproject.collector.conf.Key;
import org.torproject.collector.cron.CollecTorMain;
import org.torproject.descriptor.BridgeExtraInfoDescriptor;
import org.torproject.descriptor.BridgeNetworkStatus;
import org.torproject.descriptor.BridgeServerDescriptor;

import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base64;
@@ -56,8 +60,15 @@ public class SanitizedBridgesWriter extends CollecTorMain {
      SanitizedBridgesWriter.class);
  private static final String BRIDGE_DESCRIPTORS = "bridge-descriptors";

  /** Initialize configuration. */
  public SanitizedBridgesWriter(Configuration config) {
    super(config);
    this.mapPathDescriptors.put("recent/bridge-descriptors/statuses",
        BridgeNetworkStatus.class);
    this.mapPathDescriptors.put("recent/bridge-descriptors/server-descriptors",
        BridgeServerDescriptor.class);
    this.mapPathDescriptors.put("recent/bridge-descriptors/extra-infos",
        BridgeExtraInfoDescriptor.class);
  }

  private String rsyncCatString;
@@ -92,6 +103,11 @@ public class SanitizedBridgesWriter extends CollecTorMain {
    return "bridgedescs";
  }

  @Override
  protected String syncMarker() {
    return "Bridge";
  }

  @Override
  protected void startProcessing() throws ConfigurationException {

@@ -609,7 +625,7 @@ public class SanitizedBridgesWriter extends CollecTorMain {
        outputFile.getParentFile().mkdirs();
        BufferedWriter bw = new BufferedWriter(new FileWriter(
            outputFile));
        bw.write("@type bridge-network-status 1.1\n");
        bw.write(Annotation.Status.toString());
        bw.write("published " + publicationTime + "\n");
        bw.write(header.toString());
        for (String scrubbed : scrubbedLines.values()) {
@@ -998,7 +1014,7 @@ public class SanitizedBridgesWriter extends CollecTorMain {
        outputFile.getParentFile().mkdirs();
        BufferedWriter bw = new BufferedWriter(new FileWriter(
            outputFile, appendToFile));
        bw.write("@type bridge-server-descriptor 1.2\n");
        bw.write(Annotation.BridgeServer.toString());
        bw.write(scrubbedDesc);
        if (descriptorDigestSha256Base64 != null) {
          bw.write("router-digest-sha256 " + descriptorDigestSha256Base64
@@ -1281,7 +1297,7 @@ public class SanitizedBridgesWriter extends CollecTorMain {
        outputFile.getParentFile().mkdirs();
        BufferedWriter bw = new BufferedWriter(new FileWriter(
            outputFile, appendToFile));
        bw.write("@type bridge-extra-info 1.3\n");
        bw.write(Annotation.BridgeExtraInfo.toString());
        bw.write(scrubbedDesc);
        if (descriptorDigestSha256Base64 != null) {
          bw.write("router-digest-sha256 " + descriptorDigestSha256Base64
+38 −0
Original line number Diff line number Diff line
/* Copyright 2016 The Tor Project
 * See LICENSE for licensing information */

package org.torproject.collector.conf;

/** This enum contains all currently valid descriptor annotations. */
public enum Annotation {

  BridgeExtraInfo("@type bridge-extra-info 1.3\n"),
  BridgeServer("@type bridge-server-descriptor 1.2\n"),
  Cert("@type dir-key-certificate-3 1.0\n"),
  Consensus("@type network-status-consensus-3 1.0\n"),
  ExitList("@type tordnsel 1.0\n"),
  ExtraInfo("@type extra-info 1.0\n"),
  MicroConsensus("@type network-status-microdesc-consensus-3 1.0\n"),
  Microdescriptor("@type microdescriptor 1.0\n"),
  Server("@type server-descriptor 1.0\n"),
  Status("@type bridge-network-status 1.1\n"),
  Torperf("@type torperf 1.0\n"),
  Vote("@type network-status-vote-3 1.0\n");

  private final String annotation;
  private final byte[] bytes;

  private Annotation(String annotation) {
    this.annotation = annotation;
    this.bytes = annotation.getBytes();
  }

  public byte[] bytes() {
    return bytes;
  }

  @Override
  public String toString() {
    return annotation;
  }
}
+15 −1
Original line number Diff line number Diff line
@@ -5,6 +5,8 @@ package org.torproject.collector.conf;

import java.net.URL;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;

/**
 * Enum containing all the properties keys of the configuration.
@@ -61,9 +63,10 @@ public enum Key {
  ReplaceIpAddressesWithHashes(Boolean.class),
  BridgeDescriptorMappingsLimit(Integer.class),
  TorperfFilesLines(String[].class),
  TorperfSources(String[][].class);
  TorperfHosts(String[][].class);

  private Class clazz;
  private static Set<String> keys;

  /**
   * @param Class of key value.
@@ -76,4 +79,15 @@ public enum Key {
    return clazz;
  }

  /** Verifies, if the given string corresponds to an enum value. */
  public static boolean has(String someKey) {
    if (null == keys) {
      keys = new HashSet<>();
      for (Key key : values()) {
        keys.add(key.name());
      }
    }
    return keys.contains(someKey);
  }

}
+48 −5
Original line number Diff line number Diff line
@@ -5,6 +5,10 @@ package org.torproject.collector.cron;

import org.torproject.collector.conf.Configuration;
import org.torproject.collector.conf.ConfigurationException;
import org.torproject.collector.conf.Key;
import org.torproject.collector.conf.SourceType;
import org.torproject.collector.sync.SyncManager;
import org.torproject.descriptor.Descriptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,13 +16,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class CollecTorMain implements Callable<Object>, Observer,
    Runnable {
public abstract class CollecTorMain extends SyncManager
    implements Callable<Object>, Observer, Runnable {

  private static final Logger logger = LoggerFactory.getLogger(
      CollecTorMain.class);
@@ -31,6 +38,9 @@ public abstract class CollecTorMain implements Callable<Object>, Observer,

  private Configuration newConfig;

  protected final Map<String, Class<? extends Descriptor>> mapPathDescriptors
      = new HashMap<>();

  public CollecTorMain(Configuration conf) {
    this.config.putAll(conf.getPropertiesCopy());
    conf.addObserver(this);
@@ -51,13 +61,36 @@ public abstract class CollecTorMain implements Callable<Object>, Observer,
        }
      }
    }
    logger.info("Starting {} module of CollecTor.", module());
    try {
      if (!isSyncOnly()) {
        logger.info("Starting {} module of CollecTor.", module());
        startProcessing();
        logger.info("Terminating {} module of CollecTor.", module());
      }
    } catch (Throwable th) { // Catching all to prevent #19771
      logger.error("The {} module failed: {}", module(), th.getMessage(), th);
    }
    logger.info("Terminating {} module of CollecTor.", module());
    try {
      if (isSync()) {
        logger.info("Starting sync-run of module {} of CollecTor.", module());
        this.merge(this.config, this.syncMarker(),
            this.syncMapPathsDescriptors());
        logger.info("Finished sync-run of module {} of CollecTor.", module());
      }
    } catch (Throwable th) { // Catching all (cf. above).
      logger.error("Sync-run of {} module failed: {}", module(), th.getMessage(), th);
    }
  }

  private boolean isSync() throws ConfigurationException {
    String key = this.syncMarker() + "Sources";
    return Key.has(key) && config.getSourceTypeSet(Key.valueOf(key))
        .contains(SourceType.Sync);
  }

  private boolean isSyncOnly() throws ConfigurationException {
    String key = this.syncMarker() + "Sources";
    return this.isSync() && config.getSourceTypeSet(Key.valueOf(key)).size() == 1;
  }

  /** Wrapper for <code>run</code>. */
@@ -81,11 +114,21 @@ public abstract class CollecTorMain implements Callable<Object>, Observer,
   */
  protected abstract void startProcessing() throws ConfigurationException;

  /**
   * Returns property prefix/infix/postfix for Sync related properties.
   */
  protected abstract String syncMarker();

  /**
   * Returns the module name for logging purposes.
   */
  public abstract String module();

  /** Returns map of path and descriptor type for download. */
  public Map<String, Class<? extends Descriptor>> syncMapPathsDescriptors() {
    return Collections.unmodifiableMap(mapPathDescriptors);
  }

  /**
   * Checks the available space for the storage the given path is located on and
   * logs a warning, if 200 MiB or less are available, and otherwise logs
Loading