WebSocket.Rx
0.1.6
dotnet add package WebSocket.Rx --version 0.1.6
NuGet\Install-Package WebSocket.Rx -Version 0.1.6
<PackageReference Include="WebSocket.Rx" Version="0.1.6" />
<PackageVersion Include="WebSocket.Rx" Version="0.1.6" />
<PackageReference Include="WebSocket.Rx" />
paket add WebSocket.Rx --version 0.1.6
#r "nuget: WebSocket.Rx, 0.1.6"
#:package WebSocket.Rx@0.1.6
#addin nuget:?package=WebSocket.Rx&version=0.1.6
#tool nuget:?package=WebSocket.Rx&version=0.1.6
WebSocket.Rx
<div align="center">
<img alt="WebSocket.Rx logo" height="128" src="https://raw.githubusercontent.com/st0o0/WebSocket.Rx/refs/heads/main/docs/logo/logo.png" width="128"/>
A powerful .NET library for reactive WebSocket communication using R3 (Reactive Extensions)
</div>
๐ Table of Contents
- Features
- Installation
- Quick Start
- Core Concepts
- Advanced Usage
- API Reference
- Inspiration
- Contributing
- License
โจ Features
Client Features
- ๐ Automatic Reconnection - Built-in reconnection logic with configurable strategies
- ๐ก Reactive Streams - Observable sequences for messages, connections, and disconnections
- ๐งต Thread-Safe - Safe concurrent message sending and receiving
- ๐ฆ Message Queuing - Automatic buffering with channel-based send queue
- โก High Performance - Built on
System.Threading.ChannelsandArrayPool<byte> - ๐ฏ Type-Safe - Strong typing with text/binary message support
- ๐ Proper Resource Management - Full
IAsyncDisposablesupport with graceful shutdown
Server Features
- ๐ฅ Multi-Client Support - Handle multiple WebSocket connections simultaneously
- ๐ Client Tracking - Built-in client metadata and connection management
- ๐ Event Streams - Observables for client connect/disconnect events
- ๐ฏ Selective Messaging - Send to specific clients or broadcast to all
- ๐ก๏ธ Robust Cleanup - Automatic client cleanup on disconnect
๐ฆ Installation
dotnet add package WebSocket.Rx
Requirements: .NET 10.0 or higher
๐ Quick Start
Client Example
using WebSocket.Rx;
using R3;
// Create and configure client
await using var client = new ReactiveWebSocketClient(new Uri("wss://echo.websocket.org"))
{
IsReconnectionEnabled = true,
KeepAliveInterval = TimeSpan.FromSeconds(30),
IsTextMessageConversionEnabled = true
};
// Subscribe to messages
client.MessageReceived
.Subscribe(msg => Console.WriteLine($"Received: {msg.Text}"));
// Subscribe to connection events
client.ConnectionHappened
.Subscribe(info => Console.WriteLine($"Connected: {info.Reason}"));
client.DisconnectionHappened
.Subscribe(info => Console.WriteLine($"Disconnected: {info.Reason}"));
// Connect and send messages
await client.StartAsync();
await client.SendAsTextAsync("Hello WebSocket!");
Server Example
using WebSocket.Rx;
using R3;
// Create and start server
await using var server = new ReactiveWebSocketServer("http://localhost:8080/")
{
IsTextMessageConversionEnabled = true
};
// Subscribe to client events
server.ClientConnected
.Subscribe(client => Console.WriteLine($"Client connected: {client.Metadata.Id}"));
server.Messages
.Subscribe(msg =>
{
Console.WriteLine($"From {msg.Metadata.Id}: {msg.Message.Text}");
// Echo back to sender
server.SendAsTextAsync(msg.Metadata.Id, $"Echo: {msg.Message.Text}");
});
await server.StartAsync();
Console.WriteLine($"Server running with {server.ClientCount} clients");
๐ Core Concepts
Observable Streams
WebSocket.Rx is built around reactive streams using R3:
// Filter and transform messages
client.MessageReceived
.Where(msg => msg.MessageType == MessageType.Text)
.Select(msg => msg.Text.ToUpper())
.Subscribe(text => Console.WriteLine(text));
// Debounce reconnection events
client.ConnectionHappened
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(info => Console.WriteLine("Stable connection established"));
Message Types
// Send text message (queued)
await client.SendAsTextAsync("Hello");
// Send binary message (queued)
await client.SendAsBinaryAsync(new byte[] { 0x01, 0x02 });
// Send instant (bypasses queue)
await client.SendInstantAsync("Urgent message");
// Try send (non-blocking)
bool sent = client.TrySendAsText("Optional message");
Connection Lifecycle
// Start connection
await client.StartAsync();
// Reconnect manually
await client.ReconnectAsync();
// Stop gracefully
await client.StopAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");
// Dispose (automatic cleanup)
await client.DisposeAsync();
๐ง Advanced Usage
Custom Configuration
var client = new ReactiveWebSocketClient(new Uri("wss://example.com"))
{
// Connection settings
ConnectTimeout = TimeSpan.FromSeconds(10),
KeepAliveInterval = TimeSpan.FromSeconds(30),
KeepAliveTimeout = TimeSpan.FromSeconds(10),
// Reconnection
IsReconnectionEnabled = true,
// Message handling
IsTextMessageConversionEnabled = true,
MessageEncoding = Encoding.UTF8
};
Server Broadcasting
// Broadcast to all clients
foreach (var clientId in server.ConnectedClients.Keys)
{
await server.SendAsTextAsync(clientId, "Broadcast message");
}
// Send to specific clients
var targetClients = server.ConnectedClients
.Where(c => c.Value.CustomData?.Contains("premium") == true)
.Select(c => c.Key);
foreach (var clientId in targetClients)
{
await server.SendAsTextAsync(clientId, "Premium feature alert!");
}
Error Handling
client.DisconnectionHappened
.Subscribe(info =>
{
Console.WriteLine($"Disconnect reason: {info.Reason}");
if (info.Exception != null)
{
Console.WriteLine($"Error: {info.Exception.Message}");
}
});
Combining Observables
// Wait for connection before sending
client.ConnectionHappened
.Take(1)
.Subscribe(_ => client.SendAsTextAsync("Connected!"));
// Process messages in batches
client.MessageReceived
.Buffer(TimeSpan.FromSeconds(1))
.Where(batch => batch.Count > 0)
.Subscribe(batch => Console.WriteLine($"Processed {batch.Count} messages"));
๐ API Reference
ReactiveWebSocketClient
| Property | Type | Description |
|---|---|---|
Url |
Uri |
WebSocket server URL |
IsStarted |
bool |
Client started state |
IsRunning |
bool |
Client running state |
IsReconnectionEnabled |
bool |
Enable auto-reconnect |
MessageReceived |
Observable<ReceivedMessage> |
Message stream |
ConnectionHappened |
Observable<Connected> |
Connection stream |
DisconnectionHappened |
Observable<Disconnected> |
Disconnection stream |
Key Methods:
Task StartAsync()- Start the clientTask StopAsync(status, description)- Stop gracefullyTask ReconnectAsync()- Manual reconnectTask SendAsTextAsync(message)- Send text (queued)Task SendAsBinaryAsync(data)- Send binary (queued)ValueTask DisposeAsync()- Clean up resources
ReactiveWebSocketServer
| Property | Type | Description |
|---|---|---|
IsRunning |
bool |
Server running state |
ClientCount |
int |
Number of connected clients |
ConnectedClients |
IReadOnlyDictionary<Guid, Metadata> |
Client metadata |
ClientConnected |
Observable<ClientConnected> |
Client connect stream |
ClientDisconnected |
Observable<ClientDisconnected> |
Client disconnect stream |
Messages |
Observable<ServerReceivedMessage> |
Server message stream |
Key Methods:
Task StartAsync()- Start the serverTask<bool> StopAsync(status, description)- Stop serverTask<bool> SendAsTextAsync(clientId, message)- Send to clientValueTask DisposeAsync()- Clean up resources
๐ก Inspiration
This library is inspired by and builds upon the excellent work of:
Websocket.Client by Marfusios
WebSocket.Rx takes inspiration from Websocket.Client's elegant reactive approach to WebSocket communication. Key influences include:
- Reactive-First Design - Using observables for all events and messages
- Automatic Reconnection - Built-in reconnection logic for robust connections
- Clean API - Intuitive and easy-to-use interface
What's Different?
While honoring the spirit of Websocket.Client, WebSocket.Rx offers:
- โ R3 Integration - Built on the modern R3 reactive library (successor to Rx.NET)
- โ Server Support - Full-featured WebSocket server implementation
- โ Modern .NET - Built for .NET 10+ with latest performance optimizations
- โ IAsyncDisposable - Proper async resource cleanup
- โ
Channel-Based Queuing - High-performance message queue using
System.Threading.Channels - โ
Enhanced Memory Management - Uses
ArrayPool<byte>andRecyclableMemoryStream
Both libraries share the same core philosophy: WebSocket communication should be simple, reactive, and reliable.
๐ค Contributing
Contributions are welcome! This library grows with the community's needs.
How to Contribute
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Write tests for your changes
- Ensure all tests pass:
dotnet test - Submit a Pull Request
Guidelines
- โ Follow existing code style and conventions
- โ Include unit tests for new features
- โ Update documentation for API changes
- โ Keep PRs focused and atomic
- โ Write meaningful commit messages
Development Setup
git clone https://github.com/st0o0/WebSocket.Rx.git
cd WebSocket.Rx
dotnet restore
dotnet build
dotnet test
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
<div align="center">
Built with โค๏ธ for the .NET community
Report Bug ยท Request Feature ยท Documentation
</div>
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.IO.RecyclableMemoryStream (>= 3.0.1)
- R3 (>= 1.3.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
🎯 WebSocket.Rx v0.1.6
📦 NuGet: https://nuget.org/packages/WebSocket.Rx
🔗 Release: https://github.com/st0o0/WebSocket.Rx/releases/v0.1.6
See release page for full changelog and details!
Built with โค๏ธ