🚧 Attention, peinture fraîche !
Cette page a été traduite par une seule personne et n'a pas été relue et vérifiée par quelqu'un d'autre ! Les informations peuvent par exemple être erronées, être formulées maladroitement, ou contenir d'autres types de fautes.
Vous pouvez contribuer à l'amélioration de cette page sur sa Pull Request.
select!
La macro futures::select exécute plusieurs futures en même temps, permettant
à son utilisateur de répondre dès qu'une future est terminée.
#![allow(unused)] fn main() { use futures::{ future::FutureExt, // pour utiliser `.fuse()` pin_mut, select, }; async fn premiere_tache() { /* ... */ } async fn seconde_tache() { /* ... */ } async fn course_de_taches() { let t1 = premiere_tache().fuse(); let t2 = seconde_tache().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("la première tâche s'est terminée en premier"), () = t2 => println!("la seconde tâche s'est terminée en premier"), } } }
La fonction ci-dessus va exécuter t1 et t2 en concurrence. Lorsque t1 ou
t2 se termine, la branche correspondante va appeler println! et la fonction
va se terminer sans terminer la tâche restante.
La syntaxe classique pour select est <motif> = <expression> => <code>,,
répétée par autant de futures que vous voulez gérer avec le select.
default => ... et complete => ...
select autorise aussi l'utilisation des branches default et complete.
La branche default va s'exécuter si aucune des futures dans le select n'est
terminée. Un select avec une branche default toutefois retourner sa valeur
immédiatement, puisque default sera exécuté si aucune des futures n'est
terminée.
La branche complete peut être utilisée pour gérer le cas où toutes les
futures présentes dans le select se sont terminées et ne vont pas plus
progresser. C'est parfois utile lorsqu'on boucle sur un select!.
#![allow(unused)] fn main() { use futures::{future, select}; async fn compter() { let mut future_a = future::ready(4); let mut future_b = future::ready(6); let mut total = 0; loop { select! { a = future_a => total += a, b = future_b => total += b, complete => break, default => unreachable!(), // ne sera jamais exécuté (les futures // sont prêtes, puis ensuite terminées) }; } assert_eq!(total, 10); } }
Utilisation avec Unpin et FusedFuture
Vous avez peut-être remarqué dans le premier exemple ci-dessus que nous avons
dû appeller .fuse() sur les futures retournées par les deux fonctions
asynchrones, ainsi que les épingler avec pin_mut. Chacun de ces appels sont
nécessaires car les futures utilisées dans select doivent implémenter les
traits Unpin et FusedFuture.
Unpin est nécessaire car les futures utilisées par select ne sont pas des
valeurs, mais des références mutables. En évitant de prendre possession de la
future, les futures non terminées peuvent toujours être utilisées après l'appel
à select.
De la même manière, le trait FusedFuture est nécessaire car select ne doit
pas appeler une future après qu'elle soit complétée. FusedFuture est
implémentée par les futures qui ont besoin de savoir si oui ou non elles se
sont terminées. Cela permet d'utiliser select dans une boucle, pour appeler
uniquement les futures qui n'ont pas encore terminé. Nous pouvons voir cela
dans l'exemple ci-dessus, où future_a ou future_b sont terminés dans le
deuxième tour de boucle. Comme la future retournée par future::ready
implémente FusedFuture, c'est possible d'indiquer au select de ne pas les
appeler à nouveau.
Remarquez que les Streams ont un trait FusedStream correspondant. Les
Streams qui implémentent ce trait ou qui ont été enveloppés en utilisant
.fuse() vont produire des futures FusedFutures à partir de leurs
combinateurs .next() ou try_next().
#![allow(unused)] fn main() { use futures::{ select, stream::{FusedStream, Stream, StreamExt}, }; async fn ajouter_deux_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let element = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(nombre_suivant) = element { total += nombre_suivant; } } total } }
Des tâches concurrentes dans une boucle select avec Fuse et FuturesUnordered
Une fonction difficile à aborder, mais qui est pratique, est
Fuse::terminated(), ce qui permet de construire une future vide qui est déjà
terminée, et qui peut être rempli plus tard avec une future qui a besoin d'être
exécutée.
Cela s'avère utile lorsqu'une tâche nécessite d'être exécuté dans une boucle
select qui est elle-même créée dans la boucle select.
Remarquez l'utilisation de la fonction .select_next_some(). Elle peut être
utilisée avec select pour exécuter uniquement la branche pour les valeurs
Some(_) retournées par le Stream, en ignorant les Nones.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, pin_mut, select, stream::{FusedStream, Stream, StreamExt}, }; async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 } async fn executer_avec_nouveau_nombre(_: u8) { /* ... */ } async fn executer_boucle( mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin, nombre_initial: u8, ) { let executer_avec_nouveau_nombre_future = executer_avec_nouveau_nombre(nombre_initial).fuse(); let obtenir_nouveau_nombre_future = Fuse::terminated(); pin_mut!( executer_avec_nouveau_nombre_future, obtenir_nouveau_nombre_future ); loop { select! { () = temporisation.select_next_some() => { // La temporisation s'est terminée. Démarre un nouveau // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est // déjà en cours d'exécution. if obtenir_nouveau_nombre_future.is_terminated() { obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse()); } }, new_num = obtenir_nouveau_nombre_future => { // Un nouveau nombre est arrivé : cela démarrera un nouveau // `executer_avec_nouveau_nombre_future`, ce qui libèrera // l'ancien. executer_avec_nouveau_nombre_future.set(executer_avec_nouveau_nombre(new_num).fuse()); }, // Execute le `executer_avec_nouveau_nombre_future` () = executer_avec_nouveau_nombre_future => {}, // panique si tout est terminé, car la `temporisation` est censé // générer des valeurs à l'infini. complete => panic!("`temporisation` s'est terminé inopinément"), } } } }
Lorsque de nombreuses copies d'une même future a besoin d'être exécuté en même
temps, utilisez le type FuturesUnordered. L'exemple suivant ressemble à celui
ci-dessus, mais va exécuter chaque copie de obtenir_nouveau_nombre_future
jusqu'à ce qu'elles soient terminées, plutôt que de les arrêter lorsqu'une
nouvelle est générée. Cela va aussi afficher la valeur retournée par
obtenir_nouveau_nombre_future.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, pin_mut, select, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, }; async fn obtenir_nouveau_nombre() -> u8 { /* ... */ 5 } async fn executer_avec_nouveau_nombre(_: u8) -> u8 { /* ... */ 5 } // Exécute `executer_avec_nouveau_nombre` avec le dernier nombre obtenu // auprès de `obtenir_nouveau_nombre`. // // `obtenir_nouveau_nombre` est exécuté à nouveau à chaque fois que la // temporisation se termine, ce qui annule immédiatement le // `executer_avec_nouveau_nombre` en cours et la remplace avec la nouvelle // valeur retournée. async fn executer_boucle( mut temporisation: impl Stream<Item = ()> + FusedStream + Unpin, nombre_initial: u8, ) { let mut executer_avec_nouveau_nombre_futures = FuturesUnordered::new(); executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(nombre_initial)); let obtenir_nouveau_nombre_future = Fuse::terminated(); pin_mut!(obtenir_nouveau_nombre_future); loop { select! { () = temporisation.select_next_some() => { // La temporisation s'est terminée. Démarre un nouveau // `obtenir_nouveau_nombre_future` s'il n'y en a pas un qui est // déjà en cours d'exécution. if obtenir_nouveau_nombre_future.is_terminated() { obtenir_nouveau_nombre_future.set(obtenir_nouveau_nombre().fuse()); } }, new_num = obtenir_nouveau_nombre_future => { // Un nouveau nombre est arrivé : cela démarrera un nouveau // `executer_avec_nouveau_nombre_future`.. executer_avec_nouveau_nombre_futures.push(executer_avec_nouveau_nombre(new_num)); }, // Exécute le `executer_avec_nouveau_nombre_futures` et vérifie si certaines ont terminé. res = executer_avec_nouveau_nombre_futures.select_next_some() => { println!("executer_avec_nouveau_nombre_future a retourné {:?}", res); }, // panique si tout est terminé, car la `temporisation` est censé // générer des valeurs à l'infini. complete => panic!("`temporisation` s'est terminé inopinément"), } } } }