Commit 8bffcda7 authored by David Goulet's avatar David Goulet
Browse files

Progress in async world



Signed-off-by: default avatarDavid Goulet <dgoulet@ev0ke.net>
parent a68abe61
Loading
Loading
Loading
Loading
+83 −1
Original line number Diff line number Diff line
@@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3

[[package]]
name = "anyhow"
version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"

[[package]]
name = "atty"
version = "0.2.14"
@@ -114,6 +120,12 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"

[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"

[[package]]
name = "mio"
version = "0.7.14"
@@ -164,6 +176,16 @@ dependencies = [
 "autocfg",
]

[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
 "hermit-abi",
 "libc",
]

[[package]]
name = "onion-tunnel"
version = "0.1.0"
@@ -175,12 +197,40 @@ dependencies = [
 "tokio",
]

[[package]]
name = "onionmasq"
version = "0.1.0"
dependencies = [
 "anyhow",
 "onion-tunnel",
 "simple_logger",
 "tokio",
]

[[package]]
name = "pin-project-lite"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"

[[package]]
name = "proc-macro2"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
dependencies = [
 "unicode-xid",
]

[[package]]
name = "quote"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145"
dependencies = [
 "proc-macro2",
]

[[package]]
name = "rand_core"
version = "0.6.3"
@@ -206,7 +256,7 @@ dependencies = [
[[package]]
name = "smoltcp"
version = "0.8.0"
source = "git+https://github.com/smoltcp-rs/smoltcp?rev=7d633aa3204cbaaa4659395621e1239efece03ca#7d633aa3204cbaaa4659395621e1239efece03ca"
source = "git+https://github.com/dgoulet-tor/smoltcp.git?rev=8cb96670e672dfe6f7ef271ef1a3648734f94bfc#8cb96670e672dfe6f7ef271ef1a3648734f94bfc"
dependencies = [
 "bitflags",
 "byteorder",
@@ -216,6 +266,17 @@ dependencies = [
 "rand_core",
]

[[package]]
name = "syn"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b"
dependencies = [
 "proc-macro2",
 "quote",
 "unicode-xid",
]

[[package]]
name = "time"
version = "0.1.43"
@@ -232,12 +293,33 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838"
dependencies = [
 "bytes",
 "libc",
 "memchr",
 "mio",
 "num_cpus",
 "pin-project-lite",
 "tokio-macros",
 "winapi",
]

[[package]]
name = "tokio-macros"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"

[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
+1 −1
Original line number Diff line number Diff line
@@ -2,10 +2,10 @@ cargo-features = ["resolver"]

[workspace]
members = [
    "crates/onionmasq",
    "crates/onion-tunnel",
]


[profile.release]
codegen-units = 1
debug = 2
+2 −2
Original line number Diff line number Diff line
@@ -7,5 +7,5 @@ edition = "2021"
bytes = "~1.1.0"
log = "0.4"
simple_logger = "1"
smoltcp = { version = "0.8.0", default-features = false, features = ["log", "phy-tuntap_interface", "phy-raw_socket", "medium-ip", "socket-udp", "socket-tcp", "proto-ipv4"], git = "https://github.com/smoltcp-rs/smoltcp", rev = "7d633aa3204cbaaa4659395621e1239efece03ca" }
tokio = { version = "1", features = ["net", "rt", "time"] }
smoltcp = { version = "0.8.0", default-features = false, features = ["log", "phy-tuntap_interface", "phy-raw_socket", "medium-ip", "socket-udp", "socket-tcp", "proto-ipv4", "socket", "async"], git = "https://github.com/dgoulet-tor/smoltcp.git", rev = "8cb96670e672dfe6f7ef271ef1a3648734f94bfc" }
tokio = { version = "1", features = ["net", "rt", "time", "sync", "io-util", "macros"] }
+20 −9
Original line number Diff line number Diff line
@@ -2,9 +2,12 @@
// See LICENSE for licensing information.

use std::collections::VecDeque;
use std::os::unix::prelude::{AsRawFd, RawFd};

use log::info;
use smoltcp::phy::{Device, DeviceCapabilities, RxToken as SmolRxToken};
use smoltcp::time::Instant;
use tokio::io::unix::AsyncFd;

pub type Packet = Vec<u8>;

@@ -16,25 +19,33 @@ enum DevicePollState {

pub struct VirtualDevice<D>
where
    D: for<'a> smoltcp::phy::Device<'a>,
    D: for<'a> smoltcp::phy::Device<'a> + AsRawFd,
{
    device: D,
    device: AsyncFd<D>,
    recv_queue: VecDeque<Packet>,
    poll_mode: DevicePollState,
}

impl<'a, D> VirtualDevice<D>
where
    D: for<'d> smoltcp::phy::Device<'d>,
    D: for<'d> smoltcp::phy::Device<'d> + AsRawFd,
{
    pub fn new(device: D) -> Self {
        Self {
            device,
            device: AsyncFd::new(device).unwrap(),
            recv_queue: VecDeque::new(),
            poll_mode: DevicePollState::Queuing,
        }
    }

    pub fn device(&self) -> &AsyncFd<D> {
        &self.device
    }

    pub fn as_raw_fd(&self) -> RawFd {
        self.device.get_ref().as_raw_fd()
    }

    pub fn take_recv_queue(&mut self) -> VecDeque<Packet> {
        std::mem::take(&mut self.recv_queue)
    }
@@ -50,7 +61,7 @@ where

        // We receive packets from our internal device and discard the Tx-token since we can
        // regenerate that from the transmit() method later.
        while let Some((rx_token, _tx_token)) = self.device.receive() {
        while let Some((rx_token, _tx_token)) = self.device.get_mut().receive() {
            // We consume the Rx-token to extract the packet content for inspection. The call to
            // unwrap() should be safe here since we return Ok(...) explicitly.
            let packet = rx_token
@@ -75,7 +86,7 @@ where
        self.poll_mode = DevicePollState::Queuing;

        self.recv_queue.pop_front().map(|packet| {
            let device_tx_token = self.device.transmit().unwrap();
            let device_tx_token = self.device.get_mut().transmit().unwrap();

            let rx_token = RxToken { packet };
            let tx_token = TxToken {
@@ -115,13 +126,13 @@ impl<Tx: smoltcp::phy::TxToken> smoltcp::phy::TxToken for TxToken<Tx> {

impl<'a, D> smoltcp::phy::Device<'a> for VirtualDevice<D>
where
    D: for<'d> smoltcp::phy::Device<'d>,
    D: for<'d> smoltcp::phy::Device<'d> + AsRawFd,
{
    type RxToken = RxToken;
    type TxToken = TxToken<<D as Device<'a>>::TxToken>;

    fn capabilities(&self) -> DeviceCapabilities {
        self.device.capabilities()
        self.device.get_ref().capabilities()
    }

    fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
@@ -132,6 +143,6 @@ where
    }

    fn transmit(&'a mut self) -> Option<Self::TxToken> {
        self.device.transmit().map(|lower| TxToken { lower })
        self.device.get_mut().transmit().map(|lower| TxToken { lower })
    }
}
+33 −6
Original line number Diff line number Diff line
mod device;
mod parser;
mod proxy;
mod socket;

use device::VirtualDevice;
use log::{debug, info};
use proxy::ArtiProxy;
use smoltcp::{
    iface::{Interface, InterfaceBuilder, Routes},
    phy::{Medium, TunTapInterface},
@@ -13,14 +16,14 @@ use std::{
    sync::{Arc, Mutex},
};

use crate::parser::Parser;
use crate::{parser::Parser, socket::TcpSocket};

pub struct OnionTunnel<'a> {
pub struct OnionTunnel {
    iface_name: String,
    iface: Arc<Mutex<Interface<'a, VirtualDevice<TunTapInterface>>>>,
    iface: Arc<Mutex<Interface<'static, VirtualDevice<TunTapInterface>>>>,
}

impl<'a> OnionTunnel<'a> {
impl OnionTunnel {
    pub fn new(iface_name: &str) -> Self {
        let tun = TunTapInterface::new(iface_name, Medium::Ip).expect("Unable to create TUN iface");
        let device = VirtualDevice::new(tun);
@@ -50,7 +53,12 @@ impl<'a> OnionTunnel<'a> {
        }
    }

    pub async fn start(&'a self) {
    fn proxy(&mut self, socket: TcpSocket) {
        let mut proxy = ArtiProxy::new(socket);
        tokio::spawn(async move { proxy.start().await });
    }

    pub async fn start(&mut self) {
        info!("Starting onion tunnel on TUN iface {}", self.iface_name);

        loop {
@@ -64,12 +72,31 @@ impl<'a> OnionTunnel<'a> {
            // Handle incoming packet. Drain packets as we process them.
            while let Some(packet) = packets.pop_front() {
                if let Some(tcp_socket) = Parser::parse(packet).take() {
                    let _handle = self.iface.lock().unwrap().add_socket(tcp_socket);
                    let socket = TcpSocket::new(self.iface.clone(), tcp_socket);
                    self.proxy(socket);
                }
            }

            // The second poll we do process packets in the receive queue from the first poll.
            self.iface_poll(timestamp);

            let fd = self.iface.lock().unwrap().device().as_raw_fd();
            /*
            //let fd = self.iface.lock().unwrap().device().as_raw_fd();
            //let async_fd = AsyncFd::new(fd).unwrap();
            let iface = self.iface.lock().unwrap();
            let async_fd = iface.device().device();

            let timeout = if let Some(delay) = iface.poll_delay(timestamp) {
                delay
            } else {
                smoltcp::time::Duration::from_secs(0)
            };

            let _ = tokio::time::timeout(timeout.into(), async_fd.readable()).await;
            */
            let timeout = self.iface.lock().unwrap().poll_delay(timestamp);
            smoltcp::phy::wait(fd, timeout).expect("wait error");
        }
    }
}
Loading