Transformer notre serveur monotâche en serveur multitâches

Pour le moment, le serveur va traiter chaque requête l'une après l'autre, ce qui signifie qu'il ne traitera pas une deuxième connexion tant que la première n'a pas fini d'être traitée. Si le serveur reçoit encore plus de requêtes, cette exécution en série sera de moins en moins adaptée. Si le serveur reçoit une requête qui prend longtemps à traiter, les demandes suivantes devront attendre que la longue requête à traiter soit terminée, même si les nouvelles requêtes peuvent être traitées rapidement. Nous devons corriger cela, mais d'abord, observons le problème se produire pour de vrai.

Simuler une longue requête à traiter avec l'implémentation actuelle du serveur

Nous allons voir comment une requête longue à traiter peut affecter le traitement des autres requêtes avec l'implémentation actuelle de notre serveur. L'encart 20-10 rajoute le traitement d'une requête pour /pause qui va simuler une longue réponse qui va faire en sorte que le serveur soit en pause pendant 5 secondes avant de pouvoir répondre à nouveau.

Fichier : src/main.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
// -- partie masquée ici--

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();

    for flux in ecouteur.incoming() {
        let flux = flux.unwrap();

        gestion_connexion(flux);
    }
}

fn gestion_connexion(mut flux: TcpStream) {
    // -- partie masquée ici--

    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    // -- partie masquée ici--

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Encart 20-10 : simulation d'une requête provoquant un long traitement en détectant /pause et en faisant une pause de 5 secondes

Ce code est peu brouillon, mais est suffisant pour nos besoins de simulation. Nous avons créé une deuxième possibilité de requête pause avec des données que notre serveur peut reconnaître. Nous avons ajouté un else if après le bloc if pour tester les requêtes destinées à /pause. Lorsque cette requête est reçue, le serveur va se mettre en pause pendant 5 secondes avant de générer la page HTML de succès.

Vous pouvez constater à quel point notre serveur est primitif : une bibliothèque digne de ce nom devrait gérer la détection de différents types de requêtes de manière bien moins verbeuse !

Démarrez le serveur en utilisant cargo run. Ouvrez ensuite deux fenêtres de navigateur web : une pour http://127.0.0.1:7878/ et l'autre pour http://127.0.0.1:7878/pause. Si vous demandez l'URI / plusieurs fois, comme vous l'avez fait précédemment, vous constaterez que le serveur répond rapidement. Mais lorsque vous saisirez /pause et que vous chargerez ensuite /, vous constaterez que / attend que pause ait fini sa pause de 5 secondes avant de se charger.

Il y a plusieurs manières de changer le fonctionnement de notre serveur web pour éviter d'accumuler des requêtes après une requête dont le traitement est long ; celle que nous allons implémenter est un groupe de tâches.

Améliorer le débit avec un groupe de tâches

Un groupe de tâches est un groupe constitué de tâches qui ont été créées et qui attendent des missions. Lorsque le programme reçoit une nouvelle mission, il assigne une des tâches du groupe pour cette mission, et cette tâche va traiter la mission. Les tâches restantes dans le groupe restent disponibles pour traiter d'autres missions qui peuvent arriver pendant que la première tâche est en cours de traitement. Lorsque la première tâche en a fini avec sa mission, elle retourne dans le groupe de tâches inactives, prête à gérer une nouvelle tâche. Un groupe de tâches vous permet de traiter plusieurs connexions en simultané, ce qui augmente le débit de votre serveur.

Nous allons limiter le nombre de tâches dans le groupe à un petit nombre pour nous protéger d'attaques par déni de service (Denial of Service, DoS) ; si notre programme créait une nouvelle tâche à chaque requête qu'il reçoit, quelqu'un qui ferait 10 millions de requêtes à notre serveur pourrait faire des ravages en utilisant toutes les ressources de notre serveur et bloquer ainsi le traitement de toute nouvelle requête.

Plutôt que de générer des tâches en quantité illimitée, nous allons faire en sorte qu'il y ait un nombre fixe de tâches qui seront en attente dans le groupe. Lorsqu'une requête arrive, une tâche sera choisie dans le groupe pour procéder au traitement. Le groupe gèrera une file d'attente pour les requêtes entrantes. Chaque tâche dans le groupe va récupérer une requête dans cette liste d'attente, la traiter puis demander une autre requête à la file d'attente. Avec ce fonctionnement, nous pouvons traiter N requêtes en concurrence, où N est le nombre de tâches. Si toutes les tâches répondent chacune à une requête longue à traiter, les requêtes suivantes vont se stocker dans la file d'attente, mais nous aurons quand même augmenté le nombre de requêtes longues que nous pouvons traiter avant d'en arriver là.

Cette technique n'est qu'une des nombreuses manières d'améliorer le débit d'un serveur web. D'autres options que vous devriez envisager sont le modèle fork/join et le modèle d'entrée-sortie asynchrone monotâche. Si vous êtes intéressés par ce sujet, vous pouvez aussi en apprendre plus sur ces autres solutions et essayer de les implémenter en Rust ; avec un langage bas niveau comme Rust, toutes les options restent possibles.

Avant que nous ne commencions l'implémentation du groupe de tâches, parlons de l'utilisation du groupe. Lorsque vous essayez de concevoir du code, commencer par écrire l'interface client peut vous aider à vous guider dans la conception. Ecrivez l'API du code afin qu'il soit structuré de la manière dont vous souhaitez l'appeler ; puis implémentez ensuite la fonctionnalité au sein de cette structure, plutôt que d'implémenter la fonctionnalité puis de concevoir l'API publique.

De la même manière que nous avons utilisé le développement piloté par les tests dans le projet du chapitre 12, nous allons utiliser ici le développement orienté par le compilateur. Nous allons écrire le code qui appelle les fonctions que nous souhaitons, et ensuite nous analyserons les erreurs du compilateur pour déterminer ce qu'il faut ensuite corriger pour que le code fonctionne.

La structure du code si nous pouvions créer une tâche pour chaque requête

Pour commencer, voyons à quoi ressemblerait notre code s'il créait une nouvelle tâche pour chaque connexion. Comme nous l'avons évoqué précédemment, cela ne sera pas notre solution finale à cause des problèmes liés à la création potentielle d'un nombre illimité de tâches, mais c'est un début. L'encart 20-11 montre les changements à apporter au main pour créer une nouvelle tâche pour gérer chaque flux avec une boucle for.

Fichier : src/main.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();

    for flux in ecouteur.incoming() {
        let flux = flux.unwrap();

        thread::spawn(|| {
            gestion_connexion(flux);
        });
    }
}

fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Encart 20-11 : création d'une nouvelle tâche pour chaque flux

Comme vous l'avez appris au chapitre 16, thread::spawn va créer une nouvelle tâche puis exécuter dans cette nouvelle tâche le code présent dans la fermeture. Si vous exécutez ce code et chargez /pause dans votre navigateur, et que vous ouvrez / dans deux nouveaux onglets, vous constaterez en effet que les requêtes vers / n'aurons pas à attendre que /pause se finisse. Mais comme nous l'avons mentionné, cela peut potentiellement surcharger le système si vous créez des nouvelles tâches sans aucune limite.

Créer une interface similaire pour un nombre fini de tâches

Nous souhaitons faire en sorte que notre groupe de tâches fonctionne de la même manière, donc passer des tâches à un groupe de tâches ne devrait pas nécessiter de gros changements au code qui utilise notre API. L'encart 20-12 montre une interface possible pour une structure GroupeTaches que nous souhaitons utiliser à la place de thread::spawn.

Fichier : src/main.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
    let groupe = GroupeTaches::new(4);

    for flux in ecouteur.incoming() {
        let flux = flux.unwrap();

        groupe.executer(|| {
            gestion_connexion(flux);
        });
    }
}

fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Encart 20-12 : Notre interface idéale GroupeTaches

Nous avons utilisé GroupeTaches::new pour créer un nouveau groupe de tâches avec un nombre configurable de tâches, dans notre cas, quatre. Ensuite, dans la boucle for, groupe.executer a une interface similaire à thread::spawn qui prend une fermeture que le groupe devra exécuter pour chaque flux. Nous devons implémenter groupe.executer pour qu'il prenne la fermeture et la donne à une tâche dans le groupe pour qu'elle l'exécute. Ce code ne se compile pas encore, mais nous allons faire comme si c'était le cas pour que le compilateur puisse nous guider dans la résolution des problèmes.

Construire la structure GroupeTaches en utilisant le développement orienté par le compilateur

Faites les changements de l'encart 20-12 dans votre src/main.rs, et utilisez ensuite les erreurs du compilateur lors du cargo check pour orienter votre développement. Voici la première erreur que nous obtenons :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0433]: failed to resolve: use of undeclared type `GroupeTaches`
  --> src/main.rs:10:16
   |
10 |     let groupe = GroupeTaches::new(4);
   |                  ^^^^^^^^^^^^ use of undeclared type `GroupeTaches`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error

Bien ! Cette erreur nous informe que nous avons besoin d'un type ou d'un module qui s'appelle GroupeTaches, donc nous allons le créer. Notre implémentation de GroupeTaches sera indépendante du type de travail qu'accomplira notre serveur web. Donc, transformons la crate binaire salutations en crate de bibliothèque pour y implémenter notre GroupeTaches. Après l'avoir changé en crate de bibliothèque, nous pourrons utiliser ensuite cette bibliothèque de groupe de tâches dans n'importe quel projet où nous aurons besoin d'un groupe de tâches, et pas seulement pour servir des requêtes web.

Créez un src/lib.rs qui contient ce qui suit et qui est la définition la plus simple d'une structure GroupeTaches que nous pouvons avoir pour le moment :

Fichier : src/lib.rs

pub struct GroupeTaches;

Créez ensuite un nouveau dossier, src/bin, et déplacez-y la crate binaire src/main.rs qui sera donc désormais src/bin/main.rs. Ceci va faire que la crate de bibliothèque sera la crate principale dans le dossier salutations ; nous pouvons quand même continuer à exécuter le binaire dans src/bin/main.rs en utilisant cargo run. Après avoir déplacé le fichier main.rs, modifiez-le pour importer la crate de bibliothèque et importer GroupeTaches dans la portée en ajoutant le code suivant en haut de src/bin/main.rs :

Fichier : src/bin/main.rs

use salutations::GroupeTaches;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
    let groupe = GroupeTaches::new(4);

    for flux in ecouteur.incoming() {
        let flux = flux.unwrap();

        groupe.executer(|| {
            gestion_connexion(flux);
        });
    }
}

fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Ce code ne fonctionne toujours pas, mais vérifions-le à nouveau pour obtenir l'erreur que nous devons maintenant résoudre :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0599]: no function or associated item named `new` found for struct `GroupeTaches` in the current scope
  --> src/bin/main.rs:11:28
   |
11 |     let groupe = GroupeTaches::new(4);
   |                                ^^^ function or associated item not found in `GroupeTaches`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

Cette erreur indique que nous devons ensuite créer une fonction associée new pour GroupeTaches. Nous savons aussi que new nécessite d'avoir un paramètre qui peut accepter 4 comme argument et doit retourner une instance de GroupeTaches. Implémentons la fonction new la plus simple possible qui aura ces caractéristiques :

Fichier : src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Nous avons choisi usize comme type du paramètre taille, car nous savons qu'un nombre négatif de tâches n'a pas de sens. Nous savons également que nous allons utiliser ce 4 comme étant le nombre d'éléments dans une collection de tâches, ce qui est à quoi sert le type usize, comme nous l'avons vu dans la section “Types de nombres entiers” du chapitre 3.

Vérifions à nouveau le code :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0599]: no method named `executer` found for struct `GroupeTaches` in the current scope
  --> src/bin/main.rs:16:14
   |
16 |         groupe.executer(|| {
   |                ^^^^^^^^ method not found in `GroupeTaches`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

Désormais, nous obtenons une erreur car nous n'avons pas implémenté la méthode executer sur GroupeTaches. Souvenez-vous que nous avions décidé dans la section “Créer une interface similaire pour un nombre fini de tâches” que notre groupe de tâches devrait avoir une interface similaire à thread::spawn. C'est pourquoi nous allons implémenter la fonction executer pour qu'elle prenne en argument la fermeture qu'on lui donne et qu'elle la passe à une tâche inactive du groupe pour qu'elle l'exécute.

Nous allons définir la méthode executer sur GroupeTaches pour prendre en paramètre une fermeture. Souvenez-vous que nous avions vu dans une section du chapitre 13 que nous pouvions prendre en paramètre les fermetures avec trois types de traits différents : Fn, FnMut, et FnOnce. Nous devons décider quel genre de fermeture nous allons utiliser ici. Nous savons que nous allons faire quelque chose de sensiblement identique à l'implémentation du thread::spawn de la bibliothèque standard, donc nous pouvons nous inspirer de ce qui lie la signature de thread::spawn à son paramètre. La documentation nous donne ceci :

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Le paramètre de type F est celui qui nous intéresse ici ; le paramètre de type T est lié à la valeur de retour, et ceci ne nous intéresse pas ici. Nous pouvons constater que spawn utilise le trait FnOnce lié à F. C'est probablement ce dont nous avons besoin, parce que nous allons sûrement passer cet argument dans le execute de spawn. Nous pouvons aussi être sûr que FnOnce est le trait dont nous avons besoin car la tâche qui va traiter une requête ne va le faire qu'une seule fois, ce qui correspond à la partie Once dans FnOnce.

Le paramètre de type F a aussi le trait lié Send et la durée de vie liée 'static, qui sont utiles dans notre situation : nous avons besoin de Send pour transférer la fermeture d'une tâche vers une autre et de 'static car nous ne connaissons pas la durée d'exécution de la tâche. Créons donc une méthode executer sur GroupeTaches qui va utiliser un paramètre générique de type F avec les liens suivants :

Fichier : src/lib.rs

pub struct GroupeTaches;

impl GroupeTaches {
    // -- partie masquée ici--
    pub fn new(size: usize) -> GroupeTaches {
        GroupeTaches
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Nous utilisons toujours le () après FnOne car ce FnOnce représente une fermeture qui ne prend pas de paramètres et retourne le type unité (). Exactement comme les définitions de fonctions, le type de retour peut être omis de la signature, mais même si elle ne contient pas de paramètre, nous avons tout de même besoin des parenthèses.

A nouveau, c'est l'implémentation la plus simpliste de la méthode executer : elle ne fait rien, mais nous essayons seulement de faire en sorte que notre code se compile. Vérifions-le à nouveau :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

Cela se compile ! Mais remarquez que si vous lancez cargo run et faites la requête dans votre navigateur web, vous verrez l'erreur dans le navigateur que nous avions tout au début du chapitre. Notre bibliothèque n'exécute pas encore la fermeture envoyée à executer !

Remarque : un dicton que vous avez probablement déjà entendu à propos des compilateurs stricts, comme Haskell et Rust, est que “si le code se compile, il fonctionne”. Mais ce dicton n'est pas toujours vrai. Notre projet se compile, mais il ne fait absolument rien ! Si nous construisions un vrai projet, complexe, il serait bon de commencer à écrire des tests unitaires pour vérifier que ce code compile et qu'il suit le comportement que nous souhaitons.

Valider le nombre de tâches envoyé à new

Nous ne faisons rien avec les paramètres passés à new et executer. Implémentons le corps de ces fonctions avec le comportement que nous souhaitons. Pour commencer, réfléchissons à new. Précédemment, nous avions choisi un type sans signe pour le paramètre taille, car un groupe avec un nombre négatif de tâches n'a pas de sens. Cependant, un groupe avec aucune tâche n'a pas non plus de sens, alors que zéro est une valeur parfaitement valide pour usize. Nous allons ajouter du code pour vérifier que taille est plus grand que zéro avant de retourner une instance de GroupeTaches et faire en sorte que le programme panique s'il reçoit un zéro, en utilisant la macro assert! comme dans l'encart 20-13.

Filename : src/lib.rs

pub struct GroupeTaches;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        GroupeTaches
    }

    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Encart 20-13 : implémentation de GroupeTaches::new qui devrait paniquer si taille vaut zéro

Nous avons ajouté un peu de documentation pour notre GroupeTaches avec des commentaires de documentation. Remarquez que nous avons suivi les pratiques de bonne documentation en ajoutant une section qui liste les situations pour lesquelles notre fonction peut paniquer, comme nous l'avons vu dans le chapitre 14. Essayez de lancer cargo doc --open et de cliquer sur la structure GroupeTaches pour voir à quoi ressemble la documentation générée pour new !

Au lieu d'ajouter la macro assert! comme nous venons de le faire, nous aurions pu faire en sorte que new retourne un Result comme nous l'avions fait avec Config::new dans le projet d'entrée/sortie dans l'encart 12-9. Mais nous avons décidé que dans le cas présent, la création d'un groupe de tâches sans aucune tâche devait être une erreur irrécupérable. Si vous en sentez l'envie, essayez d'écrire une version de new avec la signature suivante, pour comparer les deux versions :

pub fn new(taille: usize) -> Result<GroupeTaches, ErreurGroupeTaches> {

Créer l'espace de rangement des tâches

Maintenant que nous avons une manière de savoir si nous avons un nombre valide de tâches à stocker dans le groupe, nous pouvons créer ces tâches et les stocker dans la structure GroupeTaches avant de la retourner. Mais comment “stocker” une tâche ? Regardons à nouveau la signature de thread::spawn :

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

La fonction spawn retourne un JoinHandle<T>, où T est le type que retourne notre fermeture. Essayons d'utiliser nous aussi JoinHandle pour voir ce qu'il va se passer. Dans notre cas, les fermetures que nous passons dans le groupe de tâches vont traiter les connexions mais ne vont rien retourner, donc T sera le type unité, ().

Le code de l'encart 20-14 va se compiler mais ne va pas encore créer de tâches pour le moment. Nous avons changé la définition de GroupeTaches pour qu'elle possède un vecteur d'instances thread::JoinHandle<()>, nous avons initialisé le vecteur avec une capacité de la valeur de taille, mis en place une boucle for qui va exécuter du code pour créer les tâches puis nous avons retourné une instance de GroupeTaches qui les contient.

Fichier : src/lib.rs

use std::thread;

pub struct GroupeTaches {
    taches: Vec<thread::JoinHandle<()>>,
}

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let mut taches = Vec::with_capacity(taille);

        for _ in 0..taille {
            // on crée quelques tâches ici et on les stocke dans le vecteur
        }

        GroupeTaches { taches }
    }

    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Encart 20-14 : création d'un vecteur pour GroupeTaches pour stocker les tâches

Nous avons importé std::thread dans la portée de la crate de bibliothèque, car nous utilisons thread::JoinHandle comme étant le type des éléments du vecteur dans GroupeTaches.

Une fois qu'une taille valide est reçue, notre GroupeTaches crée un nouveau vecteur qui peut stocker taille éléments. Nous n'avons pas encore utilisé la fonction with_capacity dans ce livre, qui fait la même chose que Vec::new mais avec une grosse différence : elle pré-alloue l'espace dans le vecteur. Comme nous savons que nous avons besoin de stocker taille éléments dans le vecteur, faire cette allocation en amont est bien plus efficace que d'utiliser Vec::new qui va se redimensionner lorsque des éléments lui seront ajoutés.

Lorsque vous lancez à nouveau cargo check, vous devriez avoir quelques avertissements en plus, mais cela devrait être un succès.

Une structure Operateur chargée d'envoyer le code de GroupeTaches à une tâche

Nous avions laissé un commentaire dans la boucle for dans l'encart 20-14 qui concernait la création des tâches. Maintenant, nous allons voir comment créer ces tâches. La bibliothèque standard fournit un moyen de créer des tâches avec thread::spawn à qui il faut passer le code que la tâche doit exécuter dès qu'elle est créée. Cependant, dans notre cas, nous souhaitons créer des tâches et faire en sorte qu'elles attendent du code que nous leur enverrons plus tard. L'implémentation des tâches de la bibliothèque standard n'offre aucun moyen de faire ceci ; nous devons donc implémenter cela nous-même.

Nous allons implémenter ce comportement en introduisant une nouvelle structure de données entre le GroupeTaches et les tâches qui va gérer ce nouveau comportement. Nous allons appeler cette structure Operateur, nom qui lui est traditionnellement donné avec Worker dans les implémentations de groupe de tâches. Imaginez des personnes qui travaillent dans la cuisine d'un restaurant : les opérateurs attendent les commandes des clients puis sont chargés de prendre en charge ces commandes et d'y répondre.

Au lieu de stocker un vecteur d'instances JoinHandle<()> dans le groupe de tâches, nous allons stocker des instances de structure Operateur. Chaque Operateur va stocker une seule instance de JoinHandle<()>. Ensuite nous implémenterons une méthode sur Operateur qui va prendre en argument une fermeture de code à exécuter et l'envoyer à la tâche qui fonctionne déjà pour exécution. Nous allons aussi donner à chacun des opérateurs un identifiant id afin que nous puissions distinguer les différents opérateurs dans le groupe dans les journaux ou lors de débogages.

Appliquons ces changements à l'endroit où nous créons un GroupeTaches. Nous allons implémenter le code de Operateur qui envoie la fermeture à la tâche en suivant ces étapes :

  1. Définir une structure Operateur qui possède un id et un JoinHandle<()>.
  2. Modifier le GroupeTaches afin qu'il possède un vecteur d'instances de Operateur.
  3. Définir une fonction Operateur::new qui prend en argument un numéro d'id et retourne une instance de Operateur qui contient l' id et une tâche créée avec une fermeture vide.
  4. Dans GroupeTaches::new, utiliser le compteur de la boucle for pour générer un id, créer un nouveau Operateur avec cet id et stocker l'opérateur dans le vecteur.

Si vous vous sentez prêt(e) à relever le défi, essayez de faire ces changements de votre côté avant de regarder le code de l'encart 20-15.

Vous êtes prêt(e) ? Voici l'encart 20-15 qui propose une solution pour procéder aux changements listés précédemment.

Fichier : src/lib.rs

use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
}

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id));
        }

        GroupeTaches { operateurs }
    }
    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize) -> Operateur {
        let tache = thread::spawn(|| {});

        Operateur { id, tache }
    }
}

Encart 20-15 : modification de GroupeTaches pour stocker des instances de Operateur plutôt que de stocker directement des tâches

Nous avons changé le nom du champ taches de GroupeTaches en operateurs car il stocke maintenant des instances de Operateur plutôt que des instances de JoinHandle<()>. Nous utilisons le compteur de la boucle for comme argument de Operateur::new et nous stockons chacun des nouveaux Operateur dans le vecteur operateurs.

Le code externe (comme celui de notre serveur dans src/bin/main.rs) n'a pas besoin de connaître les détails de l'implémentation qui utilise une structure Operateur dans GroupeTaches, donc nous faisons en sorte que la structure Operateur et sa fonction new soient privées. La fonction Operateur::new utilise l' id que nous lui donnons et stocke une instance de JoinHandle<()> qui est créée en instanciant une nouvelle tâche utilisant une fermeture vide.

Ce code va se compiler et stocker le nombre d'instances de Operateur que nous avons renseigné en argument de GroupeTaches::new. Mais nous n'exécutons toujours pas la fermeture que nous obtenons de executer. Voyons maintenant comment faire cela.

Envoyer des requêtes à des tâches via des canaux

Maintenant nous allons nous attaquer au problème qui fait que les fermetures passées à thread::spawn ne font absolument rien. Actuellement, nous obtenons la fermeture que nous souhaitons exécuter dans la méthode executer. Mais nous avons besoin de donner une fermeture à thread::spawn à exécuter lorsque nous créons chaque Operateur lors de la création de GroupeTaches.

Nous souhaitons que les structures Operateur que nous venons de créer récupèrent du code à exécuter dans une liste d'attente présente dans le GroupeTaches et renvoient ce code à leur tâche pour l'exécuter.

Dans le chapitre 16, vous avez appris les canaux (une manière simple de communiquer entre deux tâches) qui seront parfaits pour ce cas d'emploi. Nous allons utiliser un canal pour les fonctions pour créer la liste d'attente des missions, et executer devrait envoyer une mission de GroupeTaches vers les instances Operateur, qui vont passer la mission à leurs tâches. Voici le plan :

  1. Le GroupeTaches va créer un canal et se connecter à la partie émettrice de ce canal.
  2. Chaque Operateur va se connecter à la partie réceptrice du canal.
  3. Nous allons créer une nouvelle structure Mission qui va stocker les fermetures que nous souhaitons envoyer dans le canal.
  4. La méthode executer va envoyer la mission qu'elle souhaite executer à la partie émettrice du canal.
  5. Dans sa propre tâche, l' Operateur va vérifier en permanence la partie réceptrice du canal et exécuter les fermetures des missions qu'il va recevoir.

Commençons par créer un canal dans GroupeTaches::new et stocker la partie émettrice dans l'instance de GroupeTaches, comme dans l'encart 20-16. La structure Mission ne contient rien pour le moment mais sera le type d'éléments que nous enverrons dans le canal.

Fichier : src/lib.rs

use std::thread;
// -- partie masquée ici --
use std::sync::mpsc;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

struct Mission;

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id));
        }

        GroupeTaches { operateurs, envoi }
    }
    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize) -> Operateur {
        let tache = thread::spawn(|| {});

        Operateur { id, tache }
    }
}

Encart 20-16 : modification de GroupeTaches pour stocker la partie émettrice du canal qui émet des instances de Mission

Dans GroupeTaches::new, nous créons notre nouveau canal et faisons en sorte que le groupe stocke la partie émettrice. Cela devrait pouvoir se compiler, mais il subsiste des avertissements.

Essayons de donner la partie réceptrice du canal à chacun des opérateurs lorsque le groupe de tâches crée le canal. Nous savons que nous voulons utiliser la partie réceptrice dans la tâche que l'opérateur utilise, donc nous allons créer une référence vers le paramètre reception dans la fermeture. Le code de l'encart 20-17 ne se compile pas encore.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

struct Mission;

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, reception));
        }

        GroupeTaches { operateurs, envoi }
    }
    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// -- partie masquée ici --


struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize, reception: mpsc::Receiver<Mission>) -> Operateur {
        let tache = thread::spawn(|| {
            reception;
        });

        Operateur { id, tache }
    }
}

Encart 20-17 : envoi de la partie réceptrice du canal aux opérateurs

Nous avons juste fait de petites modifications simples : nous envoyons la partie réceptrice du canal dans Operateur::new puis nous l'utilisons dans la fermeture.

Lorsque nous essayons de vérifier ce code, nous obtenons cette erreur :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0382]: use of moved value: `reception`
  --> src/lib.rs:27:42
   |
22 |         let (envoi, reception) = mpsc::channel();
   |                     --------- move occurs because `reception` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
27 |             operateurs.push(Worker::new(id, reception));
   |                                             ^^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error

Le code essaye d'envoyer reception dans plusieurs instances de Operateur. Ceci ne fonctionne pas, comme vous l'avez appris au chapitre 16 : l'implémentation du canal que fournit Rust est du type plusieurs producteurs, un seul consommateur. Cela signifie que nous ne pouvons pas simplement cloner la partie réceptrice du canal pour corriger ce code. Même si nous aurions pu le faire, ce n'est pas la solution que nous souhaitons utiliser ; nous voulons plutôt distribuer les missions entre les tâches en partageant la même réception entre tous les opérateurs.

De plus, obtenir une mission de la file d'attente du canal implique de modifier la reception, donc les tâches ont besoin d'une méthode sécurisée pour partager et modifier reception ; autrement, nous risquons de nous trouver dans des situations de concurrence (comme nous l'avons vu dans le chapitre 16).

Souvenez-vous des pointeurs intelligents conçus pour les échanges entre les tâches que nous avons vus au chapitre 16 : pour partager la possession entre plusieurs tâches et permettre aux tâches de modifier la valeur, nous avons besoin d'utiliser Arc<Mutex<T>>. Le type Arc va permettre à plusieurs opérateurs de posséder la réception tandis que Mutex va s'assurer qu'un seul opérateur obtienne une mission dans la réception à un moment donné. L'encart 20-18 montre les changements que nous devons apporter.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;
// -- partie masquée ici --

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

struct Mission;

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    // -- partie masquée ici --

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// -- partie masquée ici --

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        // -- partie masquée ici --
        let tache = thread::spawn(|| {
            reception;
        });

        Operateur { id, tache }
    }
}

Encart 20-18 : partage de la partie réceptrice du canal entre les opérateurs en utilisant Arc et Mutex

Dans GroupeTaches::new, nous installons la partie réceptrice du canal dans un Arc et un Mutex. Pour chaque nouvel opérateur, nous clonons le Arc pour augmenter le compteur de références afin que les opérateurs puissent se partager la possession de la partie réceptrice.

Grâce à ces changements, le code se compile ! Nous touchons au but !

Implémenter la méthode executer

Finissons en implémentant la méthode executer de GroupeTaches. Nous allons également modifier la structure Mission pour la transformer en un alias de type pour un objet trait qui contiendra le type de fermeture que executer recevra. Comme nous l'avons vu dans une section du chapitre 19, les alias de type nous permettent de raccourcir les types un peu trop longs. Voyez cela dans l'encart 20-19.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

// -- partie masquée ici --

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    // -- partie masquée ici --
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

// -- partie masquée ici --

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(|| {
            reception;
        });

        Operateur { id, tache }
    }
}

Encart 20-19 : création d'un alias de type Mission pour une Box qui contient chaque fermeture et qui transportera la mission dans le canal

Après avoir créé une nouvelle instance Mission en utilisant la fermeture que nous obtenons dans executer, nous envoyons cette mission dans le canal via la partie émettrice. Nous utilisons unwrap sur send pour les cas où l'envoi échoue. Cela peut arriver si, par exemple, nous stoppons l'exécution de toutes les tâches, ce qui signifiera que les parties réceptrices auront finis de recevoir des nouveaux messages. Pour le moment, nous ne pouvons pas stopper l'exécution de nos tâches : nos tâches continuerons à s'exécuter aussi longtemps que le groupe existe. La raison pour laquelle nous utilisons unwrap est que nous savons que le cas d'échec ne va pas se produire, mais le compilateur ne le sait pas.

Mais nous n'avons pas encore tout à fait fini ! Dans l'opérateur, notre fermeture envoyée à thread::spawn ne fait que référencer la partie réception du canal. Au lieu de ça, nous avons besoin que la fermeture boucle à l'infini, demandant une mission à la partie réceptrice du canal et l'exécutant quand elle en obtient une. Appliquons les changements montrés dans l'encart 20-20 à Operateur::new.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

// -- partie masquée ici --

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a obtenu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur { id, tache }
    }
}

Encart 20-20 : réception et exécution des missions dans la tâche de l'opérateur

Ici, nous faisons d'abord appel à lock sur reception pour obtenir le mutex, puis nous faisons appel à unwrap pour paniquer dès qu'il y a une erreur. L'acquisition d'un verrou peut échouer si le mutex est dans un état empoisonné, ce qui peut arriver si d'autres tâches ont paniqué pendant qu'elles avaient le verrou au lieu de le rendre. Dans cette situation, l'appel à unwrap fera paniquer la tâche, ce qui est la bonne chose à faire. Vous pouvez aussi changer ce unwrap en un expect avec un message d'erreur qui sera plus explicite pour vous.

Si nous obtenons le verrou du mutex, nous faisons appel à recv pour recevoir une Mission provenant du canal. Un unwrap final s'occupe lui aussi des cas d'erreurs qui peuvent se produire si la tâche qui est connectée à la partie émettrice du canal se termine, de la même manière que la méthode send enverrait Err si la partie réceptrice se fermerait.

L'appel à recv bloque l'exécution, donc s'il n'y a pas encore de mission, la tâche courante va attendre jusqu'à ce qu'une mission soit disponible. Le Mutex<T> s'assure qu'une seule tâche d'Operateur essaie d'obtenir une mission à un instant donné.

Notre groupe de tâches est désormais en état de fonctionner ! Faites un cargo run et faites quelques requêtes :

$ cargo run
   Compiling salutations v0.1.0 (file:///projects/salutations)
warning: field is never read: `operateurs`
 -- > src/lib.rs:7:5
  |
7 |     operateurs: Vec<Operateur>,
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  -- > src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `tache`
  -- > src/lib.rs:49:5
   |
49 |     tache: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: 3 warnings emitted

    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/main`
L'opérateur 0 a obtenu une mission ; il l'exécute.
L'opérateur 2 a obtenu une mission ; il l'exécute.
L'opérateur 1 a obtenu une mission ; il l'exécute.
L'opérateur 3 a obtenu une mission ; il l'exécute.
L'opérateur 0 a obtenu une mission ; il l'exécute.
L'opérateur 2 a obtenu une mission ; il l'exécute.
L'opérateur 1 a obtenu une mission ; il l'exécute.
L'opérateur 3 a obtenu une mission ; il l'exécute.
L'opérateur 0 a obtenu une mission ; il l'exécute.
L'opérateur 2 a obtenu une mission ; il l'exécute.

Parfait ! Nous avons maintenant un groupe de tâches qui exécute des connexions de manière asynchrone. Il n'y a jamais plus de quatre tâches qui sont créées, donc notre système ne sera pas surchargé si le serveur reçoit beaucoup de requêtes. Si nous faisons une requête vers /pause, le serveur sera toujours capable de servir les autres requêtes grâce aux autres tâches qui pourront les exécuter.

Remarque : si vous ouvrez /pause dans plusieurs fenêtres de navigation en simultané, elles peuvent parfois être chargées une par une avec 5 secondes d'intervalle. Certains navigateurs web exécutent plusieurs instances de la même requête de manière séquentielle pour des raisons de mise en cache. Cette limitation n'est pas imputable à notre serveur web.

Ayant appris la boucle while let dans le chapitre 18, vous pourriez vous demander pourquoi nous n'avons pas écrit le code des tâches des opérateurs comme dans l'encart 20-21.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}
// -- partie masquée ici --

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || {
            while let Ok(mission) = reception.lock().unwrap().recv() {
                println!("L'opérateur {} a obtenu une mission ; il l'exécute.", id);

                mission();
            }
        });

        Operateur { id, tache }
    }
}

Encart 20-21 : une implémentation alternative de Operateur::new qui utilise while let

Ce code se compile et s'exécute mais ne se produit pas le comportement des tâches que nous souhaitons : une requête lente à traiter va continuer à mettre en attente de traitement les autres requêtes. La raison à cela est subtile : la structure Mutex n'a pas de méthode publique unlock car la propriété du verrou se base sur la durée de vie du MutexGuard<T> au sein du LockResult<MutexGuard<T>> que retourne la méthode lock. A la compilation, le vérificateur d'emprunt peut ensuite vérifier la règle qui dit qu'une ressource gardée par un Mutex ne peut être accessible que si nous avons ce verrou. Mais cette implémentation peut aussi conduire à ce que nous gardions le verrou plus longtemps que prévu si nous ne réfléchissons pas avec attention à la durée de vie du MutexGuard<T>.

Le code de l'encart 20-20 qui utilise let mission = reception.lock().unwrap().recv().unwrap(); fonctionne, car avec let, toute valeur temporaire utilisée dans la partie droite du signe égal est libérée immédiatement lorsque l'instruction let se termine. Cependant, while let ( ainsi que if let et match) ne libèrent pas les valeurs temporaires avant la fin du bloc associé. Dans l'encart 20-21, le verrou continue à être maintenu pendant toute la durée de l'appel à mission(), ce qui veut dire que les autres opérateurs ne peuvent pas recevoir de tâches.