【Rust】WebSocketを使ってみる(クライアント)

調べても英語の情報しかないRust。今回は、WebSocketを取り上げていく。

WebSocketとは

一言で言えば、双方向通信技術になります。通話やSNSのメッセージのやり取り、ビデオ通話、生放送でよく使われる技術になります。

ライブラリ

メイン:https://github.com/snapview/tokio-tungstenite

非同期処理は、Tokioのライブラリを使う。選んだ理由としては、海外の記事に多く取り上げられていたことと、RustのコミュニティDISCORDでこっちが安定しているという話があったから。

cargo.toml

[dependencies]
futures-util = "0.3.29"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }

プログラミングコード

ライブラリのGithubに書き方の例があるが、わかりやすく、Typescriptとかに寄せて書いています。テスト用に使えるURLも入れてあるので、そのまま実行できます。

また、受信したメッセージをprintして、他のスレッドで読み取るようなコードがサンプルとして見かけたが、私の環境では上手く読み取れていなかったので、受診したらそのまま送信するコードになっています。

main.rs

use futures_util::{StreamExt, SinkExt, stream::{SplitSink, SplitStream}};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};

// 通信のはじめ
// write: 送信用の関数
// msg: メッセージ
async fn on_open(
    write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
    msg: &str
) {
    let request_msg = Message::Text(format!("{}", msg));
    write.send(request_msg).await.expect("Failed to send registration message");
}

// メッセージの送受信
// 接続したwebsocketのやり取りを行う
// read: 受信用の関数
// write: 送信用の関数
async fn on_message(
    mut read: SplitStream<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>>,
    mut write: SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>
) {
    while let Some(message) = read.next().await {
        match message {
            Ok(msg) => {
                recive_send(&mut write, &msg).await;
            },
            Err(e) => {
                eprintln!("Error reading message: {:?}", e);
            }
        }
    }
}
// メッセージを受けたあとの処理
// ここでは、メッセージを受け取ったら、適当なメッセージを返すようにしている
async fn recive_send(
    write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
    msg: &Message,
){
    write.send(Message::Text(format!("{}", "hogehoge"))).await.expect("Failed to send message");
    println!("Received a message: {}", msg);
}

pub async fn client() {
    let url = "wss://echo.websocket.events";

    // URLのセッティング
    println!("Connecting to - {}", url);
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    println!("Connected to Agent Network");

    // 受信と送信用関数を切り離し
    let (mut write, read) = ws_stream.split();

    // wss://echo.websocket.eventsには必要ないが、オブジェクトの描き方がてら、適当な文字を入れておく。
    let start_msg = r#"{{
        "context": "message",
        "object_tmp": {{
            "hoge": "hoge"
        }}
    }}"#;
    // websocketの接続
    // register the weboscoket
    on_open(&mut write, start_msg).await;

    // メッセージの受信用にスレッドを開始
    // Handle incoming messages in a separate task
    let on_message = tokio::spawn(async move {
        on_message(read, write).await;
    });

    // tokioにスレッドをぶん投げ
    // Await both tasks (optional, depending on your use case)
    let _ = tokio::try_join!(on_message);
}


fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(client());
}

それぞれの関数について

on_open:Webで作ったことある人ならある程度予想つくと思うが、通信の開始に送信する情報になります。こちらを送信してから、WebSocketとの双方向通信が始まります。

async fn on_open(
    write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
    msg: &str
) {
    let request_msg = Message::Text(format!("{}", msg));
    write.send(request_msg).await.expect("Failed to send registration message");
}

on_message:メッセージの送受信用の関数になります。処理が入れ子になりすぎないように、取得したメッセージは、recive_send(&mut write, &msg)に流しています。

async fn on_message(
    mut read: SplitStream<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>>,
    mut write: SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>
) {
    while let Some(message) = read.next().await {
        match message {
            Ok(msg) => {
                recive_send(&mut write, &msg).await;
            },
            Err(e) => {
                eprintln!("Error reading message: {:?}", e);
            }
        }
    }
}

client:通信の管理するための関数としておいています。WebSocketの処理は基本的に非同期処理なるので、client関数をおいて、mainからtokioのライブラリを使って呼び出しています。

pub async fn client() {
    let url = "wss://echo.websocket.events";

    // URLのセッティング
    println!("Connecting to - {}", url);
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    println!("Connected to Agent Network");

    // 受信と送信用関数を切り離し
    let (mut write, read) = ws_stream.split();

    // wss://echo.websocket.eventsには必要ないが、オブジェクトの描き方がてら、適当な文字を入れておく。
    let start_msg = r#"{{
        "context": "message",
        "object_tmp": {{
            "hoge": "hoge"
        }}
    }}"#;
    // websocketの接続
    // register the weboscoket
    on_open(&mut write, start_msg).await;

    // メッセージの受信用にスレッドを開始
    // Handle incoming messages in a separate task
    let on_message = tokio::spawn(async move {
        on_message(read, write).await;
    });

    // tokioにスレッドをぶん投げ
    // Await both tasks (optional, depending on your use case)
    let _ = tokio::try_join!(on_message);
}

あとがき

Rustの文献がそもそも少なく、ライブラリも豊富とは言えないので、調べるのがとても大変だった。また、最近の記事のものを見つけても動かないことが多かったので、触って理解するってのもできない;;文献は英語orGithub、変態以外は諦めそうな環境です笑

このコードがあなたの環境で動いて為になればいいです。

参考

Github: https://github.com/chrishayuk/rust-websockets-time/tree/main

コメントを残す