🚧 Attention, peinture fraîche !

Cette page a été traduite par une seule personne et n'a pas été relue et vérifiée par quelqu'un d'autre ! Les informations peuvent par exemple être erronées, être formulées maladroitement, ou contenir d'autres types de fautes.

Gérer les connexions en concurrence

Le problème avec notre code précédent est que ecouteur.incoming() est un itérateur bloquant. L'exécuteur ne peut pas exécuter d'autres futures pendant que ecouteur attends les connexions entrantes, et nous ne pouvons pas gérer une nouvelle connexion jusqu'à ce que nous ayons terminé avec la précédente.

Pour corriger cela, nous allons transformer l'itérateur bloquant ecouteur.incoming() en Stream non bloquant. Les Streams ressemblent aux itérateurs, mais peuvent être consommés de manière asynchrone. Pour plus d'informations, vous pouvez consulter le chapitre sur les Streams.

Remplaçons notre std::net::TcpListener bloquant par le async_std::net::TcpListener non bloquant, et mettons à jour notre gestion de connexion pour accepter un async_std::net::TcpStream :

use async_std::prelude::*;

async fn gestion_connexion(mut flux: TcpStream) {
    let mut tampon = [0; 1024];
    flux.read(&mut tampon).await.unwrap();

    //<-- partie masquée ici -->
    flux.write(reponse.as_bytes()).await.unwrap();
    flux.flush().await.unwrap();
}

La version asynchrone de TcpListener implémente le trait Stream sur ecouteur.incoming(), ce qui apporte deux avantages. Le premier est que ecouteur.incoming() ne bloque plus l'exécuteur. L'exécuteur peut maintenant transférer l'exécution à d'autres futures en attente lorsqu'il n'y a plus de connexions TCP entrantes à traiter.

Le second bienfait est que les éléments du Stream peuvent optionnellement être traités en concurrence, en utilisant la méthode for_each_concurrent des Streams. Ici, nous allons profiter de cette méthode pour traiter chaque requête entrante de manière concurrente. Nous avons besoin d'importer le trait Stream de la crate futures, donc notre Cargo.toml ressemble maintenant à ceci :

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

Maintenant, nous pouvons traiter chaque connexion en concurrence en passant gestion_connexion dans une fermeture. La fermeture prend possession de chaque TcpStream, et est exécuté dès qu'un nouveau TcpStream est disponible. Tant que gestion_connexion ne bloque pas, une réponse lente ne va plus empêcher les autres requêtes de se compléter.

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    ecouteur
        .incoming()
        .for_each_concurrent(/* limite */ None, |flux_tcp| async move {
            let flux_tcp = flux_tcp.unwrap();
            gestion_connexion(flux_tcp).await;
        })
        .await;
}

Servir les requêtes en parallèle

Notre exemple jusqu'à présent a largement présenté la concurrence (en utilisant du code asynchrone) comme étant une alternative au parallélisme (en utilisant des processus). Cependant, le code asynchrone et les processus ne s'excluent pas mutuellement. Dans notre exemple, for_each_concurrent traite chaque connexion en concurrence, mais sur le même processus. La crate async-std nous permet également de créer des tâches sur des processus séparés. Comme gestion_connexion est à la fois Send et non bloquant, il est sûr à utiliser avec async_std::task::spawn. Voici à quoi cela devrait ressembler :

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let ecouteur = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    ecouteur
        .incoming()
        .for_each_concurrent(/* limite */ None, |flux| async move {
            let flux = flux.unwrap();
            spawn(gestion_connexion(flux));
        })
        .await;
}

Maintenant nous utilisons à la fois la concurrence et le parallélisme pour traiter plusieurs requêtes en même temps ! Lisez la section sur les exécuteurs multi-processus pour en savoir plus.