🚧 Attention, peinture fraîche !
Cette page a été traduite par une seule personne et n'a pas été relue et vérifiée par quelqu'un d'autre ! Les informations peuvent par exemple être erronées, être formulées maladroitement, ou contenir d'autres types de fautes.
Gérer les connexions en concurrence
Le problème avec notre code précédent est que ecouteur.incoming()
est un
itérateur bloquant. L'exécuteur ne peut pas exécuter d'autres futures pendant
que ecouteur
attends les connexions entrantes, et nous ne pouvons pas gérer
une nouvelle connexion jusqu'à ce que nous ayons terminé avec la précédente.
Pour corriger cela, nous allons transformer l'itérateur bloquant
ecouteur.incoming()
en Stream
non bloquant. Les Stream
s ressemblent aux
itérateurs, mais peuvent être consommés de manière asynchrone. Pour plus
d'informations, vous pouvez consulter le chapitre sur les
Stream
s.
Remplaçons notre std::net::TcpListener
bloquant par le
async_std::net::TcpListener
non bloquant, et mettons à jour notre gestion de
connexion pour accepter un async_std::net::TcpStream
:
use async_std::prelude::*;
async fn gestion_connexion(mut flux: TcpStream) {
let mut tampon = [0; 1024];
flux.read(&mut tampon).await.unwrap();
//<-- partie masquée ici -->
flux.write(reponse.as_bytes()).await.unwrap();
flux.flush().await.unwrap();
}
La version asynchrone de TcpListener
implémente le trait Stream
sur
ecouteur.incoming()
, ce qui apporte deux avantages.
Le premier est que ecouteur.incoming()
ne bloque plus l'exécuteur.
L'exécuteur peut maintenant transférer l'exécution à d'autres futures en
attente lorsqu'il n'y a plus de connexions TCP entrantes à traiter.
Le second bienfait est que les éléments du Stream
peuvent optionnellement
être traités en concurrence, en utilisant la méthode for_each_concurrent
des
Stream
s.
Ici, nous allons profiter de cette méthode pour traiter chaque requête entrante
de manière concurrente.
Nous avons besoin d'importer le trait Stream
de la crate futures
, donc
notre Cargo.toml ressemble maintenant à ceci :
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
Maintenant, nous pouvons traiter chaque connexion en concurrence en passant
gestion_connexion
dans une fermeture. La fermeture prend possession de chaque
TcpStream
, et est exécuté dès qu'un nouveau TcpStream
est disponible. Tant
que gestion_connexion
ne bloque pas, une réponse lente ne va plus empêcher
les autres requêtes de se compléter.
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let ecouteur = TcpListener::bind("127.0.0.1:7878").await.unwrap();
ecouteur
.incoming()
.for_each_concurrent(/* limite */ None, |flux_tcp| async move {
let flux_tcp = flux_tcp.unwrap();
gestion_connexion(flux_tcp).await;
})
.await;
}
Servir les requêtes en parallèle
Notre exemple jusqu'à présent a largement présenté la concurrence (en utilisant
du code asynchrone) comme étant une alternative au parallélisme (en utilisant
des processus).
Cependant, le code asynchrone et les processus ne s'excluent pas mutuellement.
Dans notre exemple, for_each_concurrent
traite chaque connexion en
concurrence, mais sur le même processus.
La crate async-std
nous permet également de créer des tâches sur des
processus séparés.
Comme gestion_connexion
est à la fois Send
et non bloquant, il est sûr à
utiliser avec async_std::task::spawn
.
Voici à quoi cela devrait ressembler :
use async_std::task::spawn; #[async_std::main] async fn main() { let ecouteur = TcpListener::bind("127.0.0.1:7878").await.unwrap(); ecouteur .incoming() .for_each_concurrent(/* limite */ None, |flux| async move { let flux = flux.unwrap(); spawn(gestion_connexion(flux)); }) .await; }
Maintenant nous utilisons à la fois la concurrence et le parallélisme pour traiter plusieurs requêtes en même temps ! Lisez la section sur les exécuteurs multi-processus pour en savoir plus.