Commit cb8212bf authored by Roger Dingledine's avatar Roger Dingledine
Browse files

clean up receiver buckets; prepare for payloads in relay_end; note a few bugs


svn:r502
parent 798bb6ab
......@@ -125,6 +125,7 @@ static aci_t get_unique_aci_by_addr_port(uint32_t addr, uint16_t port, int aci_t
high_bit = (aci_type == ACI_TYPE_HIGHER) ? 1<<15 : 0;
conn = connection_exact_get_by_addr_port(addr,port);
/* XXX race condition: if conn is marked_for_close it won't be noticed */
if (!conn)
return (1|high_bit); /* No connection exists; conflict is impossible. */
......@@ -910,7 +911,7 @@ int circuit_truncated(circuit_t *circ, crypt_path_t *layer) {
for(stream = circ->p_streams; stream; stream=stream->next_stream) {
if(stream->cpath_layer == victim) {
log_fn(LOG_INFO, "Marking stream %d for close.", *(int*)stream->stream_id);
stream->marked_for_close = 1;
/*ENDCLOSE*/ stream->marked_for_close = 1;
}
}
......
......@@ -88,7 +88,7 @@ static void command_process_create_cell(cell_t *cell, connection_t *conn) {
circ = circuit_get_by_aci_conn(cell->aci, conn);
if(circ) {
log_fn(LOG_WARNING,"received CREATE cell for known circ. Dropping.");
log_fn(LOG_WARNING,"received CREATE cell (aci %d) for known circ. Dropping.", cell->aci);
return;
}
......@@ -118,7 +118,7 @@ static void command_process_created_cell(cell_t *cell, connection_t *conn) {
circ = circuit_get_by_aci_conn(cell->aci, conn);
if(!circ) {
log_fn(LOG_WARNING,"received CREATED cell for unknown circ. Dropping.");
log_fn(LOG_WARNING,"received CREATED cell (aci %d) for unknown circ. Dropping.", cell->aci);
return;
}
......
......@@ -83,9 +83,6 @@ connection_t *connection_new(int type) {
conn->inbuf = buf_new();
conn->outbuf = buf_new();
conn->receiver_bucket = 50000; /* should be enough to do the handshake */
conn->bandwidth = conn->receiver_bucket / 10; /* give it a default */
conn->timestamp_created = now.tv_sec;
conn->timestamp_lastread = now.tv_sec;
conn->timestamp_lastwritten = now.tv_sec;
......@@ -149,8 +146,6 @@ int connection_create_listener(struct sockaddr_in *bindaddr, int type) {
conn = connection_new(type);
conn->s = s;
conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
conn->bandwidth = -1;
if(connection_add(conn) < 0) { /* no space, forget it */
log_fn(LOG_WARNING,"connection_add failed. Giving up.");
......@@ -197,11 +192,6 @@ int connection_handle_listener_read(connection_t *conn, int new_type) {
newconn = connection_new(new_type);
newconn->s = news;
if(!connection_speaks_cells(newconn)) {
newconn->receiver_bucket = -1;
newconn->bandwidth = -1;
}
newconn->address = strdup(inet_ntoa(remote.sin_addr)); /* remember the remote address */
newconn->addr = ntohl(remote.sin_addr.s_addr);
newconn->port = ntohs(remote.sin_port);
......@@ -305,7 +295,7 @@ static int connection_tls_finish_handshake(connection_t *conn) {
}
crypto_free_pk_env(pk);
} else { /* it's an OP */
conn->bandwidth = DEFAULT_BANDWIDTH_OP;
conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
}
} else { /* I'm a client */
if(!tor_tls_peer_has_cert(conn->tls)) { /* it's a client too?! */
......@@ -330,7 +320,7 @@ static int connection_tls_finish_handshake(connection_t *conn) {
}
log_fn(LOG_DEBUG,"The router's pk matches the one we meant to connect to. Good.");
crypto_free_pk_env(pk);
conn->bandwidth = DEFAULT_BANDWIDTH_OP;
conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
circuit_n_conn_open(conn); /* send the pending create */
}
return 0;
......@@ -446,10 +436,6 @@ int connection_handle_read(connection_t *conn) {
//log_fn(LOG_DEBUG,"connection_process_inbuf returned %d.",retval);
return -1;
}
if(!connection_state_is_open(conn) && conn->receiver_bucket == 0) {
log_fn(LOG_WARNING,"receiver bucket reached 0 before handshake finished. Closing.");
return -1;
}
return 0;
}
......@@ -458,9 +444,6 @@ int connection_read_to_buf(connection_t *conn) {
int result;
int at_most;
assert((connection_speaks_cells(conn) && conn->receiver_bucket >= 0) ||
(!connection_speaks_cells(conn) && conn->receiver_bucket < 0));
if(options.LinkPadding) {
at_most = global_read_bucket;
} else {
......@@ -477,14 +460,13 @@ int connection_read_to_buf(connection_t *conn) {
at_most = global_read_bucket;
}
if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket)
at_most = conn->receiver_bucket;
if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
if(conn->state == OR_CONN_STATE_HANDSHAKING)
return connection_tls_continue_handshake(conn);
/* else open, or closing */
if(at_most > conn->receiver_bucket)
at_most = conn->receiver_bucket;
result = read_to_buf_tls(conn->tls, at_most, conn->inbuf);
switch(result) {
......@@ -510,14 +492,21 @@ int connection_read_to_buf(connection_t *conn) {
}
global_read_bucket -= result; assert(global_read_bucket >= 0);
if(connection_speaks_cells(conn))
conn->receiver_bucket -= result;
if(conn->receiver_bucket == 0 || global_read_bucket == 0) {
log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket);
if(global_read_bucket == 0) {
log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
conn->wants_to_read = 1;
connection_stop_reading(conn);
return 0;
}
if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0);
if(conn->receiver_bucket == 0) {
log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
conn->wants_to_read = 1;
connection_stop_reading(conn);
return 0;
}
}
if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING)
if(result == at_most)
return connection_read_to_buf(conn);
......@@ -627,7 +616,10 @@ int connection_receiver_bucket_should_increase(connection_t *conn) {
if(!connection_speaks_cells(conn))
return 0; /* edge connections don't use receiver_buckets */
if(conn->state != OR_CONN_STATE_OPEN)
return 0; /* only open connections play the rate limiting game */
assert(conn->bandwidth > 0);
if(conn->receiver_bucket > 9*conn->bandwidth)
return 0;
......@@ -660,7 +652,7 @@ int connection_send_destroy(aci_t aci, connection_t *conn) {
if(!connection_speaks_cells(conn)) {
log_fn(LOG_INFO,"Aci %d: At an edge. Marking connection for close.", aci);
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
}
......@@ -746,13 +738,13 @@ void assert_connection_ok(connection_t *conn, time_t now)
#endif
if (conn->type != CONN_TYPE_OR) {
assert(conn->bandwidth == -1);
assert(conn->receiver_bucket == -1);
assert(!conn->tls);
} else {
assert(conn->bandwidth);
assert(conn->receiver_bucket >= 0);
assert(conn->receiver_bucket <= 10*conn->bandwidth);
if(conn->state == OR_CONN_STATE_OPEN) {
assert(conn->bandwidth > 0);
assert(conn->receiver_bucket >= 0);
assert(conn->receiver_bucket <= 10*conn->bandwidth);
}
assert(conn->addr && conn->port);
assert(conn->address);
if (conn->state != OR_CONN_STATE_CONNECTING)
......
......@@ -29,7 +29,7 @@ int connection_edge_process_inbuf(connection_t *conn) {
conn->done_receiving = 1;
shutdown(conn->s, 0); /* XXX check return, refactor NM */
if (conn->done_sending)
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
/* XXX Factor out common logic here and in circuit_about_to_close NM */
circ = circuit_get_by_conn(conn);
......@@ -51,17 +51,17 @@ int connection_edge_process_inbuf(connection_t *conn) {
#else
/* eof reached, kill it. */
log_fn(LOG_INFO,"conn (fd %d) reached eof. Closing.", conn->s);
return -1;
/*ENDCLOSE*/ return -1;
#endif
}
switch(conn->state) {
case AP_CONN_STATE_SOCKS_WAIT:
return connection_ap_handshake_process_socks(conn);
/*ENDCLOSE*/ return connection_ap_handshake_process_socks(conn);
case AP_CONN_STATE_OPEN:
case EXIT_CONN_STATE_OPEN:
if(connection_package_raw_inbuf(conn) < 0)
return -1;
/*ENDCLOSE*/ return -1;
return 0;
case EXIT_CONN_STATE_CONNECTING:
log_fn(LOG_INFO,"text from server while in 'connecting' state at exit. Leaving it on buffer.");
......@@ -133,10 +133,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
log_fn(LOG_INFO,"...and informing resolver we don't want the answer anymore.");
dns_cancel_pending_resolve(conn->address, conn);
}
return 0;
} else {
log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Dropping.");
log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Closing.");
return -1;
}
return 0;
}
switch(relay_command) {
......@@ -174,11 +175,11 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
// printf("New text for buf (%d bytes): '%s'", cell->length - RELAY_HEADER_SIZE, cell->payload + RELAY_HEADER_SIZE);
if(connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE,
cell->length - RELAY_HEADER_SIZE, conn) < 0) {
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
}
if(connection_consider_sending_sendme(conn, edge_type) < 0)
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
return 0;
case RELAY_COMMAND_END:
if(!conn) {
......@@ -191,9 +192,9 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
conn->done_sending = 1;
shutdown(conn->s, 1); /* XXX check return; refactor NM */
if (conn->done_receiving)
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
#endif
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
break;
case RELAY_COMMAND_EXTEND:
if(conn) {
......@@ -240,7 +241,7 @@ int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, connection
}
log_fn(LOG_INFO,"Connected! Notifying application.");
if(connection_ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) {
conn->marked_for_close = 1;
/*ENDCLOSE*/ conn->marked_for_close = 1;
}
break;
case RELAY_COMMAND_SENDME:
......@@ -331,7 +332,7 @@ repeat_connection_package_raw_inbuf:
return 0;
if(conn->package_window <= 0) {
log_fn(LOG_WARNING,"called with package_window 0. Tell Roger.");
log_fn(LOG_WARNING,"called with package_window %d. Tell Roger.", conn->package_window);
connection_stop_reading(conn);
return 0;
}
......@@ -526,7 +527,7 @@ static int connection_ap_handshake_socks_reply(connection_t *conn, char result)
return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */
}
static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
/*ENDCLOSE*/ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
connection_t *n_stream;
char *colon;
......@@ -553,8 +554,6 @@ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
n_stream->address = strdup(cell->payload + RELAY_HEADER_SIZE + STREAM_ID_SIZE);
n_stream->port = atoi(colon+1);
n_stream->state = EXIT_CONN_STATE_RESOLVING;
n_stream->receiver_bucket = -1; /* edge connections don't do receiver buckets */
n_stream->bandwidth = -1;
n_stream->s = -1; /* not yet valid */
n_stream->package_window = STREAMWINDOW_START;
n_stream->deliver_window = STREAMWINDOW_START;
......
......@@ -77,7 +77,7 @@ int connection_or_finished_flushing(connection_t *conn) {
void connection_or_init_conn_from_router(connection_t *conn, routerinfo_t *router) {
conn->addr = router->addr;
conn->port = router->or_port;
conn->bandwidth = router->bandwidth;
conn->receiver_bucket = conn->bandwidth = router->bandwidth;
conn->onion_pkey = crypto_pk_dup_key(router->onion_pkey);
conn->link_pkey = crypto_pk_dup_key(router->link_pkey);
conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);
......
......@@ -183,8 +183,6 @@ static int spawn_cpuworker(void) {
set_socket_nonblocking(fd[0]);
/* set up conn so it's got all the data we need to remember */
conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
conn->bandwidth = -1;
conn->s = fd[0];
conn->address = strdup("localhost");
......
......@@ -48,8 +48,6 @@ void directory_initiate_command(routerinfo_t *router, int command) {
conn->addr = router->addr;
conn->port = router->dir_port;
conn->address = strdup(router->address);
conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
conn->bandwidth = -1;
if (router->identity_pkey)
conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);
else {
......
......@@ -225,7 +225,7 @@ void dns_cancel_pending_resolve(char *question, connection_t *onlyconn) {
/* mark all pending connections to fail */
while(resolve->pending_connections) {
pend = resolve->pending_connections;
pend->conn->marked_for_close = 1;
/*ENDCLOSE*/ pend->conn->marked_for_close = 1;
resolve->pending_connections = pend->next;
free(pend);
}
......@@ -278,7 +278,7 @@ static void dns_found_answer(char *question, uint32_t answer) {
pend = resolve->pending_connections;
pend->conn->addr = resolve->answer;
if(resolve->state == CACHE_STATE_FAILED || connection_exit_connect(pend->conn) < 0) {
pend->conn->marked_for_close = 1;
/*ENDCLOSE*/ pend->conn->marked_for_close = 1;
}
resolve->pending_connections = pend->next;
free(pend);
......@@ -386,8 +386,6 @@ static int spawn_dnsworker(void) {
set_socket_nonblocking(fd[0]);
/* set up conn so it's got all the data we need to remember */
conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
conn->bandwidth = -1;
conn->s = fd[0];
conn->address = strdup("localhost");
......@@ -420,6 +418,7 @@ static void spawn_enough_dnsworkers(void) {
dnsconn->marked_for_close = 1;
num_dnsworkers_busy--;
num_dnsworkers--;
}
if(num_dnsworkers_busy >= MIN_DNSWORKERS)
......
......@@ -391,7 +391,8 @@ static int prepare_for_poll(void) {
if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
&& global_read_bucket > 0 /* and we're allowed to read */
&& conn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */
&& (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
conn->wants_to_read = 0;
connection_start_reading(conn);
if(conn->wants_to_write == 1) {
......
......@@ -273,12 +273,6 @@ struct connection_t {
long timestamp_created; /* when was this connection_t created? */
uint32_t bandwidth; /* connection bandwidth. Set to -1 for non-OR conns. */
int receiver_bucket; /* when this hits 0, stop receiving. Every second we
* add 'bandwidth' to this, capping it at 10*bandwidth.
* Set to -1 for non-OR conns.
*/
uint32_t addr; /* these two uniquely identify a router. Both in host order. */
uint16_t port; /* if non-zero, they identify the guy on the other end
* of the connection. */
......@@ -294,6 +288,12 @@ struct connection_t {
uint16_t next_aci; /* Which ACI do we try to use next on this connection?
* This is always in the range 0..1<<15-1.*/
/* bandwidth and receiver_bucket only used by ORs in OPEN state: */
uint32_t bandwidth; /* connection bandwidth. */
int receiver_bucket; /* when this hits 0, stop receiving. Every second we
* add 'bandwidth' to this, capping it at 10*bandwidth.
*/
/* Used only by edge connections: */
char stream_id[STREAM_ID_SIZE];
struct connection_t *next_stream; /* points to the next stream at this edge, if any */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment