🚧 Attention, peinture fraîche !

Cette page a été traduite par une seule personne et n'a pas été relue et vérifiée par quelqu'un d'autre ! Les informations peuvent par exemple être erronées, être formulées maladroitement, ou contenir d'autres types de fautes.

Vous pouvez contribuer à l'amélioration de cette page sur sa Pull Request.

select!

La macro futures::select exécute plusieurs futures en même temps, permettant à son utilisateur de répondre dès qu'une future est terminée.


#![allow(unused)]
fn main() {
    use futures::{
        future::FutureExt, // pour utiliser `.fuse()`
        pin_mut,
        select,
    };

    async fn premiere_tache() { /* ... */ }
    async fn seconde_tache() { /* ... */ }

    async fn course_de_taches() {
        let t1 = premiere_tache().fuse();
        let t2 = seconde_tache().fuse();

        pin_mut!(t1, t2);

        select! {
            () = t1 => println!("la première tâche s'est terminée en premier"),
            () = t2 => println!("la seconde tâche s'est terminée en premier"),
        }
    }
}

La fonction ci-dessus va exécuter t1 et t2 en concurrence. Lorsque t1 ou t2 se termine, la branche correspondante va appeler println! et la fonction va se terminer sans terminer la tâche restante.

La syntaxe classique pour select est <motif> = <expression> => <code>,, répétée par autant de futures que vous voulez gérer avec le select.

default => ... et complete => ...

select autorise aussi l'utilisation des branches default et complete.

La branche default va s'exécuter si aucune des futures dans le select n'est terminée. Un select avec une branche default toutefois retourner sa valeur immédiatement, puisque default sera exécuté si aucune des futures n'est terminée.

La branche complete peut être utilisée pour gérer le cas où toutes les futures présentes dans le select se sont terminées et ne vont pas plus progresser. C'est parfois utile lorsqu'on boucle sur un select!.


#![allow(unused)]
fn main() {
    use futures::{future, select};

    async fn compter() {
        let mut future_a = future::ready(4);
        let mut future_b = future::ready(6);
        let mut total = 0;

        loop {
            select! {
                a = future_a => total += a,
                b = future_b => total += b,
                complete => break,
                default => unreachable!(), // ne sera jamais exécuté (les futures
                                           // sont prêtes, puis ensuite terminées)
            };
        }
        assert_eq!(total, 10);
    }
}

Utilisation avec Unpin et FusedFuture

Vous avez peut-être remarqué dans le premier exemple ci-dessus que nous avons dû appeller .fuse() sur les futures retournées par les deux fonctions asynchrones, ainsi que les épingler avec pin_mut. Chacun de ces appels sont nécessaires car les futures utilisées dans select doivent implémenter les traits Unpin et FusedFuture.

Unpin est nécessaire car les futures utilisées par select ne sont pas des valeurs, mais des références mutables. En évitant de prendre possession de la future, les futures non terminées peuvent toujours être utilisées après l'appel à select.

De la même manière, le trait FusedFuture est nécessaire car select ne doit pas appeler une future après qu'elle soit complétée. FusedFuture est implémentée par les futures qui ont besoin de savoir si oui ou non elles se sont terminées. Cela permet d'utiliser select dans une boucle, pour appeler uniquement les futures qui n'ont pas encore terminé. Nous pouvons voir cela dans l'exemple ci-dessus, où future_a ou future_b sont terminés dans le deuxième tour de boucle. Comme la future retournée par future::ready implémente FusedFuture, c'est possible d'indiquer au select de ne pas les appeler à nouveau.

Remarquez que les Streams ont un trait FusedStream correspondant. Les Streams qui implémentent ce trait ou qui ont été enveloppés en utilisant .fuse() vont produire des futures FusedFutures à partir de leurs combinateurs .next() ou try_next().


#![allow(unused)]
fn main() {
    use futures::{
        select,
        stream::{FusedStream, Stream, StreamExt},
    };

    async fn ajouter_deux_streams(
        mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
        mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
    ) -> u8 {
        let mut total = 0;

        loop {
            let element = select! {
                x = s1.next() => x,
                x = s2.next() => x,
                complete => break,
            };
            if let Some(nombre_suivant) = element {
                total += nombre_suivant;
            }
        }

        total
    }
}

Des tâches concurrentes dans une boucle select avec Fuse et FuturesUnordered

Une fonction difficile à aborder, mais qui est pratique, est Fuse::terminated(), ce qui permet de construire une future vide qui est déjà terminée, et qui peut être rempli plus tard avec une future qui a besoin d'être exécutée.

Cela s'avère utile lorsqu'une tâche nécessite d'être exécuté dans une boucle select qui est elle-même créée dans la boucle select.

Remarquez l'utilisation de la fonction .select_next_some(). Elle peut être utilisée avec select pour exécuter uniquement la branche pour les valeurs Some(_) retournées par le Stream, en ignorant les Nones.


#![allow(unused)]
fn main() {
    use futures::{
        future::{Fuse, FusedFuture, FutureExt},
        pin_mut, select,
        stream::{FusedStream, Stream, StreamExt},
    };

    async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 }

    async fn executer_avec_nouveau_nombre(_: u8) { /* ... */ }

    async fn executer_boucle(
        mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin,
        nombre_initial: u8,
    ) {
        let executer_avec_nouveau_nombre_future =
            executer_avec_nouveau_nombre(nombre_initial).fuse();
        let obtenir_nouveau_nombre_future = Fuse::terminated();
        pin_mut!(
            executer_avec_nouveau_nombre_future,
            obtenir_nouveau_nombre_future
        );
        loop {
            select! {
                () = temporisation.select_next_some() => {
                    // La temporisation s'est terminée. Démarre un nouveau
                    // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est
                    // déjà en cours d'exécution.
                    if obtenir_nouveau_nombre_future.is_terminated() {
                        obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse());
                    }
                },
                new_num = obtenir_nouveau_nombre_future => {
                    // Un nouveau nombre est arrivé : cela démarrera un nouveau
                    // `executer_avec_nouveau_nombre_future`, ce qui libèrera
                    // l'ancien.
                    executer_avec_nouveau_nombre_future.set(executer_avec_nouveau_nombre(new_num).fuse());
                },
                // Execute le `executer_avec_nouveau_nombre_future`
                () = executer_avec_nouveau_nombre_future => {},
                // panique si tout est terminé, car la `temporisation` est censé
                // générer des valeurs à l'infini.
                complete => panic!("`temporisation` s'est terminé inopinément"),
            }
        }
    }
}

Lorsque de nombreuses copies d'une même future a besoin d'être exécuté en même temps, utilisez le type FuturesUnordered. L'exemple suivant ressemble à celui ci-dessus, mais va exécuter chaque copie de obtenir_nouveau_nombre_future jusqu'à ce qu'elles soient terminées, plutôt que de les arrêter lorsqu'une nouvelle est générée. Cela va aussi afficher la valeur retournée par obtenir_nouveau_nombre_future.


#![allow(unused)]
fn main() {
    use futures::{
        future::{Fuse, FusedFuture, FutureExt},
        pin_mut, select,
        stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    };

    async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 }

    async fn executer_avec_nouveau_nombre(_: u8) -> u8 { /* ... */ 5 }

    // Exécute `executer_avec_nouveau_nombre` avec le dernier nombre obtenu
    // auprès de `obtenir_nouveau_nombre`.
    //
    // `obtenir_nouveau_nombre` est exécuté à nouveau à chaque fois que la
    // temporisation se termine, ce qui annule immédiatement le
    // `executer_avec_nouveau_nombre` en cours et la remplace avec la nouvelle
    // valeur retournée.
    async fn executer_boucle(
        mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin,
        nombre_initial: u8,
    ) {
        let mut executer_avec_nouveau_nombre_futures = FuturesUnordered::new();
        executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(nombre_initial));
        let obtenir_nouveau_nombre_future = Fuse::terminated();
        pin_mut!(obtenir_nouveau_nombre_future);
        loop {
            select! {
                () = temporisation.select_next_some() => {
                    // La temporisation s'est terminée. Démarre un nouveau
                    // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est
                    // déjà en cours d'exécution.
                    if obtenir_nouveau_nombre_future.is_terminated() {
                        obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse());
                    }
                },
                new_num = obtenir_nouveau_nombre_future => {
                    // Un nouveau nombre est arrivé : cela démarrera un nouveau
                    // `executer_avec_nouveau_nombre_future`..
                    executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(new_num));
                },
                // Exécute le `executer_avec_nouveau_nombre_futures` et vérifie si certaines ont terminé.
                res = executer_avec_nouveau_nombre_futures.select_next_some() => {
                    println!("executer_avec_nouveau_nombre_future a retourné {:?}", res);
                },
                // panique si tout est terminé, car la `temporisation` est censé
                // générer des valeurs à l'infini.
                complete => panic!("`temporisation` s'est terminé inopinément"),
            }
        }
    }

}