🚧 Attention, peinture fraüche !

Cette page a Ă©tĂ© traduite par une seule personne et n'a pas Ă©tĂ© relue et vĂ©rifiĂ©e par quelqu'un d'autre ! Les informations peuvent par exemple ĂȘtre erronĂ©es, ĂȘtre formulĂ©es maladroitement, ou contenir d'autres types de fautes.

Vous pouvez contribuer à l'amélioration de cette page sur sa Pull Request.

select!

La macro futures::select exĂ©cute plusieurs futures en mĂȘme temps, permettant Ă  son utilisateur de rĂ©pondre dĂšs qu'une future est terminĂ©e.


#![allow(unused)]
fn main() {
    use futures::{
        future::FutureExt, // pour utiliser `.fuse()`
        pin_mut,
        select,
    };

    async fn premiere_tache() { /* ... */ }
    async fn seconde_tache() { /* ... */ }

    async fn course_de_taches() {
        let t1 = premiere_tache().fuse();
        let t2 = seconde_tache().fuse();

        pin_mut!(t1, t2);

        select! {
            () = t1 => println!("la premiÚre tùche s'est terminée en premier"),
            () = t2 => println!("la seconde tùche s'est terminée en premier"),
        }
    }
}

La fonction ci-dessus va exécuter t1 et t2 en concurrence. Lorsque t1 ou t2 se termine, la branche correspondante va appeler println! et la fonction va se terminer sans terminer la tùche restante.

La syntaxe classique pour select est <motif> = <expression> => <code>,, répétée par autant de futures que vous voulez gérer avec le select.

default => ... et complete => ...

select autorise aussi l'utilisation des branches default et complete.

La branche default va s'exécuter si aucune des futures dans le select n'est terminée. Un select avec une branche default toutefois retourner sa valeur immédiatement, puisque default sera exécuté si aucune des futures n'est terminée.

La branche complete peut ĂȘtre utilisĂ©e pour gĂ©rer le cas oĂč toutes les futures prĂ©sentes dans le select se sont terminĂ©es et ne vont pas plus progresser. C'est parfois utile lorsqu'on boucle sur un select!.


#![allow(unused)]
fn main() {
    use futures::{future, select};

    async fn compter() {
        let mut future_a = future::ready(4);
        let mut future_b = future::ready(6);
        let mut total = 0;

        loop {
            select! {
                a = future_a => total += a,
                b = future_b => total += b,
                complete => break,
                default => unreachable!(), // ne sera jamais exécuté (les futures
                                           // sont prĂȘtes, puis ensuite terminĂ©es)
            };
        }
        assert_eq!(total, 10);
    }
}

Utilisation avec Unpin et FusedFuture

Vous avez peut-ĂȘtre remarquĂ© dans le premier exemple ci-dessus que nous avons dĂ» appeller .fuse() sur les futures retournĂ©es par les deux fonctions asynchrones, ainsi que les Ă©pingler avec pin_mut. Chacun de ces appels sont nĂ©cessaires car les futures utilisĂ©es dans select doivent implĂ©menter les traits Unpin et FusedFuture.

Unpin est nĂ©cessaire car les futures utilisĂ©es par select ne sont pas des valeurs, mais des rĂ©fĂ©rences mutables. En Ă©vitant de prendre possession de la future, les futures non terminĂ©es peuvent toujours ĂȘtre utilisĂ©es aprĂšs l'appel Ă  select.

De la mĂȘme maniĂšre, le trait FusedFuture est nĂ©cessaire car select ne doit pas appeler une future aprĂšs qu'elle soit complĂ©tĂ©e. FusedFuture est implĂ©mentĂ©e par les futures qui ont besoin de savoir si oui ou non elles se sont terminĂ©es. Cela permet d'utiliser select dans une boucle, pour appeler uniquement les futures qui n'ont pas encore terminĂ©. Nous pouvons voir cela dans l'exemple ci-dessus, oĂč future_a ou future_b sont terminĂ©s dans le deuxiĂšme tour de boucle. Comme la future retournĂ©e par future::ready implĂ©mente FusedFuture, c'est possible d'indiquer au select de ne pas les appeler Ă  nouveau.

Remarquez que les Streams ont un trait FusedStream correspondant. Les Streams qui implémentent ce trait ou qui ont été enveloppés en utilisant .fuse() vont produire des futures FusedFutures à partir de leurs combinateurs .next() ou try_next().


#![allow(unused)]
fn main() {
    use futures::{
        select,
        stream::{FusedStream, Stream, StreamExt},
    };

    async fn ajouter_deux_streams(
        mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
        mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
    ) -> u8 {
        let mut total = 0;

        loop {
            let element = select! {
                x = s1.next() => x,
                x = s2.next() => x,
                complete => break,
            };
            if let Some(nombre_suivant) = element {
                total += nombre_suivant;
            }
        }

        total
    }
}

Des tĂąches concurrentes dans une boucle select avec Fuse et FuturesUnordered

Une fonction difficile Ă  aborder, mais qui est pratique, est Fuse::terminated(), ce qui permet de construire une future vide qui est dĂ©jĂ  terminĂ©e, et qui peut ĂȘtre rempli plus tard avec une future qui a besoin d'ĂȘtre exĂ©cutĂ©e.

Cela s'avĂšre utile lorsqu'une tĂąche nĂ©cessite d'ĂȘtre exĂ©cutĂ© dans une boucle select qui est elle-mĂȘme crĂ©Ă©e dans la boucle select.

Remarquez l'utilisation de la fonction .select_next_some(). Elle peut ĂȘtre utilisĂ©e avec select pour exĂ©cuter uniquement la branche pour les valeurs Some(_) retournĂ©es par le Stream, en ignorant les Nones.


#![allow(unused)]
fn main() {
    use futures::{
        future::{Fuse, FusedFuture, FutureExt},
        pin_mut, select,
        stream::{FusedStream, Stream, StreamExt},
    };

    async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 }

    async fn executer_avec_nouveau_nombre(_: u8) { /* ... */ }

    async fn executer_boucle(
        mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin,
        nombre_initial: u8,
    ) {
        let executer_avec_nouveau_nombre_future =
            executer_avec_nouveau_nombre(nombre_initial).fuse();
        let obtenir_nouveau_nombre_future = Fuse::terminated();
        pin_mut!(
            executer_avec_nouveau_nombre_future,
            obtenir_nouveau_nombre_future
        );
        loop {
            select! {
                () = temporisation.select_next_some() => {
                    // La temporisation s'est terminée. Démarre un nouveau
                    // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est
                    // déjà en cours d'exécution.
                    if obtenir_nouveau_nombre_future.is_terminated() {
                        obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse());
                    }
                },
                new_num = obtenir_nouveau_nombre_future => {
                    // Un nouveau nombre est arrivé : cela démarrera un nouveau
                    // `executer_avec_nouveau_nombre_future`, ce qui libĂšrera
                    // l'ancien.
                    executer_avec_nouveau_nombre_future.set(executer_avec_nouveau_nombre(new_num).fuse());
                },
                // Execute le `executer_avec_nouveau_nombre_future`
                () = executer_avec_nouveau_nombre_future => {},
                // panique si tout est terminé, car la `temporisation` est censé
                // générer des valeurs à l'infini.
                complete => panic!("`temporisation` s'est terminé inopinément"),
            }
        }
    }
}

Lorsque de nombreuses copies d'une mĂȘme future a besoin d'ĂȘtre exĂ©cutĂ© en mĂȘme temps, utilisez le type FuturesUnordered. L'exemple suivant ressemble Ă  celui ci-dessus, mais va exĂ©cuter chaque copie de obtenir_nouveau_nombre_future jusqu'Ă  ce qu'elles soient terminĂ©es, plutĂŽt que de les arrĂȘter lorsqu'une nouvelle est gĂ©nĂ©rĂ©e. Cela va aussi afficher la valeur retournĂ©e par obtenir_nouveau_nombre_future.


#![allow(unused)]
fn main() {
    use futures::{
        future::{Fuse, FusedFuture, FutureExt},
        pin_mut, select,
        stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    };

    async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 }

    async fn executer_avec_nouveau_nombre(_: u8) -> u8 { /* ... */ 5 }

    // Exécute `executer_avec_nouveau_nombre` avec le dernier nombre obtenu
    // auprĂšs de `obtenir_nouveau_nombre`.
    //
    // `obtenir_nouveau_nombre` est exécuté à nouveau à chaque fois que la
    // temporisation se termine, ce qui annule immédiatement le
    // `executer_avec_nouveau_nombre` en cours et la remplace avec la nouvelle
    // valeur retournée.
    async fn executer_boucle(
        mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin,
        nombre_initial: u8,
    ) {
        let mut executer_avec_nouveau_nombre_futures = FuturesUnordered::new();
        executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(nombre_initial));
        let obtenir_nouveau_nombre_future = Fuse::terminated();
        pin_mut!(obtenir_nouveau_nombre_future);
        loop {
            select! {
                () = temporisation.select_next_some() => {
                    // La temporisation s'est terminée. Démarre un nouveau
                    // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est
                    // déjà en cours d'exécution.
                    if obtenir_nouveau_nombre_future.is_terminated() {
                        obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse());
                    }
                },
                new_num = obtenir_nouveau_nombre_future => {
                    // Un nouveau nombre est arrivé : cela démarrera un nouveau
                    // `executer_avec_nouveau_nombre_future`..
                    executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(new_num));
                },
                // Exécute le `executer_avec_nouveau_nombre_futures` et vérifie si certaines ont terminé.
                res = executer_avec_nouveau_nombre_futures.select_next_some() => {
                    println!("executer_avec_nouveau_nombre_future a retourné {:?}", res);
                },
                // panique si tout est terminé, car la `temporisation` est censé
                // générer des valeurs à l'infini.
                complete => panic!("`temporisation` s'est terminé inopinément"),
            }
        }
    }

}