Skip to main content

Nethereum.JsonRpc.WebSocketStreamingClient

NuGet: Nethereum.JsonRpc.WebSocketStreamingClient | Source: src/Nethereum.JsonRpc.WebSocketStreamingClient/

Nethereum.JsonRpc.WebSocketStreamingClient

Reactive Extensions (Rx.NET) wrapper for Ethereum WebSocket streaming providing IObservable-based subscriptions and polling.

Overview

Nethereum.JsonRpc.WebSocketStreamingClient provides IObservable wrappers around WebSocket streaming functionality, enabling reactive programming patterns for Ethereum real-time data streams. This package combines Nethereum's WebSocket client with System.Reactive (Rx.NET) to provide a powerful, composable API for handling asynchronous event streams from Ethereum nodes.

What is Rx.NET?

Reactive Extensions (Rx.NET) is a library for composing asynchronous and event-based programs using observable sequences. It treats asynchronous data streams as first-class citizens, allowing you to query, filter, transform, and combine them using LINQ-style operators.

Why Use Observables for Ethereum Streaming?

Traditional event-based patterns can become complex when you need to:

  • Filter events based on criteria
  • Combine multiple event streams
  • Throttle or debounce rapid events
  • Apply backpressure to handle event overflow
  • Implement timeout and retry logic
  • Transform and aggregate events over time windows

Rx.NET solves these problems with a composable, declarative API.

Key Features:

  • IObservable wrappers for eth_subscribe subscriptions (newHeads, logs, newPendingTransactions)
  • IObservable wrappers for polling-based RPC requests (eth_blockNumber, eth_getBalance)
  • Automatic subscription lifecycle management (subscribe, unsubscribe)
  • Error handling through Rx error channels
  • Full System.Reactive operator support (Where, Select, Buffer, Throttle, etc.)
  • Type-safe subscription responses (Block, FilterLog, string)

Installation

dotnet add package Nethereum.JsonRpc.WebSocketStreamingClient

Or via Package Manager Console:

Install-Package Nethereum.JsonRpc.WebSocketStreamingClient

Dependencies

Package References:

  • System.Reactive 4.1.2

Project References:

  • Nethereum.Hex
  • Nethereum.JsonRpc.Client
  • Nethereum.RPC

Target Framework:

  • netstandard2.0

Architecture

┌──────────────────────────────────────────────────────┐
│ Your Application │
│ (LINQ-style Rx operators) │
└──────────────────────────────────────────────────────┘

│ subscribes to

┌──────────────────────────────────────────────────────┐
│ Observable Subscription Handlers │
│ - EthLogsObservableSubscription │
│ - EthNewBlockHeadersObservableSubscription │
│ - EthNewPendingTransactionObservableSubscription │
└──────────────────────────────────────────────────────┘

│ wraps

┌──────────────────────────────────────────────────────┐
│ RpcStreamingSubscriptionObservableHandler<T> │
│ - Subject<string> SubscribeResponseSubject │
│ - Subject<T> SubscriptionDataResponseSubject │
│ - Subject<bool> UnsubscribeResponseSubject │
└──────────────────────────────────────────────────────┘

│ uses

┌──────────────────────────────────────────────────────┐
│ IStreamingClient (WebSocket) │
│ - SendRequestAsync / Subscribe / Unsubscribe │
└──────────────────────────────────────────────────────┘


Ethereum Node WebSocket Endpoint
(ws://localhost:8546)

Key Concepts

Observable Subscriptions vs Polling

Subscription-Based (eth_subscribe):

True server push - node sends events as they occur:

  • EthLogsObservableSubscription - Contract event logs matching filter criteria
  • EthNewBlockHeadersObservableSubscription - New block headers
  • EthNewPendingTransactionObservableSubscription - New pending transactions

Polling-Based (Repeated RPC):

Client-initiated requests at intervals:

  • EthBlockNumberObservableHandler - Latest block number
  • EthGetBalanceObservableHandler - Account balance

Observable Subjects

Each subscription handler exposes three observable subjects:

IObservable<string> GetSubscribeResponseAsObservable()
// Emits subscription ID when eth_subscribe succeeds
// Completes immediately after emitting one value

IObservable<TResponse> GetSubscriptionDataResponsesAsObservable()
// Emits stream of subscription data (blocks, logs, etc.)
// Continues emitting until unsubscribe

IObservable<bool> GetUnsubscribeResponseAsObservable()
// Emits true when eth_unsubscribe succeeds
// Completes after emitting, also completes data stream

Error Handling

Rx.NET propagates errors through the observable pipeline:

subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(
onNext: block => Console.WriteLine($"Block: {block.Number}"),
onError: ex => Console.WriteLine($"Error: {ex.Message}"),
onCompleted: () => Console.WriteLine("Subscription completed")
);

Errors automatically trigger OnError and complete the observable sequence.

Quick Start

1. Create WebSocket Client

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

2. Subscribe to New Block Headers

var blockHeaderSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

// Subscribe to the observable stream
var subscription = blockHeaderSubscription
.GetSubscriptionDataResponsesAsObservable()
.Subscribe(block =>
{
Console.WriteLine($"New Block #{block.Number.Value}");
Console.WriteLine($"Hash: {block.BlockHash}");
Console.WriteLine($"Timestamp: {block.Timestamp.Value}");
});

// Start the subscription
await blockHeaderSubscription.SubscribeAsync();

// Let it run...
await Task.Delay(60000);

// Clean up
await blockHeaderSubscription.UnsubscribeAsync();
subscription.Dispose();

3. Filter and Transform Events

var logsSubscription = new EthLogsObservableSubscription(wsClient);

// Subscribe with Rx operators
var subscription = logsSubscription
.GetSubscriptionDataResponsesAsObservable()
.Where(log => log.Topics.Length > 0) // Filter: only logs with topics
.Select(log => new
{
ContractAddress = log.Address,
EventSignature = log.Topics[0],
BlockNumber = log.BlockNumber.Value
})
.Subscribe(evt =>
{
Console.WriteLine($"Event from {evt.ContractAddress}");
Console.WriteLine($"Signature: {evt.EventSignature}");
Console.WriteLine($"Block: {evt.BlockNumber}");
});

// Subscribe to specific contract events
var filter = new NewFilterInput
{
Address = new[] { "0xYourContractAddress" },
Topics = new[] { "0xYourEventSignature" }
};

await logsSubscription.SubscribeAsync(filter);

Usage Examples

Example 1: Monitor New Blocks with Throttling

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

// Throttle to max 1 block per 2 seconds (prevents flooding)
var subscription = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Throttle(TimeSpan.FromSeconds(2))
.Subscribe(block =>
{
Console.WriteLine($"Block #{block.Number.Value} at {DateTime.Now}");
Console.WriteLine($"Transactions: {block.TransactionCount()}");
Console.WriteLine($"Gas Used: {block.GasUsed.Value}");
});

await blockSubscription.SubscribeAsync();

// Run for 5 minutes
await Task.Delay(TimeSpan.FromMinutes(5));

await blockSubscription.UnsubscribeAsync();
subscription.Dispose();
await wsClient.StopAsync();

Example 2: Contract Event Logs with Buffering

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var logsSubscription = new EthLogsObservableSubscription(wsClient);

// Buffer logs into batches of 10, then process as group
var subscription = logsSubscription
.GetSubscriptionDataResponsesAsObservable()
.Buffer(10)
.Subscribe(logBatch =>
{
Console.WriteLine($"Processing batch of {logBatch.Count} logs:");
foreach (var log in logBatch)
{
Console.WriteLine($" - {log.Address} in block {log.BlockNumber.Value}");
}
});

// Subscribe to ERC-20 Transfer events
var transferSignature = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
var filter = new NewFilterInput
{
Topics = new[] { transferSignature }
};

await logsSubscription.SubscribeAsync(filter);

// Run for 10 minutes
await Task.Delay(TimeSpan.FromMinutes(10));

await logsSubscription.UnsubscribeAsync();
subscription.Dispose();
await wsClient.StopAsync();

Example 3: Real-World ERC-20 Transfer Monitoring

Based on Nethereum console test example for monitoring DAI transfers:

using Nethereum.Contracts;
using Nethereum.Contracts.Standards.ERC20.ContractDefinition;
using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR-PROJECT-ID");
await wsClient.StartAsync();

// DAI contract address
var daiAddress = "0x6B175474E89094C44Da98b954EedeAC495271d0F";

// Create filter for Transfer events from DAI contract
var filterTransfers = Event<TransferEventDTO>.GetEventABI().CreateFilterInput(daiAddress);

var ethLogsTokenTransfer = new EthLogsObservableSubscription(wsClient);

ethLogsTokenTransfer.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
{
try
{
// Decode the Transfer event
var decoded = Event<TransferEventDTO>.DecodeEvent(log);
if (decoded != null)
{
Console.WriteLine($"DAI Transfer:");
Console.WriteLine($" From: {decoded.Event.From}");
Console.WriteLine($" To: {decoded.Event.To}");
Console.WriteLine($" Value: {Web3.Convert.FromWei(decoded.Event.Value)} DAI");
Console.WriteLine($" Block: {log.BlockNumber.Value}");
Console.WriteLine($" Tx: {log.TransactionHash}");
}
else
{
Console.WriteLine("Found non-standard transfer log");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error decoding log: {ex.Message}");
}
},
onError: exception =>
{
Console.WriteLine($"Logs subscription error: {exception.Message}");
});

await ethLogsTokenTransfer.SubscribeAsync(filterTransfers);

// Monitor indefinitely
Console.WriteLine("Monitoring DAI transfers. Press Ctrl+C to exit.");
await Task.Delay(Timeout.Infinite);

Example 4: Combining Multiple Streams

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);
var txSubscription = new EthNewPendingTransactionObservableSubscription(wsClient);

// Combine streams: emit tuple of (block count, tx count) every 10 seconds
var blockStream = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Select(_ => 1);

var txStream = txSubscription
.GetSubscriptionDataResponsesAsObservable()
.Select(_ => 1);

var combined = Observable.CombineLatest(
blockStream.Buffer(TimeSpan.FromSeconds(10)).Select(buf => buf.Count),
txStream.Buffer(TimeSpan.FromSeconds(10)).Select(buf => buf.Count),
(blockCount, txCount) => new { BlockCount = blockCount, TxCount = txCount }
);

var subscription = combined.Subscribe(stats =>
{
Console.WriteLine($"Last 10s: {stats.BlockCount} blocks, {stats.TxCount} pending txs");
});

await blockSubscription.SubscribeAsync();
await txSubscription.SubscribeAsync();

// Monitor for 5 minutes
await Task.Delay(TimeSpan.FromMinutes(5));

await blockSubscription.UnsubscribeAsync();
await txSubscription.UnsubscribeAsync();
subscription.Dispose();
await wsClient.StopAsync();

Example 5: Timeout and Retry Logic

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

// Timeout if no block received within 30 seconds
var subscription = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Timeout(TimeSpan.FromSeconds(30))
.Retry(3) // Retry up to 3 times on timeout
.Subscribe(
onNext: block => Console.WriteLine($"Block #{block.Number.Value}"),
onError: ex => Console.WriteLine($"Failed after retries: {ex.Message}"),
onCompleted: () => Console.WriteLine("Stream completed")
);

await blockSubscription.SubscribeAsync();

Example 6: Windowing and Aggregation

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

// Calculate average gas used per block over 1-minute windows
var subscription = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Window(TimeSpan.FromMinutes(1))
.SelectMany(window => window
.Select(block => (double)block.GasUsed.Value)
.Average()
)
.Subscribe(avgGas =>
{
Console.WriteLine($"Average gas used (last minute): {avgGas:N0}");
});

await blockSubscription.SubscribeAsync();

// Monitor for 30 minutes
await Task.Delay(TimeSpan.FromMinutes(30));

await blockSubscription.UnsubscribeAsync();
subscription.Dispose();
await wsClient.StopAsync();

Example 7: Filtering Logs by Multiple Criteria

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;
using Nethereum.Hex.HexTypes;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var logsSubscription = new EthLogsObservableSubscription(wsClient);

// Filter for high-value ERC-20 transfers (value in topic[3])
var subscription = logsSubscription
.GetSubscriptionDataResponsesAsObservable()
.Where(log => log.Topics != null && log.Topics.Length == 4)
.Where(log =>
{
// Parse value from topic[3] (indexed uint256)
var value = new HexBigInteger(log.Topics[3]);
return value.Value > 1000000000000000000; // > 1 token (18 decimals)
})
.Subscribe(log =>
{
Console.WriteLine($"High-value transfer detected:");
Console.WriteLine($" Contract: {log.Address}");
Console.WriteLine($" Block: {log.BlockNumber.Value}");
Console.WriteLine($" Tx: {log.TransactionHash}");
});

// Subscribe to ERC-20 Transfer events
var transferSignature = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
var filter = new NewFilterInput
{
Topics = new[] { transferSignature }
};

await logsSubscription.SubscribeAsync(filter);

Example 8: Subscribe to Subscription ID

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

// Track subscription lifecycle
var subscribeSubscription = blockSubscription
.GetSubscribeResponseAsObservable()
.Subscribe(subscriptionId =>
{
Console.WriteLine($"Subscription created with ID: {subscriptionId}");
});

var unsubscribeSubscription = blockSubscription
.GetUnsubscribeResponseAsObservable()
.Subscribe(success =>
{
Console.WriteLine($"Unsubscribed successfully: {success}");
});

var dataSubscription = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Subscribe(block =>
{
Console.WriteLine($"Block #{block.Number.Value}");
});

await blockSubscription.SubscribeAsync();
await Task.Delay(30000);
await blockSubscription.UnsubscribeAsync();

// Wait for all observables to complete
await Task.Delay(1000);

subscribeSubscription.Dispose();
unsubscribeSubscription.Dispose();
dataSubscription.Dispose();
await wsClient.StopAsync();

Example 9: Error Handling with OnError

using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using System.Reactive.Linq;

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync();

var blockSubscription = new EthNewBlockHeadersObservableSubscription(wsClient);

var subscription = blockSubscription
.GetSubscriptionDataResponsesAsObservable()
.Subscribe(
onNext: block =>
{
Console.WriteLine($"Block #{block.Number.Value}");
},
onError: ex =>
{
if (ex is RpcResponseException rpcEx)
{
Console.WriteLine($"RPC Error {rpcEx.RpcError.Code}: {rpcEx.RpcError.Message}");
}
else if (ex is TimeoutException)
{
Console.WriteLine("Subscription timed out");
}
else
{
Console.WriteLine($"Unexpected error: {ex.Message}");
}
},
onCompleted: () =>
{
Console.WriteLine("Subscription completed successfully");
}
);

await blockSubscription.SubscribeAsync();

// If an error occurs, OnError will be called and stream completes
await Task.Delay(60000);

await blockSubscription.UnsubscribeAsync();
subscription.Dispose();
await wsClient.StopAsync();

API Reference

EthLogsObservableSubscription

Observable subscription for contract event logs.

public class EthLogsObservableSubscription : RpcStreamingSubscriptionObservableHandler<FilterLog>
{
// Constructor
public EthLogsObservableSubscription(IStreamingClient client);

// Subscribe Methods
public Task SubscribeAsync(object id = null);
public Task SubscribeAsync(NewFilterInput filterInput, object id = null);
public RpcRequest BuildRequest(NewFilterInput filterInput, object id = null);

// Observable Streams (inherited)
public IObservable<string> GetSubscribeResponseAsObservable();
public IObservable<FilterLog> GetSubscriptionDataResponsesAsObservable();
public IObservable<bool> GetUnsubscribeResponseAsObservable();

// Lifecycle (inherited)
public Task UnsubscribeAsync();
}

EthNewBlockHeadersObservableSubscription

Observable subscription for new block headers.

public class EthNewBlockHeadersObservableSubscription : RpcStreamingSubscriptionObservableHandler<Block>
{
// Constructor
public EthNewBlockHeadersObservableSubscription(IStreamingClient client);

// Subscribe Methods
public Task SubscribeAsync(object id = null);
public RpcRequest BuildRequest(object id);

// Observable Streams (inherited)
public IObservable<string> GetSubscribeResponseAsObservable();
public IObservable<Block> GetSubscriptionDataResponsesAsObservable();
public IObservable<bool> GetUnsubscribeResponseAsObservable();

// Lifecycle (inherited)
public Task UnsubscribeAsync();
}

EthNewPendingTransactionObservableSubscription

Observable subscription for new pending transactions.

public class EthNewPendingTransactionObservableSubscription : RpcStreamingSubscriptionObservableHandler<string>
{
// Constructor
public EthNewPendingTransactionObservableSubscription(IStreamingClient client);

// Subscribe Methods
public Task SubscribeAsync(object id = null);

// Observable Streams (inherited)
public IObservable<string> GetSubscribeResponseAsObservable();
public IObservable<string> GetSubscriptionDataResponsesAsObservable(); // Emits transaction hashes
public IObservable<bool> GetUnsubscribeResponseAsObservable();
}

EthBlockNumberObservableHandler

Observable handler for polling block number.

public class EthBlockNumberObservableHandler : RpcStreamingResponseNoParamsObservableHandler<HexBigInteger, EthBlockNumber>
{
// Constructor
public EthBlockNumberObservableHandler(IStreamingClient streamingClient);

// Observable Stream (inherited)
public IObservable<HexBigInteger> GetResponseAsObservable();

// Polling Methods (inherited)
public Task SendRequestAsync(object id = null);
}

EthGetBalanceObservableHandler

Observable handler for polling account balance.

public class EthGetBalanceObservableHandler : RpcStreamingResponseParamsObservableHandler<HexBigInteger, EthGetBalance>
{
// Constructor
public EthGetBalanceObservableHandler(IStreamingClient streamingClient);

// Observable Stream (inherited)
public IObservable<HexBigInteger> GetResponseAsObservable();

// Polling Methods (inherited)
public Task SendRequestAsync(string address, BlockParameter block, object id = null);
}

RpcStreamingSubscriptionObservableHandler

Base class for subscription-based observable handlers.

public abstract class RpcStreamingSubscriptionObservableHandler<TSubscriptionDataResponse>
{
// Observables
public IObservable<string> GetSubscribeResponseAsObservable();
public IObservable<TSubscriptionDataResponse> GetSubscriptionDataResponsesAsObservable();
public IObservable<bool> GetUnsubscribeResponseAsObservable();

// Lifecycle
public Task SubscribeAsync(RpcRequest request);
public Task UnsubscribeAsync();

// Internal Subjects
protected Subject<string> SubscribeResponseSubject { get; set; }
protected Subject<bool> UnsubscribeResponseSubject { get; set; }
protected Subject<TSubscriptionDataResponse> SubscriptionDataResponseSubject { get; set; }
}

RpcStreamingResponseObservableHandler

Base class for polling-based observable handlers.

public abstract class RpcStreamingResponseObservableHandler<TResponse>
{
// Observable
public IObservable<TResponse> GetResponseAsObservable();

// Internal Subject
protected Subject<TResponse> ResponseSubject { get; set; }
}

Important Notes

WebSocket Connection Required

All observable handlers require an active IStreamingClient connection:

var wsClient = new StreamingWebSocketClient("ws://localhost:8546");
await wsClient.StartAsync(); // MUST call before creating subscriptions

// Use subscriptions...

await wsClient.StopAsync(); // Clean up when done

Observable Lifecycle

Subscriptions auto-complete when unsubscribed:

var dataStream = subscription.GetSubscriptionDataResponsesAsObservable();
var disposable = dataStream.Subscribe(data => { /* ... */ });

// Later...
await subscription.UnsubscribeAsync(); // Triggers OnCompleted, disposes dataStream

disposable.Dispose(); // Clean up subscription

Dispose Pattern

Always dispose subscriptions to prevent memory leaks:

var disposable = observable.Subscribe(/* ... */);

// When done:
disposable.Dispose();

Or use using:

using (var disposable = observable.Subscribe(/* ... */))
{
// Subscription active
}
// Automatically disposed

Cold vs Hot Observables

These are cold observables - each subscription creates a new underlying RPC subscription:

var blockSub = new EthNewBlockHeadersObservableSubscription(client);
var observable = blockSub.GetSubscriptionDataResponsesAsObservable();

// First subscription creates eth_subscribe
var sub1 = observable.Subscribe(block => Console.WriteLine("Sub1: " + block.Number));
await blockSub.SubscribeAsync(); // Creates subscription

// Second subscription to same observable does NOT create new eth_subscribe
var sub2 = observable.Subscribe(block => Console.WriteLine("Sub2: " + block.Number));

Both subscribers receive same data stream from single eth_subscribe.

Subscription ID Tracking

Use subscribe response observable to track subscription IDs:

string subscriptionId = null;
subscription.GetSubscribeResponseAsObservable()
.Subscribe(id => subscriptionId = id);

await subscription.SubscribeAsync();

// subscriptionId now contains the eth_subscribe response
Console.WriteLine($"Subscription ID: {subscriptionId}");

System.Reactive Operators

All standard Rx operators work:

observable
.Where(block => block.Number.Value % 100 == 0) // Filter
.Select(block => block.Number.Value) // Transform
.Buffer(10) // Batch
.Throttle(TimeSpan.FromSeconds(1)) // Rate limit
.Timeout(TimeSpan.FromSeconds(30)) // Timeout
.Retry(3) // Retry on error
.Subscribe(/* ... */);

See System.Reactive documentation for complete operator list.

Dependencies

  • Nethereum.JsonRpc.Client - WebSocket client and streaming abstractions
  • Nethereum.RPC - RPC request builders and DTOs
  • Nethereum.Hex - Hex encoding/decoding
  • System.Reactive - Reactive Extensions for .NET

Alternative Approaches

  • Nethereum.RPC.Reactive - Event-based subscriptions without Rx.NET
  • Nethereum.Web3 - High-level Web3 client with built-in subscription support