Arrêt propre et nettoyage

Le code de l'encart 20-20 répond aux requêtes de manière asynchrone grâce à l'utilisation du groupe de tâches, comme nous l'espérions. Nous avons quelques avertissements sur les champs operateurs, id et tâche que nous n'utilisons pas directement et qui nous rappellent que nous ne nettoyons rien. Lorsque nous arrêtons brutalement la tâche principale en appuyant sur ctrl-c, toutes les autres tâches sont également immédiatement stoppées, même si elles sont en train de servir une requête.

Maintenant, nous allons implémenter le trait Drop afin d'appeler join sur chacune des tâches du groupe afin qu'elles puissent finir les requêtes qu'elles sont en train de traiter avant de s'arrêter. Ensuite, nous allons implémenter un moyen de demander aux tâches d'arrêter d'accepter de nouvelles requêtes et de s'arrêter. Pour voir ce code en action, nous allons modifier notre serveur pour n'accepter que deux requêtes avant d'arrêter proprement son groupe de tâches.

Implémenter le trait Drop sur GroupeTaches

Commençons par implémenter Drop sur notre groupe de tâches. Lorsque le groupe est nettoyé, nos tâches doivent toutes faire appel à join pour s'assurer qu'elles finissent leur travail. L'encart 20-22 montre une première tentative d'implémentation de Drop ; ce code ne fonctionne pas encore tout à fait.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            operateur.tache.join().unwrap();
        }
    }
}

struct Operateur {
    id: usize,
    tache: thread::JoinHandle<()>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur { id, tache }
    }
}

Encart 20-22 : utilisation de join sur chaque tâche lorsque le groupe de tâches sort de la portée

D'abord, nous faisons une boucle sur tous les operateurs du groupe de tâches. Pour ce faire, nous utilisons &mut car self n'est qu'une référence mutable du groupe de tâches mais nous aurons également besoin de pouvoir muter chaque operateur. Pour chaque opérateur, nous affichons un message qui indique qu'il s'arrête puis nous faisons appel à join sur la tâche de cet opérateur. Si l'appel à join échoue, nous utilisons unwrap pour faire paniquer Rust et ainsi procéder à un arrêt brutal.

Voici l'erreur que nous obtenons lorsque nous compilons ce code :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0507]: cannot move out of `operateur.tache` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             operateur.tache.join().unwrap();
   |             ^^^^^^^^^^^^^^^ move occurs because `operateur.tache` has type `JoinHandle<()>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

L'erreur nous informe que nous ne pouvons pas faire appel à join car nous avons seulement fait un emprunt mutable pour chacun des operateur alors que join prend possession de son argument. Pour résoudre ce problème, nous devons sortir la tache de l'instance de Operateur qui la possède afin que join puisse la consommer. Nous faisons ceci dans l'encart 17-15 : comme Operateur contient désormais un Option<thread::JoinHandle<()>>, nous pouvons utiliser la méthode take sur Option pour sortir la valeur de la variante Some et y mettre à la place une variante None. Autrement dit, un Operateur qui est en cours d'exécution aura une variante Some dans tache, et lorsque nous souhaiterons nettoyer Operateur, nous remplacerons Some par None afin que Operateur n'ait pas de tâche à exécuter.

Donc nous savons que nous voulons modifier la définition de Operateur comme ceci :

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            operateur.tache.join().unwrap();
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur { id, tache }
    }
}

Maintenant, aidons-nous du compilateur pour trouver les autres endroits qui ont besoin de changer. En vérifiant ce code, nous obtenons deux erreurs :

$ cargo check
    Checking salutations v0.1.0 (file:///projects/salutations)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             operateur.tache.join().unwrap();
   |                             ^^^^ method not found in `Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Operateur { id, tache }
   |                         ^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Operateur { id, Some(tache) }
   |                         +++++     +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

Corrigeons la seconde erreur, qui se situe dans le code à la fin de Operateur::new : nous devons intégrer la valeur de tache dans un Some lorsque nous créons un nouvel Operateur. Faites les changements suivants pour corriger cette erreur :

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            operateur.tache.join().unwrap();
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        // -- partie masquée ici --

        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

La première erreur se situe dans notre implémentation de Drop. Nous avions mentionné plus tôt que nous voulions faire appel à take sur la valeur de Option pour déplacer tache en dehors de operateur. Voici les changements à apporter pour ceci :

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            if let Some(tache) = operateur.tache.take() {
                tache.join().unwrap();
            }
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

Comme nous l'avons vu au chapitre 17, la méthode take sur Option sort la variante Some et laisse un None à la place. Nous utilisons if let pour destructurer le Some et obtenir la tâche ; ensuite nous faisons appel à join sur cette tâche. Si la tâche d'un opérateur est déjà un None, nous savons qu'il a déjà nettoyé sa tâche et que dans ce cas nous n'avons rien à faire.

Demander aux tâches d'arrêter d'attendre des missions

Avec tous ces changements, notre code se compile désormais sans aucun avertissement. Mais la mauvaise nouvelle est que pour l'instant ce code ne fonctionne comme nous le souhaitons. La cause se situe dans la logique des fermetures qui sont exécutées par les tâches des instances de Operateur : pour le moment, nous faisons appel à join, mais cela ne va pas arrêter les tâches car elles font une boucle infinie avec loop pour attendre des missions. Si nous essayons de nettoyer notre GroupeTaches avec l'implémentation actuelle de drop, la tâche principale va se bloquer pour toujours en attendant en vain que la première tâche se termine.

Pour corriger ce problème, nous allons modifier les tâches pour qu'elles attendent soit une Mission à exécuter, soit le signal qui leur dit qu'elles doivent arrêter d'attendre des missions et sortir de la boucle infinie. Notre canal va envoyer une de ces deux variantes d'énumération au lieu d'instances de Mission.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Mission>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NouvelleMission(Mission),
    Extinction,
}

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(mission).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            if let Some(tache) = operateur.tache.take() {
                tache.join().unwrap();
            }
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let mission = reception.lock().unwrap().recv().unwrap();

            println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

            mission();
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

Cette énumération Message aura pour valeurs une variante NouvelleMission qui contiendra la Mission que la tâche devra exécuter, ou la variante Extinction qui va faire en sorte que la tâche sorte de sa boucle et se termine.

Nous devons corriger le canal pour utiliser les valeurs du type Message à la place du type Mission, comme dans l'encart 20-23.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Message>,
}

// -- partie masquée ici --

type Mission = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NouvelleMission(Mission),
    Extinction,
}

impl GroupeTaches {
    // -- partie masquée ici --

    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(Message::NouvelleMission(mission)).unwrap();
    }
}

// -- partie masquée ici --

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            if let Some(tache) = operateur.tache.take() {
                tache.join().unwrap();
            }
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let message = reception.lock().unwrap().recv().unwrap();

            match message {
                Message::NouvelleMission(mission) => {
                    println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

                    mission();
                }
                Message::Extinction => {
                    println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);

                    break;
                }
            }
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

Encart 20-23 : envoi et réception de valeurs de Message et sortie de la boucle si un Operateur reçoit Message:Extinction

Pour intégrer l'énumération Message, nous devons changer Mission par Message à deux endroits : dans la définition de GroupeTaches et dans la signature de Operateur::new. La méthode executer de GroupeTaches doit envoyer des missions encapsulées dans des variantes de Message::NouvelleTache. Ensuite, dans Operateur::new où nous recevons des Message du canal, la mission sera traitée si la variante NouvelleTache est reçue, ou bien la tâche arrêtera la boucle si la variante Extinction est reçue.

Grâce à ces changements, le code va se compiler et continuer de fonctionner de la même manière qu'il le faisait après l'encart 20-20. Mais nous allons obtenir un avertissement car nous ne créons aucun message de la variante Extinction. Corrigeons cet avertissement en modifiant notre implémentation de Drop pour qu'elle ressemble à l'encart 20-24.

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Message>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NouvelleMission(Mission),
    Extinction,
}

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(Message::NouvelleMission(mission)).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        println!("Envoi du message d'extinction à tous les opérateurs.");

        for _ in &self.operateurs {
            self.envoi.send(Message::Extinction).unwrap();
        }

        println!("Arrêt de tous les opérateurs.");

        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            if let Some(tache) = operateur.tache.take() {
                tache.join().unwrap();
            }
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let message = reception.lock().unwrap().recv().unwrap();

            match message {
                Message::NouvelleMission(mission) => {
                    println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

                    mission();
                }
                Message::Extinction => {
                    println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);

                    break;
                }
            }
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

Encart 20-24 : envoi de Message::Extinction aux opérateurs avant de d'appeler join sur toutes les tâches de ces opérateurs

Nous itérons deux fois sur les opérateurs : une fois pour envoyer un message Extinction pour chaque opérateur, et une seconde fois pour utiliser join sur leur tâche. Si nous avions essayé d'envoyer le message et d'utiliser immédiatement join dans la même boucle, nous n'aurions pas pu garantir que l'opérateur de l'itération en cours serait celui qui obtiendrait le message envoyé dans le canal.

Pour mieux comprendre pourquoi nous avons besoin de deux boucles distinctes, imaginez un scénario avec deux opérateurs. Si nous avions utilisé une seule boucle pour itérer sur chacun des opérateurs, dans la première itération un message d'extinction aurait été envoyé dans le canal et join aurait été utilisé sur la tâche du premier opérateur. Si ce premier opérateur était occupé à traiter une requête à ce moment-là, le second opérateur aurait alors récupéré le message d'extinction dans le canal et se serait arrêté. Nous serions alors restés à attendre que le premier opérateur s'arrête, mais cela ne se serait jamais produit car c'est la seconde tâche qui aurait obtenu le message d'extinction. Nous serions alors dans une situation d'interblocage !

Pour éviter ce scénario, nous allons commencer par émettre tous nos messages Extinction dans le canal en utilisant une boucle ; puis nous utilisons join sur toutes les tâches dans une seconde boucle. Chaque opérateur va arrêter de recevoir de nouvelles requêtes du canal dès qu'il aura reçu le message d'extinction. Donc, nous sommes maintenant assurés que si nous envoyons autant de messages d'extinction qu'il y a d'opérateurs, chaque opérateur recevra un message d'extinction avant que join ne soit utilisé sur leur tâche.

Pour observer ce code en action, modifions notre main pour accepter uniquement deux requêtes avant d'arrêter proprement le serveur, comme dans l'encart 20-25.

Fichier : src/bin/main.rs

use salutations::GroupeTaches;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
    let groupe = GroupeTaches::new(4);

    for flux in ecouteur.incoming().take(2) {
        let flux = flux.unwrap();

        groupe.executer(|| {
            gestion_connexion(flux);
        });
    }

    println!("Arrêt complet.");
}

fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Encart 20-25 : arrêt du serveur après avoir servi deux requêtes en sortant de la boucle

Dans la réalité on ne voudrait pas qu'un serveur web s'arrête après avoir servi seulement deux requêtes. Ce code sert uniquement à montrer que l'arrêt et le nettoyage s'effectuent bien proprement.

La méthode take est définie dans le trait Iterator et limite l'itération aux deux premiers éléments au maximum. Le GroupeTaches va sortir de la portée à la fin du main et l'implémentation de drop va s'exécuter.

Démarrez le serveur avec cargo run et faites trois requêtes. La troisième requête devrait renvoyer une erreur tandis que dans votre terminal vous devriez avoir une sortie similaire à ceci :

$ cargo run
   Compiling salutations v0.1.0 (file:///projects/salutations)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/main`
L'opérateur 0 a reçu une mission ; il l'exécute.
L'opérateur 3 a reçu une mission ; il l'exécute.
Arrêt.
Envoi du message d'extinction à tous les opérateurs.
Arrêt de tous les opérateurs.
Arrêt de l'opérateur 0
L'opérateur 1 a reçu l'instruction d'arrêt.
L'opérateur 2 a reçu l'instruction d'arrêt.
L'opérateur 0 a reçu l'instruction d'arrêt.
L'opérateur 3 a reçu l'instruction d'arrêt.
Arrêt de l'opérateur 1
Arrêt de l'opérateur 2
Arrêt de l'opérateur 3

Vous pourriez avoir un ordre différent entre les opérateurs et les messages affichés. Nous pouvons constater la façon dont ce code fonctionne grâce aux messages : les opérateurs 0 et 3 obtiennent les deux premières requêtes puis, à la troisième requête, le serveur arrête d'accepter des connexions. Lorsque le GroupeTaches sort de la portée à la fin du main, son implémentation de Drop entre en action et le groupe demande à tous les opérateurs de s'arrêter. Chaque opérateur va afficher un message lorsqu'il recevra le message d'extinction puis le groupe de tâche utilisera join pour arrêter la tâche de chaque opérateur.

Remarquez un aspect intéressant spécifique à cette exécution : le GroupeTaches a envoyé les messages d'extinction dans le canal, et avant que tous les opérateurs aient reçu les messages, nous avons essayé d'utiliser join sur l'opérateur 0. L'opérateur 0 n'avait pas encore reçu le message d'extinction, donc la tâche principale a attendu que l'opérateur 0 finisse. Pendant ce temps, tous les autres opérateurs ont reçu les messages d'extinction. Lorsque l'opérateur 0 a fini, la tâche principale a attendu que les autres opérateurs se terminent. A ce stade, ils avaient alors tous reçu le message d'extinction et étaient en mesure de s'arrêter.

Félicitations ! Nous avons maintenant terminé notre projet ; nous avons un serveur web basique qui utilise un groupe de tâches pour répondre de manière asynchrone. Nous pouvons demander un arrêt propre du serveur qui va nettoyer toutes les tâches du groupe.

Voici le code complet afin que vous puissiez vous y référer :

Fichier : src/bin/main.rs

use salutations::GroupeTaches;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
    let groupe = GroupeTaches::new(4);

    for flux in ecouteur.incoming() {
        let flux = flux.unwrap();

        groupe.executer(|| {
            gestion_connexion(flux);
        });
    }

    println!("Shutting down.");
}

fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let pause = b"GET /pause HTTP/1.1\r\n";

    let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if tampon.starts_with(pause) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contenu = fs::read_to_string(nom_fichier).unwrap();

    let reponse = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        ligne_statut,
        contenu.len(),
        contenu
    );

    flux.write(reponse.as_bytes()).unwrap();
    flux.flush().unwrap();
}

Fichier : src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct GroupeTaches {
    operateurs: Vec<Operateur>,
    envoi: mpsc::Sender<Message>,
}

type Mission = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NouvelleMission(Mission),
    Extinction,
}

impl GroupeTaches {
    /// Crée un nouveau GroupeTaches.
    ///
    /// La taille est le nom de tâches présentes dans le groupe.
    ///
    /// # Panics
    ///
    /// La fonction `new` devrait paniquer si la taille vaut zéro.
    pub fn new(taille: usize) -> GroupeTaches {
        assert!(taille > 0);

        let (envoi, reception) = mpsc::channel();

        let reception = Arc::new(Mutex::new(reception));

        let mut operateurs = Vec::with_capacity(taille);

        for id in 0..taille {
            operateurs.push(Operateur::new(id, Arc::clone(&reception)));
        }

        GroupeTaches { operateurs, envoi }
    }

    pub fn executer<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let mission = Box::new(f);

        self.envoi.send(Message::NouvelleMission(mission)).unwrap();
    }
}

impl Drop for GroupeTaches {
    fn drop(&mut self) {
        println!("Envoi du message d'extinction à tous les opérateurs.");

        for _ in &self.operateurs {
            self.envoi.send(Message::Extinction).unwrap();
        }

        println!("Arrêt de tous les opérateurs.");

        for operateur in &mut self.operateurs {
            println!("Arrêt de l'opérateur {}", operateur.id);

            if let Some(tache) = operateur.tache.take() {
                tache.join().unwrap();
            }
        }
    }
}

struct Operateur {
    id: usize,
    tache: Option<thread::JoinHandle<()>>,
}

impl Operateur {
    fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
        let tache = thread::spawn(move || loop {
            let message = reception.lock().unwrap().recv().unwrap();

            match message {
                Message::NouvelleMission(mission) => {
                    println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);

                    mission();
                }
                Message::Extinction => {
                    println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);

                    break;
                }
            }
        });

        Operateur {
            id,
            tache: Some(tache),
        }
    }
}

Nous aurions pu faire bien plus ! Si vous souhaitez continuer à améliorer ce projet, voici quelques idées :

  • Ajouter de la documentation à GroupeTaches et aux méthodes publiques.
  • Ajouter des tests sur les fonctionnalités de la bibliothèque.
  • Remplacer les appels à unwrap pour fournir une meilleure gestion des erreurs.
  • Utiliser GroupeTaches pour exécuter d'autres tâches que de répondre à des requêtes web.
  • Trouver une crate de groupe de tâches (NdT : thread pool) sur crates.io et implémenter un serveur web similaire en l'utilisant. Comparer ensuite son API et sa robustesse au groupe de tâches que nous avons implémenté.

Résumé

Bravo ! Vous êtes arrivé à la fin du livre ! Nous tenons à vous remercier chaleureusement de nous avoir accompagné pendant cette présentation de Rust. Vous êtes maintenant fin prêt(e) à créer vos propres projets Rust et aider les projets des autres développeurs. Rappelez-vous qu'il existe une communauté accueillante de Rustacés qui adorerait vous aider à relever tous les défis que vous rencontrerez dans votre aventure avec Rust.