Subscription Streams and Event-Driven Architecture | XRPL APIs & Integration | XRP Academy - XRP Academy
3 free lessons remaining this month

Free preview access resets monthly

Upgrade for Unlimited
Skip to main content
intermediate50 min

Subscription Streams and Event-Driven Architecture

Learning Objectives

Subscribe to XRPL streams (ledger, transactions, accounts, order books)

Handle subscription events with proper parsing and routing

Design event-driven systems with queuing and deduplication

Manage backpressure when events arrive faster than processing

Recover from disconnections with gap detection and filling

Polling (Pull):

Every 5 seconds: "Any new transactions?"
Every 5 seconds: "Any new transactions?"
Every 5 seconds: "Any new transactions?"
...payment happens...
Up to 5 second delay to detect

Subscriptions (Push):

Subscribe: "Tell me about transactions"
...payment happens...
Server immediately: "Here's a transaction"
Milliseconds to detect

Subscriptions provide real-time awareness and eliminate wasted queries. The trade-off is complexity: you must handle a continuous stream of events, manage connection state, and recover gracefully from disconnections.


STREAM              DESCRIPTION                    USE CASE
────────────────────────────────────────────────────────────────
ledger              Ledger close notifications     Timing, health monitoring
transactions        All validated transactions     Network-wide monitoring
transactions_proposed Transactions in proposed set Pre-validation alerts
validations         Validation messages            Consensus monitoring
peer_status         Peer connection changes        Network health (admin)
consensus           Consensus state changes        Consensus monitoring (admin)
server              Server state changes           Health monitoring (admin)

Monitor specific accounts without receiving all network transactions:

ACCOUNT SUBSCRIPTION:
accounts: ["rAddress1", "rAddress2"]

- Payments sent/received
- Offers created/filled
- Trust lines modified
- Any transaction where account is involved

Real-time order book updates:

BOOK SUBSCRIPTION:
books: [{
  taker_gets: { currency: "XRP" },
  taker_pays: { currency: "USD", issuer: "rIssuer..." }
}]

- New offers
- Offers filled
- Offers cancelled

---
// Subscribe to ledger stream
await client.request({
  command: 'subscribe',
  streams: ['ledger']
})

// Handle events
client.on('ledgerClosed', (ledger) => {
console.log(Ledger ${ledger.ledger_index} closed)
console.log(Transactions: ${ledger.txn_count})
console.log(Close time: ${new Date(ledger.ledger_time * 1000 + 946684800000)})
})
```

// Subscribe to account transactions
await client.request({
  command: 'subscribe',
  accounts: ['rN7n3473SaZBCG4dFL83w7a1RXtXtbk2D9']
})

// Handle account transactions
client.on('transaction', (tx) => {
// Filter for our account if needed
if (tx.transaction.Account === ourAccount ||
tx.transaction.Destination === ourAccount) {
console.log('Transaction affecting our account:', tx.transaction.hash)
processTransaction(tx)
}
})
```

// Subscribe to multiple streams at once
await client.request({
  command: 'subscribe',
  streams: ['ledger', 'transactions'],
  accounts: ['rWatchAddress1', 'rWatchAddress2'],
  books: [{
    taker_gets: { currency: 'XRP' },
    taker_pays: { currency: 'USD', issuer: 'rBitstamp...' },
    both: true  // Receive both sides of the book
  }]
})
// Unsubscribe from streams
await client.request({
  command: 'unsubscribe',
  streams: ['ledger'],
  accounts: ['rOldAddress']
})

Ledger Close Event:

{
  type: 'ledgerClosed',
  fee_base: 10,
  fee_ref: 10,
  ledger_hash: 'ABC...',
  ledger_index: 87654321,
  ledger_time: 750000000,
  reserve_base: 10000000,  // 10 XRP in drops
  reserve_inc: 2000000,    // 2 XRP in drops
  txn_count: 42,
  validated_ledgers: '32570-87654321'
}

Transaction Event:

{
  type: 'transaction',
  engine_result: 'tesSUCCESS',
  engine_result_code: 0,
  engine_result_message: 'The transaction was applied.',
  ledger_hash: 'ABC...',
  ledger_index: 87654321,
  meta: {
    TransactionResult: 'tesSUCCESS',
    delivered_amount: '25000000',
    // ... balance changes, affected nodes
  },
  status: 'closed',
  transaction: {
    Account: 'rSender...',
    Destination: 'rRecipient...',
    Amount: '25000000',
    TransactionType: 'Payment',
    hash: 'XYZ...',
    // ... full transaction
  },
  validated: true
}
class EventRouter {
  constructor() {
    this.handlers = new Map()
  }

on(eventType, handler) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, [])
}
this.handlers.get(eventType).push(handler)
}

route(event) {
const handlers = this.handlers.get(event.type) || []
for (const handler of handlers) {
try {
handler(event)
} catch (error) {
console.error(Handler error for ${event.type}:, error)
}
}
}

setupClientHandlers(client) {
client.on('ledgerClosed', (data) => {
this.route({ type: 'ledgerClosed', ...data })
})

client.on('transaction', (data) => {
this.route({ type: 'transaction', ...data })
})
}
}

// Usage
const router = new EventRouter()

router.on('ledgerClosed', (event) => {
console.log(Ledger: ${event.ledger_index})
})

router.on('transaction', (event) => {
if (event.transaction.TransactionType === 'Payment') {
processPayment(event)
}
})

router.setupClientHandlers(client)
```

class TransactionFilter {
  constructor(options = {}) {
    this.watchAddresses = new Set(options.watchAddresses || [])
    this.transactionTypes = new Set(options.transactionTypes || [])
    this.minAmount = options.minAmount || 0
  }

matches(tx) {
// Filter by address
if (this.watchAddresses.size > 0) {
const involved = this.getInvolvedAddresses(tx)
const matches = [...this.watchAddresses].some(addr => involved.has(addr))
if (!matches) return false
}

// Filter by transaction type
if (this.transactionTypes.size > 0) {
if (!this.transactionTypes.has(tx.transaction.TransactionType)) {
return false
}
}

// Filter by amount (for payments)
if (this.minAmount > 0 && tx.transaction.TransactionType === 'Payment') {
const amount = this.getPaymentAmount(tx)
if (amount < this.minAmount) return false
}

return true
}

getInvolvedAddresses(tx) {
const addresses = new Set()
addresses.add(tx.transaction.Account)

if (tx.transaction.Destination) {
addresses.add(tx.transaction.Destination)
}

// Add affected addresses from meta
if (tx.meta?.AffectedNodes) {
for (const node of tx.meta.AffectedNodes) {
const fields = node.ModifiedNode?.FinalFields ||
node.CreatedNode?.NewFields ||
node.DeletedNode?.FinalFields
if (fields?.Account) {
addresses.add(fields.Account)
}
}
}

return addresses
}

getPaymentAmount(tx) {
const delivered = tx.meta?.delivered_amount || tx.transaction.Amount
if (typeof delivered === 'string') {
return parseInt(delivered)
}
return parseFloat(delivered.value) * 1_000_000 // Approximate for IOU
}
}

// Usage
const filter = new TransactionFilter({
watchAddresses: ['rMyAddress...'],
transactionTypes: ['Payment', 'OfferCreate'],
minAmount: 1_000_000 // 1 XRP minimum
})

client.on('transaction', (tx) => {
if (filter.matches(tx)) {
handleRelevantTransaction(tx)
}
})
```


┌─────────────────────────────────────────────────────────────────┐
│                      XRPL Network                                │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ WebSocket
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Event Collector                               │
│  • Manages WebSocket connection                                  │
│  • Handles reconnection                                          │
│  • Tracks last seen ledger                                       │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ Raw Events
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Event Queue (Redis/RabbitMQ)                  │
│  • Buffers events during processing spikes                       │
│  • Enables multiple consumers                                    │
│  • Persistence for crash recovery                                │
└─────────────────────────────────────────────────────────────────┘
                              │
                    ┌─────────┴─────────┐
                    │                   │
                    ▼                   ▼
┌───────────────────────┐    ┌───────────────────────┐
│  Payment Processor    │    │  Analytics Engine     │
│  • Process deposits   │    │  • Update metrics     │
│  • Credit users       │    │  • Track volumes      │
│  • Send notifications │    │  • Detect anomalies   │
└───────────────────────┘    └───────────────────────┘
const Redis = require('ioredis')

class EventQueue {
constructor(redisUrl) {
this.redis = new Redis(redisUrl)
this.queueName = 'xrpl:events'
}

async push(event) {
const serialized = JSON.stringify({
...event,
_enqueued: Date.now()
})
await this.redis.rpush(this.queueName, serialized)
}

async pop(timeout = 5) {
const result = await this.redis.blpop(this.queueName, timeout)
if (!result) return null
return JSON.parse(result[1])
}

async length() {
return this.redis.llen(this.queueName)
}
}

// Producer (Event Collector)
const queue = new EventQueue(process.env.REDIS_URL)

client.on('transaction', async (tx) => {
await queue.push({
type: 'transaction',
data: tx,
ledger: tx.ledger_index
})
})

// Consumer (Payment Processor)
async function processEvents() {
while (true) {
const event = await queue.pop()
if (event) {
try {
await handleEvent(event)
} catch (error) {
console.error('Processing error:', error)
// Push to dead letter queue for manual review
await deadLetterQueue.push({ event, error: error.message })
}
}
}
}
```

Events can be received multiple times (reconnection replays, network issues):

class EventDeduplicator {
  constructor(redis, ttlSeconds = 3600) {
    this.redis = redis
    this.ttl = ttlSeconds
    this.prefix = 'xrpl:seen:'
  }

async isDuplicate(eventId) {
    const key = this.prefix + eventId
    const exists = await this.redis.exists(key)
    return exists === 1
  }

async markSeen(eventId) {
    const key = this.prefix + eventId
    await this.redis.setex(key, this.ttl, '1')
  }

async processIfNew(eventId, handler) {
    if (await this.isDuplicate(eventId)) {
      console.log(`Skipping duplicate: ${eventId}`)
      return false
    }

await handler()
    await this.markSeen(eventId)
    return true
  }
}

// Usage
const dedup = new EventDeduplicator(redis)

client.on('transaction', async (tx) => {
  const eventId = `tx:${tx.transaction.hash}`

await dedup.processIfNew(eventId, async () => {
    await processTransaction(tx)
  })
})

When events arrive faster than you can process them:

Events arriving:     100/second
Processing capacity:  50/second

Result: Queue grows indefinitely → Memory exhaustion → Crash
class BackpressureManager {
  constructor(options = {}) {
    this.maxQueueSize = options.maxQueueSize || 10000
    this.dropPolicy = options.dropPolicy || 'oldest'  // 'oldest', 'newest', 'none'
    this.queue = []
    this.processing = false
    this.stats = {
      received: 0,
      processed: 0,
      dropped: 0
    }
  }

async enqueue(event, processor) {
this.stats.received++

// Check queue capacity
if (this.queue.length >= this.maxQueueSize) {
switch (this.dropPolicy) {
case 'oldest':
this.queue.shift() // Drop oldest
this.stats.dropped++
break
case 'newest':
this.stats.dropped++
return // Don't add new event
case 'none':
// Allow queue to grow (dangerous)
break
}
}

this.queue.push(event)

// Start processing if not already
if (!this.processing) {
this.processQueue(processor)
}
}

async processQueue(processor) {
this.processing = true

while (this.queue.length > 0) {
const event = this.queue.shift()
try {
await processor(event)
this.stats.processed++
} catch (error) {
console.error('Processing error:', error)
}
}

this.processing = false
}

getStats() {
return {
...this.stats,
queueLength: this.queue.length,
dropRate: this.stats.dropped / this.stats.received
}
}
}

// Usage
const backpressure = new BackpressureManager({
maxQueueSize: 5000,
dropPolicy: 'oldest'
})

client.on('transaction', (tx) => {
backpressure.enqueue(tx, async (event) => {
await processTransaction(event)
})
})

// Monitor
setInterval(() => {
const stats = backpressure.getStats()
if (stats.dropRate > 0.01) {
console.warn('High drop rate:', stats)
}
}, 60000)
```

class RateLimitedProcessor {
  constructor(maxPerSecond) {
    this.maxPerSecond = maxPerSecond
    this.tokens = maxPerSecond
    this.lastRefill = Date.now()
  }

async acquire() {
// Refill tokens
const now = Date.now()
const elapsed = (now - this.lastRefill) / 1000
this.tokens = Math.min(
this.maxPerSecond,
this.tokens + elapsed * this.maxPerSecond
)
this.lastRefill = now

// Wait if no tokens
while (this.tokens < 1) {
await sleep(100)
const elapsed = (Date.now() - this.lastRefill) / 1000
this.tokens += elapsed * this.maxPerSecond
this.lastRefill = Date.now()
}

this.tokens--
}

async process(event, handler) {
await this.acquire()
await handler(event)
}
}
```


When your connection drops, you miss events:

Timeline:
Ledger 100 → Received
Ledger 101 → Received
-- Connection drops --
Ledger 102 → MISSED
Ledger 103 → MISSED
-- Reconnected --
Ledger 104 → Received

Gap: Ledgers 102-103 were missed
class GapDetector {
  constructor(client) {
    this.client = client
    this.lastLedger = null
    this.gaps = []
  }

onLedgerClosed(ledger) {
const currentLedger = ledger.ledger_index

if (this.lastLedger !== null) {
const expected = this.lastLedger + 1
if (currentLedger > expected) {
const gap = {
start: expected,
end: currentLedger - 1,
detectedAt: Date.now()
}
this.gaps.push(gap)
console.warn(Gap detected: ledgers ${gap.start}-${gap.end})
}
}

this.lastLedger = currentLedger
}

async fillGaps(processor) {
while (this.gaps.length > 0) {
const gap = this.gaps.shift()
console.log(Filling gap: ${gap.start}-${gap.end})

try {
await this.fillGap(gap, processor)
} catch (error) {
console.error(Error filling gap:, error)
// Re-queue for retry
this.gaps.push(gap)
await sleep(5000)
}
}
}

async fillGap(gap, processor) {
for (let ledger = gap.start; ledger <= gap.end; ledger++) {
const ledgerData = await this.client.request({
command: 'ledger',
ledger_index: ledger,
transactions: true,
expand: true
})

// Process each transaction in the ledger
const transactions = ledgerData.result.ledger.transactions || []
for (const tx of transactions) {
await processor({
type: 'transaction',
transaction: tx,
ledger_index: ledger,
isGapFill: true
})
}

// Rate limit
await sleep(100)
}
}
}

// Usage
const gapDetector = new GapDetector(client)

client.on('ledgerClosed', (ledger) => {
gapDetector.onLedgerClosed(ledger)
})

// Fill gaps in background
setInterval(() => {
gapDetector.fillGaps(processTransaction)
}, 10000)
```

async function recoverAccountGap(client, account, startLedger, endLedger) {
  let marker
  const transactions = []

do {
const response = await client.request({
command: 'account_tx',
account: account,
ledger_index_min: startLedger,
ledger_index_max: endLedger,
limit: 400,
marker: marker
})

transactions.push(...response.result.transactions)
marker = response.result.marker

await sleep(100) // Rate limit
} while (marker)

return transactions
}

// After reconnection
async function handleReconnect(lastSeenLedger, currentLedger) {
if (currentLedger - lastSeenLedger > 1) {
console.log(Recovering gap: ${lastSeenLedger + 1} to ${currentLedger})

// For each watched account
for (const account of watchedAccounts) {
const missed = await recoverAccountGap(
client,
account,
lastSeenLedger + 1,
currentLedger
)

for (const tx of missed) {
await processTransaction({
...tx,
isGapRecovery: true
})
}
}
}
}
```


class XRPLEventMonitor {
  constructor(options) {
    this.serverUrl = options.serverUrl
    this.watchAccounts = options.watchAccounts || []
    this.client = null
    this.lastLedger = null
    this.subscriptions = []
    this.eventHandlers = new Map()
    this.deduplicator = new EventDeduplicator(options.redis)
    this.queue = new EventQueue(options.redis)
  }

on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, [])
}
this.eventHandlers.get(event).push(handler)
}

async connect() {
this.client = new xrpl.Client(this.serverUrl)

this.client.on('disconnected', () => this.handleDisconnect())
this.client.on('ledgerClosed', (l) => this.handleLedger(l))
this.client.on('transaction', (tx) => this.handleTransaction(tx))

await this.client.connect()
await this.subscribe()
}

async subscribe() {
const request = {
command: 'subscribe',
streams: ['ledger']
}

if (this.watchAccounts.length > 0) {
request.accounts = this.watchAccounts
}

await this.client.request(request)
this.subscriptions = [...this.watchAccounts]
}

async handleDisconnect() {
console.log('Disconnected, will reconnect...')
// Reconnection handled by client or wrapper
}

async handleLedger(ledger) {
const current = ledger.ledger_index

// Gap detection
if (this.lastLedger && current > this.lastLedger + 1) {
await this.fillGap(this.lastLedger + 1, current - 1)
}

this.lastLedger = current
this.emit('ledger', ledger)
}

async handleTransaction(tx) {
const txHash = tx.transaction.hash

// Deduplication
const isNew = await this.deduplicator.processIfNew(
tx:${txHash},
async () => {
// Queue for processing
await this.queue.push({
type: 'transaction',
data: tx,
receivedAt: Date.now()
})

// Emit to local handlers
this.emit('transaction', tx)
}
)
}

async fillGap(start, end) {
console.log(Filling gap: ledgers ${start}-${end})

for (const account of this.watchAccounts) {
const transactions = await recoverAccountGap(
this.client, account, start, end
)

for (const tx of transactions) {
await this.handleTransaction({
transaction: tx.tx,
meta: tx.meta,
ledger_index: tx.tx.ledger_index,
isGapFill: true
})
}
}
}

emit(event, data) {
const handlers = this.eventHandlers.get(event) || []
for (const handler of handlers) {
try {
handler(data)
} catch (error) {
console.error(Handler error for ${event}:, error)
}
}
}

addWatchAccount(account) {
if (!this.watchAccounts.includes(account)) {
this.watchAccounts.push(account)
if (this.client?.isConnected()) {
this.client.request({
command: 'subscribe',
accounts: [account]
})
}
}
}

async disconnect() {
if (this.client) {
await this.client.disconnect()
}
}
}

// Usage
const monitor = new XRPLEventMonitor({
serverUrl: 'wss://s1.ripple.com:51233',
watchAccounts: ['rMyAddress...'],
redis: redisClient
})

monitor.on('ledger', (ledger) => {
console.log(Ledger ${ledger.ledger_index})
})

monitor.on('transaction', (tx) => {
if (tx.transaction.TransactionType === 'Payment') {
processPayment(tx)
}
})

await monitor.connect()
```


Subscriptions beat polling: Faster detection, lower resource usage

Deduplication is necessary: Events can repeat; idempotent processing prevents issues

Gaps happen: Disconnections are normal; gap recovery is essential

Queuing helps: Decouples event collection from processing, handles spikes

⚠️ Optimal queue configuration: Size limits and drop policies depend on your use case

⚠️ Gap recovery completeness: Very long gaps may be impractical to recover

⚠️ Event ordering guarantees: Events may arrive out of order; handle appropriately

🔴 Assuming perfect delivery: Events can be missed; always track last seen state

🔴 Processing without deduplication: Double-processing can cause double-credits

🔴 Unbounded queues: Memory exhaustion when events exceed processing capacity

🔴 Ignoring backpressure: Slow processing eventually causes data loss or crashes

Event-driven architecture is more complex than polling but essential for real-time systems. The complexity is manageable with proper patterns: deduplication, gap detection, backpressure handling. Don't build a payment monitor without these patterns—you'll discover their necessity through production incidents.


Assignment: Build a production-ready account monitoring system using subscriptions.

Requirements:

Part 1: Core Monitoring (40%)

  • WebSocket subscription to account transactions
  • Event handling with proper parsing
  • Ledger tracking for gap detection
  • Payment-specific processing with partial payment protection

Part 2: Reliability Features (40%)

  • Event deduplication (using Set, Map, or Redis)
  • Gap detection on ledger close
  • Gap recovery by querying missed ledger range
  • Reconnection handling with subscription restoration

Part 3: Webhook Delivery (20%)

  • Extract payment details (amount, sender, destination tag)

  • Deliver to configured webhook URL

  • Handle delivery failures with retry

  • Correct subscription handling: 25%

  • Deduplication implementation: 25%

  • Gap detection and recovery: 25%

  • Webhook delivery with retry: 15%

  • Code quality: 10%

Time Investment: 3-4 hours

Submission: Code with documentation and test results


1. Deduplication Purpose (Tests Understanding):

Why must event processing be idempotent (handle the same event multiple times safely)?

A) To improve performance
B) Events can be delivered more than once due to reconnections or network issues
C) The XRPL protocol requires it
D) To reduce memory usage

Correct Answer: B

Explanation: When connections drop and reconnect, you may receive some events again. Network issues can also cause retransmission. Without deduplication, you might credit a payment twice or process the same transaction multiple times.


2. Gap Detection (Tests Knowledge):

You receive ledger 1000, then ledger 1005. What should your system do?

A) Nothing—ledgers aren't guaranteed to arrive in order
B) Detect a gap (1001-1004) and query for missed transactions
C) Restart the connection to fix the issue
D) Ignore it—gaps are normal and don't matter

Correct Answer: B

Explanation: A gap from 1000 to 1005 means you missed ledgers 1001-1004 (likely due to a brief disconnection). For any monitored accounts, you should query account_tx for transactions in those ledgers to ensure nothing was missed.


3. Backpressure Strategy (Tests Critical Thinking):

Your system receives 1000 transactions/second but can only process 500/second. What happens without backpressure handling?

A) The excess transactions are automatically saved
B) The queue grows indefinitely until memory is exhausted
C) The server stops sending transactions
D) Processing automatically speeds up

Correct Answer: B

Explanation: Without backpressure handling, events accumulate faster than they're processed. The queue grows without bound, eventually exhausting memory and crashing. Proper backpressure handling includes queue size limits and drop policies.


4. Subscription Restoration (Tests Application):

After reconnecting to XRPL, what must you do before receiving events again?

A) Nothing—subscriptions persist across reconnections
B) Re-send the subscribe command with your accounts/streams
C) Wait for the server to re-establish subscriptions
D) Clear your event queue

Correct Answer: B

Explanation: Subscriptions are per-connection. When a WebSocket connection drops and reconnects, it's a new connection with no subscriptions. You must re-send subscribe commands for all streams and accounts you want to monitor.


5. Account Subscription (Tests Comprehension):

You subscribe to account "rExchange...". Which events will you receive?

A) Only payments to that account
B) Only transactions originated by that account
C) All transactions involving that account in any way
D) All transactions on the entire network

Correct Answer: C

Explanation: Account subscriptions deliver all transactions where the account is involved—as sender, receiver, or affected party (e.g., offers being filled, trust line changes). This includes payments sent, payments received, offers created, and any transaction that modifies the account's state.


  • Event-Driven Architecture patterns
  • Message queuing (Redis, RabbitMQ)
  • Backpressure handling strategies

For Next Lesson:
Lesson 9 covers DEX integration—querying order books, creating offers, and building trading features using the XRPL's built-in decentralized exchange.


End of Lesson 8

Total words: ~4,800
Estimated completion time: 50 minutes reading + 3-4 hours for deliverable

Key Takeaways

1

Subscribe instead of poll:

Subscriptions provide real-time updates without wasted queries. Use them for any monitoring or real-time feature.

2

Always deduplicate events:

Events can be delivered multiple times (reconnection, network issues). Process each unique event exactly once.

3

Track ledger sequence for gap detection:

Monitor for gaps in ledger sequence; recover missed transactions after reconnection.

4

Handle backpressure:

Events can arrive faster than processing. Queue with limits and appropriate drop policies.

5

Design for disconnection:

Connections will drop. Your system should reconnect, restore subscriptions, and fill gaps automatically. ---