Arrêt propre et nettoyage
Le code de l'encart 20-20 répond aux requêtes de manière asynchrone grâce à
l'utilisation du groupe de tâches, comme nous l'espérions. Nous avons quelques
avertissements sur les champs operateurs
, id
et tâche
que nous
n'utilisons pas directement et qui nous rappellent que nous ne nettoyons rien.
Lorsque nous arrêtons brutalement la tâche principale en appuyant sur
ctrl-c, toutes les autres tâches sont également
immédiatement stoppées, même si elles sont en train de servir une requête.
Maintenant, nous allons implémenter le trait Drop
afin d'appeler join
sur chacune
des tâches du groupe afin qu'elles puissent finir les requêtes qu'elles
sont en train de traiter avant de s'arrêter. Ensuite, nous allons implémenter un
moyen de demander aux tâches d'arrêter d'accepter de nouvelles requêtes et de
s'arrêter. Pour voir ce code en action, nous allons modifier notre serveur
pour n'accepter que deux requêtes avant d'arrêter proprement son groupe de
tâches.
Implémenter le trait Drop
sur GroupeTaches
Commençons par implémenter Drop
sur notre groupe de tâches. Lorsque le groupe
est nettoyé, nos tâches doivent toutes faire appel à join
pour s'assurer
qu'elles finissent leur travail. L'encart 20-22 montre une première tentative
d'implémentation de Drop
; ce code ne fonctionne pas encore tout à fait.
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Mission>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(mission).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
operateur.tache.join().unwrap();
}
}
}
struct Operateur {
id: usize,
tache: thread::JoinHandle<()>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let mission = reception.lock().unwrap().recv().unwrap();
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
});
Operateur { id, tache }
}
}
D'abord, nous faisons une boucle sur tous les operateurs
du groupe de tâches.
Pour ce faire, nous utilisons &mut
car self
n'est qu'une référence mutable
du groupe de tâches mais nous aurons également besoin de pouvoir muter chaque
operateur
. Pour chaque opérateur, nous affichons un message qui indique qu'il
s'arrête puis nous faisons appel à join
sur la tâche de cet opérateur. Si
l'appel à join
échoue, nous utilisons unwrap
pour faire paniquer Rust et
ainsi procéder à un arrêt brutal.
Voici l'erreur que nous obtenons lorsque nous compilons ce code :
$ cargo check
Checking salutations v0.1.0 (file:///projects/salutations)
error[E0507]: cannot move out of `operateur.tache` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | operateur.tache.join().unwrap();
| ^^^^^^^^^^^^^^^ move occurs because `operateur.tache` has type `JoinHandle<()>`, which does not implement the `Copy` trait
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error
L'erreur nous informe que nous ne pouvons pas faire appel à join
car nous
avons seulement fait un emprunt mutable pour chacun des operateur
alors que
join
prend possession de son argument. Pour résoudre ce problème, nous devons
sortir la tache
de l'instance de Operateur
qui la possède afin que join
puisse la consommer. Nous faisons ceci dans l'encart 17-15 : comme Operateur
contient désormais un Option<thread::JoinHandle<()>>
, nous pouvons utiliser
la méthode take
sur Option
pour sortir la valeur de la variante Some
et
y mettre à la place une variante None
. Autrement dit, un Operateur
qui est
en cours d'exécution aura une variante Some
dans tache
, et lorsque nous
souhaiterons nettoyer Operateur
, nous remplacerons Some
par None
afin que
Operateur
n'ait pas de tâche à exécuter.
Donc nous savons que nous voulons modifier la définition de Operateur
comme
ceci :
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Mission>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(mission).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
operateur.tache.join().unwrap();
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let mission = reception.lock().unwrap().recv().unwrap();
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
});
Operateur { id, tache }
}
}
Maintenant, aidons-nous du compilateur pour trouver les autres endroits qui ont besoin de changer. En vérifiant ce code, nous obtenons deux erreurs :
$ cargo check
Checking salutations v0.1.0 (file:///projects/salutations)
error[E0599]: no method named `join` found for enum `Option` in the current scope
--> src/lib.rs:52:27
|
52 | operateur.tache.join().unwrap();
| ^^^^ method not found in `Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Operateur { id, tache }
| ^^^^^ expected enum `Option`, found struct `JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Operateur { id, Some(tache) }
| +++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors
Corrigeons la seconde erreur, qui se situe dans le code à la fin de
Operateur::new
: nous devons intégrer la valeur de tache
dans un Some
lorsque nous créons un nouvel Operateur
. Faites les changements suivants pour
corriger cette erreur :
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Mission>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(mission).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
operateur.tache.join().unwrap();
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
// -- partie masquée ici --
let tache = thread::spawn(move || loop {
let mission = reception.lock().unwrap().recv().unwrap();
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
});
Operateur {
id,
tache: Some(tache),
}
}
}
La première erreur se situe dans notre implémentation de Drop
. Nous avions
mentionné plus tôt que nous voulions faire appel à take
sur la valeur de
Option
pour déplacer tache
en dehors de operateur
. Voici les changements
à apporter pour ceci :
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Mission>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(mission).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
if let Some(tache) = operateur.tache.take() {
tache.join().unwrap();
}
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let mission = reception.lock().unwrap().recv().unwrap();
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
});
Operateur {
id,
tache: Some(tache),
}
}
}
Comme nous l'avons vu au chapitre 17, la méthode take
sur Option
sort la
variante Some
et laisse un None
à la place. Nous utilisons if let
pour
destructurer le Some
et obtenir la tâche ; ensuite nous faisons appel à join
sur cette tâche. Si la tâche d'un opérateur est déjà un None
, nous savons
qu'il a déjà nettoyé sa tâche et que dans ce cas nous n'avons rien à faire.
Demander aux tâches d'arrêter d'attendre des missions
Avec tous ces changements, notre code se compile désormais sans aucun
avertissement. Mais la mauvaise nouvelle est que pour l'instant ce code ne
fonctionne comme nous le souhaitons. La cause se situe dans la logique des
fermetures qui sont exécutées par les tâches des instances de Operateur
:
pour le moment, nous faisons appel à join
, mais cela ne va pas arrêter les
tâches car elles font une boucle infinie avec loop
pour attendre des
missions. Si nous essayons de nettoyer notre GroupeTaches
avec
l'implémentation actuelle de drop
, la tâche principale va se bloquer pour
toujours en attendant en vain que la première tâche se termine.
Pour corriger ce problème, nous allons modifier les tâches pour qu'elles
attendent soit une Mission
à exécuter, soit le signal qui leur dit qu'elles
doivent arrêter d'attendre des missions et sortir de la boucle infinie. Notre
canal va envoyer une de ces deux variantes d'énumération au lieu d'instances de
Mission
.
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Mission>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NouvelleMission(Mission),
Extinction,
}
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(mission).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
if let Some(tache) = operateur.tache.take() {
tache.join().unwrap();
}
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Mission>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let mission = reception.lock().unwrap().recv().unwrap();
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
});
Operateur {
id,
tache: Some(tache),
}
}
}
Cette énumération Message
aura pour valeurs une variante NouvelleMission
qui contiendra la Mission
que la tâche devra exécuter, ou la variante
Extinction
qui va faire en sorte que la tâche sorte de sa boucle et se
termine.
Nous devons corriger le canal pour utiliser les valeurs du type Message
à la place du type Mission
, comme dans l'encart 20-23.
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Message>,
}
// -- partie masquée ici --
type Mission = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NouvelleMission(Mission),
Extinction,
}
impl GroupeTaches {
// -- partie masquée ici --
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(Message::NouvelleMission(mission)).unwrap();
}
}
// -- partie masquée ici --
impl Drop for GroupeTaches {
fn drop(&mut self) {
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
if let Some(tache) = operateur.tache.take() {
tache.join().unwrap();
}
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let message = reception.lock().unwrap().recv().unwrap();
match message {
Message::NouvelleMission(mission) => {
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
}
Message::Extinction => {
println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);
break;
}
}
});
Operateur {
id,
tache: Some(tache),
}
}
}
Pour intégrer l'énumération Message
, nous devons changer Mission
par
Message
à deux endroits : dans la définition de GroupeTaches
et dans la
signature de Operateur::new
. La méthode executer
de GroupeTaches
doit
envoyer des missions encapsulées dans des variantes de
Message::NouvelleTache
. Ensuite, dans Operateur::new
où nous recevons des
Message
du canal, la mission sera traitée si la variante NouvelleTache
est
reçue, ou bien la tâche arrêtera la boucle si la variante Extinction
est
reçue.
Grâce à ces changements, le code va se compiler et continuer de fonctionner de
la même manière qu'il le faisait après l'encart 20-20. Mais nous allons obtenir
un avertissement car nous ne créons aucun message de la variante Extinction
.
Corrigeons cet avertissement en modifiant notre implémentation de Drop
pour qu'elle ressemble à l'encart 20-24.
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Message>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NouvelleMission(Mission),
Extinction,
}
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(Message::NouvelleMission(mission)).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
println!("Envoi du message d'extinction à tous les opérateurs.");
for _ in &self.operateurs {
self.envoi.send(Message::Extinction).unwrap();
}
println!("Arrêt de tous les opérateurs.");
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
if let Some(tache) = operateur.tache.take() {
tache.join().unwrap();
}
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let message = reception.lock().unwrap().recv().unwrap();
match message {
Message::NouvelleMission(mission) => {
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
}
Message::Extinction => {
println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);
break;
}
}
});
Operateur {
id,
tache: Some(tache),
}
}
}
Nous itérons deux fois sur les opérateurs : une fois pour envoyer un message
Extinction
pour chaque opérateur, et une seconde fois pour utiliser join
sur leur tâche. Si nous avions essayé d'envoyer le message et d'utiliser
immédiatement join
dans la même boucle, nous n'aurions pas pu garantir que
l'opérateur de l'itération en cours serait celui qui obtiendrait le message
envoyé dans le canal.
Pour mieux comprendre pourquoi nous avons besoin de deux boucles distinctes,
imaginez un scénario avec deux opérateurs. Si nous avions utilisé une seule
boucle pour itérer sur chacun des opérateurs, dans la première itération un
message d'extinction aurait été envoyé dans le canal et join
aurait été
utilisé sur la tâche du premier opérateur. Si ce premier opérateur était occupé
à traiter une requête à ce moment-là, le second opérateur aurait alors récupéré
le message d'extinction dans le canal et se serait arrêté. Nous serions alors
restés à attendre que le premier opérateur s'arrête, mais cela ne se serait
jamais produit car c'est la seconde tâche qui aurait obtenu le message
d'extinction. Nous serions alors dans une situation d'interblocage !
Pour éviter ce scénario, nous allons commencer par émettre tous nos messages
Extinction
dans le canal en utilisant une boucle ; puis nous utilisons join
sur toutes les tâches dans une seconde boucle. Chaque opérateur va arrêter de
recevoir de nouvelles requêtes du canal dès qu'il aura reçu le message
d'extinction. Donc, nous sommes maintenant assurés que si nous envoyons autant
de messages d'extinction qu'il y a d'opérateurs, chaque opérateur
recevra un message d'extinction avant que join
ne soit utilisé sur leur
tâche.
Pour observer ce code en action, modifions notre main
pour accepter
uniquement deux requêtes avant d'arrêter proprement le serveur, comme dans
l'encart 20-25.
Fichier : src/bin/main.rs
use salutations::GroupeTaches;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main() {
let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
let groupe = GroupeTaches::new(4);
for flux in ecouteur.incoming().take(2) {
let flux = flux.unwrap();
groupe.executer(|| {
gestion_connexion(flux);
});
}
println!("Arrêt complet.");
}
fn gestion_connexion(mut flux: TcpStream) {
let mut tampon = [0; 1024];
flux.read(&mut tampon).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let pause = b"GET /pause HTTP/1.1\r\n";
let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if tampon.starts_with(pause) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contenu = fs::read_to_string(nom_fichier).unwrap();
let reponse = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
ligne_statut,
contenu.len(),
contenu
);
flux.write(reponse.as_bytes()).unwrap();
flux.flush().unwrap();
}
Dans la réalité on ne voudrait pas qu'un serveur web s'arrête après avoir servi seulement deux requêtes. Ce code sert uniquement à montrer que l'arrêt et le nettoyage s'effectuent bien proprement.
La méthode take
est définie dans le trait Iterator
et limite l'itération
aux deux premiers éléments au maximum. Le GroupeTaches
va sortir de la portée
à la fin du main
et l'implémentation de drop
va s'exécuter.
Démarrez le serveur avec cargo run
et faites trois requêtes. La troisième
requête devrait renvoyer une erreur tandis que dans votre terminal vous devriez
avoir une sortie similaire à ceci :
$ cargo run
Compiling salutations v0.1.0 (file:///projects/salutations)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/main`
L'opérateur 0 a reçu une mission ; il l'exécute.
L'opérateur 3 a reçu une mission ; il l'exécute.
Arrêt.
Envoi du message d'extinction à tous les opérateurs.
Arrêt de tous les opérateurs.
Arrêt de l'opérateur 0
L'opérateur 1 a reçu l'instruction d'arrêt.
L'opérateur 2 a reçu l'instruction d'arrêt.
L'opérateur 0 a reçu l'instruction d'arrêt.
L'opérateur 3 a reçu l'instruction d'arrêt.
Arrêt de l'opérateur 1
Arrêt de l'opérateur 2
Arrêt de l'opérateur 3
Vous pourriez avoir un ordre différent entre les opérateurs et les messages
affichés. Nous pouvons constater la façon dont ce code fonctionne grâce aux
messages : les opérateurs 0 et 3 obtiennent les deux premières requêtes puis, à
la troisième requête, le serveur arrête d'accepter des connexions. Lorsque
le GroupeTaches
sort de la portée à la fin du main
, son implémentation de
Drop
entre en action et le groupe demande à tous les opérateurs de
s'arrêter. Chaque opérateur va afficher un message lorsqu'il recevra le message
d'extinction puis le groupe de tâche utilisera join
pour arrêter
la tâche de chaque opérateur.
Remarquez un aspect intéressant spécifique à cette exécution : le
GroupeTaches
a envoyé les messages d'extinction dans le canal, et avant que
tous les opérateurs aient reçu les messages, nous avons essayé d'utiliser
join
sur l'opérateur 0. L'opérateur 0 n'avait pas encore reçu le message
d'extinction, donc la tâche principale a attendu que l'opérateur 0 finisse.
Pendant ce temps, tous les autres opérateurs ont reçu les messages
d'extinction. Lorsque l'opérateur 0 a fini, la tâche principale a attendu que
les autres opérateurs se terminent. A ce stade, ils avaient alors tous reçu le
message d'extinction et étaient en mesure de s'arrêter.
Félicitations ! Nous avons maintenant terminé notre projet ; nous avons un serveur web basique qui utilise un groupe de tâches pour répondre de manière asynchrone. Nous pouvons demander un arrêt propre du serveur qui va nettoyer toutes les tâches du groupe.
Voici le code complet afin que vous puissiez vous y référer :
Fichier : src/bin/main.rs
use salutations::GroupeTaches;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main() {
let ecouteur = TcpListener::bind("127.0.0.1:7878").unwrap();
let groupe = GroupeTaches::new(4);
for flux in ecouteur.incoming() {
let flux = flux.unwrap();
groupe.executer(|| {
gestion_connexion(flux);
});
}
println!("Shutting down.");
}
fn gestion_connexion(mut flux: TcpStream) {
let mut tampon = [0; 1024];
flux.read(&mut tampon).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let pause = b"GET /pause HTTP/1.1\r\n";
let (ligne_statut, nom_fichier) = if tampon.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if tampon.starts_with(pause) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contenu = fs::read_to_string(nom_fichier).unwrap();
let reponse = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
ligne_statut,
contenu.len(),
contenu
);
flux.write(reponse.as_bytes()).unwrap();
flux.flush().unwrap();
}
Fichier : src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct GroupeTaches {
operateurs: Vec<Operateur>,
envoi: mpsc::Sender<Message>,
}
type Mission = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NouvelleMission(Mission),
Extinction,
}
impl GroupeTaches {
/// Crée un nouveau GroupeTaches.
///
/// La taille est le nom de tâches présentes dans le groupe.
///
/// # Panics
///
/// La fonction `new` devrait paniquer si la taille vaut zéro.
pub fn new(taille: usize) -> GroupeTaches {
assert!(taille > 0);
let (envoi, reception) = mpsc::channel();
let reception = Arc::new(Mutex::new(reception));
let mut operateurs = Vec::with_capacity(taille);
for id in 0..taille {
operateurs.push(Operateur::new(id, Arc::clone(&reception)));
}
GroupeTaches { operateurs, envoi }
}
pub fn executer<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let mission = Box::new(f);
self.envoi.send(Message::NouvelleMission(mission)).unwrap();
}
}
impl Drop for GroupeTaches {
fn drop(&mut self) {
println!("Envoi du message d'extinction à tous les opérateurs.");
for _ in &self.operateurs {
self.envoi.send(Message::Extinction).unwrap();
}
println!("Arrêt de tous les opérateurs.");
for operateur in &mut self.operateurs {
println!("Arrêt de l'opérateur {}", operateur.id);
if let Some(tache) = operateur.tache.take() {
tache.join().unwrap();
}
}
}
}
struct Operateur {
id: usize,
tache: Option<thread::JoinHandle<()>>,
}
impl Operateur {
fn new(id: usize, reception: Arc<Mutex<mpsc::Receiver<Message>>>) -> Operateur {
let tache = thread::spawn(move || loop {
let message = reception.lock().unwrap().recv().unwrap();
match message {
Message::NouvelleMission(mission) => {
println!("L'opérateur {} a reçu une mission ; il l'exécute.", id);
mission();
}
Message::Extinction => {
println!("L'opérateur {} a reçu l'instruction d'arrêt.", id);
break;
}
}
});
Operateur {
id,
tache: Some(tache),
}
}
}
Nous aurions pu faire bien plus ! Si vous souhaitez continuer à améliorer ce projet, voici quelques idées :
- Ajouter de la documentation à
GroupeTaches
et aux méthodes publiques. - Ajouter des tests sur les fonctionnalités de la bibliothèque.
- Remplacer les appels à
unwrap
pour fournir une meilleure gestion des erreurs. - Utiliser
GroupeTaches
pour exécuter d'autres tâches que de répondre à des requêtes web. - Trouver une crate de groupe de tâches (NdT : thread pool) sur crates.io et implémenter un serveur web similaire en l'utilisant. Comparer ensuite son API et sa robustesse au groupe de tâches que nous avons implémenté.
Résumé
Bravo ! Vous êtes arrivé à la fin du livre ! Nous tenons à vous remercier chaleureusement de nous avoir accompagné pendant cette présentation de Rust. Vous êtes maintenant fin prêt(e) à créer vos propres projets Rust et aider les projets des autres développeurs. Rappelez-vous qu'il existe une communauté accueillante de Rustacés qui adorerait vous aider à relever tous les défis que vous rencontrerez dans votre aventure avec Rust.