🚧 Attention, peinture fraîche !

Cette page a été traduite par une seule personne et n'a pas été relue et vérifiée par quelqu'un d'autre ! Les informations peuvent par exemple être erronées, être formulées maladroitement, ou contenir d'autres types de fautes.

Vous pouvez contribuer à l'amélioration de cette page sur sa Pull Request.

L'itération et la concurrence

Comme pour les Iterators synchrones, il existe de nombreuses façons pour itérer sur les valeurs dans un Stream et pour les traiter. Il existe des méthodes conçues pour se combiner, comme map, filter et fold, et leurs cousines conçues pour s'arrêter dès qu'elles rencontrent une erreur, comme try_map, try_filter, et try_fold.

Malheureusement, les boucles for ne sont pas utilisables avec les Stream, mais du code plus impératif peut être utilisé, comme while let et les fonctions next et try_next :

async fn somme_avec_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // pour utiliser `next`
    let mut somme = 0;
    while let Some(valeur) = stream.next().await {
        somme += valeur;
    }
    somme
}

async fn somme_avec_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // pour utiliser `try_next`
    let mut somme = 0;
    while let Some(valeur) = stream.try_next().await? {
        somme += valeur;
    }
    Ok(somme)
}

Cependant, si nous ne traitions qu'un seul élément à la fois, nous aurions probablement gaspillé des occasions de concurrence, ce qui, après tout, est la raison principale pour laquelle nous écrivons du code asynchrone. Pour traiter en concurrence plusieurs éléments d'un Stream, utilisez les méthodes for_each_concurrent et try_for_each_concurrent :

async fn sauter_partout(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // pour utiliser `try_for_each_concurrent`
    const SAUTS_CONCURRENTS_MAXI: usize = 100;

    stream.try_for_each_concurrent(SAUTS_CONCURRENTS_MAXI, |nombre| async move {
        saute_x_fois(nombre).await?;
        reporter_x_sauts(nombre).await?;
        Ok(())
    }).await?;

    Ok(())
}