Commit d438cf1e authored by Andrea Shepard's avatar Andrea Shepard
Browse files

Implement scheduler mechanism to track lists of channels wanting cells or...

Implement scheduler mechanism to track lists of channels wanting cells or writes; doesn't actually drive the cell flow from it yet
parent 1987157d
......@@ -97,8 +97,10 @@
#define LD_HEARTBEAT (1u<<20)
/** Abstract channel_t code */
#define LD_CHANNEL (1u<<21)
/** Scheduler */
#define LD_SCHED (1u<<22)
/** Number of logging domains in the code. */
#define N_LOGGING_DOMAINS 22
#define N_LOGGING_DOMAINS 23
/** This log message is not safe to send to a callback-based logger
* immediately. Used as a flag, not a log domain. */
......
......@@ -63,6 +63,7 @@ LIBTOR_OBJECTS = \
routerlist.obj \
routerparse.obj \
routerset.obj \
scheduler.obj \
statefile.obj \
status.obj \
transports.obj
......
......@@ -29,6 +29,7 @@
#include "rephist.h"
#include "router.h"
#include "routerlist.h"
#include "scheduler.h"
/* Cell queue structure */
......@@ -788,6 +789,9 @@ channel_free(channel_t *chan)
"Freeing channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
/* Get this one out of the scheduler */
scheduler_release_channel(chan);
/*
* Get rid of cmux policy before we do anything, so cmux policies don't
* see channels in weird half-freed states.
......@@ -863,6 +867,9 @@ channel_force_free(channel_t *chan)
"Force-freeing channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
/* Get this one out of the scheduler */
scheduler_release_channel(chan);
/*
* Get rid of cmux policy before we do anything, so cmux policies don't
* see channels in weird half-freed states.
......@@ -1941,6 +1948,18 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
}
}
/*
* If we're going to a closed/closing state, we don't need scheduling any
* more; in CHANNEL_STATE_MAINT we can't accept writes.
*/
if (to_state == CHANNEL_STATE_CLOSING ||
to_state == CHANNEL_STATE_CLOSED ||
to_state == CHANNEL_STATE_ERROR) {
scheduler_release_channel(chan);
} else if (to_state == CHANNEL_STATE_MAINT) {
scheduler_channel_doesnt_want_writes(chan);
}
/* Tell circuits if we opened and stuff */
if (to_state == CHANNEL_STATE_OPEN) {
channel_do_open_actions(chan);
......
......@@ -25,6 +25,7 @@
#include "relay.h"
#include "router.h"
#include "routerlist.h"
#include "scheduler.h"
/** How many CELL_PADDING cells have we received, ever? */
uint64_t stats_n_padding_cells_processed = 0;
......@@ -867,6 +868,10 @@ channel_tls_handle_state_change_on_orconn(channel_tls_t *chan,
* CHANNEL_STATE_MAINT on this.
*/
channel_change_state(base_chan, CHANNEL_STATE_OPEN);
/* We might have just become writeable; check and tell the scheduler */
if (connection_or_num_cells_writeable(conn) > 0) {
scheduler_channel_wants_writes(base_chan);
}
} else {
/*
* Not open, so from CHANNEL_STATE_OPEN we go to CHANNEL_STATE_MAINT,
......
......@@ -38,6 +38,8 @@
#include "router.h"
#include "routerlist.h"
#include "ext_orport.h"
#include "scheduler.h"
#ifdef USE_BUFFEREVENTS
#include <event2/bufferevent_ssl.h>
#endif
......@@ -595,6 +597,17 @@ connection_or_flushed_some(or_connection_t *conn)
* high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn));
if (datalen < OR_CONN_LOWWATER) {
/* Let the scheduler know */
scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan));
/*
* TODO this will be done from the scheduler, so it will
* need a generic way to ask how many cells a channel can
* accept and if it still wants writes or not to know how
* to account for it in the case that it runs out of cells
* to send first.
*/
while ((conn->chan) && channel_tls_more_to_flush(conn->chan)) {
/* Compute how many more cells we want at most */
n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
......@@ -616,6 +629,30 @@ connection_or_flushed_some(or_connection_t *conn)
return 0;
}
/** This is for channeltls.c to ask how many cells we could accept if
* they were available. */
ssize_t
connection_or_num_cells_writeable(or_connection_t *conn)
{
size_t datalen, cell_network_size;
ssize_t n = 0;
tor_assert(conn);
/*
* If we're under the high water mark, we're potentially
* writeable; note this is different from the calculation above
* used to trigger when to start writing after we've stopped.
*/
datalen = connection_get_outbuf_len(TO_CONN(conn));
if (datalen < OR_CONN_HIGHWATER) {
cell_network_size = get_cell_network_size(conn->wide_circ_ids);
n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
}
return n;
}
/** Connection <b>conn</b> has finished writing and has no bytes left on
* its outbuf.
*
......
......@@ -24,6 +24,7 @@ void connection_or_set_bad_connections(const char *digest, int force);
void connection_or_block_renegotiation(or_connection_t *conn);
int connection_or_reached_eof(or_connection_t *conn);
int connection_or_process_inbuf(or_connection_t *conn);
ssize_t connection_or_num_cells_writeable(or_connection_t *conn);
int connection_or_flushed_some(or_connection_t *conn);
int connection_or_finished_flushing(or_connection_t *conn);
int connection_or_finished_connecting(or_connection_t *conn);
......
......@@ -80,6 +80,7 @@ LIBTOR_A_SOURCES = \
src/or/routerlist.c \
src/or/routerparse.c \
src/or/routerset.c \
src/or/scheduler.c \
src/or/statefile.c \
src/or/status.c \
$(evdns_source) \
......@@ -185,6 +186,7 @@ ORHEADERS = \
src/or/routerlist.h \
src/or/routerset.h \
src/or/routerparse.h \
src/or/scheduler.h \
src/or/statefile.h \
src/or/status.h
......
......@@ -52,6 +52,7 @@
#include "router.h"
#include "routerlist.h"
#include "routerparse.h"
#include "scheduler.h"
#include "statefile.h"
#include "status.h"
#include "util_process.h"
......@@ -2455,6 +2456,14 @@ tor_init(int argc, char *argv[])
log_warn(LD_NET, "Problem initializing libevent RNG.");
}
/*
* Initialize the scheduler - this has to come after
* options_init_from_torrc() sets up libevent - why yes, that seems
* completely sensible to hide the libevent setup in the option parsing
* code!
*/
scheduler_init();
return 0;
}
......@@ -2552,6 +2561,7 @@ tor_free_all(int postfork)
channel_tls_free_all();
channel_free_all();
connection_free_all();
scheduler_free_all();
buf_shrink_freelists(1);
memarea_clear_freelist();
nodelist_free_all();
......
......@@ -39,6 +39,7 @@
#include "router.h"
#include "routerlist.h"
#include "routerparse.h"
#include "scheduler.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
......@@ -2868,6 +2869,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
log_debug(LD_GENERAL, "Made a circuit active.");
}
/* New way: mark this as having waiting cells for the scheduler */
scheduler_channel_has_waiting_cells(chan);
/* TODO remove this once scheduler does it */
if (!channel_has_queued_writes(chan)) {
/* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can
......
/* * Copyright (c) 2013, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file scheduler.c
* \brief Relay scheduling system
**/
#include "or.h"
#include "channel.h"
#include "compat_libevent.h"
#include "scheduler.h"
#ifdef HAVE_EVENT2_EVENT_H
#include <event2/event.h>
#else
#include <event.h>
#endif
/*
* Write scheduling works by keeping track of lists of channels that can
* accept cells, and have cells to write. From the scheduler's perspective,
* a channel can be in four possible states:
*
* 1.) Not open for writes, no cells to send
* - Not much to do here, and the channel will appear in neither list.
* - Transitions from:
* - Open for writes/has cells by simultaneously draining all circuit
* queues and filling the output buffer.
* - Transitions to:
* - Not open for writes/has cells by arrival of cells on an attached
* circuit (this would be driven from append_cell_to_circuit_queue())
* - Open for writes/no cells by a channel type specific path;
* driven from connection_or_flushed_some() for channel_tls_t.
*
* 2.) Open for writes, no cells to send
* - Not much here either; this will be the state an idle but open channel
* can be expected to settle in.
* - Transitions from:
* - Not open for writes/no cells by flushing some of the output
* buffer.
* - Open for writes/has cells by the scheduler moving cells from
* circuit queues to channel output queue, but not having enough
* to fill the output queue.
* - Transitions to:
* - Open for writes/has cells by arrival of new cells on an attached
* circuit, in append_cell_to_circuit_queue()
*
* 3.) Not open for writes, cells to send
* - This is the state of a busy circuit limited by output bandwidth;
* cells have piled up in the circuit queues waiting to be relayed.
* - Transitions from:
* - Not open for writes/no cells by arrival of cells on an attached
* circuit
* - Open for writes/has cells by filling an output buffer without
* draining all cells from attached circuits
* - Transitions to:
* - Opens for writes/has cells by draining some of the output buffer
* via the connection_or_flushed_some() path (for channel_tls_t).
*
* 4.) Open for writes, cells to send
* - This connection is ready to relay some cells and waiting for
* the scheduler to choose it
* - Transitions from:
* - Not open for writes/has cells by the connection_or_flushed_some()
* path
* - Open for writes/no cells by the append_cell_to_circuit_queue()
* path
* - Transitions to:
* - Not open for writes/no cells by draining all circuit queues and
* simultaneously filling the output buffer.
* - Not open for writes/has cells by writing enough cells to fill the
* output buffer
* - Open for writes/no cells by draining all attached circuit queues
* without also filling the output buffer
*
* Other event-driven parts of the code move channels between these scheduling
* states by calling scheduler functions; the scheduler only runs on open-for-
* writes/has-cells channels and is the only path for those to transition to
* other states. The scheduler_run() function gives us the opportunity to do
* scheduling work, and is called from other scheduler functions whenever a
* state transition occurs, and periodically from the main event loop.
*/
/* Scheduler global data structures */
/*
* We keep lists of channels that either have cells queued, can accept
* writes, or both (states 2, 3 and 4 above) - no explicit list of state
* 1 channels is kept, so we don't have to worry about registering new
* channels here or anything. The scheduler will learn about them when
* it needs to. We can check how many channels in state 4 in O(1), so
* the test whether we have anything to do in scheduler_run() is fast
* and there's no harm in calling it opportunistically whenever we get
* the chance.
*
* Note that it takes time O(n) to search for a channel in these smartlists
* or move one; I don't think the number of channels on a relay will be large
* enough for this to be a severe problem, but this would benefit from using
* a doubly-linked list rather than smartlist_t, together with a hash map from
* channel identifiers to pointers to list entries, so we can perform those
* operations in O(log(n)).
*/
/* List of channels that can write but have no cells (state 2 above) */
static smartlist_t *channels_waiting_for_cells = NULL;
/* List of channels with cells waiting to write (state 3 above) */
static smartlist_t *channels_waiting_to_write = NULL;
/* List of channels that can write and have cells (pending work) */
static smartlist_t *channels_pending = NULL;
/*
* This event runs the scheduler from its callback, and is manually
* activated whenever a channel enters open for writes/cells to send.
*/
static struct event *run_sched_ev = NULL;
static struct timeval run_sched_tv;
/* Scheduler static function declarations */
static void scheduler_evt_callback(evutil_socket_t fd,
short events, void *arg);
static int scheduler_more_work(void);
static void scheduler_retrigger(void);
static void scheduler_trigger(void);
/* Scheduler function implementations */
/** Free everything and shut down the scheduling system */
void
scheduler_free_all(void)
{
log_debug(LD_SCHED, "Shutting down scheduler");
if (run_sched_ev) {
event_del(run_sched_ev);
tor_event_free(run_sched_ev);
run_sched_ev = NULL;
}
if (channels_waiting_for_cells) {
smartlist_free(channels_waiting_for_cells);
channels_waiting_for_cells = NULL;
}
if (channels_waiting_to_write) {
smartlist_free(channels_waiting_to_write);
channels_waiting_to_write = NULL;
}
if (channels_pending) {
smartlist_free(channels_pending);
channels_pending = NULL;
}
}
/*
* Scheduler event callback; this should get triggered once per event loop
* if any scheduling work was created during the event loop.
*/
static void
scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
{
log_debug(LD_SCHED, "Scheduler event callback called");
tor_assert(run_sched_ev);
/* Run the scheduler */
scheduler_run();
/* Do we have more work to do? */
if (scheduler_more_work()) scheduler_retrigger();
}
/** Mark a channel as no longer ready to accept writes */
void
scheduler_channel_doesnt_want_writes(channel_t *chan)
{
tor_assert(chan);
tor_assert(channels_waiting_for_cells);
tor_assert(channels_waiting_to_write);
tor_assert(channels_pending);
/* If it's already in pending, we can put it in waiting_to_write */
if (smartlist_contains(channels_pending, chan)) {
/*
* It's in channels_pending, so it shouldn't be in any of
* the other lists. It can't write any more, so it goes to
* channels_waiting_to_write.
*/
smartlist_remove(channels_pending, chan);
smartlist_add(channels_waiting_to_write, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from pending "
"to waiting_to_write",
U64_PRINTF_ARG(chan->global_identifier), chan);
} else {
/*
* It's not in pending, so it can't become waiting_to_write; it's
* either not in any of the lists (nothing to do) or it's already in
* waiting_for_cells (remove it, can't write any more).
*/
if (smartlist_contains(channels_waiting_for_cells, chan)) {
smartlist_remove(channels_waiting_for_cells, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p left waiting_for_cells",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
}
}
/** Mark a channel as having waiting cells */
void
scheduler_channel_has_waiting_cells(channel_t *chan)
{
int became_pending = 0;
tor_assert(chan);
tor_assert(channels_waiting_for_cells);
tor_assert(channels_waiting_to_write);
tor_assert(channels_pending);
/* First, check if this one also writeable */
if (smartlist_contains(channels_waiting_for_cells, chan)) {
/*
* It's in channels_waiting_for_cells, so it shouldn't be in any of
* the other lists. It has waiting cells now, so it goes to
* channels_pending.
*/
smartlist_remove(channels_waiting_for_cells, chan);
smartlist_add(channels_pending, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_for_cells "
"to pending",
U64_PRINTF_ARG(chan->global_identifier), chan);
became_pending = 1;
} else {
/*
* It's not in waiting_for_cells, so it can't become pending; it's
* either not in any of the lists (we add it to waiting_to_write)
* or it's already in waiting_to_write or pending (we do nothing)
*/
if (!(smartlist_contains(channels_waiting_to_write, chan) ||
smartlist_contains(channels_pending, chan))) {
smartlist_add(channels_waiting_to_write, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p entered waiting_to_write",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
}
/*
* If we made a channel pending, we potentially have scheduling work
* to do.
*/
if (became_pending) scheduler_retrigger();
}
/** Set up the scheduling system */
void
scheduler_init(void)
{
log_debug(LD_SCHED, "Initting scheduler");
tor_assert(!run_sched_ev);
run_sched_ev = tor_event_new(tor_libevent_get_base(), -1,
0, scheduler_evt_callback, NULL);
channels_waiting_for_cells = smartlist_new();
channels_waiting_to_write = smartlist_new();
channels_pending = smartlist_new();
}
/** Check if there's more scheduling work */
static int
scheduler_more_work(void)
{
tor_assert(channels_pending);
return (smartlist_len(channels_pending) > 0) ? 1 : 0;
}
/** Retrigger the scheduler in a way safe to use from the callback */
static void
scheduler_retrigger(void)
{
tor_assert(run_sched_ev);
if (!evtimer_pending(run_sched_ev, NULL)) {
log_debug(LD_SCHED, "Retriggering scheduler event");
event_del(run_sched_ev);
evtimer_add(run_sched_ev, &run_sched_tv);
}
}
/** Notify the scheduler of a channel being closed */
void
scheduler_release_channel(channel_t *chan)
{
tor_assert(chan);
tor_assert(channels_waiting_for_cells);
tor_assert(channels_waiting_to_write);
tor_assert(channels_pending);
smartlist_remove(channels_waiting_for_cells, chan);
smartlist_remove(channels_waiting_to_write, chan);
smartlist_remove(channels_pending, chan);
}
/** Run the scheduling algorithm if necessary */
void
scheduler_run(void)
{
smartlist_t *tmp = NULL;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
/*
* TODO make this work properly
*
* For now, just empty the pending list and log that we saw stuff in it
*/
tmp = channels_pending;
channels_pending = smartlist_new();
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
log_debug(LD_SCHED,
"Scheduler saw pending channel " U64_FORMAT " at %p",
U64_PRINTF_ARG(chan->global_identifier), chan);
} SMARTLIST_FOREACH_END(chan);
smartlist_free(tmp);
}
/** Trigger the scheduling event so we run the scheduler later */
static void
scheduler_trigger(void)
{
log_debug(LD_SCHED, "Triggering scheduler event");
tor_assert(run_sched_ev);
run_sched_tv.tv_sec = 0;
run_sched_tv.tv_usec = 0;
evtimer_add(run_sched_ev, &run_sched_tv);
}
/** Mark a channel as ready to accept writes */
void
scheduler_channel_wants_writes(channel_t *chan)
{
int became_pending = 0;
tor_assert(chan);
tor_assert(channels_waiting_for_cells);
tor_assert(channels_waiting_to_write);
tor_assert(channels_pending);
/* If it's already in waiting_to_write, we can put it in pending */
if (smartlist_contains(channels_waiting_to_write, chan)) {
/*
* It's in channels_waiting_to_write, so it shouldn't be in any of
* the other lists. It can write now, so it goes to channels_pending.
*/
smartlist_remove(channels_waiting_to_write, chan);
smartlist_add(channels_pending, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p went from waiting_to_write "
"to pending",
U64_PRINTF_ARG(chan->global_identifier), chan);
became_pending = 1;
} else {
/*
* It's not in waiting_to_write, so it can't become pending; it's
* either not in any of the lists (we add it to waiting_for_cells)
* or it's already in waiting_for_cells or pending (we do nothing)
*/
if (!(smartlist_contains(channels_waiting_for_cells, chan) ||
smartlist_contains(channels_pending, chan))) {
smartlist_add(channels_waiting_for_cells, chan);
log_debug(LD_SCHED,
"Channel " U64_FORMAT " at %p entered waiting_for_cells",
U64_PRINTF_ARG(chan->global_identifier), chan);
}
}
/*
* If we made a channel pending, we potentially have scheduling work
* to do.
*/
if (became_pending) scheduler_retrigger();
}
/* * Copyright (c) 2013, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file scheduler.h
* \brief Header file for scheduler.c
**/
#ifndef TOR_SCHEDULER_H
#define TOR_SCHEDULER_H