web3-flight-rpc-server icon indicating copy to clipboard operation
web3-flight-rpc-server copied to clipboard

Blockchain data streaming in Apache Arrow format. Web3 data for data engineers, ready for large scale analysis

web3-flight-rpc-server

Blockchain data streaming in Apache Arrow format. Web3 data for data engineers, ready for large-scale analysis

Project Overview

This is an implementation of Apache Arrow Flight RPC server for blockchain data. Apache Arrow format allows zero-copy streaming data processing on top of standard eth-web3 RPC.

There are a few key points why to consider flight rpc:

  • Plenty of clients implementations including JavaScript, Go, Python, R and Rust (see: https://arrow.apache.org/docs/implementations.html)

  • Rich libraries support. Process eth json-rpc data with your favorite data framework:

    • pandas
    • polars
    • apache datafusion
    • and many more natively supporting apache arrow format, all with zero copy and without time loss on serialization and deserialization between the formats
  • ADBC Support Ready-made connectors to most popular databases and formats allow you to write data effortlessly

Supported Datasets

All datasets requests supports following params:

  • start_block block number to start streaming from (default LATEST)
  • end_block block number to stop streaming (default NULL)
  • batch_size

Blocks

See schema for stream return type

Schema

private static final Schema BLOCK_SCHEMA = new Schema(List.of(
        Field.nullable(BLOCK_NUMBER, new ArrowType.Int(64, true)),
        Field.nullable(BLOCK_HASH_FIELD, new ArrowType.Utf8()),
        Field.nullable(BLOCK_PARENT_HASH, new ArrowType.Utf8()),
        Field.nullable(BLOCK_TIMESTAMP, new ArrowType.Int(64, true)),
        Field.nullable(BLOCK_AUTHOR, new ArrowType.Utf8()),
        Field.nullable(BLOCK_MINER, new ArrowType.Utf8()),
        Field.nullable(BLOCK_TRANSACTIONS_ROOT, new ArrowType.Utf8()),
        Field.nullable(BLOCK_STATE_ROOT, new ArrowType.Utf8()),
        Field.nullable(BLOCK_RECEIPTS_ROOT, new ArrowType.Utf8()),
        Field.nullable(BLOCK_LOGS_BLOOM, new ArrowType.Utf8()),
        Field.nullable(BLOCK_GAS_LIMIT, new ArrowType.Int(64, true)),
        Field.nullable(BLOCK_GAS_USED, new ArrowType.Int(64, true)),
        Field.nullable(BLOCK_SIZE, new ArrowType.Int(64, true)),
        Field.nullable(BLOCK_EXTRA_DATA, new ArrowType.Utf8()),
        Field.nullable(BLOCK_NONCE, new ArrowType.Utf8()),
        Field.nullable(BLOCK_DIFFICULTY, new ArrowType.Utf8()),
        Field.nullable(BLOCK_TOTAL_DIFFICULTY, new ArrowType.Utf8()),
        Field.nullable(BLOCK_MIX_HASH, new ArrowType.Utf8()),
        Field.nullable(BLOCK_SHA3_UNCLES, new ArrowType.Utf8()),
        new Field(BLOCK_TRANSACTIONS, new FieldType(true, new ArrowType.List(), null),
                Lists.newArrayList(Field.nullable(BLOCK_TX_CHILD_NAME, new ArrowType.Utf8()))),
        new Field(BLOCK_UNCLES, new FieldType(true, new ArrowType.List(), null),
                Lists.newArrayList(Field.nullable(BLOCK_UNCLE_CHILD_NAME, new ArrowType.Utf8()))),
        new Field(BLOCK_SEAL_FIELDS, new FieldType(true, new ArrowType.List(), null),
                Lists.newArrayList(Field.nullable(BLOCK_SEAL_FIELD_CHILD_NAME, new ArrowType.Utf8())))
));

Logs

Additional parameters:

  • topics - list of topics to pass to web3 rpc filter
  • addresses - list of addresses to pass to web3 rpc filter

Schema

private static final Schema LOG_SCHEMA = new Schema(List.of(
        Field.nullable(LOG_ADDRESS, new ArrowType.Utf8()),
        Field.nullable(LOG_DATA, new ArrowType.Utf8()),
        new Field(LOG_TOPICS, new FieldType(true, new ArrowType.List(), null),
                Lists.newArrayList(Field.nullable(LOG_TOPIC_CHILD_NAME, new ArrowType.Utf8()))),
        Field.nullable(LOG_BLOCK_NUMBER, new ArrowType.Int(64, true)),
        Field.nullable(LOG_TRANSACTION_HASH, new ArrowType.Utf8()),
        Field.nullable(LOG_TRANSACTION_INDEX, new ArrowType.Int(32, true)),
        Field.nullable(LOG_BLOCK_HASH, new ArrowType.Utf8()),
        Field.nullable(LOG_INDEX, new ArrowType.Int(32, true)),
        Field.nullable(LOG_REMOVED, new ArrowType.Bool())
));

Requirements

  • Java 21
  • Access to an Ethereum node (for the server). Infura was used for development.

Building the Project

To build the project, run:

mvn clean package

This will create JAR file:

  • server/target/server.jar - The server component

Running the Applications

Important: VM Options

Application requires the following VM options:

--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED

Running the Server

The server requires an Ethereum node URL and can optionally specify a port for the Flight server.

Using environment variables:

java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar

With environment variables:

  • ETHEREUM_NODE_URL - URL of the Ethereum node (required)
  • FLIGHT_PORT - Port for the Flight server (default: 8815)

Using command line arguments:

java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar <ethereum_node_url> <flight_port>

Example:

java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar https://mainnet.infura.io/v3/your-api-key 8815

Running the Client

The client can connect to a Flight server at a specified host and port.

java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar [host] [port]

By default, it connects to 127.0.0.1:8815 if no arguments are provided.

Example:

java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar 127.0.0.1 8815

Example Workflow

  1. Start the server with an Ethereum node URL:

    java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar https://mainnet.infura.io/v3/your-api-key 8815
    
  2. In another terminal, start the client:

    java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar 127.0.0.1 8815
    
  3. The client will connect to the server and start receiving Ethereum logs data and displaying it in TSV format.

Notes

  • The server requires a valid Ethereum node URL to function properly

More client examples

Stay tuned for Python and Datafusion examples :)