Unverified Commit 6da31ec4 authored by Nick Mathewson's avatar Nick Mathewson 🦀 Committed by Alexander Hansen Færøy
Browse files

consdiffmgr: Extract the code for compressing and storing

We're going to use this for consensuses too.
parent 7a3efe25
Loading
Loading
Loading
Loading
+99 −43
Original line number Diff line number Diff line
@@ -1071,6 +1071,91 @@ typedef struct compressed_result_t {
  size_t bodylen;
} compressed_result_t;

/**
 * Compress the bytestring <b>input</b> of length <b>len</b> using the
 * <n>n_methods</b> compression methods listed in the array <b>methods</b>.
 *
 * For each successful compression, set the fields in the <b>results_out</b>
 * array in the position corresponding to the compression method. Use
 * <b>labels_in</b> as a basis for the labels of the result.
 *
 * Return 0 if all compression succeeded; -1 if any failed.
 */
static int
compress_multiple(compressed_result_t *results_out, int n_methods,
                  const compress_method_t *methods,
                  const uint8_t *input, size_t len,
                  const config_line_t *labels_in)
{
  int rv = 0;
  int i;
  for (i = 0; i < n_methods; ++i) {
    compress_method_t method = methods[i];
    const char *methodname = compression_method_get_name(method);
    char *result;
    size_t sz;
    if (0 == tor_compress(&result, &sz, (const char*)input, len, method)) {
      results_out[i].body = (uint8_t*)result;
      results_out[i].bodylen = sz;
      results_out[i].labels = config_lines_dup(labels_in);
      cdm_labels_prepend_sha3(&results_out[i].labels, LABEL_SHA3_DIGEST,
                              results_out[i].body,
                              results_out[i].bodylen);
      config_line_prepend(&results_out[i].labels,
                          LABEL_COMPRESSION_TYPE,
                          methodname);
    } else {
      rv = -1;
    }
  }
  return rv;
}

/**
 * Given an array of <b>n</b> compressed_result_t in <b>results</b>,
 * as produced by compress_multiple, store them all into the
 * consdiffmgr, and store handles to them in the <b>handles_out</b>
 * array.
 *
 * Return CDM_DIFF_PRESENT if any was stored, and CDM_DIFF_ERROR if none
 * was stored.
 */
static cdm_diff_status_t
store_multiple(consensus_cache_entry_handle_t **handles_out,
               int n,
               const compress_method_t *methods,
               const compressed_result_t *results,
               const char *description)
{
  cdm_diff_status_t status = CDM_DIFF_ERROR;
  consdiffmgr_ensure_space_for_files(n);

  int i;
  for (i = 0; i < n; ++i) {
    compress_method_t method = methods[i];
    uint8_t *body_out = results[i].body;
    size_t bodylen_out = results[i].bodylen;
    config_line_t *labels = results[i].labels;
    const char *methodname = compression_method_get_name(method);
    if (body_out && bodylen_out && labels) {
      /* Success! Store the results */
      log_info(LD_DIRSERV, "Adding %s, compressed with %s",
               description, methodname);

      consensus_cache_entry_t *ent =
        consensus_cache_add(cdm_cache_get(),
                            labels,
                            body_out,
                            bodylen_out);

      status = CDM_DIFF_PRESENT;
      handles_out[i] = consensus_cache_entry_handle_new(ent);
      consensus_cache_entry_decref(ent);
    }
  }
  return status;
}

/**
 * An object passed to a worker thread that will try to produce a consensus
 * diff.
@@ -1235,24 +1320,10 @@ consensus_diff_worker_threadfn(void *state_, void *work_)
                          job->out[0].body,
                          job->out[0].bodylen);

  unsigned u;
  for (u = 1; u < n_diff_compression_methods(); ++u) {
    compress_method_t method = compress_diffs_with[u];
    const char *methodname = compression_method_get_name(method);
    char *result;
    size_t sz;
    if (0 == tor_compress(&result, &sz, consensus_diff, difflen, method)) {
      job->out[u].body = (uint8_t*)result;
      job->out[u].bodylen = sz;
      job->out[u].labels = config_lines_dup(common_labels);
      cdm_labels_prepend_sha3(&job->out[u].labels, LABEL_SHA3_DIGEST,
                              job->out[u].body,
                              job->out[u].bodylen);
      config_line_prepend(&job->out[u].labels,
                          LABEL_COMPRESSION_TYPE,
                          methodname);
    }
  }
  compress_multiple(job->out+1,
                    n_diff_compression_methods()-1,
                    compress_diffs_with+1,
                    (const uint8_t*)consensus_diff, difflen, common_labels);

  config_free_lines(common_labels);
  return WQ_RPL_REPLY;
@@ -1318,36 +1389,20 @@ consensus_diff_worker_replyfn(void *work_)
    cache = 0;
  }

  int status = CDM_DIFF_ERROR;
  consensus_cache_entry_handle_t *handles[ARRAY_LENGTH(compress_diffs_with)];
  memset(handles, 0, sizeof(handles));

  consdiffmgr_ensure_space_for_files(n_diff_compression_methods());
  char description[128];
  tor_snprintf(description, sizeof(description),
               "consensus diff from %s to %s",
               lv_from_digest, lv_to_digest);

  unsigned u;
  for (u = 0; u < n_diff_compression_methods(); ++u) {
    compress_method_t method = compress_diffs_with[u];
    uint8_t *body_out = job->out[u].body;
    size_t bodylen_out = job->out[u].bodylen;
    config_line_t *labels = job->out[u].labels;
    const char *methodname = compression_method_get_name(method);
    if (body_out && bodylen_out && labels) {
      /* Success! Store the results */
      log_info(LD_DIRSERV, "Adding consensus diff from %s to %s, "
               "compressed with %s",
               lv_from_digest, lv_to_digest, methodname);
  int status = store_multiple(handles,
                              n_diff_compression_methods(),
                              compress_diffs_with,
                              job->out,
                              description);

      consensus_cache_entry_t *ent =
        consensus_cache_add(cdm_cache_get(),
                            labels,
                            body_out,
                            bodylen_out);

      status = CDM_DIFF_PRESENT;
      handles[u] = consensus_cache_entry_handle_new(ent);
      consensus_cache_entry_decref(ent);
    }
  }
  if (status != CDM_DIFF_PRESENT) {
    /* Failure! Nothing to do but complain */
    log_warn(LD_DIRSERV,
@@ -1357,6 +1412,7 @@ consensus_diff_worker_replyfn(void *work_)
    status = CDM_DIFF_ERROR;
  }

  unsigned u;
  for (u = 0; u < ARRAY_LENGTH(handles); ++u) {
    compress_method_t method = compress_diffs_with[u];
    if (cache) {