brick icon indicating copy to clipboard operation
brick copied to clipboard

Subscribing to Remote Supabase Events

Open tshedor opened this issue 1 year ago • 12 comments

A continuation of #451 and #448, this issue will track work integrating Supabase's channels with Brick. @Shreemanarjun has expressed interest in leading the initiative (but I won't hold you to it).

If you want to help develop this feature, please comment and tag me and @Shreemanarjun.

Get

From #448, this is a crude starting point with some TODOs to resolve:

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {
    final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
    
    // TODO respect policy and ignore remote subscription if requested

    final channel = supabaseProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          event: PostgresChangeEvent.update,
          schema: 'public',
          // TODO handle filters based on the query
          table: adapter.supabaseTableName,
          callback: (data) async {
            final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
            await storeRemoteResults(remoteResults);
          }
        )
        .subscribe();

    // TODO handle channel.unsubscribe logic

    return super.subscribe(policy: policy, query: query);
  }
}

Delete

Syncing from a delete will be tricky, as a delete operation is not guaranteed to have a remote fetch. @Shreemanarjun work is an example of how to delete from a listener, however, it may be best to use the DestructiveLocalSyncFromRemoteMixin to compare local and remote state (or some version of it).

This code is untested but may be another crude starting point:

final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
final channel = supabaseProvider.client
  .channel(adapter.supabaseTableName)
  .onPostgresChanges(
    event: PostgresChangeEvent.delete,
    schema: 'public',
    // TODO handle filters based on the query
    table: adapter.supabaseTableName,
    callback: (data) async {
      final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
      final localResults = await sqliteProvider.get<TModel>(repository: this);
      final toDelete = localResults.where((r) => !remoteResults.contains(r));

      for (final deletableModel in toDelete) {
        await sqliteProvider.delete<TModel>(deletableModel);
        memoryCacheProvider.delete<TModel>(deletableModel);
      }
    },
  ).subscribe();

// TODO handle channel.unsubscribe logic

tshedor avatar Oct 08 '24 00:10 tshedor

@tshedor I am interested in picking this up.

Josiassejod1 avatar Oct 08 '24 14:10 Josiassejod1

@Josiassejod1 Welcome aboard, happy to have you.


For the #subscribe mixin, I had a thought last night about overriding it fully to resolve the unsubscribe issue:

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  @override
  Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {    
    query ??= Query();
    if (subscriptions[TModel]?[query] != null) {
      return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
    }

    final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
    
    if (policy == OfflineFirstGetPolicy.localOnly) {
      return super.subscribe<TModel>(policy: policy, query: query);
    }

    final channel = supabaseProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          // TODO accept different Postgres events
          event: PostgresChangeEvent.update,
          schema: 'public',
          // TODO handle filters based on the query
          table: adapter.supabaseTableName,
          callback: (data) async {
            final instance = await adapter.fromSupabase(data);
            await storeRemoteResults(instance);
          }
        )
        .subscribe();

    // Should the logic be duplicated from the super class
    // or should there be a protected method to build a stream controller 
    // in the super class? 
    final controller = StreamController<List<TModel>>(
      onCancel: () async {
        await channel.unsubscribe();
        await subscriptions[TModel]?[query]?.close();
        subscriptions[TModel]?.remove(query);
        if (subscriptions[TModel]?.isEmpty ?? false) {
          subscriptions.remove(TModel);
        }
      },
    );

    subscriptions[TModel] ??= {};
    subscriptions[TModel]?[query] = controller;

    // ignore: discarded_futures
    get<TModel>(query: query, policy: policy).then((results) {
      if (!controller.isClosed) controller.add(results);
    });

    return controller.stream;
  }
}

Also, streaming will need to be added to the SupabaseMockServer. The Supabase team tests the realtime feature in their client and some of that code could be recycled.

tshedor avatar Oct 08 '24 15:10 tshedor

They did a good job of syncing in this example: https://github.com/SkillDevs/electric_dart/tree/master/todos_flutter

jtkeyva avatar Oct 09 '24 15:10 jtkeyva

@tshedor sorry for an update comment but I am planning to integrate Brick & can't continue if deletion from remote side doesn't update the client side as well in the future.

Just wanted to know if it will be possible in the future based on what you've tried, so that I can integrate with peace of mind. Sorry again for an update comment.

alterhuman avatar Oct 17 '24 10:10 alterhuman

@Josiassejod1 @Shreemanarjun any updates?

tshedor avatar Oct 17 '24 17:10 tshedor

Sorry for late reply. i solved this way.

import 'dart:async';
import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_supabase/brick_offline_first_with_supabase.dart';
import 'package:supabase_flutter/supabase_flutter.dart';

mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  @override
  Stream<List<TModel>> subscribe<TModel extends OfflineFirstWithSupabaseModel>({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {
    query ??= Query();
    if (subscriptions[TModel]?[query] != null) {
      return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
    }

    final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;

    if (policy == OfflineFirstGetPolicy.localOnly) {
      return super.subscribe<TModel>(policy: policy, query: query);
    }

    final channel = remoteProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          event: PostgresChangeEvent.all,
          schema: 'public',
          table: adapter.supabaseTableName,
          callback: (payload) async {
            final event = payload.eventType;
            final record = payload.newRecord;

            switch (event) {
              case PostgresChangeEvent.insert:
              case PostgresChangeEvent.update:
                final instance = await adapter.fromSupabase(record,
                    provider: remoteProvider);
                await upsert<TModel>(
                  instance as TModel,
                );
                break;
              case PostgresChangeEvent.delete:
                final instance = await adapter.fromSupabase(record,
                    provider: remoteProvider);
                await delete<TModel>(instance as TModel);
                break;
              default:
            }
          },
        )
        .subscribe();

    final controller = StreamController<List<TModel>>(
      onCancel: () async {
        await channel.unsubscribe();
        await subscriptions[TModel]?[query]?.close();
        subscriptions[TModel]?.remove(query);
        if (subscriptions[TModel]?.isEmpty ?? false) {
          subscriptions.remove(TModel);
        }
      },
    );

    subscriptions[TModel] ??= {};
    subscriptions[TModel]?[query] = controller;

    // Fetch initial data
    get<TModel>(query: query, policy: policy).then((results) {
      if (!controller.isClosed) controller.add(results);
    });

    return controller.stream;
  }
}



Shreemanarjun avatar Oct 19 '24 19:10 Shreemanarjun

@Shreemanarjun will you make tests or a PR?

tshedor avatar Oct 19 '24 21:10 tshedor

@Shreemanarjun is that a realtime only solution?

jtkeyva avatar Oct 20 '24 00:10 jtkeyva

@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable

jtkeyva avatar Oct 20 '24 00:10 jtkeyva

@jtkeyva this is open source software. PRs from anyone are welcome at anytime

tshedor avatar Oct 20 '24 00:10 tshedor

@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable

I don't think this is possible without realtime. Even external services require either real time or logical replication to work. Do you have a possible approach without these?

alterhuman avatar Oct 20 '24 06:10 alterhuman

@tshedor I have locally tested this code manually. Please review if you have any suggestions. Thanks for this amazing library.


import 'dart:async';
import 'package:audionotes/bootstrap.dart';
import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_supabase/brick_offline_first_with_supabase.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talker/talker.dart';

/// Custom log type for repository operations
class RepoCustomLog extends TalkerLog {
  RepoCustomLog(String message) : super(message);

  @override
  String get title => '🗄️ Repository';

  @override
  AnsiPen get pen => AnsiPen()..cyan();
}

/// A mixin that adds real-time subscription capabilities using Supabase channels
/// with integrated logging functionality.
///
/// IMPORTANT: This mixin will only work if your Supabase table has real-time enabled.
/// If real-time is not enabled on your Supabase table, this will function the same as
/// the default `subscribe` method without any real-time updates.
///
/// To enable real-time for your Supabase table:
/// 1. Go to your Supabase dashboard
/// 2. Navigate to Database > Replication
/// 3. Enable real-time for the specific table
///
/// Events handled:
/// - INSERT: Handles new record creation
/// - UPDATE: Handles record modifications
/// - DELETE: Handles record deletions
mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
  /// Helper method for consistent repository logging
  void _log(String message) {
    talker.logTyped(RepoCustomLog(message));
  }

  /// Subscribes to real-time updates for a specific model type using Supabase channels.
  ///
  /// Parameters:
  /// - [policy]: Determines how data is fetched (local or remote)
  /// - [query]: Optional query to filter the data
  ///
  /// Returns a Stream of List<TModel> that updates in real-time
  Stream<List<TModel>>
      subscribeWithChannels<TModel extends OfflineFirstWithSupabaseModel>({
    OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
    Query? query,
  }) {
    query ??= Query();

    _log('Initializing subscription for ${TModel.toString()}');

    if (subscriptions[TModel]?[query] != null) {
      _log('Returning existing subscription for ${TModel.toString()}');
      return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
    }

    final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
    if (policy == OfflineFirstGetPolicy.localOnly) {
      _log('Using local-only policy for ${TModel.toString()}');
      return super.subscribe<TModel>(policy: policy, query: query);
    }

    final channel = remoteProvider.client
        .channel(adapter.supabaseTableName)
        .onPostgresChanges(
          event: PostgresChangeEvent.all,
          schema: 'public',
          table: adapter.supabaseTableName,
          callback: (payload) async {
            final event = payload.eventType;
            final record = payload.newRecord;
            final oldRecord = payload.oldRecord;

            try {
              switch (event) {
                case PostgresChangeEvent.insert:
                  try {
                    /// Handle INSERT events for new records:
                    /// 1. Log the incoming insert event for debugging
                    /// 2. Convert Supabase JSON to Brick model instance
                    /// 3. Save to local SQLite database for offline access
                    /// 4. Update memory cache for fast retrieval
                    /// 5. Notify subscribers of the change
                    _log(
                        'Received insert event for ${TModel.toString()} - Record: $record');
                    final instance = await adapter.fromSupabase(record,
                        provider: remoteProvider);
                    await sqliteProvider.upsert<TModel>(instance as TModel);
                    await memoryCacheProvider.upsert<TModel>(instance);
                    await notifySubscriptionsWithLocalData<TModel>();
                  } catch (e, stackTrace) {
                    talker.error(
                      'Failed to process insert event for ${TModel.toString()}',
                      e,
                      stackTrace,
                    );
                  }
                  break;

                case PostgresChangeEvent.update:
                  try {
                    /// Handle UPDATE events for existing records:
                    /// 1. Log the update event for debugging
                    /// 2. Convert updated Supabase JSON to Brick model
                    /// 3. Update local SQLite database to maintain sync
                    /// 4. Refresh memory cache with new data
                    /// 5. Notify subscribers of the update
                    _log(
                        'Received update event for ${TModel.toString()} - Record: $record');
                    final instance = await adapter.fromSupabase(record,
                        provider: remoteProvider);
                    await sqliteProvider.upsert<TModel>(instance as TModel);
                    await memoryCacheProvider.upsert<TModel>(instance);
                    await notifySubscriptionsWithLocalData<TModel>();
                    _log("Update completed successfully");
                  } catch (e, stackTrace) {
                    talker.error(
                      'Failed to process update event for ${TModel.toString()}',
                      e,
                      stackTrace,
                    );
                  }
                  break;

                case PostgresChangeEvent.delete:
                  try {
                    /// Handle DELETE events for removing records:
                    /// 1. Log the delete event for debugging
                    /// 2. Extract the primary key (id) from the old record
                    /// 3. Verify the record exists locally
                    /// 4. Remove from SQLite database if found
                    /// 5. Clear from memory cache
                    /// 6. Notify subscribers of deletion
                    _log(
                        'Received delete event for ${TModel.toString()} - Record: $record OldRecord $oldRecord');
                    final primaryKey = 'id';
                    final modelID = oldRecord[primaryKey] as String?;

                    if (modelID == null) {
                      talker.error(
                          'Delete failed: Invalid or null modelID for ${TModel.toString()}');
                      break;
                    }

                    final localModels = await sqliteProvider.get<TModel>(
                      query: Query.where(
                        primaryKey,
                        modelID,
                        compare: Compare.exact,
                        limit1: true,
                      ),
                    );

                    if (localModels.isEmpty) {
                      _log("No local data found for deletion - ID: $modelID");
                      break;
                    }

                    await sqliteProvider.delete<TModel>(localModels.first);
                    await memoryCacheProvider.delete<TModel>(localModels.first);
                    await notifySubscriptionsWithLocalData<TModel>();
                    _log(
                        "${TModel.toString()} with ID $modelID deleted successfully");
                  } catch (e, stackTrace) {
                    talker.error(
                      'Failed to process delete event for ${TModel.toString()}',
                      e,
                      stackTrace,
                    );
                  }
                  break;

                default:
                  talker.error(
                      'Unhandled PostgresChangeEvent: $event for ${TModel.toString()}');
              }
            } catch (e, stackTrace) {
              _log(
                  'Error processing ${event.toString()} event for ${TModel.toString()}: $e\n$stackTrace');
            }
          },
        )
        .subscribe();

    final controller = StreamController<List<TModel>>(
      onCancel: () async {
        _log('Cleaning up subscription for ${TModel.toString()}');
        await channel.unsubscribe();
        await subscriptions[TModel]?[query]?.close();
        subscriptions[TModel]?.remove(query);
        if (subscriptions[TModel]?.isEmpty ?? false) {
          subscriptions.remove(TModel);
        }
      },
    );

    subscriptions[TModel] ??= {};
    subscriptions[TModel]?[query] = controller;

    // Fetch initial data
    _log('Fetching initial data for ${TModel.toString()}');
    get<TModel>(
      query: query,
      policy: OfflineFirstGetPolicy.alwaysHydrate,
    ).then(
      (results) {
        if (!controller.isClosed) {
          controller.add(results);
          _log(
              'Initial data fetched for ${TModel.toString()} - Count: ${results.length}');
        }
      },
      onError: (e, stackTrace) {
        talker.error(
          'Failed to fetch initial data for ${TModel.toString()}',
          e,
          stackTrace,
        );
      },
    );

    return controller.stream;
  }
}





Shreemanarjun avatar Oct 25 '24 21:10 Shreemanarjun

Testing your code locally at least seems to be working like a charm for me! Great work 🎉

lohnn avatar Oct 28 '24 15:10 lohnn

Looking through the code I do have some super small code pointers:

  • When you are inserting values to a String with String interpolation, you don't need to write .toString(), as that is implicitly used by Dart. Example: 'Initializing subscription for ${TModel.toString()}' could as well be 'Initializing subscription for $TModel', as that would mean the exact same thing.
  • You have a couple of Strings that are defined with double quotes. Codebases usually picks one and sticks to it, and as far as I can see single quotes are preferred in Brick.
  • You are awaiting memoryCacheProvider.xxx() which is not needed, as they are not asynchronous.
  • In the delete case there is a value called primaryKey that can be declared const.
  • Personally I would add trailing commas to a few of the function calls to get prettier line breaks, but that is 100% just based on preferences.
  • I believe that with the oldest supported Dart version that brick_offline_with_supabase supports (3.0.0) you should not have to manually write break; in switch/cases if it is the last line before next case:
break; // <-- These ones should be able to be removed if I am not mistaken
case PostgresChangeEvent.delete:

All of my pointers here should have no effect on functionality, it should just help keep clean and concise code 👍 As previously stated, great work! 🙏

lohnn avatar Oct 28 '24 15:10 lohnn

@Shreemanarjun @lohnn @alterhuman @jtkeyva @devj3ns I have a PR in #472 that captures the back-and-forth work from this issue and all of the TODOs.

Could you please apply the following to your projects' pubspec.yaml and verify that this PR works in all of your cases?

dependency_overrides:
  brick_offline_first_with_supabase:
    git:
      url: [email protected]:GetDutchie/brick.git
      ref: supabase-subscription
      path: packages/brick_offline_first_with_supabase
  brick_offline_first:
    git:
      url: [email protected]:GetDutchie/brick.git
      ref: supabase-subscription
      path: packages/brick_offline_first
  brick_sqlite:
    git:
      url: [email protected]:GetDutchie/brick.git
      ref: supabase-subscription
      path: packages/brick_sqlite
  brick_supabase:
    git:
      url: [email protected]:GetDutchie/brick.git
      ref: supabase-subscription
      path: packages/brick_supabase

tshedor avatar Oct 28 '24 23:10 tshedor

I'll try to try it out, and review the PR during the day :)

lohnn avatar Oct 29 '24 08:10 lohnn

This feature has been released in the latest Brick packages. Please run dart pub upgrade or update your pubspec.yaml to brick_offline_first: 3.3.0 and brick_offline_first_with_supabase: 1.1.0.

To subscribe to realtime events (as documented here):

// Listen to all changes
final customers = MyRepository().subscribeToRealtime<Customer>();
// Or listen to results of a specific filter
final customers = MyRepository().subscribeToRealtime<Customer>(query: Query.where('id', 1));

// Use the stream results
final customersSubscription = customers.listen((value) {});

// Always close your streams
await customersSubscription.cancel();

Thank you to @Shreemanarjun for the foundational code and @lohnn for manually verification.

For bugs or feature requests related to realtime, please open a new issue.

tshedor avatar Oct 30 '24 17:10 tshedor

is this THE way (or only way) to let our app know that something got deleted from the server and thus remove it locally?

or would you recommend re-qeurying data? like for a crm with thousands of contacts, how do you rekon this works? or setting an expiration day/time for offline data?

jtkeyva avatar Nov 01 '24 00:11 jtkeyva

This certainly is not the only way, it's just the way that uses Supabase's Realtime offering.

Other architectures would require a specific backend setup (or specific client polling and reconciliation), which is out of scope for Brick. I personally only have the capacity to answer Brick-related questions and problems on this repo.

tshedor avatar Nov 01 '24 00:11 tshedor

got it thanks

jtkeyva avatar Nov 01 '24 00:11 jtkeyva