Analytics and Data Extraction
Learning Objectives
Extract historical transaction data efficiently using pagination
Optimize queries using Clio servers for reporting workloads
Calculate network statistics (volume, active accounts, transaction counts)
Design data pipelines that process ledger data at scale
Build real-time analytics dashboards with appropriate refresh strategies
async function getAllAccountTransactions(client, account, options = {}) {
const transactions = []
let marker = undefined
let pageCount = 0
const {
minLedger = -1,
maxLedger = -1,
limit = 400,
maxPages = 1000,
onPage = null
} = options
do {
const response = await client.request({
command: 'account_tx',
account: account,
ledger_index_min: minLedger,
ledger_index_max: maxLedger,
limit: limit,
marker: marker,
forward: true // Oldest first for consistent processing
})
const pageTxs = response.result.transactions
transactions.push(...pageTxs)
marker = response.result.marker
pageCount++
if (onPage) {
await onPage(pageTxs, pageCount, transactions.length)
}
// Rate limiting
await sleep(100)
} while (marker && pageCount < maxPages)
return {
transactions,
complete: !marker,
pageCount
}
}
```
async function extractLedgerRange(client, startLedger, endLedger, processor) {
const stats = {
ledgersProcessed: 0,
transactionsProcessed: 0,
errors: []
}
for (let ledger = startLedger; ledger <= endLedger; ledger++) {
try {
const response = await client.request({
command: 'ledger',
ledger_index: ledger,
transactions: true,
expand: true
})
const ledgerData = response.result.ledger
const transactions = ledgerData.transactions || []
await processor({
ledgerIndex: ledger,
closeTime: ledgerData.close_time,
transactions: transactions,
txCount: transactions.length
})
stats.ledgersProcessed++
stats.transactionsProcessed += transactions.length
// Rate limiting - adjust based on server capacity
await sleep(50)
} catch (error) {
stats.errors.push({ ledger, error: error.message })
}
// Progress logging
if (stats.ledgersProcessed % 100 === 0) {
console.log(Processed ${stats.ledgersProcessed} ledgers, ${stats.transactionsProcessed} txs)
}
}
return stats
}
```
CLIO VS RIPPLED FOR ANALYTICS:
Clio:
✓ Optimized for historical queries
✓ Faster pagination
✓ Lower resource impact
✓ Better for reporting workloads
rippled:
✓ Real-time data
✓ Transaction submission
✓ Admin methods
✓ Subscription streams
class AnalyticsClient {
constructor(clioUrl, rippledUrl) {
this.clioClient = new xrpl.Client(clioUrl) // For historical
this.rippledClient = new xrpl.Client(rippledUrl) // For real-time
}
async connect() {
await Promise.all([
this.clioClient.connect(),
this.rippledClient.connect()
])
}
// Historical queries → Clio
async getAccountHistory(account, options) {
return getAllAccountTransactions(this.clioClient, account, options)
}
// Real-time queries → rippled
async getCurrentBalance(account) {
return this.rippledClient.request({
command: 'account_info',
account: account,
ledger_index: 'validated'
})
}
}
```
class NetworkStatsCalculator {
constructor(client) {
this.client = client
}
async getLedgerStats(ledgerIndex) {
const response = await this.client.request({
command: 'ledger',
ledger_index: ledgerIndex,
transactions: true,
expand: true
})
const txs = response.result.ledger.transactions || []
const stats = {
ledgerIndex,
closeTime: response.result.ledger.close_time,
txCount: txs.length,
byType: {},
xrpVolume: 0,
fees: 0,
accounts: new Set()
}
for (const tx of txs) {
// Count by type
const type = tx.TransactionType
stats.byType[type] = (stats.byType[type] || 0) + 1
// Sum fees
stats.fees += parseInt(tx.Fee || 0)
// Track accounts
stats.accounts.add(tx.Account)
if (tx.Destination) stats.accounts.add(tx.Destination)
// Sum XRP volume for payments
if (type === 'Payment' && typeof tx.Amount === 'string') {
const meta = tx.metaData || tx.meta
if (meta?.TransactionResult === 'tesSUCCESS') {
const delivered = meta.delivered_amount || tx.Amount
if (typeof delivered === 'string') {
stats.xrpVolume += parseInt(delivered)
}
}
}
}
stats.uniqueAccounts = stats.accounts.size
delete stats.accounts // Don't return the Set
return stats
}
async getDailyStats(startLedger, endLedger) {
const dailyStats = {
totalTx: 0,
totalFees: 0,
totalXrpVolume: 0,
byType: {},
ledgers: 0
}
for (let l = startLedger; l <= endLedger; l++) {
const stats = await this.getLedgerStats(l)
dailyStats.totalTx += stats.txCount
dailyStats.totalFees += stats.fees
dailyStats.totalXrpVolume += stats.xrpVolume
dailyStats.ledgers++
for (const [type, count] of Object.entries(stats.byType)) {
dailyStats.byType[type] = (dailyStats.byType[type] || 0) + count
}
await sleep(50)
}
return dailyStats
}
}
```
async function analyzeAccountActivity(client, account, days = 30) {
const now = Date.now()
const cutoff = now - (days * 24 * 60 * 60 * 1000)
const { transactions } = await getAllAccountTransactions(client, account)
const analysis = {
totalTransactions: transactions.length,
sent: 0,
received: 0,
byType: {},
dailyActivity: {},
totalXrpSent: 0,
totalXrpReceived: 0
}
for (const tx of transactions) {
const type = tx.tx.TransactionType
analysis.byType[type] = (analysis.byType[type] || 0) + 1
if (tx.tx.Account === account) {
analysis.sent++
} else {
analysis.received++
}
// Daily bucketing
const date = new Date(tx.tx.date * 1000 + 946684800000)
.toISOString().split('T')[0]
analysis.dailyActivity[date] = (analysis.dailyActivity[date] || 0) + 1
// XRP volumes
if (type === 'Payment' && typeof tx.tx.Amount === 'string') {
const amount = parseInt(tx.tx.Amount) / 1_000_000
if (tx.tx.Account === account) {
analysis.totalXrpSent += amount
}
if (tx.tx.Destination === account) {
analysis.totalXrpReceived += amount
}
}
}
return analysis
}
```
┌─────────────────────────────────────────────────────────────────┐
│ XRPL Data Pipeline │
└─────────────────────────────────────────────────────────────────┘
- EXTRACTION
- TRANSFORMATION
- LOADING
- SERVING
class XRPLDataPipeline {
constructor(client, db) {
this.client = client
this.db = db
this.lastProcessedLedger = null
}
async initialize() {
this.lastProcessedLedger = await this.db.getLastProcessedLedger()
console.log(Resuming from ledger ${this.lastProcessedLedger})
}
async processNewLedgers() {
const serverInfo = await this.client.request({ command: 'server_info' })
const currentLedger = serverInfo.result.info.validated_ledger.seq
const startLedger = this.lastProcessedLedger
? this.lastProcessedLedger + 1
: currentLedger - 1000 // Start 1000 ledgers back if fresh
for (let ledger = startLedger; ledger <= currentLedger; ledger++) {
try {
// Extract
const data = await this.extractLedger(ledger)
// Transform
const transformed = this.transformLedger(data)
// Load
await this.loadLedger(transformed)
this.lastProcessedLedger = ledger
await sleep(50)
} catch (error) {
console.error(Error processing ledger ${ledger}:, error)
// Continue with next ledger
}
}
}
async extractLedger(ledgerIndex) {
const response = await this.client.request({
command: 'ledger',
ledger_index: ledgerIndex,
transactions: true,
expand: true
})
return response.result.ledger
}
transformLedger(ledger) {
const transactions = (ledger.transactions || []).map(tx => ({
hash: tx.hash,
ledger: parseInt(ledger.ledger_index),
type: tx.TransactionType,
account: tx.Account,
destination: tx.Destination,
amount: this.parseAmount(tx.Amount),
fee: parseInt(tx.Fee),
result: tx.metaData?.TransactionResult || tx.meta?.TransactionResult,
timestamp: new Date((ledger.close_time + 946684800) * 1000)
}))
return {
ledgerIndex: parseInt(ledger.ledger_index),
closeTime: new Date((ledger.close_time + 946684800) * 1000),
txCount: transactions.length,
transactions
}
}
parseAmount(amount) {
if (!amount) return null
if (typeof amount === 'string') {
return { currency: 'XRP', value: parseInt(amount) / 1_000_000 }
}
return { currency: amount.currency, value: parseFloat(amount.value), issuer: amount.issuer }
}
async loadLedger(data) {
// Insert ledger summary
await this.db.insertLedger({
index: data.ledgerIndex,
close_time: data.closeTime,
tx_count: data.txCount
})
// Batch insert transactions
if (data.transactions.length > 0) {
await this.db.insertTransactions(data.transactions)
}
}
}
```
class DashboardService {
constructor(db, cache) {
this.db = db
this.cache = cache
}
async getNetworkOverview() {
const cacheKey = 'network_overview'
let data = await this.cache.get(cacheKey)
if (!data) {
data = await this.db.query( SELECT COUNT(*) as total_tx, COUNT(DISTINCT account) as active_accounts, SUM(CASE WHEN type = 'Payment' THEN 1 ELSE 0 END) as payments, SUM(fee) as total_fees FROM transactions WHERE timestamp > NOW() - INTERVAL '24 hours' )
await this.cache.set(cacheKey, data, 60) // Cache 1 minute
}
return data
}
async getTxTypeDistribution(hours = 24) {
return this.db.query( SELECT type, COUNT(*) as count FROM transactions WHERE timestamp > NOW() - INTERVAL '${hours} hours' GROUP BY type ORDER BY count DESC )
}
async getVolumeTimeSeries(hours = 24, bucket = '1 hour') {
return this.db.query( SELECT time_bucket('${bucket}', timestamp) as bucket, COUNT(*) as tx_count, SUM(CASE WHEN amount->>'currency' = 'XRP' THEN (amount->>'value')::numeric ELSE 0 END) as xrp_volume FROM transactions WHERE timestamp > NOW() - INTERVAL '${hours} hours' GROUP BY bucket ORDER BY bucket )
}
}
```
- Extracts ledger data with proper pagination
- Calculates daily transaction volumes by type
- Tracks active account counts
- Provides real-time dashboard endpoints
Time Investment: 3-4 hours
End of Lesson 11
Key Takeaways
Use Clio for historical queries:
Optimized for reporting workloads; save rippled for real-time.
Implement pagination correctly:
Always handle markers; never assume single-page results.
Rate limit extraction:
Respect server capacity; add delays between requests.
Design for incremental processing:
Track last processed ledger; resume from gaps.
Cache dashboard queries:
Pre-compute aggregates; refresh on reasonable intervals. ---