Real-Time Data with WebSockets
Learning Objectives
Subscribe to ledger streams for real-time blockchain state
Monitor account activity with transaction subscriptions
Track order book changes for trading applications
Handle connection lifecycle including reconnection logic
Build event-driven architectures that respond to XRPL events
Polling approach (inefficient):
// DON'T DO THIS
setInterval(async () => {
const balance = await getBalance(account);
if (balance !== lastBalance) {
handleBalanceChange(balance);
}
}, 5000); // Check every 5 seconds
// Problems:
// - Wastes API calls when nothing changes
// - Misses events between polls
// - Delays reaction by up to 5 seconds
// - Doesn't scale with many accounts
```
WebSocket approach (efficient):
// DO THIS
client.request({
command: 'subscribe',
accounts: [account]
});
client.on('transaction', (tx) => {
handleTransaction(tx); // Instant notification
});
// Benefits:
// - Only notified when something happens
// - Millisecond latency
// - Scales to many subscriptions
// - Lower resource usage
```
// src/websocket/connection.js
const xrpl = require('xrpl');
class XRPLConnection {
constructor(url = 'wss://s.altnet.rippletest.net:51233') {
this.url = url;
this.client = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.handlers = new Map();
}
async connect() {
this.client = new xrpl.Client(this.url);
// Set up event handlers
this.client.on('connected', () => {
console.log('Connected to XRPL');
this.isConnected = true;
this.reconnectAttempts = 0;
this.emit('connected');
});
this.client.on('disconnected', (code) => {
console.log(Disconnected: ${code});
this.isConnected = false;
this.emit('disconnected', code);
this.handleDisconnect();
});
this.client.on('error', (error) => {
console.error('Connection error:', error);
this.emit('error', error);
});
// Forward XRPL events
this.client.on('ledgerClosed', (ledger) => {
this.emit('ledger', ledger);
});
this.client.on('transaction', (tx) => {
this.emit('transaction', tx);
});
await this.client.connect();
return this;
}
async handleDisconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
this.emit('maxReconnectReached');
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}));
setTimeout(async () => {
try {
await this.connect();
// Re-establish subscriptions
await this.resubscribe();
} catch (error) {
console.error('Reconnection failed:', error);
this.handleDisconnect();
}
}, delay);
}
async resubscribe() {
// Override in subclass to restore subscriptions
this.emit('resubscribed');
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
}
emit(event, data) {
const handlers = this.handlers.get(event) || [];
handlers.forEach(handler => handler(data));
}
async disconnect() {
if (this.client) {
await this.client.disconnect();
}
}
}
module.exports = { XRPLConnection };
```
// src/websocket/health-monitor.js
class ConnectionHealthMonitor {
constructor(connection, options = {}) {
this.connection = connection;
this.pingInterval = options.pingInterval || 30000;
this.pingTimeout = options.pingTimeout || 5000;
this.lastPong = Date.now();
this.intervalId = null;
}
start() {
this.intervalId = setInterval(() => this.checkHealth(), this.pingInterval);
console.log('Health monitoring started');
}
stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
async checkHealth() {
if (!this.connection.isConnected) {
return;
}
try {
const start = Date.now();
// Use server_info as a health check
const response = await Promise.race([
this.connection.client.request({ command: 'server_info' }),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Ping timeout')), this.pingTimeout)
)
]);
const latency = Date.now() - start;
this.lastPong = Date.now();
this.connection.emit('healthCheck', {
healthy: true,
latency,
serverState: response.result.info.server_state
});
} catch (error) {
console.error('Health check failed:', error);
this.connection.emit('healthCheck', {
healthy: false,
error: error.message
});
}
}
}
module.exports = { ConnectionHealthMonitor };
```
Subscribe to new ledger closes:
// src/websocket/ledger-stream.js
const xrpl = require('xrpl');
async function subscribeLedger() {
const client = new xrpl.Client('wss://s.altnet.rippletest.net:51233');
await client.connect();
// Subscribe to ledger stream
await client.request({
command: 'subscribe',
streams: ['ledger']
});
console.log('Subscribed to ledger stream');
// Handle ledger events
client.on('ledgerClosed', (ledger) => {
console.log(`\nNew ledger: ${ledger.ledger_index}`);
console.log(` Hash: ${ledger.ledger_hash}`);
console.log(` Close time: ${new Date(ledger.ledger_time * 1000 + 946684800000).toISOString()}`);
console.log(` Transactions: ${ledger.txn_count}`);
console.log(` Base fee: ${ledger.fee_base} drops`);
console.log(` Reserve base: ${ledger.reserve_base / 1000000} XRP`);
});
return client;
}
// Track ledger statistics
class LedgerTracker {
constructor() {
this.ledgers = [];
this.maxHistory = 100;
}
addLedger(ledger) {
this.ledgers.push({
index: ledger.ledger_index,
time: ledger.ledger_time,
txCount: ledger.txn_count,
receivedAt: Date.now()
});
if (this.ledgers.length > this.maxHistory) {
this.ledgers.shift();
}
}
getStats() {
if (this.ledgers.length < 2) {
return null;
}
const intervals = [];
for (let i = 1; i < this.ledgers.length; i++) {
intervals.push(this.ledgers[i].time - this.ledgers[i-1].time);
}
const avgInterval = intervals.reduce((a, b) => a + b, 0) / intervals.length;
const totalTx = this.ledgers.reduce((sum, l) => sum + l.txCount, 0);
return {
ledgerCount: this.ledgers.length,
avgCloseTime: avgInterval.toFixed(2) + ' seconds',
totalTransactions: totalTx,
avgTxPerLedger: (totalTx / this.ledgers.length).toFixed(1)
};
}
}
module.exports = { subscribeLedger, LedgerTracker };
Monitor specific accounts for any activity:
// src/websocket/account-monitor.js
const xrpl = require('xrpl');
class AccountMonitor {
constructor(client) {
this.client = client;
this.accounts = new Set();
this.handlers = new Map();
}
async subscribe(address, handler) {
// Add to subscription
if (!this.accounts.has(address)) {
await this.client.request({
command: 'subscribe',
accounts: [address]
});
this.accounts.add(address);
}
// Register handler
if (!this.handlers.has(address)) {
this.handlers.set(address, []);
}
this.handlers.get(address).push(handler);
console.log(`Monitoring account: ${address}`);
}
async unsubscribe(address) {
if (this.accounts.has(address)) {
await this.client.request({
command: 'unsubscribe',
accounts: [address]
});
this.accounts.delete(address);
this.handlers.delete(address);
console.log(`Stopped monitoring: ${address}`);
}
}
setupEventHandler() {
this.client.on('transaction', (tx) => {
// Find affected accounts
const affected = this.findAffectedAccounts(tx);
for (const address of affected) {
const handlers = this.handlers.get(address) || [];
handlers.forEach(handler => handler(tx, address));
}
});
}
findAffectedAccounts(tx) {
const affected = new Set();
// Check transaction fields
if (tx.transaction.Account && this.accounts.has(tx.transaction.Account)) {
affected.add(tx.transaction.Account);
}
if (tx.transaction.Destination && this.accounts.has(tx.transaction.Destination)) {
affected.add(tx.transaction.Destination);
}
// Check affected nodes in metadata
if (tx.meta?.AffectedNodes) {
for (const node of tx.meta.AffectedNodes) {
const fields = node.ModifiedNode?.FinalFields ||
node.CreatedNode?.NewFields ||
node.DeletedNode?.FinalFields;
if (fields?.Account && this.accounts.has(fields.Account)) {
affected.add(fields.Account);
}
}
}
return affected;
}
getSubscribedAccounts() {
return Array.from(this.accounts);
}
}
// Usage example
async function monitorAccountActivity(address) {
const client = new xrpl.Client('wss://s.altnet.rippletest.net:51233');
await client.connect();
const monitor = new AccountMonitor(client);
monitor.setupEventHandler();
await monitor.subscribe(address, (tx, account) => {
console.log(`\nTransaction affecting ${account}:`);
console.log(` Type: ${tx.transaction.TransactionType}`);
console.log(` Hash: ${tx.transaction.hash}`);
console.log(` Result: ${tx.meta.TransactionResult}`);
// Parse balance change
const balanceChange = parseBalanceChange(tx, account);
if (balanceChange) {
console.log(` Balance change: ${balanceChange} XRP`);
}
});
return { client, monitor };
}
function parseBalanceChange(tx, address) {
if (!tx.meta?.AffectedNodes) return null;
for (const node of tx.meta.AffectedNodes) {
if (node.ModifiedNode?.LedgerEntryType === 'AccountRoot') {
const finalFields = node.ModifiedNode.FinalFields;
const prevFields = node.ModifiedNode.PreviousFields;
if (finalFields?.Account === address && prevFields?.Balance) {
const newBalance = Number(finalFields.Balance);
const oldBalance = Number(prevFields.Balance);
return (newBalance - oldBalance) / 1_000_000;
}
}
}
return null;
}
module.exports = { AccountMonitor, monitorAccountActivity };
Real-time order book updates for trading:
// src/websocket/orderbook-stream.js
const xrpl = require('xrpl');
class OrderBookStream {
constructor(client) {
this.client = client;
this.books = new Map();
this.handlers = [];
}
async subscribe(takerGets, takerPays, handler) {
const bookKey = this.makeBookKey(takerGets, takerPays);
// Subscribe to order book
const response = await this.client.request({
command: 'subscribe',
books: [{
taker_gets: takerGets,
taker_pays: takerPays,
snapshot: true, // Get initial state
both: true // Get both sides
}]
});
// Store initial snapshot
if (response.result.offers) {
this.books.set(bookKey, {
asks: response.result.offers,
bids: response.result.bids || []
});
}
this.handlers.push({ bookKey, handler });
console.log(`Subscribed to order book: ${this.formatBook(takerGets, takerPays)}`);
return response.result;
}
setupEventHandler() {
this.client.on('transaction', (tx) => {
if (this.isOrderBookTransaction(tx)) {
this.handleOrderBookChange(tx);
}
});
}
isOrderBookTransaction(tx) {
const type = tx.transaction.TransactionType;
return type === 'OfferCreate' || type === 'OfferCancel' || type === 'Payment';
}
handleOrderBookChange(tx) {
// Notify all handlers
for (const { bookKey, handler } of this.handlers) {
const book = this.books.get(bookKey);
handler({
transaction: tx,
book: book,
type: this.classifyChange(tx)
});
}
}
classifyChange(tx) {
const type = tx.transaction.TransactionType;
if (type === 'OfferCreate') {
// Check if it was consumed or placed
const created = tx.meta.AffectedNodes.find(
n => n.CreatedNode?.LedgerEntryType === 'Offer'
);
return created ? 'offer_placed' : 'offer_filled';
}
if (type === 'OfferCancel') {
return 'offer_cancelled';
}
return 'trade';
}
makeBookKey(takerGets, takerPays) {
return `${JSON.stringify(takerGets)}:${JSON.stringify(takerPays)}`;
}
formatBook(takerGets, takerPays) {
const gets = takerGets.currency || 'XRP';
const pays = takerPays.currency || 'XRP';
return `${gets}/${pays}`;
}
}
// Real-time price ticker
class PriceTicker {
constructor() {
this.lastPrice = null;
this.high24h = null;
this.low24h = null;
this.volume24h = 0;
this.trades = [];
}
recordTrade(price, amount, timestamp) {
this.lastPrice = price;
this.trades.push({ price, amount, timestamp });
// Update 24h stats
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
this.trades = this.trades.filter(t => t.timestamp > oneDayAgo);
const prices = this.trades.map(t => t.price);
this.high24h = Math.max(...prices);
this.low24h = Math.min(...prices);
this.volume24h = this.trades.reduce((sum, t) => sum + t.amount, 0);
}
getStats() {
return {
lastPrice: this.lastPrice,
high24h: this.high24h,
low24h: this.low24h,
volume24h: this.volume24h,
tradeCount24h: this.trades.length
};
}
}
module.exports = { OrderBookStream, PriceTicker };
// src/websocket/tx-confirmation.js
const xrpl = require('xrpl');
class TransactionTracker {
constructor(client) {
this.client = client;
this.pending = new Map(); // hash -> { resolve, reject, timeout }
}
async track(hash, timeoutMs = 60000) {
return new Promise((resolve, reject) => {
// Set up timeout
const timeout = setTimeout(() => {
this.pending.delete(hash);
reject(new Error(Transaction ${hash} confirmation timeout));
}, timeoutMs);
this.pending.set(hash, { resolve, reject, timeout });
});
}
setupEventHandler() {
this.client.on('transaction', (tx) => {
const hash = tx.transaction.hash;
if (this.pending.has(hash)) {
const { resolve, timeout } = this.pending.get(hash);
clearTimeout(timeout);
this.pending.delete(hash);
resolve({
hash: hash,
result: tx.meta.TransactionResult,
validated: tx.validated,
ledgerIndex: tx.ledger_index,
meta: tx.meta
});
}
});
}
async submitAndTrack(signedTx) {
// Submit transaction
const submitResult = await this.client.request({
command: 'submit',
tx_blob: signedTx
});
if (submitResult.result.engine_result !== 'tesSUCCESS' &&
!submitResult.result.engine_result.startsWith('tes')) {
throw new Error(Submit failed: ${submitResult.result.engine_result});
}
const hash = submitResult.result.tx_json.hash;
// Subscribe to account for this transaction
await this.client.request({
command: 'subscribe',
accounts: [submitResult.result.tx_json.Account]
});
// Track confirmation
return this.track(hash);
}
}
// Batch transaction tracking
class BatchTransactionTracker {
constructor(client) {
this.tracker = new TransactionTracker(client);
this.tracker.setupEventHandler();
}
async trackMultiple(hashes, timeoutMs = 60000) {
const results = await Promise.allSettled(
hashes.map(hash => this.tracker.track(hash, timeoutMs))
);
return results.map((result, index) => ({
hash: hashes[index],
status: result.status,
value: result.status === 'fulfilled' ? result.value : null,
error: result.status === 'rejected' ? result.reason : null
}));
}
}
module.exports = { TransactionTracker, BatchTransactionTracker };
```
// src/websocket/payment-notifier.js
const xrpl = require('xrpl');
class PaymentNotificationService {
constructor() {
this.client = null;
this.watchedAccounts = new Map(); // address -> callback
}
async start(serverUrl = 'wss://s.altnet.rippletest.net:51233') {
this.client = new xrpl.Client(serverUrl);
await this.client.connect();
this.client.on('transaction', (tx) => {
this.processTransaction(tx);
});
console.log('Payment notification service started');
}
async watchAccount(address, callback) {
if (!this.watchedAccounts.has(address)) {
await this.client.request({
command: 'subscribe',
accounts: [address]
});
}
this.watchedAccounts.set(address, callback);
console.log(Watching payments for: ${address});
}
processTransaction(tx) {
// Only process successful payments
if (tx.transaction.TransactionType !== 'Payment') return;
if (tx.meta.TransactionResult !== 'tesSUCCESS') return;
const destination = tx.transaction.Destination;
if (this.watchedAccounts.has(destination)) {
const callback = this.watchedAccounts.get(destination);
const paymentInfo = this.extractPaymentInfo(tx);
callback(paymentInfo);
}
}
extractPaymentInfo(tx) {
const delivered = tx.meta.delivered_amount;
return {
hash: tx.transaction.hash,
from: tx.transaction.Account,
to: tx.transaction.Destination,
amount: this.formatAmount(delivered),
destinationTag: tx.transaction.DestinationTag,
ledgerIndex: tx.ledger_index,
timestamp: new Date()
};
}
formatAmount(amount) {
if (typeof amount === 'string') {
return { currency: 'XRP', value: Number(amount) / 1_000_000 };
}
return {
currency: amount.currency,
issuer: amount.issuer,
value: Number(amount.value)
};
}
async stop() {
if (this.client) {
await this.client.disconnect();
}
}
}
// Example: E-commerce payment notifications
async function ecommerceExample() {
const notifier = new PaymentNotificationService();
await notifier.start();
const merchantAddress = 'rMerchant...';
await notifier.watchAccount(merchantAddress, (payment) => {
console.log('\nđź’° Payment received!');
console.log( From: ${payment.from});
console.log( Amount: ${payment.amount.value} ${payment.amount.currency});
console.log( Order ID: ${payment.destinationTag || 'none'});
// Process order...
if (payment.destinationTag) {
processOrder(payment.destinationTag, payment);
}
});
}
function processOrder(orderId, payment) {
console.log(Processing order ${orderId}...);
// Update order status, send confirmation email, etc.
}
module.exports = { PaymentNotificationService };
```
// src/websocket/balance-aggregator.js
class BalanceChangeAggregator {
constructor(client) {
this.client = client;
this.balances = new Map(); // address -> { xrp, tokens }
this.changeCallbacks = [];
}
async watchAccount(address) {
// Get initial balances
const accountInfo = await this.client.request({
command: 'account_info',
account: address
});
const accountLines = await this.client.request({
command: 'account_lines',
account: address
});
this.balances.set(address, {
xrp: Number(accountInfo.result.account_data.Balance) / 1_000_000,
tokens: accountLines.result.lines.map(line => ({
currency: line.currency,
issuer: line.account,
balance: Number(line.balance)
}))
});
// Subscribe
await this.client.request({
command: 'subscribe',
accounts: [address]
});
}
onBalanceChange(callback) {
this.changeCallbacks.push(callback);
}
setupEventHandler() {
this.client.on('transaction', (tx) => {
if (tx.meta.TransactionResult !== 'tesSUCCESS') return;
const changes = this.parseBalanceChanges(tx);
for (const change of changes) {
if (this.balances.has(change.account)) {
this.updateBalance(change);
this.notifyChange(change);
}
}
});
}
parseBalanceChanges(tx) {
const changes = [];
for (const node of tx.meta.AffectedNodes || []) {
// XRP balance changes
if (node.ModifiedNode?.LedgerEntryType === 'AccountRoot') {
const fields = node.ModifiedNode.FinalFields;
const prev = node.ModifiedNode.PreviousFields;
if (fields && prev?.Balance) {
changes.push({
account: fields.Account,
currency: 'XRP',
previous: Number(prev.Balance) / 1_000_000,
current: Number(fields.Balance) / 1_000_000,
change: (Number(fields.Balance) - Number(prev.Balance)) / 1_000_000
});
}
}
// Token balance changes
if (node.ModifiedNode?.LedgerEntryType === 'RippleState') {
// Parse trust line changes
const fields = node.ModifiedNode.FinalFields;
const prev = node.ModifiedNode.PreviousFields;
if (fields && prev?.Balance) {
// Determine which account's perspective
const highAccount = fields.HighLimit.issuer;
const lowAccount = fields.LowLimit.issuer;
const newBalance = Number(fields.Balance.value);
const oldBalance = Number(prev.Balance.value);
// High account sees positive balance as their holding
changes.push({
account: highAccount,
currency: fields.Balance.currency,
issuer: lowAccount,
previous: oldBalance,
current: newBalance,
change: newBalance - oldBalance
});
}
}
}
return changes;
}
updateBalance(change) {
const balances = this.balances.get(change.account);
if (!balances) return;
if (change.currency === 'XRP') {
balances.xrp = change.current;
} else {
const token = balances.tokens.find(
t => t.currency === change.currency && t.issuer === change.issuer
);
if (token) {
token.balance = change.current;
}
}
}
notifyChange(change) {
for (const callback of this.changeCallbacks) {
callback(change);
}
}
getBalance(address) {
return this.balances.get(address);
}
}
module.exports = { BalanceChangeAggregator };
```
// src/websocket/subscription-manager.js
class SubscriptionManager {
constructor(client) {
this.client = client;
this.subscriptions = {
streams: new Set(),
accounts: new Set(),
books: new Map()
};
}
async subscribeStream(stream) {
if (this.subscriptions.streams.has(stream)) {
return { alreadySubscribed: true };
}
await this.client.request({
command: 'subscribe',
streams: [stream]
});
this.subscriptions.streams.add(stream);
return { subscribed: stream };
}
async subscribeAccount(address) {
if (this.subscriptions.accounts.has(address)) {
return { alreadySubscribed: true };
}
await this.client.request({
command: 'subscribe',
accounts: [address]
});
this.subscriptions.accounts.add(address);
return { subscribed: address };
}
async subscribeBook(takerGets, takerPays) {
const key = ${JSON.stringify(takerGets)}:${JSON.stringify(takerPays)};
if (this.subscriptions.books.has(key)) {
return { alreadySubscribed: true };
}
const result = await this.client.request({
command: 'subscribe',
books: [{
taker_gets: takerGets,
taker_pays: takerPays,
snapshot: true
}]
});
this.subscriptions.books.set(key, { takerGets, takerPays });
return { subscribed: key, snapshot: result.result };
}
async unsubscribeAll() {
// Unsubscribe streams
if (this.subscriptions.streams.size > 0) {
await this.client.request({
command: 'unsubscribe',
streams: Array.from(this.subscriptions.streams)
});
}
// Unsubscribe accounts
if (this.subscriptions.accounts.size > 0) {
await this.client.request({
command: 'unsubscribe',
accounts: Array.from(this.subscriptions.accounts)
});
}
// Unsubscribe books
for (const [key, book] of this.subscriptions.books) {
await this.client.request({
command: 'unsubscribe',
books: [{
taker_gets: book.takerGets,
taker_pays: book.takerPays
}]
});
}
// Clear local state
this.subscriptions.streams.clear();
this.subscriptions.accounts.clear();
this.subscriptions.books.clear();
}
async resubscribeAll() {
// Re-establish all subscriptions after reconnect
const streams = Array.from(this.subscriptions.streams);
const accounts = Array.from(this.subscriptions.accounts);
const books = Array.from(this.subscriptions.books.values());
if (streams.length > 0) {
await this.client.request({
command: 'subscribe',
streams: streams
});
}
if (accounts.length > 0) {
await this.client.request({
command: 'subscribe',
accounts: accounts
});
}
for (const book of books) {
await this.client.request({
command: 'subscribe',
books: [{
taker_gets: book.takerGets,
taker_pays: book.takerPays
}]
});
}
console.log(Resubscribed: ${streams.length} streams, ${accounts.length} accounts, ${books.length} books);
}
getStatus() {
return {
streams: Array.from(this.subscriptions.streams),
accounts: this.subscriptions.accounts.size,
books: this.subscriptions.books.size
};
}
}
module.exports = { SubscriptionManager };
```
WebSocket subscriptions are essential for responsive XRPL applications. The technical implementation is straightforward; the challenge is robust connection management and efficient event processing. Always plan for disconnections and design your event handlers to be idempotent.
Assignment: Build a real-time dashboard showing XRPL activity.
Requirements:
Robust WebSocket connection with reconnection
Health monitoring and status display
Subscription management
Display latest ledger information
Show ledger close statistics
Track transaction volume
Monitor multiple accounts
Display balance changes in real-time
Show transaction notifications
Real-time order book for a currency pair
Display price updates
Track trade activity
Connection management robust (25%)
Ledger display accurate (25%)
Account monitoring works (25%)
Order book updates correctly (25%)
Time investment: 4 hours
Value: Foundation for any real-time XRPL application
Knowledge Check
Question 1 of 1Your WebSocket connection drops. What must you do after reconnecting?
- Subscribe method: https://xrpl.org/subscribe.html
- Unsubscribe method: https://xrpl.org/unsubscribe.html
- WebSocket API: https://xrpl.org/websocket-api.html
- Ledger stream: https://xrpl.org/subscribe.html#ledger-stream
- Transactions stream: https://xrpl.org/subscribe.html#transactions-stream
- Order book stream: https://xrpl.org/subscribe.html#order-book-streams
For Next Lesson:
You now understand real-time data. Lesson 14 covers error handling and debugging—essential skills for building reliable production applications.
End of Lesson 13
Total words: ~5,200
Estimated completion time: 50 minutes reading + 4 hours for deliverable
Key Takeaways
Subscribe, don't poll
: WebSockets provide instant updates with lower overhead than polling.
Handle disconnections
: Connections will drop. Implement reconnection with subscription restoration.
Track subscriptions
: Know what you're subscribed to for proper cleanup and resubscription.
Process events efficiently
: Don't block on event handlers; queue and process asynchronously if needed.
Use appropriate subscription types
: Ledger for blockchain state, accounts for activity, books for trading. ---