Calculate a 24-hour block-by-block moving average of the ETH-USDC pair on Uniswap

Suppose the user wants to calculate the 24-hour (~7200 blocks) moving average for the ETH-USDC pair on Uniswap, it can be done following these steps

  1. Using the filtered collections, filtered_receipts_logs and token0_prices, from the previous example as input streams. We assume that the block numbers were mixed into the input stream @input("ETHEREUM_RECEIPTS_LOGS") using the map operator for future use.

  2. Merge the two streams into a collection of tuples (log, price) using the zip operator.

  3. Transform the logs into tuples (block_number, price) using the map operator.

  4. Perform the following steps in parallel:

    • Sum the prices of all swaps in each block.

    • Count the number of swaps in each block.

  5. Combine the two outputs of the previous step into a collection of tuples using the zip operator.

  6. Use the map operator to transform the tuple (sum, count) into average values (i.e., divide sum by count).

  7. Use the custom operator arrange_by_key to index the average prices by block number.

  8. Use the custom operator moving_avg, which can be applied to the arrangement collection, to compute the average value for the "largest" 7200 elements with a block_window_size in the arrangement (i.e., average values for the last 7200 blocks).

Below is the code for the dataflow that performs the described steps:

// points 1-3
collection batched_price_tuples = filtered_receipts_logs
	   .zip(token0_prices)
    .map((log, price) => (log.block_number, price))
    .batch_by_key()

The batch_by_key operator informs the compiler that the keys (in our case, log.block_number) obtained using the map operator will be clustered together in the underlying stream of the collection. This allows the compiler to choose a more efficient implementation for subsequent operators. In this case, all logs within the same block will be grouped together in the stream, appearing consecutively. The batch_by_key operator utilizes an iterative context, which avoids indexing (transforming into an arrangement collection) in each of the following two parallel operations.

// point 4
collection volumes = batched_price_tuples.sum_by_key()
collection counts = batched_price_tuples.count_by_key()

To build an arrangement collection, we need to order the average price values by block number. For this purpose, we can use the built-in operator arrange_by_key. Internally, it constructs a Merkle tree (not a multiset hash) of the delta average prices and orders them by block number (key), providing efficient "random access" to historical updates using membership proofs. Then, we use the consolidate operator to instruct the data stream to always combine delta average prices for identical keys.

// points 5-7
arrangement collection average_prices_by_block = volumes
    .zip(counts)
    .map(((block_num, volume), (_, count)) => (block_num, volume / count))
    .arrange_by_key()
    .consolidate()

This scheme can be reused by any number of operators. To demonstrate this, the user also calculates a 1-hour (~300 blocks) moving average using the same ordered collection:

// point 8
export collection moving_avg_24hr =
average_prices_by_block.moving_avg(7200)

export collection moving_avg_1hr =
average_prices_by_block.moving_avg(300)

In the custom operator moving_avg the user specifies that the arranged_items in the input stream are an ordered collection using the keyword arranged. Ordered collections allow the operator to perform key-based search and iterate over the values in the collection. The keyword single in the output declaration indicates that the output is a collection that should always contain a single element. This is syntactic sugar that, in combination with the sum operator, informs the compiler that each subsequent element in the input stream collection should result in removing the old value of the single-element collection, current_moving_avg, and adding a new value where the sum of the values gradually accumulates. The operator also uses a generic type parameter, indicating that it can operate on any ordered collection of elements of any type "T" for which division and addition operations are well-defined.

operator moving_avg<T: Div<T> + Add<T>> (block_window_len: i32) {
    input arranged arranged_items: T
    output single current_moving_avg = arranged_items
        .iter_descending()
        .take(block_window_len)
        .map(block_average_priсe => 
block_average_priсe / block_window_len)
        .sum()
}

Last updated