0

I'm trying to spawn multiple parallel (not concurrent) tasks. Every task is running a PUT operation to a custom S3 storage using AWS SDK for Rust.

The function body looks the following (the different approaches are inspired by this post):

    async fn copy_local_to_s3(
        &self,
        global_args: &mut GlobalArgs,
        args: CopyArgs,
        cfg: &mut S3Config,
    ) -> anyhow::Result<()> {
        let mut from = Vec::new();

        for p in args.pos_args.first.clone().into_iter() {
            let path = p.get_path()?.to_path_buf();
            check_if_local_exists(&path)?;
            from.push(path);
        }

        let to = args.pos_args.last.get_s3string()?.clone();

        // Here some lines left out....

            // First try with tokio::spawn
            let jobs = from.into_iter().map({
                |p| {
                    let args = args.clone();
                    let cfg = cfg.clone();
                    let to = to.clone();
                    tokio::spawn(async move {
                        println!("Here I am for {}", &p.display());
                        Self::put_local_to_s3(args, cfg, p.clone(), to, None)
                    })
                }
            });
            futures::future::join_all(jobs).await;

            // Second try with simple async move
            let jobs = from.into_iter().map({
                |p| {
                    let args = args.clone();
                    let cfg = cfg.clone();
                    let to = to.clone();
                    async move {
                        println!("Here I am for {}", &p.display());
                        Self::put_local_to_s3(args, cfg, p.clone(), to, None)
                    }
                }
            });
            futures::future::join_all(jobs).await;

            // Third try with direct for-loop and without .map() closure
            let mut jobs = Vec::new();
            for p in from.into_iter() {
                let mb = multi_bar.clone();
                let args = args.clone();
                let config = cfg.clone();
                let to = to.clone();
                let job = tokio::spawn(async move {
                    println!("Job: {}", &p.display());
                    let _ = Self::put_local_to_s3(args, config, p.clone(), to, None);
                });
                jobs.push(job);
            }
            futures::future::join_all(jobs).await;

            // Fourth: direct for-loop and simple async move
            let mut jobs = Vec::new();
            for p in from.into_iter() {
                let mb = multi_bar.clone();
                let args = args.clone();
                let config = cfg.clone();
                let to = to.clone();
                let job = async move {
                    println!("Job: {}", &p.display());
                    Self::put_local_to_s3(args, config, p.clone(), to, None)
                };
                jobs.push(job);
            }
            futures::future::join_all(jobs).await;

In all cases I run the program with two files to be uploaded and it exits with error code 0. I also can see the printed line Job: /path/of/local/file for both files, showing that the async tasks have been started correctly. But the function put_local_to_s3 seems to never be triggered.

The head of the named function looks like this:

    async fn put_local_to_s3(
        args: CopyArgs,
        cfg: S3Config,
        from: PathBuf,
        to: S3String,
        multi_bar: Option<Arc<Mutex<MultiProgress>>>,
    ) -> anyhow::Result<()> {

The body is very long and handles all the S3 stuff. It also contains a further async task for rendering a progress bar. But this seems to be no problem. If I run the function not from an async task/block, it works just fine.

0

1 Answer 1

3

put_local_to_s3 is async and you are not awaiting it. Here's a MRE of what is happening. In this example, you are performing main1 (not awaiting an async function), main2 is how to correct it. Note that there are compile warnings saying that not awaiting a future does nothing. Playground link

Sign up to request clarification or add additional context in comments.

1 Comment

Ok, thank you. That did it. I was thinking the awaiting is taken care of by the join_all. Still learning... And to lazy to read the whole tokio book (but thats my own fault) ;)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.