DynamicData
DynamicData copied to clipboard
MergeMany not behaving as expected
Just to preface this, I think this issue is due to a misunderstanding on my part but I'm hoping to clear this up for others and come up with a solution that works.
I was attempting to use MergeMany(parent => parent.ChildCache) in conjunction with DistinctValues(child => child) to union/or all the children items together given a cache of parents. This was not working and I realised this was down to MergeMany not working as I expected.
Here is a fairly simple example based on the existing unit tests:
var parent = new SourceCache<SourceCache<Person, string>, object>(o => o);
var source1 = new SourceCache<Person, string>(p => p.Name);
var source2 = new SourceCache<Person, string>(p => p.Name);
var person1 = new Person("Person1", 11);
var person2 = new Person("Person2", 12);
var results = parent.Connect().MergeMany(p => p.Connect()).AsAggregator();
source1.AddOrUpdate(person1);
source2.AddOrUpdate(person2);
parent.AddOrUpdate(source1);
parent.AddOrUpdate(source2);
parent.Remove(source1);
// the removal will not be included here, only the two additions
results.Messages.Should().HaveCount(3);
In this case the removal of source1 does not result in a removal of person1 from the results as MergeMany is not 'change set aware' like I had assumed it was and does not remove all previous items when the source is removed.
What I'd really like to do is very similar to the existing DynamicCombiner functionality (which works well) except to do so in a fluent way without having to manually create and maintain a SourceList<IObservable<IChangeSet<TObject, TKey>>>
I did come up with a new class and extension method leveraging the existing functionality to make this happen. Is this the right approach to take or am I missing something here?
Is this worth a PR with further work? It needs to be extended to handle all change reasons and other variants apart from union/or may be useful. The usage is identical to MergeMany above but you call UnionMany instead.
public static class DynamicDataExtensions
{
public static IObservable<IChangeSet<TObjectOut, TKeyOut>> UnionMany<TObjectOut, TKeyOut, TObjectIn, TKeyIn>(
this IObservable<IChangeSet<TObjectIn, TKeyIn>> source, Func<TObjectIn, IObservable<IChangeSet<TObjectOut, TKeyOut>>> cacheSelector)
{
return new Combiner<TObjectOut, TKeyOut, TKeyIn>(source.Transform(cacheSelector)).Run();
}
}
public class Combiner<TObject, TKey, TKeyIn> : IDisposable
{
private readonly SourceList<IObservable<IChangeSet<TObject, TKey>>> _list;
private readonly CompositeDisposable _disposables;
public Combiner(IObservable<IChangeSet<IObservable<IChangeSet<TObject, TKey>>, TKeyIn>> source)
{
_list = new SourceList<IObservable<IChangeSet<TObject, TKey>>>();
var subscription = source.Subscribe(changes =>
{
foreach (var change in changes)
{
if (change.Reason == ChangeReason.Add)
{
_list.Add(change.Current);
}
else if (change.Reason == ChangeReason.Remove)
{
_list.Remove(change.Current);
}
}
});
_disposables = new CompositeDisposable(subscription, _list);
}
public IObservable<IChangeSet<TObject, TKey>> Run()
{
return _list.Or();
}
public void Dispose()
{
_disposables?.Dispose();
}
}
What you say makes sense regarding merge many. It was never meant to be change set aware, but that's caught many consumers out. From an API point of view my concern is that providing yet another operator, could further confuse matters.
Thanks for the feedback Roland, I can understand not wanting to introduce more operators.
Perhaps the better question is should MergeMany be change set aware? It's possible to introduce an overload that does work as expected on IChangeSet but the downside would be backwards compatibility / breaking change issues for some users. Could introduce a default boolean to control this behaviour? Maybe a bit obtuse.
I did see a blog post (linked by another issue) for a MergeManyEx: https://www.magentaize.xyz/posts/mergemany-do-not-remove-elements-of-removed-observables-in-dynamicdata/
Simple answer here may just to be clarify the comments and documentation and leave it to users to implement as needed.
I have been looking at the overloads for the first time in years (and with fresh eyes). We can achieve the semantic equivalent of the Or operator when usingMergeMany with either a child list or a child cache. Locally, I just created the following overloads and it compiles fine and detects when I select either a child list or a child cache:
using System;
// ReSharper disable once CheckNamespace
namespace DynamicData
{
public static class ObservableCacheEx_MergeMany
{
/*
* The next 2 overloads are for selecting an item which has a child cache
*/
public static IObservable<TDestination> MergeMany<TObject, TKey, TDestination, TDestinationKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<IChangeSet<TDestination, TDestinationKey>>> observableSelector)
where TKey : notnull
where TDestinationKey : notnull
{
return source.MergeMany((t, k) => observableSelector(t));
}
public static IObservable<TDestination> MergeMany<TObject, TKey, TDestination, TDestinationKey>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject,TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> observableSelector)
where TKey : notnull
where TDestinationKey : notnull
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector));
//Construct an Or operator - similar to example
throw new NotImplementedException();
}
/*
* The next 2 overloads are for selecting an item which has a child list
*/
public static IObservable<TDestination> MergeMany<TObject, TKey, TDestination>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IObservable<IChangeSet<TDestination>>> observableSelector)
where TKey : notnull
{
return source.MergeMany((t, k) => observableSelector(t));
}
public static IObservable<TDestination> MergeMany<TObject, TKey, TDestination>(this IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination>>> observableSelector)
where TKey : notnull
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector));
//Construct an Or operator - similar to example
throw new NotImplementedException();
}
}
}
With your example, the following snippets use the above overloads:
var cacheOfCache = new SourceCache<SourceCache<Person, string>, object>(o => o);
var resultsCache1 = cacheOfCache.Connect().MergeMany(p => p.Connect());
var resultsCache2 = cacheOfCache.Connect().MergeMany((p,key) => p.Connect());
var cacheOfList = new SourceCache<SourceList<Person>, object>(o => o);
var resultsList1 = cacheOfList.Connect().MergeMany(p => p.Connect());
var resultsList2 = cacheOfList.Connect().MergeMany((p, key) => p.Connect());
This means we can implement the solution without introducing a new operator.
Hi, I'm the author of the blog mentioned above and please let me thank RolandPheasant for this amazing project first. It's glad to me that someone else has the same problem of MergeMany also.
I don't like to use Or operator after Connect. It looks like a magic to me and associating MergeMany and Or together is a hard job. On the other hand, to use Combiner without exception requires the parent list is not empty.
private static IObservable<IChangeSet<T>> Combine<T>(this IObservable<IChangeSet<T>> source, CombineOperator type, params IObservable<IChangeSet<T>>[] others)
{
if (source is null)
{
throw new ArgumentNullException(nameof(source));
}
if (others is null)
{
throw new ArgumentNullException(nameof(others));
}
// Here!
if (others.Length == 0)
{
throw new ArgumentException("Must be at least one item to combine with", nameof(others));
}
var items = source.EnumerateOne().Union(others).ToList();
return new Combiner<T>(items, type).Run();
}
In fact, after creating the merged list, in my scenario, it's almost always empty because reactive objects are like statements and there's no workload has been applied.
I also prefer to modify MergeMany as opposed to introduce a new operator, if it can meet the requirement of processing observable changes of parent list.
Is this still an open issue? I implemented it for my current project and would be delighted for the chance to contribute as this library is amazing and has been very helpful. If there is interest, I'm happy to submit a full PR with unit tests, etc.
The implementation uses SubscribeMany to maintain a local IntermediateCache (and a custom operator that is like PopulateInto except with auto-remove).
Here is a simplified version:
public static IObservable<IChangeSet<TDestination, TDestinationKey>> MergeMany<TObject, TKey, TDestination, TDestinationKey>(
this IObservable<IChangeSet<TObject, TKey>> source,
Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> targetSelector)
where TKey : notnull
where TDestinationKey : notnull =>
Observable.Create<IChangeSet<TDestination, TDestinationKey>>(observer =>
{
var intermediateCache = new IntermediateCache<TDestination, TDestinationKey>();
IDisposable subscribeManySubscription = source.SubscribeMany((item, key) => targetSelector(item, key).AutoRemovePopulateInto(intermediateCache))
.Subscribe();
IDisposable cacheSubscription = intermediateCache.Connect().SubscribeSafe(observer);
return Disposable.Create(() =>
{
subscribeManySubscription.Dispose();
cacheSubscription.Dispose();
intermediateCache.Dispose();
});
});
AutoRemovePopulateInto looks like:
private static IDisposable AutoRemovePopulateInto<TObject, TKey>(
this IObservable<IChangeSet<TObject, TKey>> observableChangeSet,
IIntermediateCache<TObject, TKey> intermediateCache)
where TKey : notnull
{
IIntermediateCache<TObject, TKey> localCache = new IntermediateCache<TObject, TKey>();
// Atomically apply the ChangeSet to both the main cache and the local one
IDisposable editSubscription = observableChangeSet.Synchronize().Subscribe(changeSet =>
{
intermediateCache.Edit(updater => updater.Clone(changeSet));
localCache.Edit(updater => updater.Clone(changeSet));
});
return Disposable.Create(() =>
{
editSubscription.Dispose();
intermediateCache.Edit(updater => updater.RemoveKeys(localCache.Keys));
localCache.Dispose();
});
}
AutoRemovePopulateInto has been very useful in my project. I would be open to submitting a PR for that as well (perhaps as an overload for PopulateInto), but for now we can just keep it private just to implement this new MergeMany overload.
This sure is an open issue, and I would welcome a PR.
My only concern with using merge many this way is whether it scales well with large number of sources. For combining records dynamically the Or operator is optimised accordingly but MergeMany is not. One option would be to provide an option of passing either a time buffer or throttle in to allow multiple changes to accumulate and subsequently combine in a minimal number of batches.
That said, maybe no need to worry too much. If we get the overload into Dynamic Data together with many unit tests we can optimize of refactor safely.
I would be open to buffering up the changes, but I don't think we'll need that because, after further consideration, I think the OP had the right of it: the Or operator is probably the way to go. My original suggestion is fine for when the contents of the sub-caches don't collide, but if different sub-caches have the same key, then the Intermediate Cache's values will get overridden.
Correct me if I'm wrong, but the Or operator will handle these collisions properly... Even restoring the original entry if an overriding value gets subsequently removed.
I haven't looked the DynamicCombiner code super carefully, but it seems that using it would neatly side-step the issue of needing something like "AutoRemovePublishInto" because the overload that uses an observable list handles the removal events appropriately. I think.
I will make a PR, but it might be that we just need a simple adapter to the Or operator, which is exactly what the person who opened this issue suggested. I would defer to them, but they seemed to have turned into a Ghost.
Should this be an overload of MergeMany (even if it uses Or) because that is what people might expect? Or should it be an overload of Or?
@dwcullop referring back to the original post plus historic comments on the #dynamicdata slack channel, I think we should keep to the merge many semantics and create a specialization for it. I suggest this because merge many in it's current form has never, and will never work for change sets. In this regard it will not be a breaking change (hopefully).
I am open to DynamicCombiner being cloned and adapted as it keeps any changes isolated, and allows for specific case optimisations.
Correct me if I'm wrong, but the Or operator will handle these collisions properly... Even restoring the original entry if an overriding value gets subsequently removed.
The current operator will ignore an add or update if the item has been included from another data set. Currently this uses reference equality but it would almost certainly be better to provide and overload so a consumer can supply a value based equality comparison.
Okay, I will pursue implementing an overload for the MergeMany operator that works specifically on ChangeSets and use the implementation of DynamicCombiner (specifically the Or operations) as the starting point (optimizing for this specific case when possible).
I think an overload that supports a custom IEqualityComparer implementation is a great idea, but we should also consider supporting a custom IComparer implementation. If more than one of the changesets add a value with the same key, there needs to be some way to prioritize one over the other, so a "worse" value doesn't replace a "better" one. It will also help decide which of the possible values to use as the replacement when 3 (or more) changesets add the same key and then the highest priority one gets removed. That's probably an edge-case scenario and I don't want it to impact performance for the usual case. I'll know more when I get into it a bit more.
I used DynamicCombiner as a starting point, but since I only needed to handle the "Or" case, I could optimize for each of the operations instead of needing to have one update method to handle any type of operation.
I also added support for IEqualityComparer and IComparer (or both). If not supplied, the performance impact is negligible. Even when provided, I tried to minimize using them as much as possible. For example, if a new item is added, there is no reason to sort all of the possible values to find the best one. If there is a published value already, then we only need to compare that value with the value being added because all of the other candidates have already been eliminated. However, when an item is removed, there is no way to avoid searching all the candidates for the best value to replace it.
If the IEqualityComparer is not provided, it will just use ReferenceEqual as a fallback.
If the IComparer is not provided, it will just use the first value observed as the one to publish downstream. If that value is removed, it will be replaced by the first one that it can find, which is going to be the best solution when the consumers don't care because they know their dataset won't have overlapping keys.
Please checkout #736 and let me know what you think.
This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.