調べても英語の情報しかないRust。今回は、WebSocketを取り上げていく。
WebSocketとは
一言で言えば、双方向通信技術になります。通話やSNSのメッセージのやり取り、ビデオ通話、生放送でよく使われる技術になります。
ライブラリ
メイン:https://github.com/snapview/tokio-tungstenite
非同期処理は、Tokioのライブラリを使う。選んだ理由としては、海外の記事に多く取り上げられていたことと、RustのコミュニティDISCORDでこっちが安定しているという話があったから。
cargo.toml
[dependencies]
futures-util = "0.3.29"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
プログラミングコード
ライブラリのGithubに書き方の例があるが、わかりやすく、Typescriptとかに寄せて書いています。テスト用に使えるURLも入れてあるので、そのまま実行できます。
また、受信したメッセージをprintして、他のスレッドで読み取るようなコードがサンプルとして見かけたが、私の環境では上手く読み取れていなかったので、受診したらそのまま送信するコードになっています。
main.rs
use futures_util::{StreamExt, SinkExt, stream::{SplitSink, SplitStream}};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
// 通信のはじめ
// write: 送信用の関数
// msg: メッセージ
async fn on_open(
write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
msg: &str
) {
let request_msg = Message::Text(format!("{}", msg));
write.send(request_msg).await.expect("Failed to send registration message");
}
// メッセージの送受信
// 接続したwebsocketのやり取りを行う
// read: 受信用の関数
// write: 送信用の関数
async fn on_message(
mut read: SplitStream<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>>,
mut write: SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>
) {
while let Some(message) = read.next().await {
match message {
Ok(msg) => {
recive_send(&mut write, &msg).await;
},
Err(e) => {
eprintln!("Error reading message: {:?}", e);
}
}
}
}
// メッセージを受けたあとの処理
// ここでは、メッセージを受け取ったら、適当なメッセージを返すようにしている
async fn recive_send(
write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
msg: &Message,
){
write.send(Message::Text(format!("{}", "hogehoge"))).await.expect("Failed to send message");
println!("Received a message: {}", msg);
}
pub async fn client() {
let url = "wss://echo.websocket.events";
// URLのセッティング
println!("Connecting to - {}", url);
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("Connected to Agent Network");
// 受信と送信用関数を切り離し
let (mut write, read) = ws_stream.split();
// wss://echo.websocket.eventsには必要ないが、オブジェクトの描き方がてら、適当な文字を入れておく。
let start_msg = r#"{{
"context": "message",
"object_tmp": {{
"hoge": "hoge"
}}
}}"#;
// websocketの接続
// register the weboscoket
on_open(&mut write, start_msg).await;
// メッセージの受信用にスレッドを開始
// Handle incoming messages in a separate task
let on_message = tokio::spawn(async move {
on_message(read, write).await;
});
// tokioにスレッドをぶん投げ
// Await both tasks (optional, depending on your use case)
let _ = tokio::try_join!(on_message);
}
fn main() {
tokio::runtime::Runtime::new().unwrap().block_on(client());
}
それぞれの関数について
on_open:Webで作ったことある人ならある程度予想つくと思うが、通信の開始に送信する情報になります。こちらを送信してから、WebSocketとの双方向通信が始まります。
async fn on_open(
write: &mut SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>,
msg: &str
) {
let request_msg = Message::Text(format!("{}", msg));
write.send(request_msg).await.expect("Failed to send registration message");
}
on_message:メッセージの送受信用の関数になります。処理が入れ子になりすぎないように、取得したメッセージは、recive_send(&mut write, &msg)に流しています。
async fn on_message(
mut read: SplitStream<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>>,
mut write: SplitSink<WebSocketStream<impl AsyncRead + AsyncWrite + Unpin>, Message>
) {
while let Some(message) = read.next().await {
match message {
Ok(msg) => {
recive_send(&mut write, &msg).await;
},
Err(e) => {
eprintln!("Error reading message: {:?}", e);
}
}
}
}
client:通信の管理するための関数としておいています。WebSocketの処理は基本的に非同期処理なるので、client関数をおいて、mainからtokioのライブラリを使って呼び出しています。
pub async fn client() {
let url = "wss://echo.websocket.events";
// URLのセッティング
println!("Connecting to - {}", url);
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("Connected to Agent Network");
// 受信と送信用関数を切り離し
let (mut write, read) = ws_stream.split();
// wss://echo.websocket.eventsには必要ないが、オブジェクトの描き方がてら、適当な文字を入れておく。
let start_msg = r#"{{
"context": "message",
"object_tmp": {{
"hoge": "hoge"
}}
}}"#;
// websocketの接続
// register the weboscoket
on_open(&mut write, start_msg).await;
// メッセージの受信用にスレッドを開始
// Handle incoming messages in a separate task
let on_message = tokio::spawn(async move {
on_message(read, write).await;
});
// tokioにスレッドをぶん投げ
// Await both tasks (optional, depending on your use case)
let _ = tokio::try_join!(on_message);
}
あとがき
Rustの文献がそもそも少なく、ライブラリも豊富とは言えないので、調べるのがとても大変だった。また、最近の記事のものを見つけても動かないことが多かったので、触って理解するってのもできない;;文献は英語orGithub、変態以外は諦めそうな環境です笑
このコードがあなたの環境で動いて為になればいいです。
参考
Github: https://github.com/chrishayuk/rust-websockets-time/tree/main
