Skip to main content

Stream Real-Time Blockchain Data

When you need to react to blockchain events as they happen — new blocks mined, tokens transferred, DEX swaps executed — you subscribe to a stream rather than polling in a loop. Nethereum provides two approaches:

ApproachTransportBest for
WebSocket subscriptionsPersistent WSS connectionSub-second latency, no missed events
Rx pollingStandard HTTPHTTP-only providers, simpler setup

Both use Reactive Extensions (Rx) for a uniform IObservable<T> API — you compose streams with .Where(), .Select(), .Buffer(), and other Rx operators.

Prerequisites

dotnet add package Nethereum.RPC.Reactive
dotnet add package Nethereum.JsonRpc.WebSocketStreamingClient

You need a WebSocket endpoint (starts with wss://). Most providers offer one — Infura, Alchemy, Ankr, or a local node with --ws enabled. For HTTP-only fallback, see Polling-Based Streams below.


WebSocket: Connect and Subscribe

Create the Client

using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;

var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_KEY");
await client.StartAsync();

The client maintains a single persistent connection. All subscriptions share it — you don't create a new connection per subscription.

New Block Headers

The most common subscription. Fires every time a new block is mined (roughly every 12 seconds on mainnet):

var subscription = new EthNewBlockHeadersObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(block =>
{
Console.WriteLine($"Block {block.Number}{block.Timestamp}");
Console.WriteLine($" Gas used: {block.GasUsed}");
});

await subscription.SubscribeAsync();

Block headers include number, timestamp, gas used/limit, base fee, and miner — but not the full transaction list. If you need transactions, fetch the full block by number when a header arrives.

Pending Transactions

Watch the mempool for transactions before they're mined. Returns transaction hashes only (not full transaction objects):

var subscription = new EthNewPendingTransactionObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(txHash =>
{
Console.WriteLine($"Pending tx: {txHash}");
});

await subscription.SubscribeAsync();
warning

Pending transaction streams can be extremely high-volume on mainnet (thousands per second). Always filter or buffer to avoid overwhelming your application.

Event Logs

Subscribe to specific contract events by address and topic filters. This is the foundation for monitoring token transfers, DEX activity, governance votes, etc.:

using Nethereum.RPC.Eth.DTOs;

// Keccak256 of "Transfer(address,address,uint256)" — the ERC-20 Transfer event signature.
// For typed filter construction, see the ERC-20 example below or the [Events & Logs](../smart-contracts/guide-events) guide.
var transferTopic = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";

var filter = new NewFilterInput
{
Address = new[] { "0x6B175474E89094C44Da98b954EedeAC495271d0F" },
Topics = new[] { new[] { transferTopic } }
};

var subscription = new EthLogsObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(log =>
{
Console.WriteLine($"Log from {log.Address} in block {log.BlockNumber}");
});

await subscription.SubscribeAsync(filter);

Real-World Example: ERC-20 Transfer Streaming

Rather than building raw topic filters, use the typed event DTO to create the filter automatically:

using Nethereum.Contracts;
using Nethereum.Contracts.Standards.ERC20.ContractDefinition;

// Create a typed filter for Transfer events on the DAI contract
var filterTransfers = Event<TransferEventDTO>.GetEventABI()
.CreateFilterInput("0x6B175474E89094C44Da98b954EedeAC495271d0F");

var subscription = new EthLogsObservableSubscription(client);

subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(log =>
{
var decoded = Event<TransferEventDTO>.DecodeEvent(log);
if (decoded != null)
{
Console.WriteLine($"Transfer: {decoded.Event.From}{decoded.Event.To}");
// See [Unit Conversion](guide-unit-conversion) for FromWei / ToWei helpers
Console.WriteLine($" Value: {Web3.Convert.FromWei(decoded.Event.Value)}");
}
});

await subscription.SubscribeAsync(filterTransfers);

The CreateFilterInput method builds the correct topic hash from the event signature. DecodeEvent returns null for logs that don't match the type — safe to use on multi-event streams.


Real-World Example: DEX Swap Monitoring

Monitor Uniswap V2 pair swaps with price calculation:

[Event("Swap")]
public class SwapEventDTO : IEventDTO
{
[Parameter("address", "sender", 1, true)] public string Sender { get; set; }
[Parameter("uint256", "amount0In", 2)] public BigInteger Amount0In { get; set; }
[Parameter("uint256", "amount1In", 3)] public BigInteger Amount1In { get; set; }
[Parameter("uint256", "amount0Out", 4)] public BigInteger Amount0Out { get; set; }
[Parameter("uint256", "amount1Out", 5)] public BigInteger Amount1Out { get; set; }
[Parameter("address", "to", 6, true)] public string To { get; set; }
}

var pairAddress = "0xa478c2975ab1ea89e8196811f51a7b7ade33eb11"; // DAI-ETH
var filter = Event<SwapEventDTO>.GetEventABI().CreateFilterInput(pairAddress);

var subscription = new EthLogsObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(log =>
{
var swap = log.DecodeEvent<SwapEventDTO>();
if (swap != null)
{
var amount0Out = UnitConversion.Convert.FromWei(swap.Event.Amount0Out);
var amount1In = UnitConversion.Convert.FromWei(swap.Event.Amount1In);

if (swap.Event.Amount0In == 0 && swap.Event.Amount1Out == 0 && amount1In > 0)
{
var price = amount0Out / amount1In;
Console.WriteLine($"Sell ETH — Price: {price:F4} DAI/ETH");
}
}
});

await subscription.SubscribeAsync(filter);

Connection Management

WebSocket connections drop — providers timeout idle connections, networks hiccup, and servers restart. Production code needs reconnection logic.

Error Handling

client.Error += async (sender, ex) =>
{
Console.WriteLine($"WebSocket error: {ex.Message}");
// Stop and recreate — subscriptions need to be re-established
await ((StreamingWebSocketClient)sender).StopAsync();
// Implement your own reconnection logic here:
// 1. Create a new StreamingWebSocketClient and call StartAsync()
// 2. Re-create your subscriptions (they are not transferable to a new client)
// 3. Consider exponential back-off to avoid hammering the provider
};

Keep-Alive Pinging

Most hosted providers (Infura, Alchemy) close idle WebSocket connections after 1-2 minutes. Send periodic RPC calls to keep the connection alive:

using Nethereum.RPC.Reactive.Eth;

_ = Task.Run(async () =>
{
while (true)
{
var handler = new EthBlockNumberObservableHandler(client);
handler.GetResponseAsObservable()
.Subscribe(x => Console.WriteLine($"Keepalive — block {x.Value}"));
await handler.SendRequestAsync();
await Task.Delay(30000); // ping every 30 seconds
}
});

Enriching Pending Transactions

Pending transaction subscriptions only give you hashes. To get the full transaction, fetch it on demand:

using Nethereum.RPC.Reactive.Eth.Transactions;

var pendingSub = new EthNewPendingTransactionObservableSubscription(client);

pendingSub.GetSubscriptionDataResponsesAsObservable()
.Subscribe(txHash =>
{
var txByHash = new EthGetTransactionByHashObservableHandler(client);
txByHash.GetResponseAsObservable().Subscribe(tx =>
{
if (tx != null)
Console.WriteLine($"Pending: {tx.From}{tx.To} ({Web3.Convert.FromWei(tx.Value)} ETH)");
});
// .Wait() is used here because Subscribe takes a synchronous Action.
// Prefer `await` in async code — .Wait() blocks the thread and can
// cause deadlocks in UI applications (WPF, MAUI, Blazor Server).
txByHash.SendRequestAsync(txHash).Wait();
});

await pendingSub.SubscribeAsync();

Polling-Based Streams (HTTP)

If your node only supports HTTP, use polling-based Rx streams. Same IObservable<T> API, but backed by periodic eth_getFilterChanges calls:

using Nethereum.RPC.Reactive.Polling;
using Nethereum.Web3;

var web3 = new Web3("https://mainnet.infura.io/v3/YOUR_KEY");

Poll for New Blocks

var blockStream = web3.Eth.Blocks
.GetBlockWithTransactionsByNumber
.CreateObservable(intervalMs: 2000);

blockStream.Subscribe(block =>
{
Console.WriteLine($"Block {block.Number} with {block.Transactions.Length} txs");
});

The polling interval controls the trade-off between latency and request volume. 2 seconds is reasonable for most use cases.


Rx Operators for Stream Processing

Since subscriptions return IObservable<T>, you get the full power of Rx LINQ operators:

using System.Reactive.Linq;

// Only blocks with gas usage above 15M (near-full blocks)
subscription.GetSubscriptionDataResponsesAsObservable()
.Where(block => block.GasUsed > 15_000_000)
.Select(block => new { block.Number, block.GasUsed })
.Subscribe(b => Console.WriteLine($"High-gas block: {b.Number}"));

// Buffer events into 10-second windows for batch processing
subscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(10))
.Where(batch => batch.Count > 0)
.Subscribe(batch =>
{
// Process the batch — e.g., write to database, update UI, or aggregate metrics
Console.WriteLine($"Received {batch.Count} events in window");
});

Choosing Between WebSocket and Polling

FeatureWebSocketPolling
LatencySub-secondPolling interval
Node requirementWSS endpointHTTP endpoint
Missed eventsNone (server pushes)Possible between polls
Connection overheadSingle persistentNew request per poll
Provider supportMost (Infura, Alchemy, Ankr)All

Use WebSocket when you need instant notifications and your provider supports it. Use polling as a fallback for HTTP-only environments or when WebSocket stability is a concern.

Next Steps

Package References