Commit 2dda97e8 authored by Roger Dingledine's avatar Roger Dingledine
Browse files

implemented cpuworkers

please poke at it and report bugs

still needs polishing, and only handles onions now (should handle
OR handshakes too)


svn:r402
parent d43f145d
......@@ -88,6 +88,21 @@ void tv_addms(struct timeval *a, long ms) {
a->tv_usec %= 1000000;
}
/* a wrapper for write(2) that makes sure to write all count bytes.
* Only use if fd is a blocking socket. */
int write_all(int fd, const void *buf, size_t count) {
int written = 0;
int result;
while(written != count) {
result = write(fd, buf+written, count-written);
if(result<0)
return -1;
written += result;
}
return count;
}
void set_socket_nonblocking(int socket)
{
#ifdef MS_WINDOWS
......
......@@ -51,6 +51,8 @@ void tv_addms(struct timeval *a, long ms);
void tv_add(struct timeval *a, struct timeval *b);
int tv_cmp(struct timeval *a, struct timeval *b);
int write_all(int fd, const void *buf, size_t count);
void set_socket_nonblocking(int socket);
/* Minimalist interface to run a void function in the background. On
......
......@@ -7,14 +7,14 @@ bin_PROGRAMS = or
or_SOURCES = buffers.c circuit.c command.c connection.c \
connection_exit.c connection_ap.c connection_or.c config.c \
onion.c routers.c directory.c dns.c connection_edge.c \
main.c tor_main.c
cpuworker.c main.c tor_main.c
or_LDADD = ../common/libor.a
test_SOURCES = buffers.c circuit.c command.c connection.c \
connection_exit.c connection_ap.c connection_or.c config.c \
onion.c routers.c directory.c dns.c connection_edge.c \
main.c test.c
cpuworker.c main.c test.c
test_LDADD = ../common/libor.a
......
......@@ -97,13 +97,13 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) {
memcpy(circ->onionskin,cell->payload,cell->length);
/* add it to the pending onions queue, and then return */
if(onion_pending_add(circ) < 0) {
log_fn(LOG_DEBUG,"Failed to queue onionskin. Closing.");
/* hand it off to the cpuworkers, and then return */
if(assign_to_cpuworker(NULL, CPUWORKER_TASK_ONION, circ) < 0) {
log_fn(LOG_DEBUG,"Failed to hand off onionskin. Closing.");
circuit_close(circ);
return;
}
log_fn(LOG_DEBUG,"success: queued onionskin.");
return;
log_fn(LOG_DEBUG,"success: handed off onionskin.");
}
void command_process_created_cell(cell_t *cell, connection_t *conn) {
......@@ -153,7 +153,6 @@ void command_process_created_cell(cell_t *cell, connection_t *conn) {
return;
}
}
return;
}
void command_process_relay_cell(cell_t *cell, connection_t *conn) {
......
......@@ -21,7 +21,8 @@ char *conn_type_to_string[] = {
"App", /* 7 */
"Dir listener",/* 8 */
"Dir", /* 9 */
"DNS master", /* 10 */
"DNS worker", /* 10 */
"CPU worker", /* 11 */
};
char *conn_state_to_string[][15] = {
......@@ -58,7 +59,11 @@ char *conn_state_to_string[][15] = {
"reading", /* 2 */
"awaiting command", /* 3 */
"writing" }, /* 4 */
{ "open" }, /* dns master, 0 */
{ "idle", /* dns worker, 0 */
"busy" }, /* 1 */
{ "idle", /* cpu worker, 0 */
"busy with onion", /* 1 */
"busy with handshake" }, /* 2 */
};
/********* END VARIABLES ************/
......@@ -542,6 +547,8 @@ int connection_process_inbuf(connection_t *conn) {
return connection_dir_process_inbuf(conn);
case CONN_TYPE_DNSWORKER:
return connection_dns_process_inbuf(conn);
case CONN_TYPE_CPUWORKER:
return connection_cpu_process_inbuf(conn);
default:
log_fn(LOG_DEBUG,"got unexpected conn->type.");
return -1;
......@@ -679,6 +686,8 @@ int connection_finished_flushing(connection_t *conn) {
return connection_dir_finished_flushing(conn);
case CONN_TYPE_DNSWORKER:
return connection_dns_finished_flushing(conn);
case CONN_TYPE_CPUWORKER:
return connection_cpu_finished_flushing(conn);
default:
log_fn(LOG_DEBUG,"got unexpected conn->type.");
return -1;
......
/* Copyright 2003 Roger Dingledine. */
/* See LICENSE for licensing information */
/* $Id$ */
#include "or.h"
extern or_options_t options; /* command-line and config-file options */
#define MAX_QUESTIONLEN 256
#define MAX_CPUWORKERS 17
#define MIN_CPUWORKERS 2
#define LEN_ONION_RESPONSE (1+DH_KEY_LEN+32)
#define LEN_HANDSHAKE_RESPONSE (somethingelse)
int num_cpuworkers=0;
int num_cpuworkers_busy=0;
int cpuworker_main(void *data);
static int spawn_cpuworker(void);
static void spawn_enough_cpuworkers(void);
static int process_pending_task(connection_t *cpuworker);
void cpu_init(void) {
spawn_enough_cpuworkers();
}
int connection_cpu_finished_flushing(connection_t *conn) {
assert(conn && conn->type == CONN_TYPE_CPUWORKER);
connection_stop_writing(conn);
return 0;
}
int connection_cpu_process_inbuf(connection_t *conn) {
unsigned char buf[MAX_QUESTIONLEN];
assert(conn && conn->type == CONN_TYPE_CPUWORKER);
if(conn->inbuf_reached_eof) {
log_fn(LOG_ERR,"Read eof. Worker dying.");
if(conn->state != CPUWORKER_STATE_IDLE) {
onion_pending_remove(conn->circ);
circuit_close(conn->circ);
conn->circ = NULL;
num_cpuworkers_busy--;
}
num_cpuworkers--;
return -1;
}
if(conn->state == CPUWORKER_STATE_BUSY_ONION) {
assert(conn->circ);
if(conn->inbuf_datalen < LEN_ONION_RESPONSE) /* entire answer available? */
return 0; /* not yet */
assert(conn->inbuf_datalen == LEN_ONION_RESPONSE);
connection_fetch_from_buf(buf,LEN_ONION_RESPONSE,conn);
if(*buf == 0 || conn->circ->p_conn == NULL ||
onionskin_process(conn->circ, buf+1, buf+1+DH_KEY_LEN) < 0) {
log_fn(LOG_DEBUG,"decoding onion, onionskin_process, or p_conn failed. Closing.");
// onion_pending_remove(conn->circ);
circuit_close(conn->circ);
} else {
log_fn(LOG_DEBUG,"onionskin_process succeeded. Yay.");
// onion_pending_remove(conn->circ);
}
conn->circ = NULL;
} else {
assert(conn->state == CPUWORKER_STATE_BUSY_HANDSHAKE);
assert(0); /* don't ask me to do handshakes yet */
}
conn->state = CPUWORKER_STATE_IDLE;
num_cpuworkers_busy--;
process_pending_task(conn); /* discard return value */
return 0;
}
int cpuworker_main(void *data) {
unsigned char question[MAX_QUESTIONLEN];
unsigned char question_type;
int *fdarray = data;
int fd;
int len;
/* variables for onion processing */
unsigned char keys[32];
unsigned char response[DH_KEY_LEN];
unsigned char buf[MAX_QUESTIONLEN];
close(fdarray[0]); /* this is the side of the socketpair the parent uses */
fd = fdarray[1]; /* this side is ours */
for(;;) {
if(read(fd, &question_type, 1) != 1) {
log_fn(LOG_INFO,"read type failed. Exiting.");
spawn_exit();
}
assert(question_type == CPUWORKER_TASK_ONION ||
question_type == CPUWORKER_TASK_HANDSHAKE);
if(question_type == CPUWORKER_TASK_ONION)
len = DH_ONIONSKIN_LEN;
else
len = 0; /* XXX */
if(read(fd, question, len) != len) {
log(LOG_INFO,"cpuworker_main(): read question failed. Exiting.");
spawn_exit();
}
if(question_type == CPUWORKER_TASK_ONION) {
if(onion_skin_server_handshake(question, get_privatekey(),
response, keys, 32) < 0) {
/* failure */
log_fn(LOG_ERR,"onion_skin_server_handshake failed.");
memset(buf,0,LEN_ONION_RESPONSE); /* send all zeros for failure */
} else {
/* success */
log_fn(LOG_DEBUG,"onion_skin_server_handshake succeeded.");
buf[0] = 1; /* 1 means success */
memcpy(buf+1,response,DH_KEY_LEN);
memcpy(buf+1+DH_KEY_LEN,keys,32);
}
if(write_all(fd, buf, LEN_ONION_RESPONSE) != LEN_ONION_RESPONSE) {
log_fn(LOG_INFO,"writing response buf failed. Exiting.");
spawn_exit();
}
log_fn(LOG_DEBUG,"finished writing response/keys.");
} else { /* we've been asked to do a handshake. not implemented yet. */
spawn_exit();
}
}
return 0; /* windows wants this function to return an int */
}
static int spawn_cpuworker(void) {
int fd[2];
connection_t *conn;
if(tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) {
perror("socketpair");
exit(1);
}
spawn_func(cpuworker_main, (void*)fd);
log_fn(LOG_DEBUG,"just spawned a worker.");
close(fd[1]); /* we don't need the worker's side of the pipe */
conn = connection_new(CONN_TYPE_CPUWORKER);
if(!conn) {
close(fd[0]);
return -1;
}
set_socket_nonblocking(fd[0]);
/* set up conn so it's got all the data we need to remember */
conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
conn->bandwidth = -1;
conn->s = fd[0];
if(connection_add(conn) < 0) { /* no space, forget it */
log_fn(LOG_INFO,"connection_add failed. Giving up.");
connection_free(conn); /* this closes fd[0] */
return -1;
}
conn->state = CPUWORKER_STATE_IDLE;
connection_start_reading(conn);
return 0; /* success */
}
static void spawn_enough_cpuworkers(void) {
int num_cpuworkers_needed = options.NumCpus + 1;
if(num_cpuworkers_needed < MIN_CPUWORKERS)
num_cpuworkers_needed = MIN_CPUWORKERS;
if(num_cpuworkers_needed > MAX_CPUWORKERS)
num_cpuworkers_needed = MAX_CPUWORKERS;
while(num_cpuworkers < num_cpuworkers_needed) {
if(spawn_cpuworker() < 0) {
log_fn(LOG_ERR,"spawn failed!");
return;
}
num_cpuworkers++;
}
}
static int process_pending_task(connection_t *cpuworker) {
circuit_t *circ;
assert(cpuworker);
/* for now only process onion tasks */
circ = onion_next_task();
if(!circ)
return 0;
return assign_to_cpuworker(cpuworker, CPUWORKER_TASK_ONION, circ);
}
/* if cpuworker is defined, assert that he's idle, and use him. else,
* look for an idle cpuworker and use him. if none idle, queue task onto
* the pending onion list and return.
* If question_type is CPUWORKER_TASK_ONION then task is a circ, else
* (something else)
*/
int assign_to_cpuworker(connection_t *cpuworker, unsigned char question_type,
void *task) {
circuit_t *circ;
if(question_type == CPUWORKER_TASK_ONION) {
circ = task;
if(num_cpuworkers_busy == num_cpuworkers) {
log_fn(LOG_DEBUG,"No idle cpuworkers. Queuing.");
if(onion_pending_add(circ) < 0)
return -1;
return 0;
}
if(!cpuworker)
cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER, CPUWORKER_STATE_IDLE);
assert(cpuworker);
cpuworker->circ = circ;
cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
num_cpuworkers_busy++;
if(connection_write_to_buf(&question_type, 1, cpuworker) < 0 ||
connection_write_to_buf(circ->onionskin, DH_ONIONSKIN_LEN, cpuworker) < 0) {
log_fn(LOG_NOTICE,"Write failed. Closing worker and failing circ.");
cpuworker->marked_for_close = 1;
return -1;
}
}
return 0;
}
/*
Local Variables:
mode:c
indent-tabs-mode:nil
c-basic-offset:2
End:
*/
......@@ -16,15 +16,15 @@
#define MIN_DNSWORKERS 3
#define MAX_IDLE_DNSWORKERS 10
int num_workers=0;
int num_workers_busy=0;
int num_dnsworkers=0;
int num_dnsworkers_busy=0;
static void purge_expired_resolves(uint32_t now);
static int dns_assign_to_worker(connection_t *exitconn);
static int assign_to_dnsworker(connection_t *exitconn);
static void dns_found_answer(char *question, uint32_t answer);
int dnsworker_main(void *data);
static int dns_spawn_worker(void);
static void spawn_enough_workers(void);
static int spawn_dnsworker(void);
static void spawn_enough_dnsworkers(void);
struct pending_connection_t {
struct connection_t *conn;
......@@ -60,7 +60,7 @@ static void init_cache_tree(void) {
void dns_init(void) {
init_cache_tree();
spawn_enough_workers();
spawn_enough_dnsworkers();
}
static struct cached_resolve *oldest_cached_resolve = NULL; /* linked list, */
......@@ -144,42 +144,42 @@ int dns_resolve(connection_t *exitconn) {
newest_cached_resolve = resolve;
SPLAY_INSERT(cache_tree, &cache_root, resolve);
return dns_assign_to_worker(exitconn);
return assign_to_dnsworker(exitconn);
}
assert(0);
return 0; /* not reached; keep gcc happy */
}
static int dns_assign_to_worker(connection_t *exitconn) {
static int assign_to_dnsworker(connection_t *exitconn) {
connection_t *dnsconn;
unsigned char len;
spawn_enough_workers(); /* respawn here, to be sure there are enough */
spawn_enough_dnsworkers(); /* respawn here, to be sure there are enough */
dnsconn = connection_get_by_type_state(CONN_TYPE_DNSWORKER, DNSWORKER_STATE_IDLE);
if(!dnsconn) {
log(LOG_INFO,"dns_assign_to_worker(): no idle dns workers. Failing.");
log_fn(LOG_INFO,"no idle dns workers. Failing.");
dns_cancel_pending_resolve(exitconn->address, NULL);
return -1;
}
dnsconn->address = strdup(exitconn->address);
dnsconn->state = DNSWORKER_STATE_BUSY;
num_workers_busy++;
num_dnsworkers_busy++;
len = strlen(dnsconn->address);
/* FFFF we should have it retry if the first worker bombs out */
if(connection_write_to_buf(&len, 1, dnsconn) < 0 ||
connection_write_to_buf(dnsconn->address, len, dnsconn) < 0) {
log(LOG_NOTICE,"dns_assign_to_worker(): Write failed. Closing worker and failing resolve.");
log_fn(LOG_NOTICE,"Write failed. Closing worker and failing resolve.");
dnsconn->marked_for_close = 1;
dns_cancel_pending_resolve(exitconn->address, NULL);
return -1;
}
// log(LOG_DEBUG,"dns_assign_to_worker(): submitted '%s'", exitconn->address);
// log_fn(LOG_DEBUG,"submitted '%s'", exitconn->address);
return 0;
}
......@@ -301,9 +301,9 @@ int connection_dns_process_inbuf(connection_t *conn) {
log(LOG_ERR,"connection_dnsworker_process_inbuf(): Read eof. Worker dying.");
if(conn->state == DNSWORKER_STATE_BUSY) {
dns_cancel_pending_resolve(conn->address, NULL);
num_workers_busy--;
num_dnsworkers_busy--;
}
num_workers--;
num_dnsworkers--;
return -1;
}
......@@ -319,7 +319,7 @@ int connection_dns_process_inbuf(connection_t *conn) {
free(conn->address);
conn->address = NULL;
conn->state = DNSWORKER_STATE_IDLE;
num_workers_busy--;
num_dnsworkers_busy--;
return 0;
}
......@@ -328,8 +328,8 @@ int dnsworker_main(void *data) {
char question[MAX_ADDRESSLEN];
unsigned char question_len;
struct hostent *rent;
int fd;
int *fdarray = data;
int fd;
close(fdarray[0]); /* this is the side of the socketpair the parent uses */
fd = fdarray[1]; /* this side is ours */
......@@ -351,14 +351,13 @@ int dnsworker_main(void *data) {
rent = gethostbyname(question);
if (!rent) {
log(LOG_INFO,"dnsworker_main(): Could not resolve dest addr %s. Returning nulls.",question);
/* XXX it's conceivable write could return 1 through 3. but that's never gonna happen, right? */
if(write(fd, "\0\0\0\0", 4) != 4) {
if(write_all(fd, "\0\0\0\0", 4) != 4) {
log(LOG_INFO,"dnsworker_main(): writing nulls failed. Exiting.");
spawn_exit();
}
} else {
assert(rent->h_length == 4); /* break to remind us if we move away from ipv4 */
if(write(fd, rent->h_addr, 4) != 4) {
if(write_all(fd, rent->h_addr, 4) != 4) {
log(LOG_INFO,"dnsworker_main(): writing answer failed. Exiting.");
spawn_exit();
}
......@@ -368,7 +367,7 @@ int dnsworker_main(void *data) {
return 0; /* windows wants this function to return an int */
}
static int dns_spawn_worker(void) {
static int spawn_dnsworker(void) {
int fd[2];
connection_t *conn;
......@@ -378,7 +377,7 @@ static int dns_spawn_worker(void) {
}
spawn_func(dnsworker_main, (void*)fd);
log(LOG_DEBUG,"dns_spawn_worker(): just spawned a worker.");
log_fn(LOG_DEBUG,"just spawned a worker.");
close(fd[1]); /* we don't need the worker's side of the pipe */
conn = connection_new(CONN_TYPE_DNSWORKER);
......@@ -395,7 +394,7 @@ static int dns_spawn_worker(void) {
conn->s = fd[0];
if(connection_add(conn) < 0) { /* no space, forget it */
log(LOG_INFO,"dns_spawn_worker(): connection_add failed. Giving up.");
log_fn(LOG_INFO,"connection_add failed. Giving up.");
connection_free(conn); /* this closes fd[0] */
return -1;
}
......@@ -406,12 +405,12 @@ static int dns_spawn_worker(void) {
return 0; /* success */
}
static void spawn_enough_workers(void) {
int num_workers_needed; /* aim to have 1 more than needed,
static void spawn_enough_dnsworkers(void) {
int num_dnsworkers_needed; /* aim to have 1 more than needed,
* but no less than min and no more than max */
connection_t *dnsconn;
if(num_workers_busy == MAX_DNSWORKERS) {
if(num_dnsworkers_busy == MAX_DNSWORKERS) {
/* We always want at least one worker idle.
* So find the oldest busy worker and kill it.
*/
......@@ -422,28 +421,28 @@ static void spawn_enough_workers(void) {
dns_cancel_pending_resolve(dnsconn->address, NULL);
dnsconn->marked_for_close = 1;
num_workers_busy--;
num_dnsworkers_busy--;
}
if(num_workers_busy >= MIN_DNSWORKERS)
num_workers_needed = num_workers_busy+1;
if(num_dnsworkers_busy >= MIN_DNSWORKERS)
num_dnsworkers_needed = num_dnsworkers_busy+1;
else
num_workers_needed = MIN_DNSWORKERS;
num_dnsworkers_needed = MIN_DNSWORKERS;
while(num_workers < num_workers_needed) {
if(dns_spawn_worker() < 0) {
log(LOG_ERR,"spawn_enough_workers(): spawn failed!");
while(num_dnsworkers < num_dnsworkers_needed) {
if(spawn_dnsworker() < 0) {
log(LOG_ERR,"spawn_enough_dnsworkers(): spawn failed!");
return;
}
num_workers++;
num_dnsworkers++;
}
while(num_workers > num_workers_needed+MAX_IDLE_DNSWORKERS) { /* too many idle? */
while(num_dnsworkers > num_dnsworkers_needed+MAX_IDLE_DNSWORKERS) { /* too many idle? */
/* cull excess workers */
dnsconn = connection_get_by_type_state(CONN_TYPE_DNSWORKER, DNSWORKER_STATE_IDLE);
assert(dnsconn);
dnsconn->marked_for_close = 1;
num_workers--;
num_dnsworkers--;
}
}
......
......@@ -434,12 +434,7 @@ static int prepare_for_poll(int *timeout) {
current_second = now.tv_sec; /* remember which second it is, for next time */
}
if(onion_pending_check()) {
/* there's an onion pending. check for new things to do, but don't wait any time */
*timeout = 0;
} else {
*timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */
}
*timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */
return 0;
}
......@@ -502,9 +497,10 @@ static int do_main_loop(void) {
return -1;
}
set_privatekey(prkey);
cpu_init(); /* launch cpuworkers. Need to do this *after* we've read the private key. */
}
/* load the private key, if we're supposed to have one */
/* load the directory private key, if we're supposed to have one */
if(options.DirPort) {
prkey = crypto_new_pk_env(CRYPTO_PK_RSA);
if (!prkey) {
......@@ -523,8 +519,8 @@ static int do_main_loop(void) {
* and start the listeners
*/
retry_all_connections((uint16_t) options.ORPort,
(uint16_t) options.APPort,
(uint16_t) options.DirPort);
(uint16_t) options.APPort,
(uint16_t) options.DirPort);
for(;;) {
#ifndef MS_WIN32 /* do signal stuff only on unix */
......@@ -568,11 +564,6 @@ static int do_main_loop(void) {
}
#endif
if(poll_result == 0) {
/* poll timed out without anything to do. process a pending onion, if any. */
onion_pending_process_one();
}
if(poll_result > 0) { /* we have at least one connection to deal with */
/* do all the reads and errors first, so we can detect closed sockets */
for(i=0;i<nfds;i++)
......
......@@ -7,7 +7,6 @@
extern or_options_t options; /* command-line and config-file options */
static int count_acceptable_routers(routerinfo_t **rarray, int rarray_len);
static int onionskin_process(circuit_t *circ);
int decide_aci_type(uint32_t local_addr, uint16_t local_port,
uint32_t remote_addr, uint16_t remote_port) {
......@@ -47,7 +46,7 @@ int onion_pending_add(circuit_t *circ) {