Commit 8f18647a authored by Roger Dingledine's avatar Roger Dingledine
Browse files

create cells are now queued and processed only when idle

we also queue data cells destined for a circuit that is
pending, and process them once the circuit opens

destroys reach into the queue and remove the pending onion,
along with its collected data cells


svn:r142
parent 147879ab
......@@ -10,8 +10,9 @@ static circuit_t *global_circuitlist=NULL;
char *circuit_state_to_string[] = {
"receiving the onion", /* 0 */
"connecting to firsthop", /* 1 */
"open" /* 2 */
"waiting to process create", /* 1 */
"connecting to firsthop", /* 2 */
"open" /* 3 */
};
/********* END VARIABLES ************/
......@@ -57,7 +58,7 @@ circuit_t *circuit_new(aci_t p_aci, connection_t *p_conn) {
circ->p_aci = p_aci;
circ->p_conn = p_conn;
circ->state = CIRCUIT_STATE_OPEN_WAIT;
circ->state = CIRCUIT_STATE_ONION_WAIT;
/* ACIs */
circ->p_aci = p_aci;
......@@ -128,6 +129,10 @@ int circuit_init(circuit_t *circ, int aci_type) {
unsigned char iv[16];
unsigned char digest1[20];
unsigned char digest2[20];
struct timeval start, end;
int time_passed;
assert(circ);
......@@ -146,8 +151,27 @@ int circuit_init(circuit_t *circ, int aci_type) {
log(LOG_DEBUG,"circuit_init(): aci_type = %u.",aci_type);
gettimeofday(&start,NULL);
circ->n_aci = get_unique_aci_by_addr_port(circ->n_addr, circ->n_port, aci_type);
gettimeofday(&end,NULL);
if(end.tv_usec < start.tv_usec) {
end.tv_sec--;
end.tv_usec += 1000000;
}
time_passed = ((end.tv_sec - start.tv_sec)*1000000) + (end.tv_usec - start.tv_usec);
if(time_passed > 1000) { /* more than 1ms */
log(LOG_NOTICE,"circuit_init(): get_unique_aci just took %d us!",time_passed);
}
log(LOG_DEBUG,"circuit_init(): Chosen ACI %u.",circ->n_aci);
/* keys */
......
......@@ -70,7 +70,6 @@ void command_process_cell(cell_t *cell, connection_t *conn) {
/* do nothing */
break;
case CELL_CREATE:
log(LOG_INFO,"Starting to process create cell.");
command_time_process_cell(cell, conn, &num_create, &create_time,
command_process_create_cell);
break;
......@@ -96,61 +95,19 @@ void command_process_cell(cell_t *cell, connection_t *conn) {
}
}
/* helper function for command_process_create_cell */
static int deliver_onion_to_conn(aci_t aci, unsigned char *onion, uint32_t onionlen, connection_t *conn) {
char *buf;
int buflen, dataleft;
cell_t cell;
assert(aci && onion && onionlen);
buflen = onionlen+4;
buf = malloc(buflen);
if(!buf)
return -1;
log(LOG_DEBUG,"deliver_onion_to_conn(): Setting onion length to %u.",onionlen);
*(uint32_t*)buf = htonl(onionlen);
memcpy((void *)(buf+4),(void *)onion,onionlen);
dataleft = buflen;
while(dataleft > 0) {
memset(&cell,0,sizeof(cell_t));
cell.command = CELL_CREATE;
cell.aci = aci;
if(dataleft >= CELL_PAYLOAD_SIZE)
cell.length = CELL_PAYLOAD_SIZE;
else
cell.length = dataleft;
memcpy(cell.payload, buf+buflen-dataleft, cell.length);
dataleft -= cell.length;
log(LOG_DEBUG,"deliver_onion_to_conn(): Delivering create cell, payload %d bytes.",cell.length);
if(connection_write_cell_to_buf(&cell, conn) < 0) {
log(LOG_DEBUG,"deliver_onion_to_conn(): Could not buffer new create cells. Closing.");
free(buf);
return -1;
}
}
free(buf);
return 0;
}
void command_process_create_cell(cell_t *cell, connection_t *conn) {
circuit_t *circ;
connection_t *n_conn;
int retval;
circ = circuit_get_by_aci_conn(cell->aci, conn);
if(circ && circ->state != CIRCUIT_STATE_OPEN_WAIT) {
log(LOG_DEBUG,"command_process_create_cell(): received CREATE cell, not in open_wait. Dropping.");
if(circ && circ->state != CIRCUIT_STATE_ONION_WAIT) {
log(LOG_DEBUG,"command_process_create_cell(): received CREATE cell, not in onion_wait. Dropping.");
return;
}
if(!circ) { /* if it's not there, create it */
circ = circuit_new(cell->aci, conn);
circ->state = CIRCUIT_STATE_OPEN_WAIT;
circ->state = CIRCUIT_STATE_ONION_WAIT;
circ->onionlen = ntohl(*(int*)cell->payload);
log(LOG_DEBUG,"command_process_create_cell(): Onion length is %u.",circ->onionlen);
if(circ->onionlen > 50000 || circ->onionlen < 1) { /* too big or too small */
......@@ -191,9 +148,17 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) {
return;
}
/* we're all ready to go now. */
circ->state = CIRCUIT_STATE_OPEN;
/* add it to the pending onions queue, and then return */
circ->state = CIRCUIT_STATE_ONION_PENDING;
if(onion_pending_add(circ) < 0) {
log(LOG_DEBUG,"command_process_create_cell(): Failed to queue onion. Closing.");
circuit_close(circ);
}
return;
}
#if 0
conn->onions_handled_this_second++;
log(LOG_DEBUG,"command_process_create_cell(): Processing onion %d for this second.",conn->onions_handled_this_second);
if(conn->onions_handled_this_second > options.OnionsPerSecond) {
......@@ -201,69 +166,7 @@ void command_process_create_cell(cell_t *cell, connection_t *conn) {
circuit_close(circ);
return;
}
if(process_onion(circ, conn) < 0) {
log(LOG_DEBUG,"command_process_create_cell(): Onion processing failed. Closing.");
circuit_close(circ);
return;
}
if(circ->n_addr && circ->n_port) { /* must send create cells to the next router */
n_conn = connection_twin_get_by_addr_port(circ->n_addr,circ->n_port);
if(!n_conn || n_conn->type != CONN_TYPE_OR) {
/* i've disabled making connections through OPs, but it's definitely
* possible here. I'm not sure if it would be a bug or a feature. -RD
*/
/* note also that this will close circuits where the onion has the same
* router twice in a row in the path. i think that's ok. -RD
*/
log(LOG_DEBUG,"command_process_create_cell(): Next router not connected. Closing.");
circuit_close(circ);
return;
}
circ->n_addr = n_conn->addr; /* these are different if we found a twin instead */
circ->n_port = n_conn->port;
circ->n_conn = n_conn;
log(LOG_DEBUG,"command_process_create_cell(): n_conn is %s:%u",n_conn->address,n_conn->port);
/* send the CREATE cells on to the next hop */
pad_onion(circ->onion,circ->onionlen, sizeof(onion_layer_t));
log(LOG_DEBUG,"command_process_create_cell(): Padded the onion with random data.");
retval = deliver_onion_to_conn(circ->n_aci, circ->onion, circ->onionlen, n_conn);
// retval = pack_create(circ->n_aci, circ->onion, circ->onionlen, &cellbuf, &cellbuflen);
free((void *)circ->onion);
circ->onion = NULL;
if (retval == -1) {
log(LOG_DEBUG,"command_process_create_cell(): Could not deliver the onion to next conn. Closing.");
circuit_close(circ);
}
return;
} else { /* this is destined for an exit */
log(LOG_DEBUG,"command_process_create_cell(): Creating new exit connection.");
n_conn = connection_new(CONN_TYPE_EXIT);
if(!n_conn) {
log(LOG_DEBUG,"command_process_create_cell(): connection_new failed. Closing.");
circuit_close(circ);
return;
}
n_conn->state = EXIT_CONN_STATE_CONNECTING_WAIT;
n_conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
n_conn->bandwidth = -1;
n_conn->s = -1; /* not yet valid */
if(connection_add(n_conn) < 0) { /* no space, forget it */
log(LOG_DEBUG,"command_process_create_cell(): connection_add failed. Closing.");
connection_free(n_conn);
circuit_close(circ);
return;
}
circ->n_conn = n_conn;
return;
}
}
#endif
void command_process_sendme_cell(cell_t *cell, connection_t *conn) {
circuit_t *circ;
......@@ -275,8 +178,8 @@ void command_process_sendme_cell(cell_t *cell, connection_t *conn) {
return;
}
if(circ->state == CIRCUIT_STATE_OPEN_WAIT) {
log(LOG_DEBUG,"command_process_sendme_cell(): circuit in open_wait. Dropping.");
if(circ->state == CIRCUIT_STATE_ONION_WAIT) {
log(LOG_DEBUG,"command_process_sendme_cell(): circuit in onion_wait. Dropping.");
return;
}
if(circ->state == CIRCUIT_STATE_OR_WAIT) {
......@@ -330,15 +233,19 @@ void command_process_data_cell(cell_t *cell, connection_t *conn) {
return;
}
if(circ->state == CIRCUIT_STATE_OPEN_WAIT) {
log(LOG_DEBUG,"command_process_data_cell(): circuit in open_wait. Dropping data cell.");
if(circ->state == CIRCUIT_STATE_ONION_WAIT) {
log(LOG_DEBUG,"command_process_data_cell(): circuit in onion_wait. Dropping data cell.");
return;
}
if(circ->state == CIRCUIT_STATE_OR_WAIT) {
log(LOG_DEBUG,"command_process_data_cell(): circuit in or_wait. Dropping data cell.");
return;
}
if(circ->state == CIRCUIT_STATE_ONION_PENDING) {
log(LOG_DEBUG,"command_process_data_cell(): circuit in create_wait. Queueing data cell.");
onion_pending_data_add(circ, cell);
return;
}
/* at this point both circ->n_conn and circ->p_conn are guaranteed to be set */
if(cell->aci == circ->p_aci) { /* it's an outgoing cell */
......@@ -389,6 +296,9 @@ void command_process_destroy_cell(cell_t *cell, connection_t *conn) {
}
log(LOG_DEBUG,"command_process_destroy_cell(): Received for aci %d.",cell->aci);
if(circ->state == CIRCUIT_STATE_ONION_PENDING) {
onion_pending_remove(circ);
}
circuit_remove(circ);
if(cell->aci == circ->p_aci) /* the destroy came from behind */
connection_send_destroy(circ->n_aci, circ->n_conn);
......
......@@ -181,7 +181,7 @@ void config_assign(or_options_t *options, struct config_line *list) {
config_compare(list, "DirRebuildPeriod",CONFIG_TYPE_INT, &options->DirRebuildPeriod) ||
config_compare(list, "DirFetchPeriod", CONFIG_TYPE_INT, &options->DirFetchPeriod) ||
config_compare(list, "KeepalivePeriod", CONFIG_TYPE_INT, &options->KeepalivePeriod) ||
config_compare(list, "OnionsPerSecond", CONFIG_TYPE_INT, &options->OnionsPerSecond) ||
config_compare(list, "MaxOnionsPending",CONFIG_TYPE_INT, &options->MaxOnionsPending) ||
/* float options */
config_compare(list, "CoinWeight", CONFIG_TYPE_DOUBLE, &options->CoinWeight)
......@@ -214,7 +214,7 @@ int getconfig(int argc, char **argv, or_options_t *options) {
options->DirRebuildPeriod = 600;
options->DirFetchPeriod = 6000;
options->KeepalivePeriod = 300;
options->OnionsPerSecond = 50;
options->MaxOnionsPending = 10;
// options->ReconnectPeriod = 6001;
options->Role = ROLE_OR_LISTEN | ROLE_OR_CONNECT_ALL | ROLE_OP_LISTEN | ROLE_AP_LISTEN;
......
......@@ -301,7 +301,7 @@ void check_conn_marked(int i) {
int prepare_for_poll(int *timeout) {
int i;
int need_to_wake_soon = 0;
// int need_to_wake_soon = 0;
connection_t *conn = NULL;
connection_t *tmpconn;
struct timeval now, soonest;
......@@ -436,6 +436,11 @@ int prepare_for_poll(int *timeout) {
}
}
if(onion_pending_check()) {
/* there's an onion pending. check for new things to do, but don't wait any time */
*timeout = 0;
}
return 0;
}
......@@ -497,7 +502,7 @@ int do_main_loop(void) {
*/
/* if the timeout is less than 10, set it to 10 */
if(timeout >= 0 && timeout < 10)
if(timeout > 0 && timeout < 10)
timeout = 10;
/* poll until we have an event, or it's time to do something */
......@@ -511,6 +516,11 @@ int do_main_loop(void) {
}
#endif
if(poll_result == 0) {
/* poll timed out without anything to do. process a pending onion, if any. */
onion_pending_process_one();
}
if(poll_result > 0) { /* we have at least one connection to deal with */
/* do all the reads first, so we can detect closed sockets */
for(i=0;i<nfds;i++)
......
......@@ -5,14 +5,17 @@
#include "or.h"
extern int global_role; /* from main.c */
extern or_options_t options; /* command-line and config-file options */
/********* START VARIABLES **********/
tracked_onion_t *tracked_onions = NULL; /* linked list of tracked onions */
tracked_onion_t *last_tracked_onion = NULL;
static tracked_onion_t *tracked_onions = NULL; /* linked list of tracked onions */
static tracked_onion_t *last_tracked_onion = NULL;
/********* END VARIABLES ************/
static int onion_process(circuit_t *circ);
static int onion_deliver_to_conn(aci_t aci, unsigned char *onion, uint32_t onionlen, connection_t *conn);
int decide_aci_type(uint32_t local_addr, uint16_t local_port,
uint32_t remote_addr, uint16_t remote_port) {
......@@ -27,7 +30,201 @@ int decide_aci_type(uint32_t local_addr, uint16_t local_port,
return ACI_TYPE_LOWER;
}
int process_onion(circuit_t *circ, connection_t *conn) {
struct data_queue_t {
cell_t *cell;
struct data_queue_t *next;
};
struct onion_queue_t {
circuit_t *circ;
struct data_queue_t *data_cells;
struct onion_queue_t *next;
};
/* global (within this file) variables used by the next few functions */
static struct onion_queue_t *ol_list=NULL;
static struct onion_queue_t *ol_tail=NULL;
static int ol_length=0;
int onion_pending_add(circuit_t *circ) {
struct onion_queue_t *tmp;
tmp = malloc(sizeof(struct onion_queue_t));
memset(tmp, 0, sizeof(struct onion_queue_t));
tmp->circ = circ;
if(!ol_tail) {
assert(!ol_list);
assert(!ol_length);
ol_list = tmp;
ol_tail = tmp;
ol_length++;
return 0;
}
assert(ol_list);
assert(!ol_tail->next);
if(ol_length >= options.MaxOnionsPending) {
log(LOG_INFO,"onion_pending_add(): Already have %d onions queued. Closing.", ol_length);
free(tmp);
return -1;
}
ol_length++;
ol_tail->next = tmp;
ol_tail = tmp;
return 0;
}
int onion_pending_check(void) {
if(ol_list)
return 1;
else
return 0;
}
void onion_pending_process_one(void) {
struct data_queue_t *tmpd;
if(!ol_list)
return; /* no onions pending, we're done */
assert(ol_list->circ && ol_list->circ->p_conn);
assert(ol_length > 0);
if(onion_process(ol_list->circ) < 0) {
log(LOG_DEBUG,"onion_pending_process_one(): Failed. Closing.");
onion_pending_remove(ol_list->circ);
circuit_close(ol_list->circ);
} else {
log(LOG_DEBUG,"onion_pending_process_one(): Succeeded. Delivering queued data cells.");
for(tmpd = ol_list->data_cells; tmpd; tmpd=tmpd->next) {
command_process_data_cell(tmpd->cell, ol_list->circ->p_conn);
}
onion_pending_remove(ol_list->circ);
}
return;
}
/* go through ol_list, find the element which points to circ, remove and
* free that element. leave circ itself alone.
*/
void onion_pending_remove(circuit_t *circ) {
struct onion_queue_t *tmpo, *victim;
struct data_queue_t *tmpd;
if(!ol_list)
return; /* nothing here. */
/* first check to see if it's the first entry */
tmpo = ol_list;
if(tmpo->circ == circ) {
/* it's the first one. remove it from the list. */
ol_list = tmpo->next;
if(!ol_list)
ol_tail = NULL;
ol_length--;
victim = tmpo;
} else { /* we need to hunt through the rest of the list */
for( ;tmpo->next && tmpo->next->circ != circ; tmpo=tmpo->next) ;
if(!tmpo->next) {
log(LOG_WARNING,"onion_pending_remove(): circ (p_aci %d), not in list!",circ->p_aci);
return;
}
/* now we know tmpo->next->circ == circ */
victim = tmpo->next;
tmpo->next = victim->next;
if(ol_tail == victim)
ol_tail = tmpo;
ol_length--;
}
/* now victim points to the element that needs to be removed */
/* first dump the attached data cells too, if any */
while(victim->data_cells) {
tmpd = victim->data_cells;
victim->data_cells = tmpd->next;
free(tmpd->cell);
free(tmpd);
}
free(victim);
}
/* a data cell has arrived for a circuit which is still pending. Find
* the right entry in ol_list, and add it to the end of the 'data_cells'
* list.
*/
void onion_pending_data_add(circuit_t *circ, cell_t *cell) {
struct onion_queue_t *tmpo;
struct data_queue_t *tmpd, *newd;
newd = malloc(sizeof(struct data_queue_t));
memset(newd, 0, sizeof(struct data_queue_t));
newd->cell = malloc(sizeof(cell_t));
memcpy(newd->cell, cell, sizeof(cell_t));
for(tmpo=ol_list; tmpo; tmpo=tmpo->next) {
if(tmpo->circ == circ) {
if(!tmpo->data_cells) {
tmpo->data_cells = newd;
return;
}
for(tmpd = tmpo->data_cells; tmpd->next; tmpd=tmpd->next) ;
/* now tmpd->next is null */
tmpd->next = newd;
return;
}
}
}
/* helper function for onion_process */
static int onion_deliver_to_conn(aci_t aci, unsigned char *onion, uint32_t onionlen, connection_t *conn) {
char *buf;
int buflen, dataleft;
cell_t cell;
assert(aci && onion && onionlen);
buflen = onionlen+4;
buf = malloc(buflen);
if(!buf)
return -1;
log(LOG_DEBUG,"onion_deliver_to_conn(): Setting onion length to %u.",onionlen);
*(uint32_t*)buf = htonl(onionlen);
memcpy((void *)(buf+4),(void *)onion,onionlen);
dataleft = buflen;
while(dataleft > 0) {
memset(&cell,0,sizeof(cell_t));
cell.command = CELL_CREATE;
cell.aci = aci;
if(dataleft >= CELL_PAYLOAD_SIZE)
cell.length = CELL_PAYLOAD_SIZE;
else
cell.length = dataleft;
memcpy(cell.payload, buf+buflen-dataleft, cell.length);
dataleft -= cell.length;
log(LOG_DEBUG,"onion_deliver_to_conn(): Delivering create cell, payload %d bytes.",cell.length);
if(connection_write_cell_to_buf(&cell, conn) < 0) {
log(LOG_DEBUG,"onion_deliver_to_conn(): Could not buffer new create cells. Closing.");
free(buf);
return -1;
}
}
free(buf);
return 0;
}
static int onion_process(circuit_t *circ) {
connection_t *n_conn;
int retval;
aci_t aci_type;
struct sockaddr_in me; /* my router identity */
......@@ -66,6 +263,55 @@ int process_onion(circuit_t *circ, connection_t *conn) {
log(LOG_DEBUG,"process_onion(): Onion tracking failed. Will ignore.");
}
/* now we must send create cells to the next router */
if(circ->n_addr && circ->n_port) {
n_conn = connection_twin_get_by_addr_port(circ->n_addr,circ->n_port);
if(!n_conn || n_conn->type != CONN_TYPE_OR) {
/* i've disabled making connections through OPs, but it's definitely
* possible here. I'm not sure if it would be a bug or a feature. -RD
*/
/* note also that this will close circuits where the onion has the same
* router twice in a row in the path. i think that's ok. -RD
*/
log(LOG_DEBUG,"command_process_create_cell(): Next router not connected. Closing.");
return -1;
}
circ->n_addr = n_conn->addr; /* these are different if we found a twin instead */
circ->n_port = n_conn->port;
circ->n_conn = n_conn;
log(LOG_DEBUG,"command_process_create_cell(): n_conn is %s:%u",n_conn->address,n_conn->port);
/* send the CREATE cells on to the next hop */
pad_onion(circ->onion,circ->onionlen, sizeof(onion_layer_t));
log(LOG_DEBUG,"command_process_create_cell(): Padded the onion with random data.");
retval = onion_deliver_to_conn(circ->n_aci, circ->onion, circ->onionlen, n_conn);
free((void *)circ->onion);
circ->onion = NULL;
if (retval == -1) {
log(LOG_DEBUG,"command_process_create_cell(): Could not deliver the onion to next conn. Closing.");
return -1;
}
} else { /* this is destined for an exit */
log(LOG_DEBUG,"command_process_create_cell(): Creating new exit connection.");
n_conn = connection_new(CONN_TYPE_EXIT);
if(!n_conn) {
log(LOG_DEBUG,"command_process_create_cell(): connection_new failed. Closing.");
return -1;
}
n_conn->state = EXIT_CONN_STATE_CONNECTING_WAIT;
n_conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
n_conn->bandwidth = -1;
n_conn->s = -1; /* not yet valid */
if(connection_add(n_conn) < 0) { /* no space, forget it */
log(LOG_DEBUG,"command_process_create_cell(): connection_add failed. Closing.");
connection_free(n_conn);
return -1;
}
circ->n_conn = n_conn;
}
return 0;
}
......
......@@ -114,12 +114,13 @@
#define DIR_CONN_STATE_COMMAND_WAIT 3
#define DIR_CONN_STATE_WRITING 4
#define CIRCUIT_STATE_OPEN_WAIT 0 /* receiving/processing the onion */
#define CIRCUIT_STATE_OR_WAIT 1 /* I'm at the beginning of the path, my firsthop is still connecting */
#define CIRCUIT_STATE_OPEN 2 /* onion processed, ready to send data along the connection */
#define CIRCUIT_STATE_CLOSE_WAIT1 3 /* sent two "destroy" signals, waiting for acks */
#define CIRCUIT_STATE_CLOSE_WAIT2 4 /* received one ack, waiting for one more
(or if just one was sent, waiting for that one */
#define CIRCUIT_STATE_ONION_WAIT 0 /* receiving the onion */
#define CIRCUIT_STATE_ONION_PENDING 1 /* waiting to process the onion */
#define CIRCUIT_STATE_OR_WAIT 2 /* I'm at the beginning of the path, my firsthop is still connecting */
#define CIRCUIT_STATE_OPEN 3 /* onion processed, ready to send data along the connection */
//#define CIRCUIT_STATE_CLOSE_WAIT1 4 /* sent two "destroy" signals, waiting for acks */
//#define CIRCUIT_STATE_CLOSE_WAIT2 5 /* received one ack, waiting for one more
// (or if just one was sent, waiting for that one */
//#define CIRCUIT_STATE_CLOSE 4 /* both acks received, connection is dead */ /* NOT USED */
/* available cipher functions */
......@@ -378,7 +379,7 @@ typedef struct
int DirRebuildPeriod;
int DirFetchPeriod;
int KeepalivePeriod;
int OnionsPerSecond;