rooch icon indicating copy to clipboard operation
rooch copied to clipboard

[JS-SDK] Support subscribes via WebSocket

Open yubing744 opened this issue 9 months ago • 6 comments

Proposal Details

Support subscribes via WebSocket in JS-SDK

  • [ ] New Transaction
  • [ ] New Contract Event

yubing744 avatar Mar 28 '25 13:03 yubing744

RoochClient Subscription Feature Design

This document outlines the design of the subscription functionality for RoochClient, a TypeScript-based client for interacting with the Rooch blockchain. The design leverages the distinct JSON-RPC methods (rooch_subscribeEvents for events and rooch_subscribeTransactions for transactions) as observed in the provided RoochWebSocketClient implementation. The API remains developer-friendly, abstracting the complexity of filter conversion and method selection.

Design Goals

  • Simplicity: Provide an intuitive API for subscribing to events and transactions with minimal configuration.
  • Type Safety: Use TypeScript to enforce correct filter usage and provide autocompletion.
  • Alignment with Backend: Reflect the distinct JSON-RPC methods (rooch_subscribeEvents and rooch_subscribeTransactions) in the design.
  • Encapsulation: Handle filter conversion and WebSocket communication internally.

Subscription Types and Methods

The subscription feature supports two distinct types, each mapped to a specific JSON-RPC method:

  • "event": Subscribes to contract events using rooch_subscribeEvents.
  • "transaction": Subscribes to transactions using rooch_subscribeTransactions.

API Design

1. RoochClient.subscribe Method

The subscribe method serves as the unified entry point, dispatching to the appropriate JSON-RPC method based on the subscription type.

Interface Definition

interface SubscriptionOptions {
  type: 'event' | 'transaction';          // Subscription type
  filter?: EventFilter | TransactionFilter; // Optional filter based on type
  onEvent: (event: SubscriptionEvent) => void; // Callback for received events
  onError?: (error: Error) => void;       // Optional callback for errors
}

interface Subscription {
  id: string;                            // Unique subscription identifier
  unsubscribe: () => void;               // Method to cancel the subscription
}

class RoochClient {
  subscribe(options: SubscriptionOptions): Subscription {
    // Implementation details below
  }
}
  • type: Determines the subscription type ("event" or "transaction") and maps to rooch_subscribeEvents or rooch_subscribeTransactions.
  • filter: An optional filter specific to the subscription type.
  • onEvent: Callback invoked with event or transaction data.
  • onError: Optional callback for error handling.
  • Return Value: A Subscription object with an id and unsubscribe method.

2. Filter Types

Filters are designed to be simple and combinable, mirroring the EventFilterView and TransactionFilterView structures while keeping the API user-friendly.

2.1 EventFilter

interface EventFilter {
  eventType?: string;                   // Event type (StructTagView)
  sender?: string;                      // Sender address (UnitedAddressView)
  eventHandleId?: string;               // Event handle ID (ObjectIDView)
  txHash?: string;                      // Transaction hash (H256View)
  timeRange?: { start: number; end: number }; // Time range in seconds
  orderRange?: { from: number; to: number };  // Order range
  all?: boolean;                        // Subscribe to all events if true
}
  • Fields: Combinable (e.g., { eventType: "SomeEvent", sender: "0x123" }).
  • all: Subscribes to all events if true.

2.2 TransactionFilter

interface TransactionFilter {
  sender?: string;                      // Sender address (UnitedAddressView)
  txHashes?: string[];                  // List of transaction hashes (H256View[])
  timeRange?: { start: number; end: number }; // Time range in seconds
  orderRange?: { from: number; to: number };  // Order range
  all?: boolean;                        // Subscribe to all transactions if true
}
  • Fields: Combinable (e.g., { sender: "0x123", txHashes: ["0xabc"] }).
  • all: Subscribes to all transactions if true.

3. Event Data

The onEvent callback receives data structured according to the subscription type, reflecting the IndexerEventView for events and TransactionWithInfoView for transactions from the Rooch RPC API.

SubscriptionEvent

interface IndexerEventView {
  indexer_event_id: { tx_order: string; event_index: string };
  event_id: { event_handle_id: string; event_seq: string };
  event_type: string;
  event_data: string;
  tx_hash: string;
  sender: string;
  created_at: string;
  decoded_event_data?: any;
}

interface TransactionWithInfoView {
  transaction: {
    data:
      | { type: 'l1_block'; data: { chain_id: string; block_height: string; block_hash: string; bitcoin_block_hash?: string } }
      | { type: 'l1_tx'; data: { chain_id: string; block_hash: string; bitcoin_block_hash?: string; txid: string; bitcoin_txid?: string } }
      | { type: 'l2_tx'; data: { sender: string /* + other fields */ } };
    sequence_info: { tx_order: string; tx_timestamp: string /* + other fields */ };
  };
  execution_info?: { tx_hash: string /* + other fields */ };
}

type SubscriptionEvent =
  | { type: 'event'; data: IndexerEventView }
  | { type: 'transaction'; data: TransactionWithInfoView };
  • type: 'event': Returns an IndexerEventView object, which includes detailed event data such as event_type, sender, tx_hash, and optional decoded_event_data.
  • type: 'transaction': Returns a TransactionWithInfoView object, which includes the transaction data (L1 block, L1 transaction, or L2 transaction) and optional execution info.

Filter Conversion Logic

The subscribe method converts the user-provided filters into the corresponding EventFilterView or TransactionFilterView structures required by the JSON-RPC methods.

Event Filter Conversion

function toEventFilterView(filter: EventFilter): EventFilterView {
  if (filter.all) return 'All';
  
  if (filter.eventType && filter.sender) {
    return { event_type: filter.eventType, sender: { rooch_address: filter.sender } };
  }
  if (filter.eventType) {
    return { event_type: filter.eventType };
  }
  if (filter.eventHandleId && filter.sender) {
    return { event_handle_id: filter.eventHandleId, sender: { rooch_address: filter.sender } };
  }
  if (filter.eventHandleId) {
    return { event_handle_id: filter.eventHandleId };
  }
  if (filter.sender) {
    return { sender: { rooch_address: filter.sender } };
  }
  if (filter.txHash) {
    return { tx_hash: filter.txHash };
  }
  if (filter.timeRange) {
    return { 
      start_time: { 0: filter.timeRange.start.toString() }, 
      end_time: { 0: filter.timeRange.end.toString() } 
    };
  }
  if (filter.orderRange) {
    return { 
      from_order: { 0: filter.orderRange.from.toString() }, 
      to_order: { 0: filter.orderRange.to.toString() } 
    };
  }
  throw new Error('Invalid event filter configuration');
}

Transaction Filter Conversion

function toTransactionFilterView(filter: TransactionFilter): TransactionFilterView {
  if (filter.all) return 'All';
  
  if (filter.sender) {
    return { sender: { rooch_address: filter.sender } };
  }
  if (filter.txHashes) {
    return { tx_hashes: filter.txHashes };
  }
  if (filter.timeRange) {
    return { 
      start_time: { 0: filter.timeRange.start.toString() }, 
      end_time: { 0: filter.timeRange.end.toString() } 
    };
  }
  if (filter.orderRange) {
    return { 
      from_order: { 0: filter.orderRange.from.toString() }, 
      to_order: { 0: filter.orderRange.to.toString() } 
    };
  }
  throw new Error('Invalid transaction filter configuration');
}

Implementation Details

1. WebSocket Integration

  • Method Selection: The subscribe method selects the appropriate JSON-RPC method based on type:
    • "event"rooch_subscribeEvents
    • "transaction"rooch_subscribeTransactions
  • Subscription Request: Constructs and sends a JSON-RPC message via the WebSocket transport.
    • Example for events:
      {
        "jsonrpc": "2.0",
        "method": "rooch_subscribeEvents",
        "params": [{ "event_type": "SomeEvent", "sender": { "rooch_address": "0x123" } }],
        "id": 1
      }
      
    • Example for transactions:
      {
        "jsonrpc": "2.0",
        "method": "rooch_subscribeTransactions",
        "params": [{ "sender": { "rooch_address": "0x123" } }],
        "id": 1
      }
      
  • Event Handling: Listens for WebSocket onmessage events, parsing responses with "method": "subscription" and invoking onEvent.

2. Unsubscription

  • Unsubscribe Request: The unsubscribe method sends a rooch_unsubscribe message with the subscription ID.
    {
      "jsonrpc": "2.0",
      "method": "rooch_unsubscribe",
      "params": ["sub_123456"],
      "id": 2
    }
    

3. Subscription Management

  • Maintains a Set of subscription IDs internally, similar to RoochWebSocketClient.
  • Ensures cleanup when unsubscribe is called or the client is destroyed.

Usage Examples

Example 1: Subscribe to All Events

const subscription = client.subscribe({
  type: 'event',
  filter: { all: true },
  onEvent: (event) => {
    console.log(`New event: ${event.eventType}`);
  },
});

subscription.unsubscribe(); // Cancel later

Example 2: Subscribe to Transactions by Sender

const subscription = client.subscribe({
  type: 'transaction',
  filter: { sender: '0x1234...' },
  onEvent: (event) => {
    console.log(`New transaction from ${event.sender}: ${event.txHash}`);
  },
});

Example 3: Subscribe to Events by Type and Sender

const subscription = client.subscribe({
  type: 'event',
  filter: { eventType: 'SomeEvent', sender: '0x1234...' },
  onEvent: (event) => {
    console.log(`Event ${event.eventType} from ${event.sender}`);
  },
});

Conclusion

This design aligns with the Rooch backend's use of distinct JSON-RPC methods (rooch_subscribeEvents and rooch_subscribeTransactions) while maintaining a simple, unified API for developers. The subscribe method encapsulates method selection and filter conversion, ensuring ease of use and type safety. The implementation leverages the existing RoochWebSocketTransport for seamless integration with the SDK.

yubing744 avatar Mar 28 '25 14:03 yubing744

@jolestar @wow-sven Please help review the technical solution for JS-SDK subscription.

yubing744 avatar Mar 28 '25 14:03 yubing744

The subscribe argument follows the RPC.

jolestar avatar Mar 28 '25 15:03 jolestar

The subscribe argument follows the RPC.

OK, I will remove the Filter Conversion Logic and directly expose the RPC filter parameters

yubing744 avatar Mar 29 '25 07:03 yubing744

Revised RoochClient Subscription Feature Design

This document outlines the updated design for the subscription functionality in RoochClient, a TypeScript-based client for interacting with the Rooch blockchain. The design leverages the JSON-RPC methods rooch_subscribeEvents for events and rooch_subscribeTransactions for transactions, as implemented in RoochWebSocketClient. Per the requirement, the filter conversion logic is removed, and the subscription filters directly mirror the Rust-defined EventFilterView and TransactionFilterView enums.

Design Goals

  • Simplicity: Provide an intuitive API for subscribing to events and transactions.
  • Type Safety: Leverage TypeScript to enforce correct filter structures and provide autocompletion.
  • Alignment with Backend: Use the Rust-defined filters directly, matching the rooch_subscribeEvents and rooch_subscribeTransactions expectations.
  • Encapsulation: Handle WebSocket communication internally while exposing Rust-aligned filter types.

Subscription Types and Methods

The subscription feature supports two types, each tied to a specific JSON-RPC method:

  • "event": Subscribes to contract events via rooch_subscribeEvents.
  • "transaction": Subscribes to transactions via rooch_subscribeTransactions.

API Design

1. RoochClient.subscribe Method

The subscribe method remains the unified entry point, dispatching to the appropriate JSON-RPC method based on the subscription type. The filter parameter now directly accepts EventFilterView or TransactionFilterView types without conversion.

Interface Definition

interface SubscriptionOptions {
  type: 'event' | 'transaction';          // Subscription type
  filter?: EventFilterView | TransactionFilterView; // Optional Rust-defined filter
  onEvent: (event: SubscriptionEvent) => void; // Callback for received events
  onError?: (error: Error) => void;       // Optional callback for errors
}

interface Subscription {
  id: string;                            // Unique subscription identifier
  unsubscribe: () => void;               // Method to cancel the subscription
}

class RoochClient {
  subscribe(options: SubscriptionOptions): Subscription {
    const { type, filter, onEvent, onError } = options;
    const method = type === 'event' ? 'rooch_subscribeEvents' : 'rooch_subscribeTransactions';
    const params = filter ? [filter] : [];
    // Implementation: Send JSON-RPC request via WebSocket with method and params
    // Return Subscription object with id and unsubscribe method
  }
}
  • type: Specifies the subscription type ("event" or "transaction"), mapping to rooch_subscribeEvents or rooch_subscribeTransactions.
  • filter: An optional filter matching the Rust EventFilterView or TransactionFilterView type, based on type.
  • onEvent: Callback invoked with event or transaction data.
  • onError: Optional callback for error handling.
  • Return Value: A Subscription object containing an id and unsubscribe method.

2. Filter Types

Filters are defined in TypeScript to directly reflect the Rust EventFilterView and TransactionFilterView enums, ensuring compatibility with the JSON-RPC methods without additional conversion.

2.1 EventFilterView

Mirrors the Rust EventFilterView enum:

interface UnitedAddressView {
  rooch_address: string; // Represents UnitedAddressView as a string address
}

type EventFilterView =
  | { event_type_with_sender: { event_type: string; sender: UnitedAddressView } }
  | { event_type: string }
  | { event_handle_with_sender: { event_handle_id: string; sender: UnitedAddressView } }
  | { event_handle: string }
  | { sender: UnitedAddressView }
  | { tx_hash: string }
  | { time_range: { start_time: string; end_time: string } }
  | { tx_order_range: { from_order: string; to_order: string } }
  | 'All';
  • Fields: Each variant corresponds to a Rust enum variant:
    • event_type_with_sender: Filters events by event type and sender.
    • event_type: Filters by event type (StructTagView as string).
    • event_handle_with_sender: Filters by event handle ID and sender.
    • event_handle: Filters by event handle ID (ObjectIDView as string).
    • sender: Filters by sender address (UnitedAddressView).
    • tx_hash: Filters by transaction hash (H256View as string).
    • time_range: Filters by time interval (StrView<u64> fields as strings).
    • tx_order_range: Filters by transaction order range (StrView<u64> fields as strings).
    • 'All': Subscribes to all events.

2.2 TransactionFilterView

Mirrors the Rust TransactionFilterView enum:

type TransactionFilterView =
  | { sender: UnitedAddressView }
  | { tx_hashes: string[] }
  | { time_range: { start_time: string; end_time: string } }
  | { tx_order_range: { from_order: string; to_order: string } }
  | 'All';
  • Fields: Each variant corresponds to a Rust enum variant:
    • sender: Filters by sender address (UnitedAddressView).
    • tx_hashes: Filters by a list of transaction hashes (Vec<H256View> as string array).
    • time_range: Filters by time interval (StrView<u64> fields as strings).
    • tx_order_range: Filters by transaction order range (StrView<u64> fields as strings).
    • 'All': Subscribes to all transactions.

Notes on Type Definitions:

  • StructTagView, ObjectIDView, H256View, and StrView<u64> are represented as strings in TypeScript, assuming they serialize to strings in JSON.
  • UnitedAddressView is modeled as an object { rooch_address: string }, reflecting its likely serialization format.

3. Event Data

The onEvent callback receives data as defined in the original design, reflecting IndexerEventView for events and TransactionWithInfoView for transactions.

SubscriptionEvent

interface IndexerEventView {
  indexer_event_id: { tx_order: string; event_index: string };
  event_id: { event_handle_id: string; event_seq: string };
  event_type: string;
  event_data: string;
  tx_hash: string;
  sender: string;
  created_at: string;
  decoded_event_data?: any;
}

interface TransactionWithInfoView {
  transaction: {
    data:
      | { type: 'l1_block'; data: { chain_id: string; block_height: string; block_hash: string; bitcoin_block_hash?: string } }
      | { type: 'l1_tx'; data: { chain_id: string; block_hash: string; bitcoin_block_hash?: string; txid: string; bitcoin_txid?: string } }
      | { type: 'l2_tx'; data: { sender: string /* + other fields */ } };
    sequence_info: { tx_order: string; tx_timestamp: string /* + other fields */ };
  };
  execution_info?: { tx_hash: string /* + other fields */ };
}

type SubscriptionEvent =
  | { type: 'event'; data: IndexerEventView }
  | { type: 'transaction'; data: TransactionWithInfoView };
  • type: 'event': Contains IndexerEventView with event details.
  • type: 'transaction': Contains TransactionWithInfoView with transaction details.

Implementation Details

1. WebSocket Integration

  • Method Selection: Based on type:
    • "event"rooch_subscribeEvents
    • "transaction"rooch_subscribeTransactions
  • Subscription Request: Sends a JSON-RPC message with the filter directly as the first params element.
    • Example for events:
      {
        "jsonrpc": "2.0",
        "method": "rooch_subscribeEvents",
        "params": [{ "event_type": "SomeEvent" }],
        "id": 1
      }
      
    • Example for transactions:
      {
        "jsonrpc": "2.0",
        "method": "rooch_subscribeTransactions",
        "params": [{ "sender": { "rooch_address": "0x123" } }],
        "id": 1
      }
      
  • Event Handling: Parses WebSocket messages with "method": "subscription" and invokes onEvent.

2. Unsubscription

  • Sends a rooch_unsubscribe message with the subscription ID:
    {
      "jsonrpc": "2.0",
      "method": "rooch_unsubscribe",
      "params": ["sub_123456"],
      "id": 2
    }
    

3. Subscription Management

  • Maintains a Set of subscription IDs, ensuring cleanup on unsubscribe or client destruction.

Usage Examples

Example 1: Subscribe to All Events

const subscription = client.subscribe({
  type: 'event',
  filter: 'All',
  onEvent: (event) => {
    if (event.type === 'event') {
      console.log(`New event: ${event.data.event_type}`);
    }
  },
});

subscription.unsubscribe();

Example 2: Subscribe to Transactions by Sender

const subscription = client.subscribe({
  type: 'transaction',
  filter: { sender: { rooch_address: '0x1234...' } },
  onEvent: (event) => {
    if (event.type === 'transaction') {
      console.log(`New transaction: ${event.data.transaction.data}`);
    }
  },
});

Example 3: Subscribe to Events by Type and Sender

const subscription = client.subscribe({
  type: 'event',
  filter: { event_type_with_sender: { event_type: 'SomeEvent', sender: { rooch_address: '0x1234...' } } },
  onEvent: (event) => {
    if (event.type === 'event') {
      console.log(`Event ${event.data.event_type} from ${event.data.sender}`);
    }
  },
});

Conclusion

This revised design removes the filter conversion logic, directly using the Rust-defined EventFilterView and TransactionFilterView enums in TypeScript. The subscribe method accepts these filters as-is, ensuring alignment with the Rooch backend’s JSON-RPC methods while maintaining type safety and developer usability. The implementation leverages the existing WebSocket transport for seamless integration.

yubing744 avatar Mar 29 '25 23:03 yubing744

Appendix Module: RoochSubscriptionTransport Design

Design Goals

RoochSubscriptionTransport is a transport layer module designed for the subscription functionality of RoochClient. Its goals are as follows:

  • Abstract Subscription Transport Layer: Provide a unified interface to support multiple transport methods (e.g., WebSocket, SSE, etc.).
  • Decouple from RoochClient: Separate subscription logic from RoochClient, allowing it to focus on general request-response functionality.
  • Extensibility: Support the addition of new transport methods in the future without modifying existing code.
  • Event Stream Support: Ensure stability and efficiency for long-lived connections and real-time event streams.
  • Reconnection Handling: Manage reconnection logic for interrupted connections, minimizing disruption to active subscriptions.
  • Error Handling: Provide mechanisms to handle transport-level errors, such as failed reconnections or network interruptions.

Interface Definition

The RoochSubscriptionTransport interface defines core methods for subscription functionality, connection management, and error handling:

interface RoochSubscriptionTransport {
  /**
   * Subscribe to a specific request
   * @param request JSON-RPC request object containing method and parameters
   * @returns A Subscription object containing the subscription ID and an unsubscribe method
   */
  subscribe(request: JsonRpcRequest): Promise<Subscription>;

  /**
   * Unsubscribe from a specific subscription
   * @param subscriptionId The subscription ID
   */
  unsubscribe(subscriptionId: string): void;

  /**
   * Register a callback to handle subscription events
   * @param callback Function to handle subscription events
   */
  onEvent(callback: (event: any) => void): void;

  /**
   * Register a callback to handle reconnection events
   * @param callback Function called when the connection is re-established
   */
  onReconnected(callback: () => void): void;

  /**
   * Register a callback to handle transport-level errors
   * @param callback Function to handle errors such as failed reconnections or network issues
   */
  onError(callback: (error: Error) => void): void;

  /**
   * Destroy transport layer resources and close the connection
   */
  destroy(): void;
}
  • subscribe: Initiates a subscription request and returns a Subscription object.
  • unsubscribe: Cancels a specific subscription.
  • onEvent: Registers a callback to handle subscription events.
  • onReconnected: Registers a callback to be invoked when the connection is re-established.
  • onError: Registers a callback to handle transport-level errors.
  • destroy: Releases resources and closes the connection.

SubscriptionOptions Definition

SubscriptionOptions is used to configure subscriptions at the client level:

interface SubscriptionOptions {
  type: 'event' | 'transaction';          // Subscription type
  filter?: EventFilterView | TransactionFilterView; // Optional filter
  onEvent: (event: SubscriptionEvent) => void; // Event callback
}
  • type: Specifies the subscription type (event or transaction).
  • filter: An optional filter based on the Rust-defined EventFilterView or TransactionFilterView.
  • onEvent: Callback function to receive events.

Subscription Definition

The Subscription object represents an active subscription:

interface Subscription {
  id: string;                            // Subscription ID
  unsubscribe: () => void;               // Method to cancel the subscription
}
  • id: Unique identifier for the subscription.
  • unsubscribe: Method to cancel the subscription.

Implementation Example: RoochWebSocketTransport

RoochWebSocketTransport is a concrete implementation based on the WebSocket protocol, handling reconnections, subscription event management, and error handling:

class RoochWebSocketTransport implements RoochSubscriptionTransport {
  private ws: WebSocket;
  private eventEmitter = new EventEmitter();
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;
  private subscriptions: Map<string, SubscriptionOptions> = new Map();

  constructor(url: string) {
    this.ws = new WebSocket(url);
    this.ws.onmessage = (event) => this.handleMessage(event);
    this.ws.onclose = () => this.handleReconnect();
    this.ws.onopen = () => this.eventEmitter.emit('reconnected');
  }

  subscribe(request: JsonRpcRequest): Promise<Subscription> {
    return new Promise((resolve, reject) => {
      if (this.ws.readyState !== WebSocket.OPEN) {
        const error = new Error('WebSocket connection is not open');
        this.eventEmitter.emit('error', error); // Emit error if connection is not open
        return reject(error);
      }
      this.ws.send(JSON.stringify(request));
      // Assume the server returns a subscription ID, simulated here
      const subscriptionId = `sub_${Date.now()}`;
      resolve({
        id: subscriptionId,
        unsubscribe: () => this.unsubscribe(subscriptionId),
      });
    });
  }

  unsubscribe(subscriptionId: string): void {
    if (this.subscriptions.has(subscriptionId)) {
      const unsubscribeRequest = {
        jsonrpc: '2.0',
        method: 'rooch_unsubscribe',
        params: [subscriptionId],
        id: Date.now(),
      };
      this.ws.send(JSON.stringify(unsubscribeRequest));
      this.subscriptions.delete(subscriptionId);
    }
  }

  onEvent(callback: (event: any) => void): void {
    this.eventEmitter.on('subscriptionEvent', callback);
  }

  onReconnected(callback: () => void): void {
    this.eventEmitter.on('reconnected', callback);
  }

  onError(callback: (error: Error) => void): void {
    this.eventEmitter.on('error', callback);
  }

  private handleMessage(event: MessageEvent): void {
    const data = JSON.parse(event.data);
    if (data.method === 'subscription') {
      this.eventEmitter.emit('subscriptionEvent', data.params.result);
    }
  }

  private handleReconnect(): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      const error = new Error('Max reconnect attempts reached');
      this.eventEmitter.emit('error', error); // Emit error when max attempts are exceeded
      return;
    }
    this.reconnectAttempts++;
    setTimeout(() => {
      this.ws = new WebSocket(this.url);
      this.ws.onmessage = (event) => this.handleMessage(event);
      this.ws.onclose = () => this.handleReconnect();
      this.ws.onopen = () => {
        this.reconnectAttempts = 0;
        this.eventEmitter.emit('reconnected');
      };
    }, this.reconnectDelay * this.reconnectAttempts);
  }

  destroy(): void {
    this.ws.close();
    this.subscriptions.clear();
  }
}
  • Features:
    • Manages WebSocket connections, including reconnection logic.
    • Emits 'subscriptionEvent' events upon receiving subscription messages.
    • Emits 'reconnected' events when the connection is re-established.
    • Emits 'error' events for transport-level errors, such as failed reconnections or network interruptions.
    • Supports subscription and unsubscription requests.

Integration in RoochClient

RoochClient integrates subscription functionality, handles re-subscription after reconnection, and provides error handling:

export class RoochClient {
  private transport: RoochSubscriptionTransport;
  private subscriptions: Map<string, SubscriptionOptions> = new Map();

  constructor(options: RoochClientOptions) {
    this.transport = options.subscriptionTransport ?? new RoochWebSocketTransport(options.url);
    // Register subscription event listeners
    this.transport.onEvent((event) => this.handleEvent(event));
    // Register reconnection listener for re-subscription
    this.transport.onReconnected(() => this.resubscribeAll());
    // Register error listener for transport-level errors
    this.transport.onError((error) => this.handleError(error));
  }

  subscribe(options: SubscriptionOptions): Subscription {
    const { type, filter, onEvent } = options;
    const method = type === 'event' ? 'rooch_subscribeEvents' : 'rooch_subscribeTransactions';
    const params = filter ? [filter] : [];
    const request = { method, params };

    const subscription = await this.transport.subscribe(request);
    this.subscriptions.set(subscription.id, options);
    return subscription;
  }

  private handleEvent(event: any): void {
    const subscriptionId = event.subscription;
    const options = this.subscriptions.get(subscriptionId);
    if (options) {
      options.onEvent(event.result);
    }
  }

  private resubscribeAll(): void {
    for (const [id, options] of this.subscriptions) {
      const method = options.type === 'event' ? 'rooch_subscribeEvents' : 'rooch_subscribeTransactions';
      const params = options.filter ? [options.filter] : [];
      const request = { method, params };
      this.transport.subscribe(request);
    }
  }

  private handleError(error: Error): void {
    console.error('Transport error:', error.message);
    // Custom logic: Notify users, fallback to polling, etc.
  }

  destroy(): void {
    this.transport.destroy();
    this.subscriptions.clear();
  }
}
  • Advantages:
    • RoochSubscriptionTransport handles connection management, reconnections, and error reporting.
    • RoochClient manages subscription logic, automatically re-subscribes after reconnection, and provides centralized error handling.
    • Separates transport layer concerns from business logic, ensuring flexibility and maintainability.

Conclusion

The updated design ensures that RoochSubscriptionTransport focuses on managing connections, event streams, and error handling, while RoochClient handles subscription business logic, including re-subscription after reconnection and centralized error reporting. This separation of concerns improves system robustness, extensibility, and maintainability, making it suitable for production-grade applications.

yubing744 avatar Mar 29 '25 23:03 yubing744