Analytics and Data Extraction | XRPL APIs & Integration | XRP Academy - XRP Academy
3 free lessons remaining this month

Free preview access resets monthly

Upgrade for Unlimited
Skip to main content
intermediate55 min

Analytics and Data Extraction

Learning Objectives

Extract historical transaction data efficiently using pagination

Optimize queries using Clio servers for reporting workloads

Calculate network statistics (volume, active accounts, transaction counts)

Design data pipelines that process ledger data at scale

Build real-time analytics dashboards with appropriate refresh strategies

async function getAllAccountTransactions(client, account, options = {}) {
  const transactions = []
  let marker = undefined
  let pageCount = 0

const {
minLedger = -1,
maxLedger = -1,
limit = 400,
maxPages = 1000,
onPage = null
} = options

do {
const response = await client.request({
command: 'account_tx',
account: account,
ledger_index_min: minLedger,
ledger_index_max: maxLedger,
limit: limit,
marker: marker,
forward: true // Oldest first for consistent processing
})

const pageTxs = response.result.transactions
transactions.push(...pageTxs)
marker = response.result.marker
pageCount++

if (onPage) {
await onPage(pageTxs, pageCount, transactions.length)
}

// Rate limiting
await sleep(100)

} while (marker && pageCount < maxPages)

return {
transactions,
complete: !marker,
pageCount
}
}
```

async function extractLedgerRange(client, startLedger, endLedger, processor) {
  const stats = {
    ledgersProcessed: 0,
    transactionsProcessed: 0,
    errors: []
  }

for (let ledger = startLedger; ledger <= endLedger; ledger++) {
try {
const response = await client.request({
command: 'ledger',
ledger_index: ledger,
transactions: true,
expand: true
})

const ledgerData = response.result.ledger
const transactions = ledgerData.transactions || []

await processor({
ledgerIndex: ledger,
closeTime: ledgerData.close_time,
transactions: transactions,
txCount: transactions.length
})

stats.ledgersProcessed++
stats.transactionsProcessed += transactions.length

// Rate limiting - adjust based on server capacity
await sleep(50)

} catch (error) {
stats.errors.push({ ledger, error: error.message })
}

// Progress logging
if (stats.ledgersProcessed % 100 === 0) {
console.log(Processed ${stats.ledgersProcessed} ledgers, ${stats.transactionsProcessed} txs)
}
}

return stats
}
```


CLIO VS RIPPLED FOR ANALYTICS:

Clio:
✓ Optimized for historical queries
✓ Faster pagination
✓ Lower resource impact
✓ Better for reporting workloads

rippled:
✓ Real-time data
✓ Transaction submission
✓ Admin methods
✓ Subscription streams
class AnalyticsClient {
  constructor(clioUrl, rippledUrl) {
    this.clioClient = new xrpl.Client(clioUrl)      // For historical
    this.rippledClient = new xrpl.Client(rippledUrl) // For real-time
  }

async connect() {
await Promise.all([
this.clioClient.connect(),
this.rippledClient.connect()
])
}

// Historical queries → Clio
async getAccountHistory(account, options) {
return getAllAccountTransactions(this.clioClient, account, options)
}

// Real-time queries → rippled
async getCurrentBalance(account) {
return this.rippledClient.request({
command: 'account_info',
account: account,
ledger_index: 'validated'
})
}
}
```


class NetworkStatsCalculator {
  constructor(client) {
    this.client = client
  }

async getLedgerStats(ledgerIndex) {
const response = await this.client.request({
command: 'ledger',
ledger_index: ledgerIndex,
transactions: true,
expand: true
})

const txs = response.result.ledger.transactions || []

const stats = {
ledgerIndex,
closeTime: response.result.ledger.close_time,
txCount: txs.length,
byType: {},
xrpVolume: 0,
fees: 0,
accounts: new Set()
}

for (const tx of txs) {
// Count by type
const type = tx.TransactionType
stats.byType[type] = (stats.byType[type] || 0) + 1

// Sum fees
stats.fees += parseInt(tx.Fee || 0)

// Track accounts
stats.accounts.add(tx.Account)
if (tx.Destination) stats.accounts.add(tx.Destination)

// Sum XRP volume for payments
if (type === 'Payment' && typeof tx.Amount === 'string') {
const meta = tx.metaData || tx.meta
if (meta?.TransactionResult === 'tesSUCCESS') {
const delivered = meta.delivered_amount || tx.Amount
if (typeof delivered === 'string') {
stats.xrpVolume += parseInt(delivered)
}
}
}
}

stats.uniqueAccounts = stats.accounts.size
delete stats.accounts // Don't return the Set

return stats
}

async getDailyStats(startLedger, endLedger) {
const dailyStats = {
totalTx: 0,
totalFees: 0,
totalXrpVolume: 0,
byType: {},
ledgers: 0
}

for (let l = startLedger; l <= endLedger; l++) {
const stats = await this.getLedgerStats(l)

dailyStats.totalTx += stats.txCount
dailyStats.totalFees += stats.fees
dailyStats.totalXrpVolume += stats.xrpVolume
dailyStats.ledgers++

for (const [type, count] of Object.entries(stats.byType)) {
dailyStats.byType[type] = (dailyStats.byType[type] || 0) + count
}

await sleep(50)
}

return dailyStats
}
}
```

async function analyzeAccountActivity(client, account, days = 30) {
  const now = Date.now()
  const cutoff = now - (days * 24 * 60 * 60 * 1000)

const { transactions } = await getAllAccountTransactions(client, account)

const analysis = {
totalTransactions: transactions.length,
sent: 0,
received: 0,
byType: {},
dailyActivity: {},
totalXrpSent: 0,
totalXrpReceived: 0
}

for (const tx of transactions) {
const type = tx.tx.TransactionType
analysis.byType[type] = (analysis.byType[type] || 0) + 1

if (tx.tx.Account === account) {
analysis.sent++
} else {
analysis.received++
}

// Daily bucketing
const date = new Date(tx.tx.date * 1000 + 946684800000)
.toISOString().split('T')[0]
analysis.dailyActivity[date] = (analysis.dailyActivity[date] || 0) + 1

// XRP volumes
if (type === 'Payment' && typeof tx.tx.Amount === 'string') {
const amount = parseInt(tx.tx.Amount) / 1_000_000
if (tx.tx.Account === account) {
analysis.totalXrpSent += amount
}
if (tx.tx.Destination === account) {
analysis.totalXrpReceived += amount
}
}
}

return analysis
}
```


┌─────────────────────────────────────────────────────────────────┐
│                    XRPL Data Pipeline                            │
└─────────────────────────────────────────────────────────────────┘
  1. EXTRACTION
  1. TRANSFORMATION
  1. LOADING
  1. SERVING
class XRPLDataPipeline {
  constructor(client, db) {
    this.client = client
    this.db = db
    this.lastProcessedLedger = null
  }

async initialize() {
this.lastProcessedLedger = await this.db.getLastProcessedLedger()
console.log(Resuming from ledger ${this.lastProcessedLedger})
}

async processNewLedgers() {
const serverInfo = await this.client.request({ command: 'server_info' })
const currentLedger = serverInfo.result.info.validated_ledger.seq

const startLedger = this.lastProcessedLedger
? this.lastProcessedLedger + 1
: currentLedger - 1000 // Start 1000 ledgers back if fresh

for (let ledger = startLedger; ledger <= currentLedger; ledger++) {
try {
// Extract
const data = await this.extractLedger(ledger)

// Transform
const transformed = this.transformLedger(data)

// Load
await this.loadLedger(transformed)

this.lastProcessedLedger = ledger

await sleep(50)
} catch (error) {
console.error(Error processing ledger ${ledger}:, error)
// Continue with next ledger
}
}
}

async extractLedger(ledgerIndex) {
const response = await this.client.request({
command: 'ledger',
ledger_index: ledgerIndex,
transactions: true,
expand: true
})
return response.result.ledger
}

transformLedger(ledger) {
const transactions = (ledger.transactions || []).map(tx => ({
hash: tx.hash,
ledger: parseInt(ledger.ledger_index),
type: tx.TransactionType,
account: tx.Account,
destination: tx.Destination,
amount: this.parseAmount(tx.Amount),
fee: parseInt(tx.Fee),
result: tx.metaData?.TransactionResult || tx.meta?.TransactionResult,
timestamp: new Date((ledger.close_time + 946684800) * 1000)
}))

return {
ledgerIndex: parseInt(ledger.ledger_index),
closeTime: new Date((ledger.close_time + 946684800) * 1000),
txCount: transactions.length,
transactions
}
}

parseAmount(amount) {
if (!amount) return null
if (typeof amount === 'string') {
return { currency: 'XRP', value: parseInt(amount) / 1_000_000 }
}
return { currency: amount.currency, value: parseFloat(amount.value), issuer: amount.issuer }
}

async loadLedger(data) {
// Insert ledger summary
await this.db.insertLedger({
index: data.ledgerIndex,
close_time: data.closeTime,
tx_count: data.txCount
})

// Batch insert transactions
if (data.transactions.length > 0) {
await this.db.insertTransactions(data.transactions)
}
}
}
```


class DashboardService {
  constructor(db, cache) {
    this.db = db
    this.cache = cache
  }

async getNetworkOverview() {
const cacheKey = 'network_overview'
let data = await this.cache.get(cacheKey)

if (!data) {
data = await this.db.query( SELECT COUNT(*) as total_tx, COUNT(DISTINCT account) as active_accounts, SUM(CASE WHEN type = 'Payment' THEN 1 ELSE 0 END) as payments, SUM(fee) as total_fees FROM transactions WHERE timestamp > NOW() - INTERVAL '24 hours' )

await this.cache.set(cacheKey, data, 60) // Cache 1 minute
}

return data
}

async getTxTypeDistribution(hours = 24) {
return this.db.query( SELECT type, COUNT(*) as count FROM transactions WHERE timestamp > NOW() - INTERVAL '${hours} hours' GROUP BY type ORDER BY count DESC )
}

async getVolumeTimeSeries(hours = 24, bucket = '1 hour') {
return this.db.query( SELECT time_bucket('${bucket}', timestamp) as bucket, COUNT(*) as tx_count, SUM(CASE WHEN amount->>'currency' = 'XRP' THEN (amount->>'value')::numeric ELSE 0 END) as xrp_volume FROM transactions WHERE timestamp > NOW() - INTERVAL '${hours} hours' GROUP BY bucket ORDER BY bucket )
}
}
```



  • Extracts ledger data with proper pagination
  • Calculates daily transaction volumes by type
  • Tracks active account counts
  • Provides real-time dashboard endpoints

Time Investment: 3-4 hours


End of Lesson 11

Key Takeaways

1

Use Clio for historical queries:

Optimized for reporting workloads; save rippled for real-time.

2

Implement pagination correctly:

Always handle markers; never assume single-page results.

3

Rate limit extraction:

Respect server capacity; add delays between requests.

4

Design for incremental processing:

Track last processed ledger; resume from gaps.

5

Cache dashboard queries:

Pre-compute aggregates; refresh on reasonable intervals. ---