Subscribing to Remote Supabase Events
A continuation of #451 and #448, this issue will track work integrating Supabase's channels with Brick. @Shreemanarjun has expressed interest in leading the initiative (but I won't hold you to it).
If you want to help develop this feature, please comment and tag me and @Shreemanarjun.
Get
From #448, this is a crude starting point with some TODOs to resolve:
mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
Query? query,
}) {
final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
// TODO respect policy and ignore remote subscription if requested
final channel = supabaseProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: PostgresChangeEvent.update,
schema: 'public',
// TODO handle filters based on the query
table: adapter.supabaseTableName,
callback: (data) async {
final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
await storeRemoteResults(remoteResults);
}
)
.subscribe();
// TODO handle channel.unsubscribe logic
return super.subscribe(policy: policy, query: query);
}
}
Delete
Syncing from a delete will be tricky, as a delete operation is not guaranteed to have a remote fetch. @Shreemanarjun work is an example of how to delete from a listener, however, it may be best to use the DestructiveLocalSyncFromRemoteMixin to compare local and remote state (or some version of it).
This code is untested but may be another crude starting point:
final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
final channel = supabaseProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: 'public',
// TODO handle filters based on the query
table: adapter.supabaseTableName,
callback: (data) async {
final remoteResults = await Future.wait<TModel>(data.map(adapter.fromSupabase));
final localResults = await sqliteProvider.get<TModel>(repository: this);
final toDelete = localResults.where((r) => !remoteResults.contains(r));
for (final deletableModel in toDelete) {
await sqliteProvider.delete<TModel>(deletableModel);
memoryCacheProvider.delete<TModel>(deletableModel);
}
},
).subscribe();
// TODO handle channel.unsubscribe logic
@tshedor I am interested in picking this up.
@Josiassejod1 Welcome aboard, happy to have you.
For the #subscribe mixin, I had a thought last night about overriding it fully to resolve the unsubscribe issue:
mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
@override
Stream<List<TModel extends OfflineFirstWithSupabaseModel>> subscribe({
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
Query? query,
}) {
query ??= Query();
if (subscriptions[TModel]?[query] != null) {
return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
}
final adapter = supabaseProvider.modelDictionary.adapterFor[TModel]!;
if (policy == OfflineFirstGetPolicy.localOnly) {
return super.subscribe<TModel>(policy: policy, query: query);
}
final channel = supabaseProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
// TODO accept different Postgres events
event: PostgresChangeEvent.update,
schema: 'public',
// TODO handle filters based on the query
table: adapter.supabaseTableName,
callback: (data) async {
final instance = await adapter.fromSupabase(data);
await storeRemoteResults(instance);
}
)
.subscribe();
// Should the logic be duplicated from the super class
// or should there be a protected method to build a stream controller
// in the super class?
final controller = StreamController<List<TModel>>(
onCancel: () async {
await channel.unsubscribe();
await subscriptions[TModel]?[query]?.close();
subscriptions[TModel]?.remove(query);
if (subscriptions[TModel]?.isEmpty ?? false) {
subscriptions.remove(TModel);
}
},
);
subscriptions[TModel] ??= {};
subscriptions[TModel]?[query] = controller;
// ignore: discarded_futures
get<TModel>(query: query, policy: policy).then((results) {
if (!controller.isClosed) controller.add(results);
});
return controller.stream;
}
}
Also, streaming will need to be added to the SupabaseMockServer. The Supabase team tests the realtime feature in their client and some of that code could be recycled.
They did a good job of syncing in this example: https://github.com/SkillDevs/electric_dart/tree/master/todos_flutter
@tshedor sorry for an update comment but I am planning to integrate Brick & can't continue if deletion from remote side doesn't update the client side as well in the future.
Just wanted to know if it will be possible in the future based on what you've tried, so that I can integrate with peace of mind. Sorry again for an update comment.
@Josiassejod1 @Shreemanarjun any updates?
Sorry for late reply. i solved this way.
import 'dart:async';
import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_supabase/brick_offline_first_with_supabase.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
@override
Stream<List<TModel>> subscribe<TModel extends OfflineFirstWithSupabaseModel>({
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
Query? query,
}) {
query ??= Query();
if (subscriptions[TModel]?[query] != null) {
return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
}
final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
if (policy == OfflineFirstGetPolicy.localOnly) {
return super.subscribe<TModel>(policy: policy, query: query);
}
final channel = remoteProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: adapter.supabaseTableName,
callback: (payload) async {
final event = payload.eventType;
final record = payload.newRecord;
switch (event) {
case PostgresChangeEvent.insert:
case PostgresChangeEvent.update:
final instance = await adapter.fromSupabase(record,
provider: remoteProvider);
await upsert<TModel>(
instance as TModel,
);
break;
case PostgresChangeEvent.delete:
final instance = await adapter.fromSupabase(record,
provider: remoteProvider);
await delete<TModel>(instance as TModel);
break;
default:
}
},
)
.subscribe();
final controller = StreamController<List<TModel>>(
onCancel: () async {
await channel.unsubscribe();
await subscriptions[TModel]?[query]?.close();
subscriptions[TModel]?.remove(query);
if (subscriptions[TModel]?.isEmpty ?? false) {
subscriptions.remove(TModel);
}
},
);
subscriptions[TModel] ??= {};
subscriptions[TModel]?[query] = controller;
// Fetch initial data
get<TModel>(query: query, policy: policy).then((results) {
if (!controller.isClosed) controller.add(results);
});
return controller.stream;
}
}
@Shreemanarjun will you make tests or a PR?
@Shreemanarjun is that a realtime only solution?
@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable
@jtkeyva this is open source software. PRs from anyone are welcome at anytime
@tshedor any chance on implementing this for non-realtime? i was about to commit to using Brick but this is a major roadblock making it essentially unusable
I don't think this is possible without realtime. Even external services require either real time or logical replication to work. Do you have a possible approach without these?
@tshedor I have locally tested this code manually. Please review if you have any suggestions. Thanks for this amazing library.
import 'dart:async';
import 'package:audionotes/bootstrap.dart';
import 'package:brick_offline_first/brick_offline_first.dart';
import 'package:brick_offline_first_with_supabase/brick_offline_first_with_supabase.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:talker/talker.dart';
/// Custom log type for repository operations
class RepoCustomLog extends TalkerLog {
RepoCustomLog(String message) : super(message);
@override
String get title => '🗄️ Repository';
@override
AnsiPen get pen => AnsiPen()..cyan();
}
/// A mixin that adds real-time subscription capabilities using Supabase channels
/// with integrated logging functionality.
///
/// IMPORTANT: This mixin will only work if your Supabase table has real-time enabled.
/// If real-time is not enabled on your Supabase table, this will function the same as
/// the default `subscribe` method without any real-time updates.
///
/// To enable real-time for your Supabase table:
/// 1. Go to your Supabase dashboard
/// 2. Navigate to Database > Replication
/// 3. Enable real-time for the specific table
///
/// Events handled:
/// - INSERT: Handles new record creation
/// - UPDATE: Handles record modifications
/// - DELETE: Handles record deletions
mixin SubscribeWithChannels on OfflineFirstWithSupabaseRepository {
/// Helper method for consistent repository logging
void _log(String message) {
talker.logTyped(RepoCustomLog(message));
}
/// Subscribes to real-time updates for a specific model type using Supabase channels.
///
/// Parameters:
/// - [policy]: Determines how data is fetched (local or remote)
/// - [query]: Optional query to filter the data
///
/// Returns a Stream of List<TModel> that updates in real-time
Stream<List<TModel>>
subscribeWithChannels<TModel extends OfflineFirstWithSupabaseModel>({
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
Query? query,
}) {
query ??= Query();
_log('Initializing subscription for ${TModel.toString()}');
if (subscriptions[TModel]?[query] != null) {
_log('Returning existing subscription for ${TModel.toString()}');
return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
}
final adapter = remoteProvider.modelDictionary.adapterFor[TModel]!;
if (policy == OfflineFirstGetPolicy.localOnly) {
_log('Using local-only policy for ${TModel.toString()}');
return super.subscribe<TModel>(policy: policy, query: query);
}
final channel = remoteProvider.client
.channel(adapter.supabaseTableName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: adapter.supabaseTableName,
callback: (payload) async {
final event = payload.eventType;
final record = payload.newRecord;
final oldRecord = payload.oldRecord;
try {
switch (event) {
case PostgresChangeEvent.insert:
try {
/// Handle INSERT events for new records:
/// 1. Log the incoming insert event for debugging
/// 2. Convert Supabase JSON to Brick model instance
/// 3. Save to local SQLite database for offline access
/// 4. Update memory cache for fast retrieval
/// 5. Notify subscribers of the change
_log(
'Received insert event for ${TModel.toString()} - Record: $record');
final instance = await adapter.fromSupabase(record,
provider: remoteProvider);
await sqliteProvider.upsert<TModel>(instance as TModel);
await memoryCacheProvider.upsert<TModel>(instance);
await notifySubscriptionsWithLocalData<TModel>();
} catch (e, stackTrace) {
talker.error(
'Failed to process insert event for ${TModel.toString()}',
e,
stackTrace,
);
}
break;
case PostgresChangeEvent.update:
try {
/// Handle UPDATE events for existing records:
/// 1. Log the update event for debugging
/// 2. Convert updated Supabase JSON to Brick model
/// 3. Update local SQLite database to maintain sync
/// 4. Refresh memory cache with new data
/// 5. Notify subscribers of the update
_log(
'Received update event for ${TModel.toString()} - Record: $record');
final instance = await adapter.fromSupabase(record,
provider: remoteProvider);
await sqliteProvider.upsert<TModel>(instance as TModel);
await memoryCacheProvider.upsert<TModel>(instance);
await notifySubscriptionsWithLocalData<TModel>();
_log("Update completed successfully");
} catch (e, stackTrace) {
talker.error(
'Failed to process update event for ${TModel.toString()}',
e,
stackTrace,
);
}
break;
case PostgresChangeEvent.delete:
try {
/// Handle DELETE events for removing records:
/// 1. Log the delete event for debugging
/// 2. Extract the primary key (id) from the old record
/// 3. Verify the record exists locally
/// 4. Remove from SQLite database if found
/// 5. Clear from memory cache
/// 6. Notify subscribers of deletion
_log(
'Received delete event for ${TModel.toString()} - Record: $record OldRecord $oldRecord');
final primaryKey = 'id';
final modelID = oldRecord[primaryKey] as String?;
if (modelID == null) {
talker.error(
'Delete failed: Invalid or null modelID for ${TModel.toString()}');
break;
}
final localModels = await sqliteProvider.get<TModel>(
query: Query.where(
primaryKey,
modelID,
compare: Compare.exact,
limit1: true,
),
);
if (localModels.isEmpty) {
_log("No local data found for deletion - ID: $modelID");
break;
}
await sqliteProvider.delete<TModel>(localModels.first);
await memoryCacheProvider.delete<TModel>(localModels.first);
await notifySubscriptionsWithLocalData<TModel>();
_log(
"${TModel.toString()} with ID $modelID deleted successfully");
} catch (e, stackTrace) {
talker.error(
'Failed to process delete event for ${TModel.toString()}',
e,
stackTrace,
);
}
break;
default:
talker.error(
'Unhandled PostgresChangeEvent: $event for ${TModel.toString()}');
}
} catch (e, stackTrace) {
_log(
'Error processing ${event.toString()} event for ${TModel.toString()}: $e\n$stackTrace');
}
},
)
.subscribe();
final controller = StreamController<List<TModel>>(
onCancel: () async {
_log('Cleaning up subscription for ${TModel.toString()}');
await channel.unsubscribe();
await subscriptions[TModel]?[query]?.close();
subscriptions[TModel]?.remove(query);
if (subscriptions[TModel]?.isEmpty ?? false) {
subscriptions.remove(TModel);
}
},
);
subscriptions[TModel] ??= {};
subscriptions[TModel]?[query] = controller;
// Fetch initial data
_log('Fetching initial data for ${TModel.toString()}');
get<TModel>(
query: query,
policy: OfflineFirstGetPolicy.alwaysHydrate,
).then(
(results) {
if (!controller.isClosed) {
controller.add(results);
_log(
'Initial data fetched for ${TModel.toString()} - Count: ${results.length}');
}
},
onError: (e, stackTrace) {
talker.error(
'Failed to fetch initial data for ${TModel.toString()}',
e,
stackTrace,
);
},
);
return controller.stream;
}
}
Testing your code locally at least seems to be working like a charm for me! Great work 🎉
Looking through the code I do have some super small code pointers:
- When you are inserting values to a String with String interpolation, you don't need to write
.toString(), as that is implicitly used by Dart. Example:'Initializing subscription for ${TModel.toString()}'could as well be'Initializing subscription for $TModel', as that would mean the exact same thing. - You have a couple of Strings that are defined with double quotes. Codebases usually picks one and sticks to it, and as far as I can see single quotes are preferred in Brick.
- You are awaiting
memoryCacheProvider.xxx()which is not needed, as they are not asynchronous. - In the
deletecase there is a value calledprimaryKeythat can be declared const. - Personally I would add trailing commas to a few of the function calls to get prettier line breaks, but that is 100% just based on preferences.
- I believe that with the oldest supported Dart version that brick_offline_with_supabase supports (3.0.0) you should not have to manually write
break;in switch/cases if it is the last line before next case:
break; // <-- These ones should be able to be removed if I am not mistaken
case PostgresChangeEvent.delete:
All of my pointers here should have no effect on functionality, it should just help keep clean and concise code 👍 As previously stated, great work! 🙏
@Shreemanarjun @lohnn @alterhuman @jtkeyva @devj3ns I have a PR in #472 that captures the back-and-forth work from this issue and all of the TODOs.
Could you please apply the following to your projects' pubspec.yaml and verify that this PR works in all of your cases?
dependency_overrides:
brick_offline_first_with_supabase:
git:
url: [email protected]:GetDutchie/brick.git
ref: supabase-subscription
path: packages/brick_offline_first_with_supabase
brick_offline_first:
git:
url: [email protected]:GetDutchie/brick.git
ref: supabase-subscription
path: packages/brick_offline_first
brick_sqlite:
git:
url: [email protected]:GetDutchie/brick.git
ref: supabase-subscription
path: packages/brick_sqlite
brick_supabase:
git:
url: [email protected]:GetDutchie/brick.git
ref: supabase-subscription
path: packages/brick_supabase
I'll try to try it out, and review the PR during the day :)
This feature has been released in the latest Brick packages. Please run dart pub upgrade or update your pubspec.yaml to brick_offline_first: 3.3.0 and brick_offline_first_with_supabase: 1.1.0.
To subscribe to realtime events (as documented here):
// Listen to all changes
final customers = MyRepository().subscribeToRealtime<Customer>();
// Or listen to results of a specific filter
final customers = MyRepository().subscribeToRealtime<Customer>(query: Query.where('id', 1));
// Use the stream results
final customersSubscription = customers.listen((value) {});
// Always close your streams
await customersSubscription.cancel();
Thank you to @Shreemanarjun for the foundational code and @lohnn for manually verification.
For bugs or feature requests related to realtime, please open a new issue.
is this THE way (or only way) to let our app know that something got deleted from the server and thus remove it locally?
or would you recommend re-qeurying data? like for a crm with thousands of contacts, how do you rekon this works? or setting an expiration day/time for offline data?
This certainly is not the only way, it's just the way that uses Supabase's Realtime offering.
Other architectures would require a specific backend setup (or specific client polling and reconciliation), which is out of scope for Brick. I personally only have the capacity to answer Brick-related questions and problems on this repo.
got it thanks