使用Tokio0.2生成非静态未来

我有一个异步方法应该并行执行一些期货,并且只有在所有期货完成后才返回。但是,它通过引用传递了一些寿命不长的数据'static(它将在主方法中的某个时刻被删除)。从概念上讲,它类似于(Playground):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}

现在,tokio 希望传递给的期货spawn'static一生中有效,因为我可以在不停止未来的情况下放下句柄。这意味着我上面的示例会产生此错误消息:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   |
12 | async fn parallel_stuff(array: &[u64]) {
   |                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
   |                         |
   |                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
   |                    ----- ...and is required to live as long as `'static` here

所以我的问题是:如何生成仅对当前上下文有效的期货,然后我可以等到所有期货都完成?

(如果这在 tokio 0.3 上可行但在 0.2 上不可行,我仍然感兴趣,尽管目前这会涉及很多 git 依赖项)

回答

不可能'static从异步 Rust 中产生非未来。这是因为任何异步函数都可能在任何时候被取消,因此无法保证调用者确实比生成的任务寿命更长。

确实有各种 crate 允许异步任务的作用域生成,但这些 crate 不能从异步代码中使用。他们允许为产卵范围的异步从任务非异步代码。这并不违反上面的问题,因为产生它们的非异步代码在任何时候都不能取消,因为它不是异步的。

一般有两种方法可以解决这个问题:

  1. 'static通过使用Arc而不是普通引用生成任务。
  2. 使用 futures crate 中的并发原语而不是 spawning。

请注意,此答案适用于 Tokio0.2.x0.3.x.


通常要生成静态任务并使用Arc,您必须拥有相关值的所有权。这意味着由于您的函数通过引用获取参数,因此您不能在不克隆数据的情况下使用此技术。

async fn do_sth(with: Arc<[u64]>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &[u64]) {
    // Make a clone of the data so we can shared it across tasks.
    let shared: Arc<[u64]> = Arc::from(array);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

请注意,如果您有一个对数据的可变引用,并且数据是Sized,即不是切片,则可以临时获得它的所有权。

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
    delay_for(Duration::new(with[idx], 0)).await;
    println!("{}", with[idx]);
}

async fn parallel_stuff(array: &mut Vec<u64>) {
    // Swap the array with an empty one to temporarily take ownership.
    let vec = std::mem::take(array);
    let shared = Arc::new(vec);
    
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in 0..array.len() {
        // Cloning an Arc does not clone the data.
        let shared_clone = shared.clone();
        let task = spawn(do_sth(shared_clone, i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
    
    // Put back the vector where we took it from.
    // This works because there is only one Arc left.
    *array = Arc::try_unwrap(shared).unwrap();
}

另一种选择是使用 futures crate 中的并发原语。这些具有处理非'static数据的优点,但缺点是任务无法同时在多个线程上运行。

对于许多工作流来说,这完全没问题,因为无论如何异步代码都应该花费大部分时间等待 IO。

一种方法是使用FuturesUnordered. 这是一个特殊的集合,可以存储许多不同的期货,它有一个next函数可以同时运行所有期货,并在第一个完成后返回。(该next功能仅在StreamExt导入时可用)

你可以这样使用它:

use futures::stream::{FuturesUnordered, StreamExt};

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks = FuturesUnordered::new();
    for i in array {
        let task = do_sth(i);
        tasks.push(task);
    }
    // This loop runs everything concurrently, and waits until they have
    // all finished.
    while let Some(()) = tasks.next().await { }
}

注意:FuturesUnordered必须定义的共享值。否则你会得到一个借用错误,这是由于它们以错误的顺序被删除。


另一种方法是使用Stream. 对于流,您可以使用buffer_unordered. 这是一个在FuturesUnordered内部使用的实用程序。

use futures::stream::StreamExt;

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    // Create a stream going through the array.
    futures::stream::iter(array)
    // For each item in the stream, create a future.
        .map(|i| do_sth(i))
    // Run at most 10 of the futures concurrently.
        .buffer_unordered(10)
    // Since Streams are lazy, we must use for_each or collect to run them.
    // Here we use for_each and do nothing with the return value from do_sth.
        .for_each(|()| async {})
        .await;
}

请注意,在这两种情况下,导入StreamExt都很重要,因为它提供了在不导入扩展特征的情况下无法在流上使用的各种方法。


以上是使用Tokio0.2生成非静态未来的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>