Utiliser l'envoi de messages pour transférer des données entre les tâches
Une approche de plus en plus populaire pour garantir la sécurité de la concurrence est l'envoi de message, avec lequel les tâches ou les acteurs communiquent en envoyant aux autres des messages contenant des données. Voici l'idée résumée, tirée d'un slogan provenant de la documentation du langage Go : “Ne communiquez pas en partageant la mémoire ; partagez plutôt la mémoire en communiquant”.
Un des outils majeurs que Rust a pour accomplir l'envoi de messages pour la concurrence est le canal, un concept de programmation dont la bibliothèque standard de Rust fournit une implémentation. Vous pouvez imaginer un canal de programmation comme étant un canal d'eau, comme un ruisseau ou une rivière. Si vous posez quelque chose comme un canard en plastique ou un bateau sur un ruisseau, il se déplacera en descendant le long de la voie d'eau.
Un canal de programmation est divisé en deux parties : un transmetteur et un receveur. La partie du transmetteur est le lieu en amont où vous déposez les canards en plastique sur la rivière et la partie du receveur est celle où les canards en plastique finissent leur voyage. Une partie de votre code appelle des méthodes du transmetteur en lui passant les données que vous souhaitez envoyer, tandis qu'une autre partie attend que des messages arrivent. Un canal est déclaré fermé lorsque l'une des parties, le transmetteur ou le récepteur, est libérée.
Ici, nous allons concevoir un programme qui a une tâche pour générer des valeurs et les envoyer dans un canal, et une autre tâche qui va recevoir les valeurs et les afficher. Nous allons envoyer de simples valeurs entre les tâches en utilisant un canal pour illustrer cette fonctionnalité. Une fois que vous serez familier avec cette technique, vous pourrez utiliser les canaux pour créer un système de dialogue en ligne ou un système où de nombreuses tâches font chacune une partie d'un gros calcul et envoient leur résultat à une tâche chargée de les agréger.
Pour commencer, dans l'encart 16-6, nous allons créer un canal mais nous n'allons rien faire avec. Remarquez qu'il ne se compilera pas encore car Rust ne peut pas savoir le type de valeurs que nous souhaitons envoyer dans le canal.
Fichier : src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
Nous créons un nouveau canal en utilisant la fonction mpsc::channel
; mpsc
signifie multiple producer, single consumer, c'est-à-dire
plusieurs producteurs, un seul consommateur. En bref, la façon dont la
bibliothèque standard de Rust a implémenté ces canaux permet d'avoir plusieurs
extrémités émettrices qui produisent des valeurs, mais seulement une seule
extrémité réceptrice qui consomme ces valeurs. Imaginez plusieurs ruisseaux
qui se rejoignent en une seule grosse rivière : tout ce qui est déposé sur les
ruisseaux va finir dans une seule rivière à la fin. Nous allons commencer avec
un seul producteur pour le moment, mais nous allons ajouter d'autres
producteurs lorsque notre exemple fonctionnera.
La fonction mpsc::channel
retourne un tuple, le premier élément est celui qui
permet d'envoyer et le second est celui qui reçoit. Les abréviations tx
et
rx
sont utilisés traditionnellement dans de nombreux domaines pour signifier
respectivement transmetteur et récepteur, nous avons donc nommé nos
variables ainsi pour indiquer clairement le rôle de chaque élément. Nous
utilisons une instruction let
avec un motif qui déstructure les tuples ; nous
verrons l'utilisation des motifs dans les instructions let
et la
déstructuration au chapitre 18. L'utilisation d'une instruction let
est une
façon d'extraire facilement les éléments du tuple retourné par mpsc::channel
.
Déplaçons maintenant l'élément de transmission dans une nouvelle tâche et faisons-lui envoyer une chaîne de caractères afin que la nouvelle tâche communique avec la tâche principale, comme dans l'encart 16-7. C'est comme poser un canard en plastique sur l'amont de la rivière ou envoyer un message instantané d'une tâche à une autre.
Fichier : src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let valeur = String::from("salut"); tx.send(valeur).unwrap(); }); }
Nous utilisons à nouveau thread::spawn
pour créer une nouvelle tâche et
ensuite utiliser move
pour déplacer tx
dans la fermeture afin que la
nouvelle tâche possède désormais tx
. La nouvelle tâche a besoin de posséder
la partie émettrice du canal pour être en capacité d'envoyer des messages
dans ce canal.
La partie émettrice a une méthode send
qui prend en argument la valeur que
nous souhaitons envoyer. La méthode send
retourne un type Result<T, E>
,
donc si la partie réceptrice a déjà été libérée et qu'il n'y a nulle part où
envoyer la valeur, l'opération d'envoi va retourner une erreur. Dans cet
exemple, nous faisons appel à unwrap
pour paniquer en cas d'erreur. Mais dans
un vrai programme, nous devrions gérer ce cas correctement : retournez au
chapitre 9 pour revoir les stratégies permettant de gérer correctement les erreurs.
Dans l'encart 16-8, nous allons obtenir la valeur de l'extrémité réceptrice du canal dans la tâche principale. C'est comme récupérer le canard en plastique dans l'eau à la fin de la rivière, ou récupérer un message instantané.
Fichier : src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let valeur = String::from("salut"); tx.send(valeur).unwrap(); }); let recu = rx.recv().unwrap(); println!("On a reçu : {}", recu); }
La partie réception d'un canal a deux modes intéressants : recv
et
try_recv
. Nous avons utilisé recv
, un raccourci pour recevoir, qui va
bloquer l'exécution de la tâche principale et attendre jusqu'à ce qu'une valeur
soit envoyée dans le canal. Une fois qu'une valeur est envoyée, recv
va
la retourner dans un Result<T, E>
. Lorsque la partie transmission du canal se
ferme, recv
va retourner une erreur pour signaler qu'il n'y aura plus de
valeurs qui arriveront.
La méthode try_recv
ne bloque pas, mais va plutôt retourner immédiatement un
Result<T, E>
: une valeur Ok
qui contiendra un message s'il y en a un de
disponible, et une valeur Err
s'il n'y a pas de message cette fois-ci.
L'utilisation de try_recv
est pratique si cette tâche à d'autres choses à
faire pendant qu'elle attend les messages : nous pouvons ainsi écrire une
boucle qui appelle régulièrement try_recv
, gère le message s'il y en a un, et
sinon fait d'autres choses avant de vérifier à nouveau.
Nous avons utilisé recv
dans cet exemple pour des raisons de simplicité ;
nous n'avons rien d'autres à faire dans la tâche principale que d'attendre les
messages, donc bloquer la tâche principale est acceptable.
Lorsque nous exécutons le code de l'encart 16-8, nous allons voir la valeur s'afficher grâce à la tâche principale :
On a reçu : salut
C'est parfait ainsi !
Les canaux et le transfert de possession
Les règles de possession jouent un rôle vital dans l'envoi de messages car
elles vous aident à écrire du code sûr et concurrent. Réfléchir à la possession
avec vos programmes Rust vous offre l'avantage d'éviter des erreurs de
développement avec la concurrence. Faisons une expérience pour montrer comment
la possession et les canaux fonctionnent ensemble pour éviter les problèmes :
nous allons essayer d'utiliser la valeur
dans la nouvelle tâche après que
nous l'avons envoyée dans le canal. Essayez de compiler le code de l'encart 16-9
pour découvrir pourquoi ce code n'est pas autorisé :
Fichier : src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeur = String::from("hi");
tx.send(valeur).unwrap();
println!("valeur vaut {}", valeur);
});
let recu = rx.recv().unwrap();
println!("On a reçu : {}", recu);
}
Ici, nous essayons d'afficher valeur
après que nous l'avons envoyée dans le
canal avec tx.send
. Ce serait une mauvaise idée de permettre cela : une fois
que la valeur a été envoyée à une autre tâche, cette tâche peut la modifier ou
la libérer avant que nous essayions de l'utiliser à nouveau. Il est possible que
des modifications faites par l'autre tâche puissent causer des erreurs ou des résultats
inattendus à cause de données incohérentes ou manquantes. Toutefois, Rust nous
affiche une erreur si nous essayons de compiler le code de l'encart 16-9 :
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `valeur`
--> src/main.rs:10:31
|
8 | let valeur = String::from("salut");
| ------ move occurs because `valeur` has type `String`, which does not implement the `Copy` trait
9 | tx.send(valeur).unwrap();
| ------ value moved here
10 | println!("valeur vaut {}", valeur);
| ^^^^^^ value borrowed here after move
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error
Notre erreur de concurrence a provoqué une erreur à la compilation. La fonction
send
prend possession de ses paramètres, et lorsque la valeur est déplacée,
le récepteur en prend possession. Cela nous évite d'utiliser à nouveau
accidentellement la valeur après l'avoir envoyée ; le système de possession
vérifie que tout est en ordre.
Envoyer plusieurs valeurs et voir le récepteur les attendre
Le code de l'encart 16-8 s'est compilé et exécuté, mais il ne nous a pas clairement indiqué que deux tâches séparées communiquaient entre elles via le canal. Dans l'encart 16-10 nous avons fait quelques modifications qui prouvent que le code de l'encart 16-8 est exécuté avec de la concurrence : la nouvelle tâche va maintenant envoyer plusieurs messages et faire une pause d'une seconde entre chaque message.
Fichier : src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let valeurs = vec![
String::from("salutations"),
String::from("à partir"),
String::from("de la"),
String::from("nouvelle tâche"),
];
for valeur in valeurs {
tx.send(valeur).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for recu in rx {
println!("On a reçu : {}", recu);
}
}
Cette fois-ci, la nouvelle tâche a un vecteur de chaînes de caractères que nous
souhaitons envoyer à la tâche principale. Nous itérons sur celui-ci, on envoie
les chaînes une par une en faisant une pause entre chaque envoi en appelant la
fonction thread::sleep
avec une valeur Duration
de 1 seconde.
Dans la tâche principale, nous n'appelons plus explicitement la fonction
recv
: à la place, nous utilisons rx
comme un itérateur. Pour chaque valeur
reçue, nous l'affichons. Lorsque le canal se fermera, l'itération se terminera.
Lorsque nous exécutons le code de l'encart 16-10, nous devrions voir la sortie suivante, avec une pause de 1 seconde entre chaque ligne :
On a reçu : salutations
On a reçu : à partir
On a reçu : de la
On a reçu : nouvelle tâche
Comme nous n'avons pas de code qui met en pause ou retarde la boucle for
de
la tâche principale, nous pouvons dire que la tâche principale est en attente
de réception des valeurs de la part de la nouvelle tâche.
Créer plusieurs producteurs en clonant le transmetteur
Précédemment, nous avions évoqué que mpsc
était un acronyme pour
multiple producer, single consumer. Mettons mpsc
en œuvre en élargissant le
code de l'encart 16-10 pour créer plusieurs tâches qui vont toutes envoyer des
valeurs au même récepteur. Nous pouvons faire ceci en clonant la partie
émettrice du canal, comme dans l'encart 16-11 :
Fichier : src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// -- partie masquée ici --
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let valeurs = vec![
String::from("salutations"),
String::from("à partir"),
String::from("de la"),
String::from("nouvelle tâche"),
];
for valeur in valeurs {
tx1.send(valeur).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let valeurs = vec![
String::from("encore plus"),
String::from("de messages"),
String::from("pour"),
String::from("vous"),
];
for valeur in valeurs {
tx.send(valeur).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for recu in rx {
println!("On a reçu : {}", recu);
}
// -- partie masquée ici --
}
Cette fois-ci, avant de créer la première nouvelle tâche, nous appelons clone
sur la partie émettrice du canal. Cela va nous donner un nouveau transmetteur
que nous pourrons passer à la première nouvelle tâche. Nous passons ensuite le
transmetteur original à une seconde nouvelle tâche. Cela va nous donner deux
tâches, chacune envoyant des messages différents à la partie réceptrice du
canal.
Lorsque vous exécuterez ce code, votre sortie devrait ressembler à ceci :
On a reçu : salutations
On a reçu : encore plus
On a reçu : de messages
On a reçu : pour
On a reçu : à partir
On a reçu : de la
On a reçu : nouvelle tâche
On a reçu : pour vous
Vous pourrez peut-être constater que les valeurs sont dans un autre ordre chez
vous ; cela dépend de votre système. C'est ce qui rend la concurrence aussi
intéressante que difficile. Si vous jouez avec la valeur de thread::sleep
en
lui donnant différentes valeurs dans différentes tâches, chaque exécution sera
encore moins déterministe et créera une sortie différente à chaque fois.
Maintenant que nous avons découvert le fonctionnement des canaux, examinons un autre genre de concurrence.