Commit bdeaf7d4 authored by Nick Mathewson's avatar Nick Mathewson 🤹
Browse files

Code to manage publish/subscribe setup via subsystem interface.

This commit has the necessary logic to run the publish/subscribe
system from the mainloop, and to initialize it on startup and tear
it down later.
parent 24df14eb
Loading
Loading
Loading
Loading
+51 −1
Original line number Diff line number Diff line
@@ -5,9 +5,14 @@

#include "orconfig.h"
#include "app/main/subsysmgr.h"
#include "lib/err/torerr.h"

#include "lib/dispatch/dispatch_naming.h"
#include "lib/dispatch/msgtypes.h"
#include "lib/err/torerr.h"
#include "lib/log/log.h"
#include "lib/malloc/malloc.h"
#include "lib/pubsub/pubsub_build.h"
#include "lib/pubsub/pubsub_connect.h"

#include <stdio.h>
#include <stdlib.h>
@@ -105,6 +110,51 @@ subsystems_init_upto(int target_level)
  return 0;
}

/**
 * Add publish/subscribe relationships to <b>builder</b> for all
 * initialized subsystems of level no more than <b>target_level</b>.
 **/
int
subsystems_add_pubsub_upto(pubsub_builder_t *builder,
                           int target_level)
{
  for (unsigned i = 0; i < n_tor_subsystems; ++i) {
    const subsys_fns_t *sys = tor_subsystems[i];
    if (!sys->supported)
      continue;
    if (sys->level > target_level)
      break;
    if (! sys_initialized[i])
      continue;
    int r = 0;
    if (sys->add_pubsub) {
      subsys_id_t sysid = get_subsys_id(sys->name);
      raw_assert(sysid != ERROR_ID);
      pubsub_connector_t *connector;
      connector = pubsub_connector_for_subsystem(builder, sysid);
      r = sys->add_pubsub(connector);
      pubsub_connector_free(connector);
    }
    if (r < 0) {
      fprintf(stderr, "BUG: subsystem %s (at %u) could not connect to "
              "publish/subscribe system.", sys->name, sys->level);
      raw_assert_unreached_msg("A subsystem couldn't be connected.");
    }
  }

  return 0;
}

/**
 * Add publish/subscribe relationships to <b>builder</b> for all
 * initialized subsystems.
 **/
int
subsystems_add_pubsub(pubsub_builder_t *builder)
{
  return subsystems_add_pubsub_upto(builder, MAX_SUBSYS_LEVEL);
}

/**
 * Shut down all the subsystems.
 **/
+5 −0
Original line number Diff line number Diff line
@@ -14,6 +14,11 @@ extern const unsigned n_tor_subsystems;
int subsystems_init(void);
int subsystems_init_upto(int level);

struct pubsub_builder_t;
int subsystems_add_pubsub_upto(struct pubsub_builder_t *builder,
                               int target_level);
int subsystems_add_pubsub(struct pubsub_builder_t *builder);

void subsystems_shutdown(void);
void subsystems_shutdown_downto(int level);

+2 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ LIBTOR_APP_A_SOURCES = \
	src/core/mainloop/connection.c		\
	src/core/mainloop/cpuworker.c		\
	src/core/mainloop/mainloop.c		\
	src/core/mainloop/mainloop_pubsub.c	\
	src/core/mainloop/netstatus.c		\
	src/core/mainloop/periodic.c		\
	src/core/or/address_set.c		\
@@ -213,6 +214,7 @@ noinst_HEADERS += \
	src/core/mainloop/connection.h			\
	src/core/mainloop/cpuworker.h			\
	src/core/mainloop/mainloop.h			\
	src/core/mainloop/mainloop_pubsub.h		\
	src/core/mainloop/netstatus.h			\
	src/core/mainloop/periodic.h			\
	src/core/or/addr_policy_st.h			\
+149 −0
Original line number Diff line number Diff line
/* Copyright (c) 2001, Matej Pfajfar.
 * Copyright (c) 2001-2004, Roger Dingledine.
 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
 * Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */

#include "orconfig.h"

#include "src/core/or/or.h"
#include "src/core/mainloop/mainloop.h"
#include "src/core/mainloop/mainloop_pubsub.h"

#include "lib/container/smartlist.h"
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_naming.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/pubsub/pubsub.h"
#include "lib/pubsub/pubsub_build.h"

/**
 * Dispatcher to use for delivering messages.
 **/
static dispatch_t *the_dispatcher = NULL;
static pubsub_items_t *the_pubsub_items = NULL;
/**
 * A list of mainloop_event_t, indexed by channel ID, to flush the messages
 * on a channel.
 **/
static smartlist_t *alert_events = NULL;

/**
 * Mainloop event callback: flush all the messages in a channel.
 *
 * The channel is encoded as a pointer, and passed via arg.
 **/
static void
flush_channel_event(mainloop_event_t *ev, void *arg)
{
  (void)ev;
  if (!the_dispatcher)
    return;

  channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
  dispatch_flush(the_dispatcher, chan, INT_MAX);
}

int
tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
{
  int rv = -1;
  tor_mainloop_disconnect_pubsub();

  the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
  if (! the_dispatcher)
    goto err;

  const size_t num_channels = get_num_channel_ids();
  alert_events = smartlist_new();
  for (size_t i = 0; i < num_channels; ++i) {
    smartlist_add(alert_events,
                  mainloop_event_postloop_new(flush_channel_event,
                                              (void*)(uintptr_t)(i)));
  }

  rv = 0;
 err:
  tor_mainloop_disconnect_pubsub();
  return rv;
}

/**
 * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
 **/
static void
alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
{
  (void)d;
  (void)chan;
  (void)arg;
}

/**
 * Dispatch alertfn callback: activate a mainloop event. Implements
 * DELIV_PROMPT.
 **/
static void
alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
{
  (void)d;
  (void)chan;
  mainloop_event_t *event = arg;
  mainloop_event_activate(event);
}

/**
 * Dispatch alertfn callback: flush all messages right now. Implements
 * DELIV_IMMEDIATE.
 **/
static void
alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
{
  (void) arg;
  dispatch_flush(d, chan, INT_MAX);
}

/**
 * Set the strategy to be used for delivering messages on the named channel.
 **/
int
tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
                                   deliv_strategy_t strategy)
{
  channel_id_t chan = get_channel_id(msg_channel_name);
  if (BUG(chan == ERROR_ID) ||
      BUG(chan >= smartlist_len(alert_events)))
    return -1;

  switch (strategy) {
    case DELIV_NEVER:
      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
      break;
    case DELIV_PROMPT:
      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
                            smartlist_get(alert_events, chan));
      break;
    case DELIV_IMMEDIATE:
      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
      break;
  }
  return 0;
}

/**
 * Remove all pubsub dispatchers and events from the mainloop.
 **/
void
tor_mainloop_disconnect_pubsub(void)
{
  if (the_pubsub_items) {
    pubsub_items_clear_bindings(the_pubsub_items);
    pubsub_items_free(the_pubsub_items);
  }
  if (alert_events) {
    SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
                      mainloop_event_free(ev));
    smartlist_free(alert_events);
  }
  dispatch_free(the_dispatcher);
}
+23 −0
Original line number Diff line number Diff line
/* Copyright (c) 2001, Matej Pfajfar.
 * Copyright (c) 2001-2004, Roger Dingledine.
 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
 * Copyright (c) 2007-2018, The Tor Project, Inc. */
/* See LICENSE for licensing information */

#ifndef TOR_MAINLOOP_PUBSUB_H
#define TOR_MAINLOOP_PUBSUB_H

struct pubsub_builder_t;

typedef enum {
   DELIV_NEVER=0,
   DELIV_PROMPT,
   DELIV_IMMEDIATE,
} deliv_strategy_t;

int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder);
int tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
                                        deliv_strategy_t strategy);
void tor_mainloop_disconnect_pubsub(void);

#endif
Loading