#future #stream #low-memory #capacity #fixed #thread-safe #concurrency

futures-buffered

future concurrency primitives with emphasis on performance and low memory usage

8 releases

new 0.2.6 May 16, 2024
0.2.5 May 16, 2024
0.2.4 Jan 9, 2023
0.2.2 Nov 15, 2022
0.1.0 Oct 29, 2022

#76 in Asynchronous

Download history 21/week @ 2024-01-28 14/week @ 2024-02-04 20/week @ 2024-02-11 12/week @ 2024-02-18 38/week @ 2024-02-25 10/week @ 2024-03-03 24/week @ 2024-03-10 90/week @ 2024-03-17 890/week @ 2024-03-24 1123/week @ 2024-03-31 1678/week @ 2024-04-07 1733/week @ 2024-04-14 1766/week @ 2024-04-21 1938/week @ 2024-04-28 1475/week @ 2024-05-05 1830/week @ 2024-05-12

7,116 downloads per month
Used in 14 crates (7 directly)

MIT license

130KB
2.5K SLoC

futures-buffered

This project provides a single future structure: FuturesUnorderedBounded.

Much like futures::FuturesUnordered, this is a thread-safe, Pin friendly, lifetime friendly, concurrent processing stream.

The is different to FuturesUnordered in that FuturesUnorderedBounded has a fixed capacity for processing count. This means it's less flexible, but produces better memory efficiency.

Benchmarks

Speed

Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:

FuturesUnordered         time:   [420.47 ms 422.21 ms 423.99 ms]
FuturesUnorderedBounded  time:   [366.02 ms 367.54 ms 369.05 ms]

Memory usage

Running 512000 Ready<i32> futures with 256 concurrent jobs.

  • count: the number of times alloc/dealloc was called
  • alloc: the number of cumulative bytes allocated
  • dealloc: the number of cumulative bytes deallocated
FuturesUnordered
    count:    1024002
    alloc:    40960144 B
    dealloc:  40960000 B

FuturesUnorderedBounded
    count:    2
    alloc:    8264 B
    dealloc:  0 B

Conclusion

As you can see, FuturesUnorderedBounded massively reduces you memory overhead while providing a small performance gain. Perfect for if you want a fixed batch size

Examples

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = conn::handshake(stream).await?;
runtime.spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<Body>) -> ResponseFuture {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(Body::from(""))
        .unwrap();
    rs.send_request(req)
}

// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);

// start up 128 requests
for _ in 0..128 {
    queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}
use futures::future::join_all;

async fn foo(i: u32) -> u32 { i }

let futures = vec![foo(1), foo(2), foo(3)];

assert_eq!(join_all(futures).await, [1, 2, 3]);

Dependencies

~715KB
~14K SLoC