Futures in Rust

Datetime:2016-08-22 23:59:03          Topic: Rust           Share

I stumbled upon Rust a few months ago but didn’t really bother to learn it. The ecosystem was tiny, the language seemed to be of little practical use and it was bothersome to get my head around its quirks like borrowing and lifetimes.

But things have changed. With growing interest and a fast-evolving ecosystem, the language is impossible to ignore. Go’s goroutines and Scala’s Future s had really spoiled me and I really needed something similar. Then I stumbled upon futures , a zero-cost futures and streams Rust library. And I thought of making a simple async web API wrapper as my first Rust project and a blog post to go along as a tutorial.

Background

I started off very naively, searching for “async HTTP client library Rust” on Google and the first couple of results included hyper , mio and rotor .

hyper

hyper is a well designed and almost defacto HTTP library in the Rust ecosystem but the async support was still a work-in-progress and used a very last-decade interface with handler functions ( hyper::client::Handler trait). I do not know if an alternative API exists but the one example that I found repelled me enough. I was looking for something flexible and composable, which the users of my library could naturally extend and not a mess of handlers which would be a nightmare to reason about and extend.

mio

mio is bare-bones but definitely the defacto async library that there is for Rust. It uses an event loop at its core and boasts of non-blocking sockets and thread safe message channels for communication. Unfortunately, it’s pretty low level and would need a lot more on top to make it suitable for my purpose.

rotor

Seeing rotor’s Github project page excited me because they promised “A Future type which allows communication between state machines in safe and efficient way”. I immediately went to look at the two server and client examples to see how this Future type works. Ctrl+F Future : no results. I go to the project docs and search for Future . Nothing.

I’m not a very patient man and abandoned rotor without bothering to investigate more. Besides, I’m still a Rust beginner and if a library doesn’t have an example that corresponds closely with what I want, I’m not diving deep into its guts to figure out how to make it work.

futures

Dejected, I was about to give up when I received This Week in Rust in my mailbox. And it featured tokio which then led me to futures ! The library reminded me of Scala’s Future s that I sorely missed. Moreover they seemed simple enough to work with. And thus I began writing my first Rust program.

First Steps

The simplest thing to do is to retrieve data and print it to the screen. Easy enough.

extern crate curl;
extern crate futures;
extern crate futures_curl;
extern crate futures_mio;

use std::io::Write;

use curl::easy::Easy;
use futures_mio::Loop;
use futures_curl::Session;

fn main() {
    // Create an event loop that we'll run on, as well as an HTTP `Session`
    // which we'll be routing all requests through.
    let mut lp = Loop::new().unwrap();
    let sess = Session::new(lp.pin());

    let mut req = Easy::new();
    req.get(true).unwrap();
    req.url("https://www.rust-lang.org").unwrap();

    req.write_function(|data| {
            std::io::stdout().write(data).unwrap();
            Ok(data.len())
        })
        .unwrap();


    lp.run(sess.perform(req)).unwrap();
}

And Cargo.toml :

[dependencies]
futures = { git = "https://github.com/alexcrichton/futures-rs" }
futures-io = { git = "https://github.com/alexcrichton/futures-rs" }
futures-mio = { git = "https://github.com/alexcrichton/futures-rs" }
futures-curl = { git = "https://github.com/alexcrichton/futures-rs" }
curl = "0.3"

OK, so at least we have something working.

Improvements

However, to be any useful as a library, the code has to reside in its own function and return the output as a String instead of printing it to the screen. The first part is easy, move the code that creates f to a new function and tada, done. However, the second part was slightly harder because of the write function.

The write function may be called multiple times with partial data and each time, it has to write those bytes somewhere. The problem was, where? If we declare a Vec<u8> on the stack, it would be lost as soon as the function returned (which would be very soon since everything is async).

Thus it needs to be on the heap. But a simple Box wouldn’t do because it needs to be accessed from two places, the write function and the point where the complete Vec<u8> is converted to a String . The ownership would be unclear.

OK! Rc<RefCell<Vec<u8>>> it must be then. Unfortunately, no. Look at where we reached so far:

extern crate curl;
extern crate futures;
extern crate futures_curl;
extern crate futures_io;
extern crate futures_mio;

use std::io::Write;
use std::rc::Rc;
use std::cell::RefCell;

use curl::easy::Easy;
use futures::{BoxFuture, Future};
use futures_io::IoFuture;
use futures_mio::Loop;
use futures_curl::Session;

fn get_page(sess: Session) -> BoxFuture<String, std::io::Error> {
    let response = Rc::new(RefCell::new(Vec::new()));
    let response1 = response.clone();

    let mut req = Easy::new();
    req.get(true).unwrap();
    req.url("https://www.rust-lang.org").unwrap();

    req.write_function(move |data| {
            response1.borrow_mut().extend_from_slice(data);
            Ok(data.len())
        })
        .unwrap();

    sess.perform(req)
        .map(move |_| String::from_utf8_lossy(&*response.borrow()).into_owned())
        .boxed()
}

fn main() {
    // Create an event loop that we'll run on, as well as an HTTP `Session`
    // which we'll be routing all requests through.
    let mut lp = Loop::new().unwrap();
    let session = Session::new(lp.pin());

    let f = get_page(session);
    println!("{}", lp.run(f).unwrap());
}

Try compiling it and the compiler will complain that the Send trait isn’t satisfied by response. Send is a marker trait that marks that the data is thread-safe. Unfortunately, Rc isn’t thread-safe as the internal reference count could be mutated by two threads at the same time resulting in race conditions.

OK, so we switch to Arc<Mutex<Vec<u8>>> instead of Rc<RefCell<Vec<u8>>> . And that works beautifully:

fn get_page(sess: Session) -> BoxFuture<String, std::io::Error> {
    let response = Arc::new(Mutex::new(Vec::new()));
    let response1 = response.clone();

    let mut req = Easy::new();
    req.get(true).unwrap();
    req.url("https://www.rust-lang.org").unwrap();

    req.write_function(move |data| {
            response1.lock().unwrap().extend_from_slice(data);
            Ok(data.len())
        })
        .unwrap();

    sess.perform(req)
        .map(move |_| {
            let s = response.lock().unwrap();
            String::from(String::from_utf8_lossy(&s).into_owned())
        })
        .boxed()
}

Now, can we do better? Of course! Now, remember that the only thread that’s going to be accessing the response vector is the one running the event loop. Locking and unlocking a Mutex are expensive operations so we don’t really need Mutex except to satisfy the Send marker trait. Fortunately futures-mio offers a LoopData struct built for this very purpose. But we also need to add a RefCell in as the data held by LoopData isn’t mutable.

The result so far:

fn get_page(lp: LoopPin, sess: Session) -> BoxFuture<String, std::io::Error> {
    let response = Arc::new(lp.add_loop_data(RefCell::new(Vec::new())));
    let response1 = response.clone();

    let mut req = Easy::new();
    req.get(true).unwrap();
    req.url("https://www.rust-lang.org").unwrap();

    req.write_function(move |data| {
            response1.get().unwrap().borrow_mut().extend_from_slice(data);
            Ok(data.len())
        })
        .unwrap();

    sess.perform(req)
        .map(move |_| {
            let s = response.get().unwrap().borrow();
            String::from(String::from_utf8_lossy(&s).into_owned())
        })
        .boxed()
}

Everything so far works but only on a single thread and just for a single function. LoopPin doesn’t implement Send so get_page can only be called from the same thread as the event loop itself. Utterly useless for a real application.

Almost There

We need to do better. In particular, Session is thread-safe and so is LoopHandle . So the signature of our updated function needs to be:

fn get_page(lph: LoopHandle, sess: Session) -> BoxFuture<String, std::io::Error> 

Moreover, we need to run the event loop on a separate thread and pass LoopHandle and Session to the main thread somehow. And since both these structs are Clone able and Send able, our get_page could be called from multiple threads and still be perfectly asynchronous with all I/O events passing through our event loop.

Not to forget, we must wrap everything in a nice looking struct so that we don’t need to pass LoopHandle , Session and other API information every time.

The Final Version

The final version with a few other additions like a couple of sanity checks and getting headers along with the response is as follows:

extern crate curl;
extern crate futures;
extern crate futures_curl;
extern crate futures_io;
extern crate futures_mio;
extern crate time;

use std::sync::Arc;
use std::cell::RefCell;
use std::thread;
use std::sync::mpsc;

use curl::easy::Easy;
use futures::{BoxFuture, Future};
use futures_mio::{Loop, LoopHandle};
use futures_curl::Session;

struct Api {
    lp: LoopHandle,
    sess: Session,
}

impl Api {
    fn new(lp: LoopHandle, sess: Session) -> Api {
        Api {
            lp: lp,
            sess: sess,
        }
    }

    fn get_page(&self) -> BoxFuture<String, std::io::Error> {

        let sess = self.sess.clone();

        // First one for response and second for the headers
        self.lp
            .add_loop_data(|| (RefCell::new(Vec::<u8>::new()), RefCell::new(Vec::<Vec<u8>>::new())))
            .and_then(move |data| {
                let data = Arc::new(data);

                let data1 = data.clone();
                let data2 = data.clone();

                let mut req = Easy::new();
                req.get(true).unwrap();
                req.url("https://www.rust-lang.org").unwrap();

                req.write_function(move |d| {
                        data1.get().unwrap().0.borrow_mut().extend_from_slice(d);
                        Ok(d.len())
                    })
                    .unwrap();

                req.header_function(move |header| {
                        data2.get().unwrap().1.borrow_mut().push(header.to_vec());
                        true
                    })
                    .unwrap();

                sess.perform(req)
                    .map(move |(mut resp, err)| {
                        assert!(err.is_none());
                        assert_eq!(resp.response_code().unwrap(), 200);

                        let response = data.get().unwrap().0.borrow();
                        let response = String::from_utf8_lossy(&response);
                        assert!(response.contains("<html>"));
                        let headers = data.get()
                            .unwrap()
                            .1
                            .borrow()
                            .iter()
                            .map(|h| String::from(String::from_utf8_lossy(&h).trim()))
                            .fold(String::new(), |acc, elem| acc + &elem + "\n");
                        assert!(headers.len() > 0);

                        format!("Headers:\n\n{}\n\nResponse:\n\n{}", headers, response)
                    })
            })
            .boxed()
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();

    // Make sure the event loop runs forever in a new thread.
    thread::spawn(move || {
        // Create an event loop that we'll run on, as well as an HTTP `Session`
        // which we'll be routing all requests through.
        let mut lp = Loop::new().unwrap();

        // Send loop handler and Session to the main thread so that async curl requests can be
        // made.
        tx.send((lp.handle(), Session::new(lp.pin()))).unwrap();

        lp.run(futures::empty::<(), ()>()) // Keep the event loop running forever
            .unwrap();
    });

    let (lph, sess) = rx.recv().unwrap();

    // Create a new API using the loop handle and Session.
    let api = Api::new(lph, sess);

    // Wait for the page and quit.
    let res = await(api.get_page());
    println!("{}", res.unwrap());
}

// Taken from tokio and modified to compile with upstream changes.
// Why this isn't in futures-rs, I do not know...
fn await<T: Future + Send>(f: T) -> Result<T::Item, T::Error>
    where T::Item: Send,
          T::Error: Send
{
    let (tx, rx) = mpsc::channel();

    f.then(move |res| {
            tx.send(res).unwrap();
            Ok::<(), ()>(())
        })
        .forget();

    rx.recv().unwrap()
}

Conclusion

Our goal in the beginning was to create a truly async web API wrapper and we have accomplished exactly that! But, futures is in constant flux and I had to revise this blog post twice even before the first publication to account for API changes.

With Rust, it’s great to be able to have zero cost abstractions and a GC free environment but it comes at a price: explicitness. I already had experience with C/C++ so lifetimes were easy to reason about and accept as necessary. Howewer, what was hard to swallow were the dozen different guarantees and marker traits. But I think I understand Rust and its philosophy much better now. Can’t wait to create more stuff!

Please leave comments if you found this tutorial useful and suggest changes. Afterall, I’m still a Rust noob.





About List