Commit 6a43d26c authored by Hiro's avatar Hiro 🏄
Browse files

Resolve "Missing data for bridge users in China (and other countries)"

parent 49339cf1
Loading
Loading
Loading
Loading
+88 −44
Original line number Diff line number Diff line
@@ -214,32 +214,48 @@ DECLARE
  -- which is why we're storing the updated values.
  undo_end TIMESTAMP WITHOUT TIME ZONE;
  undo_val DOUBLE PRECISION;
  merged_part_name TEXT;
  current_timestamp BIGINT;

BEGIN
  RAISE NOTICE '% Starting to merge.', timeofday();

  RAISE NOTICE '% Creating temporary merged_part table.', timeofday();

  SELECT EXTRACT(EPOCH FROM current_timestamp)::bigint INTO current_timestamp;
  -- Generate a unique name for merged_part table into the variable
  -- merged_part_name

  merged_part_name := 'pg_temp.merged_part_name_' || TO_CHAR(current_timestamp, 'YYYYMMDDHH24MISSUS');


  -- Materialize a temporary merged_part table that only contains dates of
  -- newly imported rows for the query below.
  CREATE TEMPORARY TABLE merged_part AS
  -- We use the variable name merged_part_name
  CREATE TEMPORARY TABLE IF NOT EXISTS merged_part_name AS
  SELECT * FROM merged
  WHERE DATE(merged.stats_start) IN (
    SELECT DISTINCT DATE(stats_start) FROM imported);


  -- Explicitly lock the temporary table
  LOCK TABLE merged_part_name IN SHARE MODE;

  RAISE NOTICE '% Joining imported and merged_part tables.', timeofday();

  -- Loop over results from a query that joins new entries in the imported
  -- table with existing entries in the merged_part table.
  -- The table name is now unique and contained in the mergerd_part_name
  -- variable generated above.
  FOR cur IN SELECT DISTINCT

    -- Select id, interval start and end, and value of the existing entry
    -- in merged_part; all these fields may be null if the imported entry
    -- is not adjacent to an existing one.
    merged_part.id AS merged_id,
    merged_part.stats_start AS merged_start,
    merged_part.stats_end AS merged_end,
    merged_part.val AS merged_val,
    merged_part_name.id AS merged_id,
    merged_part_name.stats_start AS merged_start,
    merged_part_name.stats_end AS merged_end,
    merged_part_name.val AS merged_val,

    -- Select interval start and end and value of the newly imported
    -- entry.
@@ -263,30 +279,31 @@ BEGIN
    -- that we handle the same imported entry twice, if it starts directly
    -- after one existing entry and ends directly before another existing
    -- entry.
    FROM imported LEFT JOIN merged_part
    -- The table name is now substituted via the variable merged_part_name.
    FROM imported LEFT JOIN merged_part_name

    -- First two join conditions are to find adjacent intervals.  In fact,
    -- we also include overlapping intervals here, so that we can skip the
    -- overlapping entry in the imported table.
    ON imported.stats_end >= merged_part.stats_start AND
       imported.stats_start <= merged_part.stats_end AND
    ON imported.stats_end >= merged_part_name.stats_start AND
       imported.stats_start <= merged_part_name.stats_end AND

       -- Further join conditions are same date, fingerprint, node, etc.,
       -- so that we don't merge entries that don't belong together.
       DATE(imported.stats_start) = DATE(merged_part.stats_start) AND
       imported.fingerprint = merged_part.fingerprint AND
       imported.nickname = merged_part.nickname AND
       imported.node = merged_part.node AND
       imported.metric = merged_part.metric AND
       imported.country = merged_part.country AND
       imported.transport = merged_part.transport AND
       imported.version = merged_part.version
       DATE(imported.stats_start) = DATE(merged_part_name.stats_start) AND
       imported.fingerprint = merged_part_name.fingerprint AND
       imported.nickname = merged_part_name.nickname AND
       imported.node = merged_part_name.node AND
       imported.metric = merged_part_name.metric AND
       imported.country = merged_part_name.country AND
       imported.transport = merged_part_name.transport AND
       imported.version = merged_part_name.version

    -- Ordering is key, or our approach to merge subsequent entries is
    -- going to break.
    ORDER BY imported.fingerprint, imported.nickname, imported.node,
             imported.metric, imported.country, imported.transport,
             imported.version, imported.stats_start, merged_part.stats_start,
             imported.version, imported.stats_start, merged_part_name.stats_start,
             imported.stats_end

  -- Now go through the results one by one.
@@ -440,8 +457,10 @@ BEGIN
    END IF;
  END LOOP;

  RAISE NOTICE '% Deleting temporary table merged_part.', timeofday();
  DROP TABLE merged_part;
  -- Log that we're done with the query and about to start merging.
  RAISE NOTICE '% Deleting temporary table %', timeofday(),  merged_part_name;
  -- Drop the temporary table
  EXECUTE 'DROP TABLE IF EXISTS ' || merged_part_name;

  -- That's it, we're done merging.
  RAISE NOTICE '% Finishing merge.', timeofday();
@@ -455,8 +474,17 @@ $$ LANGUAGE plpgsql;
-- the dates to be updated, and finally inserts newly computed aggregates
-- for these dates.
CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
DECLARE
  current_timestamp BIGINT;
  update_temp_name TEXT;
  update_no_dimensions_temp_name TEXT;

BEGIN
  RAISE NOTICE '% Starting aggregate step.', timeofday();
  SELECT EXTRACT(EPOCH FROM current_timestamp)::bigint INTO current_timestamp;
  -- Generate a unique name for merged_part
  update_temp_name := 'pg_temp.update_temp_name_' || TO_CHAR(current_timestamp, 'YYYYMMDDHH24MISSUS');
  update_no_dimensions_temp_name := 'pg_temp.update_no_dimensions_temp_name_' || TO_CHAR(current_timestamp, 'YYYYMMDDHH24MISSUS');

  -- Create a new temporary table containing all relevant information
  -- needed to update the aggregated table.  In this table, we sum up all
@@ -464,7 +492,7 @@ BEGIN
  -- (temporarily) materialized, because we need to combine its entries
  -- multiple times in various ways.  A (non-materialized) view would have
  -- meant to re-compute this query multiple times.
  CREATE TEMPORARY TABLE update AS
  CREATE TEMPORARY TABLE update_temp_name AS
    SELECT fingerprint, nickname, node, metric, country, transport, version,
           DATE(stats_start), SUM(val) AS val,
           SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
@@ -475,9 +503,12 @@ BEGIN
    GROUP BY fingerprint, nickname, node, metric, country, transport, version,
             DATE(stats_start);

  -- Explicitly lock the temporary table
  LOCK TABLE update_temp_name IN SHARE MODE;

  -- Delete all entries from the aggregated table that we're about to
  -- re-compute.
  DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
  DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update_temp_name);

  -- Insert partly empty results for all existing combinations of date,
  -- node ('relay' or 'bridge'), country, transport, and version.  Only
@@ -488,18 +519,20 @@ BEGIN
      nrx)
    SELECT date, node, country, transport, version, SUM(val) AS rrx,
    SUM(seconds) AS nrx
    FROM update WHERE metric = 'responses'
    FROM update_temp_name WHERE metric = 'responses'
    GROUP BY date, node, country, transport, version;

  -- Create another temporary table with only those entries that aren't
  -- broken down by any dimension.  This table is much smaller, so the
  -- following operations are much faster.
  CREATE TEMPORARY TABLE update_no_dimensions AS
    SELECT fingerprint, nickname, node, metric, date, val, seconds FROM update
  CREATE TEMPORARY TABLE update_no_dimensions_temp_name AS
    SELECT fingerprint, nickname, node, metric, date, val, seconds FROM update_temp_name
    WHERE country = ''
    AND transport = ''
    AND version = '';

  -- Explicitly lock the temporary table
  LOCK TABLE update_no_dimensions_temp_name IN SHARE MODE;
  -- Update results in the aggregated table by setting aggregates based
  -- on reported directory bytes.  These aggregates are only based on
  -- date and node, so that the same values are set for all combinations
@@ -508,7 +541,7 @@ BEGIN
    SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
    FROM (
      SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
      FROM update_no_dimensions
      FROM update_no_dimensions_temp_name
      WHERE metric = 'bytes'
      GROUP BY date, node
    ) aggregated_bytes
@@ -520,7 +553,7 @@ BEGIN
    SET nn = aggregated_status.nn
    FROM (
      SELECT date, node, SUM(seconds) AS nn
      FROM update_no_dimensions
      FROM update_no_dimensions_temp_name
      WHERE metric = 'status'
      GROUP BY date, node
    ) aggregated_status
@@ -534,8 +567,8 @@ BEGIN
      SELECT bytes.date, bytes.node,
             SUM((LEAST(bytes.seconds, responses.seconds)
                 * bytes.val) / bytes.seconds) AS hrh
      FROM update_no_dimensions bytes
      LEFT JOIN update_no_dimensions responses
      FROM update_no_dimensions_temp_name bytes
      LEFT JOIN update_no_dimensions_temp_name responses
      ON bytes.date = responses.date
      AND bytes.fingerprint = responses.fingerprint
      AND bytes.nickname = responses.nickname
@@ -555,8 +588,8 @@ BEGIN
      SELECT responses.date, responses.node,
             SUM(GREATEST(0, responses.seconds
                             - COALESCE(bytes.seconds, 0))) AS nrh
      FROM update_no_dimensions responses
      LEFT JOIN update_no_dimensions bytes
      FROM update_no_dimensions_temp_name responses
      LEFT JOIN update_no_dimensions_temp_name bytes
      ON responses.date = bytes.date
      AND responses.fingerprint = bytes.fingerprint
      AND responses.nickname = bytes.nickname
@@ -568,9 +601,9 @@ BEGIN
    WHERE aggregated.date = aggregated_responses_bytes.date
    AND aggregated.node = aggregated_responses_bytes.node;

    RAISE NOTICE '% Deleting temporary tables update_no_dimensions and update.', timeofday();
    DROP TABLE update_no_dimensions;
    DROP TABLE update;
    RAISE NOTICE '% Deleting temporary tables % %', timeofday(), update_no_dimensions_temp_name, update_temp_name;
    DROP TABLE update_no_dimensions_temp_name;
    DROP TABLE update_temp_name;

  -- We're done aggregating new data.
  RAISE NOTICE '% Finishing aggregate step.', timeofday();
@@ -586,16 +619,22 @@ $$ LANGUAGE plpgsql;
-- response numbers for the dates to be updated, and finally inserts newly
-- combined response numbers for these dates.
CREATE OR REPLACE FUNCTION combine() RETURNS VOID AS $$
DECLARE
  current_timestamp BIGINT;
  update2_temp_name TEXT;
BEGIN
  RAISE NOTICE '% Starting combine step.', timeofday();
  SELECT EXTRACT(EPOCH FROM current_timestamp)::bigint INTO current_timestamp;
  -- Generate a unique name for update_temp_name
  update2_temp_name := 'pg_temp.update_temp_name_' || TO_CHAR(current_timestamp, 'YYYYMMDDHH24MISSUS');

  RAISE NOTICE '% Starting combine step.', timeofday();
  -- Create a new temporary table containing all relevant information
  -- needed to update the combined_country_transport table.  In this
  -- table, we sum up all responses by reporting node.  This query is
  -- (temporarily) materialized, because we need to combine its entries
  -- multiple times in various ways.  A (non-materialized) view would have
  -- meant to re-compute this query multiple times.
  CREATE TEMPORARY TABLE update2 AS
  CREATE TEMPORARY TABLE update2_temp_name AS
    SELECT fingerprint, nickname, country, transport,
           DATE(stats_start) AS date, SUM(val) AS val
    FROM merged
@@ -607,10 +646,15 @@ BEGIN
        SELECT DISTINCT DATE(stats_start) FROM imported)
    GROUP BY fingerprint, nickname, country, transport, date;

  -- Explicitly lock the temporary table
  LOCK TABLE update2_temp_name IN SHARE MODE;

  -- Delete all entries from the combined table that we're about to
  -- re-compute.
  -- Explicitly lock the table
  LOCK TABLE combined_country_transport IN SHARE MODE;
  DELETE FROM combined_country_transport
  WHERE date IN (SELECT DISTINCT date FROM update2);
  WHERE date IN (SELECT DISTINCT date FROM update2_temp_name);

  -- Combine each country with each transport that a bridge reported
  -- responses for and also consider total responses reported by the
@@ -623,9 +667,9 @@ BEGIN
           SUM(GREATEST(0, transport.val + country.val - total.val))
             AS low,
           SUM(LEAST(transport.val, country.val)) AS high
    FROM update2 country,
         update2 transport,
         update2 total
    FROM update2_temp_name country,
         update2_temp_name transport,
         update2_temp_name total
    WHERE country.country <> ''
    AND transport.transport <> ''
    AND total.country = ''
@@ -644,8 +688,8 @@ BEGIN
    AND transport.nickname = total.nickname
    GROUP BY country.date, country.country, transport.transport;

  RAISE NOTICE '% Deleting temporary table update2.', timeofday();
  DROP TABLE update2;
  RAISE NOTICE '% Deleting temporary table %', timeofday(), update2_temp_name;
  DROP TABLE update2_temp_name;

  -- We're done combining new data.
  RAISE NOTICE '% Finishing combine step.', timeofday();