main.rs 8.8 KB
Newer Older
1
2
3
4
//! A minimal client for connecting to the tor network

#![warn(missing_docs)]

Nick Mathewson's avatar
Nick Mathewson committed
5
use argh::FromArgs;
6
7
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::stream::StreamExt;
8
use log::{error, info, warn, LevelFilter};
9
use std::net::{Ipv4Addr, Ipv6Addr};
10
use std::sync::Arc;
Nick Mathewson's avatar
Nick Mathewson committed
11
12

use tor_chanmgr::transport::nativetls::NativeTlsTransport;
13
use tor_circmgr::TargetPort;
14
use tor_dirmgr::{DirMgr, NetworkConfig};
15
16
use tor_proto::circuit::IPVersionPreference;
use tor_socksproto::{SocksCmd, SocksRequest};
17

18
use anyhow::{Context, Result};
19
use serde::Deserialize;
20

21
#[derive(FromArgs, Debug, Clone)]
22
23
/// Make a connection to the Tor network, open a SOCKS port, and proxy
/// traffic.
24
25
26
///
/// This is a demo; you get no stability guarantee.
struct Args {
27
28
29
30
31
32
33
34
35
    /// override the default location(s) for the configuration file
    #[argh(option, short = 'f')]
    rc: Vec<String>,
    /// override a configuration option (uses toml syntax)
    #[argh(option, short = 'c')]
    cfg: Vec<String>,
}

/// Default options to use for our configuration.
36
37
38
39
40
const ARTI_DEFAULTS: &str = concat!(
    include_str!("./arti_defaults.toml"),
    include_str!("./fallback_caches.toml"),
    include_str!("./authorities.toml"),
);
41
42
43
44
45
46
47
48
49
50
51
52

/// Structure to hold our configuration options, whether from a
/// configuration file or the command line.
///
/// NOTE: These are NOT the final options or their final layout.
/// Expect NO stability here.
#[derive(Deserialize, Debug, Clone)]
struct ArtiConfig {
    /// Port to listen on (at localhost) for incoming SOCKS
    /// connections.
    socks_port: Option<u16>,
    /// Whether to log at trace level.
53
    trace: bool,
54
55
56

    /// Information about the Tor network we want to connect to.
    network: NetworkConfig,
57
58
}

59
60
61
62
63
64
65
66
67
68
69
70
fn ip_preference(req: &SocksRequest, addr: &str) -> IPVersionPreference {
    if addr.parse::<Ipv4Addr>().is_ok() {
        IPVersionPreference::Ipv4Only
    } else if addr.parse::<Ipv6Addr>().is_ok() {
        IPVersionPreference::Ipv6Only
    } else if req.version() == 4 {
        IPVersionPreference::Ipv4Only
    } else {
        IPVersionPreference::Ipv4Preferred
    }
}

71
72
async fn handle_socks_conn(
    dir: Arc<tor_netdir::NetDir>,
73
    circmgr: Arc<tor_circmgr::CircMgr>,
74
    stream: tor_rtcompat::net::TcpStream,
75
) -> Result<()> {
76
    let mut handshake = tor_socksproto::SocksHandshake::new();
77
78
79
80
81
82

    let (mut r, mut w) = stream.split();
    let mut inbuf = [0_u8; 1024];
    let mut n_read = 0;
    let request = loop {
        // Read some more stuff.
83
84
85
86
        n_read += r
            .read(&mut inbuf[n_read..])
            .await
            .context("Error while reading SOCKS handshake")?;
87
88
89

        // try to advance the handshake.
        let action = match handshake.handshake(&inbuf[..n_read]) {
90
            Err(tor_socksproto::Error::Truncated) => continue,
91
92
93
94
95
96
97
98
99
100
            Err(e) => return Err(e.into()),
            Ok(action) => action,
        };

        // reply if needed.
        if action.drain > 0 {
            (&mut inbuf).copy_within(action.drain..action.drain + n_read, 0);
            n_read -= action.drain;
        }
        if !action.reply.is_empty() {
101
102
103
            w.write(&action.reply[..])
                .await
                .context("Error while writing reply to SOCKS handshake")?;
104
105
106
107
108
109
110
111
112
113
        }
        if action.finished {
            break handshake.into_request();
        }
    }
    .unwrap();

    let addr = request.addr().to_string();
    let port = request.port();
    info!("Got a socks request for {}:{}", addr, port);
114
115
116
117
    if request.command() != SocksCmd::CONNECT {
        warn!("Dropping request; {:?} is unsupported", request.command());
        return Ok(());
    }
118

119
120
121
122
123
    if addr.to_lowercase().ends_with(".onion") {
        info!("That's an onion address; rejecting it.");
        return Ok(());
    }

124
125
126
127
128
129
    let begin_flags = ip_preference(&request, &addr);
    let exit_ports = [if begin_flags == IPVersionPreference::Ipv6Only {
        TargetPort::ipv6(port)
    } else {
        TargetPort::ipv4(port)
    }];
130
    let circ = circmgr
131
        .get_or_launch_exit(dir.as_ref().into(), &exit_ports)
132
133
        .await
        .context("Unable to launch circuit for request")?;
134
    info!("Got a circuit for {}:{}", addr, port);
135
    drop(dir); // This decreases the refcount on the netdir.
136

137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    let stream = circ.begin_stream(&addr, port, Some(begin_flags)).await;
    let stream = match stream {
        Ok(s) => s,
        // In the case of a stream timeout, send the right SOCKS reply.
        Err(tor_proto::Error::StreamTimeout) => {
            let reply = request.reply(tor_socksproto::SocksStatus::TTL_EXPIRED, None);
            w.write(&reply[..])
                .await
                .context("Couldn't write SOCKS reply")?;
            return Err(tor_proto::Error::StreamTimeout.into());
        }
        // In any other case, just propagate the error downwards
        Err(e) => return Err(e.into()),
    };

152
    info!("Got a stream for {}:{}", addr, port);
153
    // TODO: XXXX-A1 Should send a SOCKS reply if something fails.
154

155
    let reply = request.reply(tor_socksproto::SocksStatus::SUCCEEDED, None);
156
157
158
    w.write(&reply[..])
        .await
        .context("Couldn't write SOCKS reply")?;
159

160
    let (mut rstream, wstream) = stream.split();
161

162
    let _t1 = tor_rtcompat::task::spawn(async move {
163
164
165
166
        let mut buf = [0u8; 1024];
        loop {
            let n = match r.read(&mut buf[..]).await {
                Err(e) => break e.into(),
167
                Ok(0) => break tor_proto::Error::StreamClosed("closed"),
168
169
                Ok(n) => n,
            };
170
            if let Err(e) = wstream.write_bytes(&buf[..n]).await {
171
172
173
174
                break e;
            }
        }
    });
175
    let _t2 = tor_rtcompat::task::spawn(async move {
176
177
        let mut buf = [0u8; 1024];
        loop {
178
            let n = match rstream.read_bytes(&mut buf[..]).await {
179
180
181
182
183
184
185
186
187
                Err(e) => break e,
                Ok(n) => n,
            };
            if let Err(e) = w.write(&buf[..n]).await {
                break e.into();
            }
        }
    });

188
    // TODO: XXXX-A1 we should close the TCP stream if either task fails.
189

190
191
192
193
    Ok(())
}

async fn run_socks_proxy(
194
    dir: Arc<tor_dirmgr::DirMgr>,
195
    circmgr: Arc<tor_circmgr::CircMgr>,
196
    args: &ArtiConfig,
197
) -> Result<()> {
198
199
    use tor_rtcompat::net::TcpListener;

200
201
202
203
204
    if args.socks_port.is_none() {
        info!("Nothing to do: no socks_port configured.");
        return Ok(());
    }
    let socksport = args.socks_port.unwrap();
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
    let mut listeners = Vec::new();

    for localhost in &["127.0.0.1", "::1"] {
        let addr = (*localhost, socksport);
        match TcpListener::bind(addr).await {
            Ok(listener) => {
                info!("Listening on {:?}.", addr);
                listeners.push(listener);
            }
            Err(e) => warn!("Can't listen on {:?}: {}", addr, e),
        }
    }
    if listeners.is_empty() {
        error!("Couldn't open any listeners.");
        return Ok(());
    }
    let mut incoming = futures::stream::select_all(listeners.iter().map(TcpListener::incoming));
222
223

    while let Some(stream) = incoming.next().await {
224
        let stream = stream.context("Failed to receive incoming stream on SOCKS port")?;
225
        let d = dir.netdir().await;
226
        let ci = Arc::clone(&circmgr);
227
        tor_rtcompat::task::spawn(async move {
228
229
            let res = handle_socks_conn(d, ci, stream).await;
            if let Err(e) = res {
Nick Mathewson's avatar
Nick Mathewson committed
230
                warn!("connection exited with error: {}", e);
231
232
233
234
235
236
237
            }
        });
    }

    Ok(())
}

238
fn main() -> Result<()> {
239
    let args: Args = argh::from_env();
240
241
242
243
244
245
246
247
248
249
    let dflt_config = tor_config::default_config_file();

    let mut cfg = config::Config::new();
    cfg.merge(config::File::from_str(
        ARTI_DEFAULTS,
        config::FileFormat::Toml,
    ))?;
    tor_config::load(&mut cfg, dflt_config, &args.rc, &args.cfg)?;

    let config: ArtiConfig = cfg.try_into()?;
250

251
    let filt = if config.trace {
252
253
254
255
256
257
        LevelFilter::Trace
    } else {
        LevelFilter::Debug
    };
    simple_logging::log_to_stderr(filt);

258
    let mut dircfg = tor_dirmgr::NetDirConfigBuilder::new();
259
260
    dircfg.set_network_config(config.network.clone());
    let dircfg = dircfg.finalize()?;
261

262
    tor_rtcompat::task::block_on(async {
Nick Mathewson's avatar
Nick Mathewson committed
263
        let transport = NativeTlsTransport::new();
264
        let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(transport));
265
        let circmgr = Arc::new(tor_circmgr::CircMgr::new(Arc::clone(&chanmgr)));
266
        let dirmgr = DirMgr::bootstrap_from_config(dircfg, Arc::clone(&circmgr)).await?;
267

268
        run_socks_proxy(dirmgr, circmgr, &config).await
269
    })
270
}
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289

#[cfg(test)]
mod test {

    use super::*;

    #[test]
    fn load_default_config() -> Result<()> {
        // TODO: this is duplicate code.
        let mut cfg = config::Config::new();
        cfg.merge(config::File::from_str(
            ARTI_DEFAULTS,
            config::FileFormat::Toml,
        ))?;

        let _parsed: ArtiConfig = cfg.try_into()?;
        Ok(())
    }
}