Utiliser l'envoi de messages pour transférer des données entre les tâches

Une approche de plus en plus populaire pour garantir la sécurité de la concurrence est l'envoi de message, avec lequel les tâches ou les acteurs communiquent en envoyant aux autres des messages contenant des données. Voici l'idée résumée, tirée d'un slogan provenant de la documentation du langage Go : “Ne communiquez pas en partageant la mémoire ; partagez plutôt la mémoire en communiquant”.

Un des outils majeurs que Rust a pour accomplir l'envoi de messages pour la concurrence est le canal, un concept de programmation dont la bibliothèque standard de Rust fournit une implémentation. Vous pouvez imaginer un canal de programmation comme étant un canal d'eau, comme un ruisseau ou une rivière. Si vous posez quelque chose comme un canard en plastique ou un bateau sur un ruisseau, il se déplacera en descendant le long de la voie d'eau.

Un canal de programmation est divisé en deux parties : un transmetteur et un receveur. La partie du transmetteur est le lieu en amont où vous déposez les canards en plastique sur la rivière et la partie du receveur est celle où les canards en plastique finissent leur voyage. Une partie de votre code appelle des méthodes du transmetteur en lui passant les données que vous souhaitez envoyer, tandis qu'une autre partie attend que des messages arrivent. Un canal est déclaré fermé lorsque l'une des parties, le transmetteur ou le récepteur, est libérée.

Ici, nous allons concevoir un programme qui a une tâche pour générer des valeurs et les envoyer dans un canal, et une autre tâche qui va recevoir les valeurs et les afficher. Nous allons envoyer de simples valeurs entre les tâches en utilisant un canal pour illustrer cette fonctionnalité. Une fois que vous serez familier avec cette technique, vous pourrez utiliser les canaux pour créer un système de dialogue en ligne ou un système où de nombreuses tâches font chacune une partie d'un gros calcul et envoient leur résultat à une tâche chargée de les agréger.

Pour commencer, dans l'encart 16-6, nous allons créer un canal mais nous n'allons rien faire avec. Remarquez qu'il ne se compilera pas encore car Rust ne peut pas savoir le type de valeurs que nous souhaitons envoyer dans le canal.

Fichier : src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Encart 16-6 : création d'un canal et assignation de ses deux parties à tx et rx

Nous créons un nouveau canal en utilisant la fonction mpsc::channel ; mpsc signifie multiple producer, single consumer, c'est-à-dire plusieurs producteurs, un seul consommateur. En bref, la façon dont la bibliothèque standard de Rust a implémenté ces canaux permet d'avoir plusieurs extrémités émettrices qui produisent des valeurs, mais seulement une seule extrémité réceptrice qui consomme ces valeurs. Imaginez plusieurs ruisseaux qui se rejoignent en une seule grosse rivière : tout ce qui est déposé sur les ruisseaux va finir dans une seule rivière à la fin. Nous allons commencer avec un seul producteur pour le moment, mais nous allons ajouter d'autres producteurs lorsque notre exemple fonctionnera.

La fonction mpsc::channel retourne un tuple, le premier élément est celui qui permet d'envoyer et le second est celui qui reçoit. Les abréviations tx et rx sont utilisés traditionnellement dans de nombreux domaines pour signifier respectivement transmetteur et récepteur, nous avons donc nommé nos variables ainsi pour indiquer clairement le rôle de chaque élément. Nous utilisons une instruction let avec un motif qui déstructure les tuples ; nous verrons l'utilisation des motifs dans les instructions let et la déstructuration au chapitre 18. L'utilisation d'une instruction let est une façon d'extraire facilement les éléments du tuple retourné par mpsc::channel.

Déplaçons maintenant l'élément de transmission dans une nouvelle tâche et faisons-lui envoyer une chaîne de caractères afin que la nouvelle tâche communique avec la tâche principale, comme dans l'encart 16-7. C'est comme poser un canard en plastique sur l'amont de la rivière ou envoyer un message instantané d'une tâche à une autre.

Fichier : src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let valeur = String::from("salut");
        tx.send(valeur).unwrap();
    });
}

Encart 16-7 : déplacement de tx dans la nouvelle tâche et envoi de “salut”

Nous utilisons à nouveau thread::spawn pour créer une nouvelle tâche et ensuite utiliser move pour déplacer tx dans la fermeture afin que la nouvelle tâche possède désormais tx. La nouvelle tâche a besoin de posséder la partie émettrice du canal pour être en capacité d'envoyer des messages dans ce canal.

La partie émettrice a une méthode send qui prend en argument la valeur que nous souhaitons envoyer. La méthode send retourne un type Result<T, E>, donc si la partie réceptrice a déjà été libérée et qu'il n'y a nulle part où envoyer la valeur, l'opération d'envoi va retourner une erreur. Dans cet exemple, nous faisons appel à unwrap pour paniquer en cas d'erreur. Mais dans un vrai programme, nous devrions gérer ce cas correctement : retournez au chapitre 9 pour revoir les stratégies permettant de gérer correctement les erreurs.

Dans l'encart 16-8, nous allons obtenir la valeur de l'extrémité réceptrice du canal dans la tâche principale. C'est comme récupérer le canard en plastique dans l'eau à la fin de la rivière, ou récupérer un message instantané.

Fichier : src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let valeur = String::from("salut");
        tx.send(valeur).unwrap();
    });

    let recu = rx.recv().unwrap();
    println!("On a reçu : {}", recu);
}

Encart 16-8 : réception de la valeur “salut” dans la tâche principale pour l'afficher

La partie réception d'un canal a deux modes intéressants : recv et try_recv. Nous avons utilisé recv, un raccourci pour recevoir, qui va bloquer l'exécution de la tâche principale et attendre jusqu'à ce qu'une valeur soit envoyée dans le canal. Une fois qu'une valeur est envoyée, recv va la retourner dans un Result<T, E>. Lorsque la partie transmission du canal se ferme, recv va retourner une erreur pour signaler qu'il n'y aura plus de valeurs qui arriveront.

La méthode try_recv ne bloque pas, mais va plutôt retourner immédiatement un Result<T, E> : une valeur Ok qui contiendra un message s'il y en a un de disponible, et une valeur Err s'il n'y a pas de message cette fois-ci. L'utilisation de try_recv est pratique si cette tâche à d'autres choses à faire pendant qu'elle attend les messages : nous pouvons ainsi écrire une boucle qui appelle régulièrement try_recv, gère le message s'il y en a un, et sinon fait d'autres choses avant de vérifier à nouveau.

Nous avons utilisé recv dans cet exemple pour des raisons de simplicité ; nous n'avons rien d'autres à faire dans la tâche principale que d'attendre les messages, donc bloquer la tâche principale est acceptable.

Lorsque nous exécutons le code de l'encart 16-8, nous allons voir la valeur s'afficher grâce à la tâche principale :

On a reçu : salut

C'est parfait ainsi !

Les canaux et le transfert de possession

Les règles de possession jouent un rôle vital dans l'envoi de messages car elles vous aident à écrire du code sûr et concurrent. Réfléchir à la possession avec vos programmes Rust vous offre l'avantage d'éviter des erreurs de développement avec la concurrence. Faisons une expérience pour montrer comment la possession et les canaux fonctionnent ensemble pour éviter les problèmes : nous allons essayer d'utiliser la valeur dans la nouvelle tâche après que nous l'avons envoyée dans le canal. Essayez de compiler le code de l'encart 16-9 pour découvrir pourquoi ce code n'est pas autorisé :

Fichier : src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let valeur = String::from("hi");
        tx.send(valeur).unwrap();
        println!("valeur vaut {}", valeur);
    });

    let recu = rx.recv().unwrap();
    println!("On a reçu : {}", recu);
}

Encart 16-9 : tentative d'utiliser valeur après que nous l'avons envoyée dans le canal

Ici, nous essayons d'afficher valeur après que nous l'avons envoyée dans le canal avec tx.send. Ce serait une mauvaise idée de permettre cela : une fois que la valeur a été envoyée à une autre tâche, cette tâche peut la modifier ou la libérer avant que nous essayions de l'utiliser à nouveau. Il est possible que des modifications faites par l'autre tâche puissent causer des erreurs ou des résultats inattendus à cause de données incohérentes ou manquantes. Toutefois, Rust nous affiche une erreur si nous essayons de compiler le code de l'encart 16-9 :

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `valeur`
  --> src/main.rs:10:31
   |
8  |         let valeur = String::from("salut");
   |             ------ move occurs because `valeur` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(valeur).unwrap();
   |                 ------ value moved here
10 |         println!("valeur vaut {}", valeur);
   |                                    ^^^^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

Notre erreur de concurrence a provoqué une erreur à la compilation. La fonction send prend possession de ses paramètres, et lorsque la valeur est déplacée, le récepteur en prend possession. Cela nous évite d'utiliser à nouveau accidentellement la valeur après l'avoir envoyée ; le système de possession vérifie que tout est en ordre.

Envoyer plusieurs valeurs et voir le récepteur les attendre

Le code de l'encart 16-8 s'est compilé et exécuté, mais il ne nous a pas clairement indiqué que deux tâches séparées communiquaient entre elles via le canal. Dans l'encart 16-10 nous avons fait quelques modifications qui prouvent que le code de l'encart 16-8 est exécuté avec de la concurrence : la nouvelle tâche va maintenant envoyer plusieurs messages et faire une pause d'une seconde entre chaque message.

Fichier : src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let valeurs = vec![
            String::from("salutations"),
            String::from("à partir"),
            String::from("de la"),
            String::from("nouvelle tâche"),
        ];

        for valeur in valeurs {
            tx.send(valeur).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for recu in rx {
        println!("On a reçu : {}", recu);
    }
}

Encart 16-10 : envoi de plusieurs messages en faisant une pause entre chacun

Cette fois-ci, la nouvelle tâche a un vecteur de chaînes de caractères que nous souhaitons envoyer à la tâche principale. Nous itérons sur celui-ci, on envoie les chaînes une par une en faisant une pause entre chaque envoi en appelant la fonction thread::sleep avec une valeur Duration de 1 seconde.

Dans la tâche principale, nous n'appelons plus explicitement la fonction recv : à la place, nous utilisons rx comme un itérateur. Pour chaque valeur reçue, nous l'affichons. Lorsque le canal se fermera, l'itération se terminera.

Lorsque nous exécutons le code de l'encart 16-10, nous devrions voir la sortie suivante, avec une pause de 1 seconde entre chaque ligne :

On a reçu : salutations
On a reçu : à partir
On a reçu : de la
On a reçu : nouvelle tâche

Comme nous n'avons pas de code qui met en pause ou retarde la boucle for de la tâche principale, nous pouvons dire que la tâche principale est en attente de réception des valeurs de la part de la nouvelle tâche.

Créer plusieurs producteurs en clonant le transmetteur

Précédemment, nous avions évoqué que mpsc était un acronyme pour multiple producer, single consumer. Mettons mpsc en œuvre en élargissant le code de l'encart 16-10 pour créer plusieurs tâches qui vont toutes envoyer des valeurs au même récepteur. Nous pouvons faire ceci en clonant la partie émettrice du canal, comme dans l'encart 16-11 :

Fichier : src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // -- partie masquée ici --

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let valeurs = vec![
            String::from("salutations"),
            String::from("à partir"),
            String::from("de la"),
            String::from("nouvelle tâche"),
        ];

        for valeur in valeurs {
            tx1.send(valeur).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let valeurs = vec![
            String::from("encore plus"),
            String::from("de messages"),
            String::from("pour"),
            String::from("vous"),
        ];

        for valeur in valeurs {
            tx.send(valeur).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for recu in rx {
        println!("On a reçu : {}", recu);
    }

    // -- partie masquée ici --
}

Encart 16-11 : envoi de plusieurs messages à partir de plusieurs producteurs

Cette fois-ci, avant de créer la première nouvelle tâche, nous appelons clone sur la partie émettrice du canal. Cela va nous donner un nouveau transmetteur que nous pourrons passer à la première nouvelle tâche. Nous passons ensuite le transmetteur original à une seconde nouvelle tâche. Cela va nous donner deux tâches, chacune envoyant des messages différents à la partie réceptrice du canal.

Lorsque vous exécuterez ce code, votre sortie devrait ressembler à ceci :

On a reçu : salutations
On a reçu : encore plus
On a reçu : de messages
On a reçu : pour
On a reçu : à partir
On a reçu : de la
On a reçu : nouvelle tâche
On a reçu : pour vous

Vous pourrez peut-être constater que les valeurs sont dans un autre ordre chez vous ; cela dépend de votre système. C'est ce qui rend la concurrence aussi intéressante que difficile. Si vous jouez avec la valeur de thread::sleep en lui donnant différentes valeurs dans différentes tâches, chaque exécution sera encore moins déterministe et créera une sortie différente à chaque fois.

Maintenant que nous avons découvert le fonctionnement des canaux, examinons un autre genre de concurrence.