Skip to main content

Nethereum.JsonRpc.WebSocketClient

NuGet: Nethereum.JsonRpc.WebSocketClient | Source: src/Nethereum.JsonRpc.WebSocketClient/

Nethereum.JsonRpc.WebSocketClient

WebSocket JSON-RPC client with support for real-time subscriptions and event streaming.

Overview

Nethereum.JsonRpc.WebSocketClient provides WebSocket transport implementations for Ethereum node communication, supporting both standard request/response patterns and real-time event subscriptions. WebSockets enable push-based notifications from the node for new blocks, pending transactions, and contract events without polling.

Key Features:

  • WebSocket transport (wss:// and ws://)
  • Real-time subscriptions (newHeads, logs, pendingTransactions, syncing)
  • Event streaming with automatic message routing
  • Request/response and streaming modes
  • Custom request headers support
  • Connection management and automatic reconnection
  • Thread-safe subscription handling
  • Production-tested reliability

Use Cases:

  • Real-time block monitoring
  • Contract event streaming
  • Pending transaction monitoring
  • Mempool watching (MEV, arbitrage)
  • Live dashboard updates
  • Blockchain indexers
  • Wallet notifications

Installation

dotnet add package Nethereum.JsonRpc.WebSocketClient

Requirements:

  • Ethereum node with WebSocket support (Geth, Erigon, Infura, Alchemy)
  • .NET Standard 2.0+ or .NET Core 2.1+

Dependencies

Nethereum:

  • Nethereum.JsonRpc.Client - Core RPC abstraction (which provides JSON serialization and logging support)

External:

  • System.Net.WebSockets.Client (v4.3.2) - WebSocket protocol implementation

Quick Start

Basic WebSocket Client (Request/Response)

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;

// Connect to WebSocket endpoint
var client = new WebSocketClient("ws://localhost:8546");

// Use like any other RPC client
var ethBlockNumber = new EthBlockNumber(client);
var blockNumber = await ethBlockNumber.SendRequestAsync();

Console.WriteLine($"Current block: {blockNumber.Value}");

// Always dispose when done
client.Dispose();

Streaming Client (Subscriptions)

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Reactive.Eth.Subscriptions;

// Create streaming client
var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID");

// Create subscription for new blocks
var subscription = new EthNewBlockHeadersSubscription(client);

// Handle new block events
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
Console.WriteLine($"New block: {block.Number.Value}");
Console.WriteLine($"Hash: {block.BlockHash}");
Console.WriteLine($"Miner: {block.Miner}");
});

// Start streaming
await client.StartAsync();

// Subscribe
await subscription.SubscribeAsync();

// Keep running
Console.WriteLine("Monitoring new blocks. Press Enter to exit.");
Console.ReadLine();

// Cleanup
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Usage Examples

Example 1: Basic WebSocket Connection

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;

// Local Geth/Erigon
var client = new WebSocketClient("ws://localhost:8546");

// Infura
var infuraClient = new WebSocketClient(
"wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID"
);

// Alchemy
var alchemyClient = new WebSocketClient(
"wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY"
);

// Use with RPC services
var ethChainId = new EthChainId(client);
var chainId = await ethChainId.SendRequestAsync();

var ethGasPrice = new EthGasPrice(client);
var gasPrice = await ethGasPrice.SendRequestAsync();

Console.WriteLine($"Chain ID: {chainId.Value}");
Console.WriteLine($"Gas Price: {gasPrice.Value} wei");

// Cleanup
client.Dispose();

Example 2: Real-Time Block Monitoring

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;

var client = new StreamingWebSocketClient("ws://localhost:8546");

// Create new block headers subscription
var subscription = new EthNewBlockHeadersSubscription(client);

// Subscribe to new blocks
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(block =>
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] New Block");
Console.WriteLine($" Number: {block.Number.Value}");
Console.WriteLine($" Hash: {block.BlockHash}");
Console.WriteLine($" Parent: {block.ParentHash}");
Console.WriteLine($" Timestamp: {DateTimeOffset.FromUnixTimeSeconds((long)block.Timestamp.Value)}");
Console.WriteLine($" Difficulty: {block.Difficulty.Value}");
Console.WriteLine($" Gas Used: {block.GasUsed.Value:N0}");
Console.WriteLine($" Transactions: {block.TransactionCount()}");
Console.WriteLine();
},
error => Console.WriteLine($"Error: {error.Message}"));

// Start client and subscribe
await client.StartAsync();
await subscription.SubscribeAsync();

Console.WriteLine("Monitoring blocks. Press Enter to stop.");
Console.ReadLine();

// Cleanup
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Example 3: Contract Event Streaming

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Hex.HexTypes;

var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID");

// Create logs subscription for USDC Transfer events
var transferEventSignature = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
var usdcAddress = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48";

var filterLogs = new NewFilterInput
{
Address = new[] { usdcAddress },
Topics = new[] { transferEventSignature }
};

var subscription = new EthLogsSubscription(client);
await subscription.SubscribeAsync(filterLogs);

// Handle Transfer events
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
{
var from = "0x" + log.Topics[1].ToString().Substring(26);
var to = "0x" + log.Topics[2].ToString().Substring(26);
var amount = new HexBigInteger(log.Data).Value;

Console.WriteLine($"USDC Transfer:");
Console.WriteLine($" From: {from}");
Console.WriteLine($" To: {to}");
Console.WriteLine($" Amount: {amount / 1000000m:N2} USDC"); // USDC has 6 decimals
Console.WriteLine($" Tx: {log.TransactionHash}");
Console.WriteLine();
});

await client.StartAsync();

Console.WriteLine("Monitoring USDC transfers. Press Enter to stop.");
Console.ReadLine();

await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Example 4: Pending Transaction Monitoring (Mempool)

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.Web3;

var client = new StreamingWebSocketClient("ws://localhost:8546");

// Create pending transactions subscription
var subscription = new EthNewPendingTransactionSubscription(client);

// Handle new pending transactions
subscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(1)) // Batch for 1 second
.Subscribe(async txHashes =>
{
if (txHashes.Count > 0)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {txHashes.Count} new pending transactions");

// Fetch details for first transaction
var web3 = new Web3(client);
var txDetails = await web3.Eth.Transactions.GetTransactionByHash.SendRequestAsync(txHashes[0]);

if (txDetails != null)
{
Console.WriteLine($" First tx hash: {txDetails.TransactionHash}");
Console.WriteLine($" From: {txDetails.From}");
Console.WriteLine($" To: {txDetails.To}");
Console.WriteLine($" Value: {Web3.Convert.FromWei(txDetails.Value)} ETH");
Console.WriteLine($" Gas Price: {Web3.Convert.FromWei(txDetails.GasPrice, Web3.Convert.UnitConversion.Gwei)} Gwei");
}
}
});

await client.StartAsync();
await subscription.SubscribeAsync();

Console.WriteLine("Monitoring mempool. Press Enter to stop.");
Console.ReadLine();

await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Example 5: Multiple Subscriptions

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;

var client = new StreamingWebSocketClient("ws://localhost:8546");

// Create multiple subscriptions
var blockSubscription = new EthNewBlockHeadersSubscription(client);
var pendingTxSubscription = new EthNewPendingTransactionSubscription(client);
var syncSubscription = new EthSyncingSubscription(client);

// Handle new blocks
blockSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
Console.WriteLine($"[BLOCK] #{block.Number.Value}");
});

// Handle pending transactions (with throttling)
pendingTxSubscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(5))
.Subscribe(txHashes =>
{
Console.WriteLine($"[MEMPOOL] {txHashes.Count} pending transactions in last 5s");
});

// Handle sync status
syncSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(syncStatus =>
{
if (syncStatus.IsSyncing)
{
Console.WriteLine($"[SYNC] Current: {syncStatus.CurrentBlock}, Highest: {syncStatus.HighestBlock}");
}
else
{
Console.WriteLine($"[SYNC] Node is synced");
}
});

// Start client and all subscriptions
await client.StartAsync();
await blockSubscription.SubscribeAsync();
await pendingTxSubscription.SubscribeAsync();
await syncSubscription.SubscribeAsync();

Console.WriteLine("Monitoring multiple streams. Press Enter to stop.");
Console.ReadLine();

// Cleanup all subscriptions
await blockSubscription.UnsubscribeAsync();
await pendingTxSubscription.UnsubscribeAsync();
await syncSubscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Example 6: Custom Request Headers (Authentication)

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;

var client = new WebSocketClient("wss://api.example.com/ws");

// Add custom headers (e.g., API key)
client.RequestHeaders.Add("X-API-Key", "your-api-key-here");
client.RequestHeaders.Add("Authorization", "Bearer your-token");

var ethBlockNumber = new EthBlockNumber(client);
var blockNumber = await ethBlockNumber.SendRequestAsync();

Console.WriteLine($"Block (authenticated): {blockNumber.Value}");

client.Dispose();

Example 7: Production Reconnection Pattern (from Nethereum.WebSocketsStreamingTest)

CRITICAL for production: Automatic reconnection when WebSocket connection drops:

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;

public class ProductionBlockMonitor
{
private readonly string url;
private StreamingWebSocketClient client;

public ProductionBlockMonitor(string url)
{
this.url = url;
}

public async Task SubscribeAndRunAsync()
{
if (client == null)
{
client = new StreamingWebSocketClient(url);

// Production pattern: auto-reconnect on error
client.Error += Client_Error;
}

var blockHeaderSubscription = new EthNewBlockHeadersObservableSubscription(client);

// Get subscription ID when subscribed
blockHeaderSubscription.GetSubscribeResponseAsObservable().Subscribe(subscriptionId =>
Console.WriteLine($"Block Header subscription Id: {subscriptionId}"));

// Process new blocks
blockHeaderSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(
block => Console.WriteLine($"New Block: {block.BlockHash}"),
exception => Console.WriteLine($"BlockHeaderSubscription error info: {exception.Message}")
);

// Handle unsubscribe confirmation
blockHeaderSubscription.GetUnsubscribeResponseAsObservable().Subscribe(response =>
Console.WriteLine($"Block Header unsubscribe result: {response}"));

await client.StartAsync();
await blockHeaderSubscription.SubscribeAsync();

Console.WriteLine("Monitoring blocks with auto-reconnect. Press Enter to stop.");
Console.ReadLine();

await blockHeaderSubscription.UnsubscribeAsync();
}

// Production reconnection handler
private async void Client_Error(object sender, Exception ex)
{
Console.WriteLine($"Client Error, restarting... ({ex.Message})");

// Stop the failed connection
await ((StreamingWebSocketClient)sender).StopAsync();

// Restart everything
await SubscribeAndRunAsync();
}
}

// Usage
var monitor = new ProductionBlockMonitor("ws://localhost:8546");
await monitor.SubscribeAndRunAsync();

Why this pattern works:

  • The Client_Error event catches all WebSocket failures
  • Automatically stops the failed connection
  • Recursively restarts the entire subscription flow
  • Ensures continuous monitoring even through network disruptions

Example 8: High-Frequency Event Processing with Reactive Extensions

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;

var client = new StreamingWebSocketClient("ws://localhost:8546");
var subscription = new EthNewBlockHeadersSubscription(client);

// Advanced reactive processing
subscription.GetSubscriptionDataResponsesAsObservable()
.Window(TimeSpan.FromMinutes(1)) // 1-minute windows
.SelectMany(window => window
.Aggregate(new
{
Count = 0,
TotalGasUsed = BigInteger.Zero,
TotalTransactions = 0
}, (acc, block) => new
{
Count = acc.Count + 1,
TotalGasUsed = acc.TotalGasUsed + block.GasUsed.Value,
TotalTransactions = acc.TotalTransactions + (int)block.TransactionCount()
}))
.Subscribe(stats =>
{
Console.WriteLine($"=== 1-Minute Stats ===");
Console.WriteLine($"Blocks: {stats.Count}");
Console.WriteLine($"Avg Gas/Block: {stats.TotalGasUsed / stats.Count:N0}");
Console.WriteLine($"Total Transactions: {stats.TotalTransactions}");
Console.WriteLine();
});

await client.StartAsync();
await subscription.SubscribeAsync();

Console.WriteLine("Collecting statistics. Press Enter to stop.");
Console.ReadLine();

await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();

Example 9: Using with Nethereum.Web3

using Nethereum.Web3;
using Nethereum.JsonRpc.WebSocketClient;

// Create WebSocket client
var wsClient = new WebSocketClient("ws://localhost:8546");

// Use with Web3
var web3 = new Web3(wsClient);

// Standard Web3 operations over WebSocket
var balance = await web3.Eth.GetBalance.SendRequestAsync(
"0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb"
);

var blockNumber = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync();

Console.WriteLine($"Balance: {Web3.Convert.FromWei(balance)} ETH");
Console.WriteLine($"Block: {blockNumber.Value}");

// Cleanup
wsClient.Dispose();

API Reference

WebSocketClient (Basic)

public class WebSocketClient : ClientBase, IDisposable, IClientRequestHeaderSupport
{
public WebSocketClient(string path,
JsonSerializerSettings jsonSerializerSettings = null,
ILogger log = null)

public Dictionary<string, string> RequestHeaders { get; set; }
public TimeSpan ConnectionTimeout { get; set; }

public Task StopAsync()
public Task StopAsync(WebSocketCloseStatus webSocketCloseStatus, string status, CancellationToken timeOutToken)
}

StreamingWebSocketClient (Subscriptions)

public class StreamingWebSocketClient : IStreamingClient, IDisposable, IClientRequestHeaderSupport
{
public StreamingWebSocketClient(string path,
JsonSerializerSettings jsonSerializerSettings = null,
ILogger log = null)

public Dictionary<string, string> RequestHeaders { get; set; }
public static TimeSpan ConnectionTimeout { get; set; }
public WebSocketState WebSocketState { get; }
public bool IsStarted { get; }

public event WebSocketStreamingErrorEventHandler Error;

public Task StartAsync()
public Task StopAsync()
public bool AddSubscription(string subscriptionId, IRpcStreamingResponseHandler handler)
public bool RemoveSubscription(string subscriptionId)
}

Available Subscriptions

SubscriptionDescription
EthNewBlockHeadersSubscriptionNew block headers
EthNewPendingTransactionSubscriptionPending transactions (mempool)
EthLogsSubscriptionContract event logs
EthSyncingSubscriptionNode sync status

Important Notes

WebSocket Endpoints

Common WebSocket URLs:

Node/ProviderWebSocket URL
Geth (local)ws://localhost:8546
Erigon (local)ws://localhost:8545
Infurawss://mainnet.infura.io/ws/v3/PROJECT_ID
Alchemywss://eth-mainnet.g.alchemy.com/v2/API_KEY
QuickNodewss://your-endpoint.quiknode.pro/TOKEN/

Starting Geth/Erigon with WebSocket

Geth:

geth --ws --ws.addr 0.0.0.0 --ws.port 8546 --ws.api eth,net,web3

Erigon:

erigon --ws --ws.port 8545

Performance Considerations

SubscriptionEvent RateNotes
newHeads~12s (mainnet)One per block
pendingTransactions100-1000/sVery high volume
logs (filtered)VariableDepends on filter
syncingRareOnly during sync

Tips:

  • Use Buffer() or throttling for high-volume subscriptions
  • Filter logs as narrowly as possible (specific addresses/topics)
  • Consider multiple clients for heavy workloads

Thread Safety

  • StreamingWebSocketClient is thread-safe for subscriptions
  • Multiple subscriptions can run concurrently
  • Each subscription has isolated message handling

Connection Management

  • WebSocket connections can drop - implement error handling
  • Use the Error event to detect connection issues
  • Implement reconnection logic for production apps
  • Always call Dispose() to properly close connections

Subscription Limits

Some providers limit concurrent subscriptions:

  • Infura: Up to 5 subscriptions per connection
  • Alchemy: Up to 10 subscriptions per connection
  • Local nodes: Usually unlimited

Alternative Transports

  • Nethereum.JsonRpc.RpcClient - HTTP/HTTPS transport
  • Nethereum.JsonRpc.IpcClient - IPC transport
  • Nethereum.JsonRpc.SystemTextJsonRpcClient - HTTP with System.Text.Json

Core Dependencies

  • Nethereum.JsonRpc.Client - Abstraction layer

Higher-Level APIs

  • Nethereum.Web3 - Complete Web3 API
  • Nethereum.RPC.Reactive - Reactive Extensions for subscriptions

Additional Resources