Intro to Dynamic Data

Dynamic Data (DD) applies the concepts of Reactive Observable (Rx) Streams to collections.

DD was created by Roland Pheasant for the purpose of greatly simplifying the data collection updates.

It allows monitoring the source collections and updating target collection(s) in a way that the target collection satisfies conditions specified by the Dynamic Data operators.

For example assuming that we have a source collection of Trade objects representing pending trades for various stocks. We can filter the target collection to contain only Trades for IBM stock.

Image description

Filtering is only one Dynamic Data operator out of many. Same as LINQ or Rx operators, DD operators can be piped together making them perform as complex tasks as needed.

Essentially all LINQ operators have their matches in Dynamic Data (in Dynamic Data operators have different names in order to avoid clashes with Rx operators). For example

  • Rx (or LINQ) Operator Where(...) is named Filter(...) in DD.
  • Select(...) operator is named Transform(...) in DD.
  • OrderBy(...) is called Sort(...) in DD.
  • GroupBy(...) is named Group(...) in DD.

Dynamic Data has two special types representing the observable collections that can serve as source collections:

  • SourceList for collections whose items are immutable (or at least whose items changes do not need to be observed by the DD operators).
  • SourceCache - for collections with updatable items.

In this article we shall talk about SourceCache containing updatable objects (while the previous article in the series was dedicated to SourceLists.

Source Code Location

All the source code for this article is located under Intro to Observable Cache within NP.Sample repository.

The samples are built as XUnit Facts defined within
SimpleObservableCacheExamples class. Each of the facts can be run separately from the rest.

For more on XUnit read Everything you need to know to create XUnit C# Tests in Visual Studio.

Simplified Trade class

All our samples are built around Trade class representing a simplification of a stock trade:

public class Trade : 
    AbstractNotifyPropertyChanged
{
    // primary key (used for distinguishing 
    // between the new entries and the 
    // updates_)
    public int TradeId { get; }

    // Stock Symbol
    public Symbol TheSymbol { get; }

    // updatable total trade prices
    decimal _totalTradePrice;
    public decimal TotalTradePrice 
    {
        get => _totalTradePrice; 

        // SetAndRaise fires PropertyChanged event
        // when TotalTradePrice property changes
        set => SetAndRaise(ref _totalTradePrice, value);
    }

    public Trade
    (
        int tradeId,
        Symbol symbol,
        decimal totalTradeAmount)
    {
        this.TradeId = tradeId;
        this.TheSymbol = symbol;
        this.TotalTradePrice = totalTradeAmount;
    }

    ...

    public override bool Equals(object? obj)
    {
        if (obj is Trade trade)
        {
            return this.TradeId.Equals(trade.TradeId) && 
                   (this.TheSymbol == trade.TheSymbol) && 
                   (this.TotalTradePrice == trade.TotalTradePrice);
        }

        return false;
    }
}

Trade has only 3 properties:

  1. int TradeId { get; }
  2. Symbol TheSymbol { get; }
  3. decimal TotalTradePrice { get { ... } set {...} }

The first two properties TradeId and TheSymbol cannot be modified, while the 3rd TotalTradePrice can be updated and fires INotifyPropertyChanged.PropertyChanged event after it is updated.

Property TradeId is the primary key - it determines if the trade is a new trade within the cache or an update of an existing trade. If a Trade with the same TradeId already exists in the SourceCache then the newly coming entry is an update, otherwise it is a new Trade object.

Property TheSymbol uniquely specifies the stock of the trade. Note, that there can be different trades with for the same stock at the same time within the system. This property is of type Symbol which is an C# enum (since this sample deals with very few stocks, I wanted to take advantage of enum's strong typing instead of using strings which leave a strong changes of spelling errors).

Property TotalTradePrice essentially represents the number of shares multiplied by the per share price for the trade.

Those who deal with stock trades will easily understand that this Trade class is greatly simplified. I already mentioned number-of-shares and price-per-share properties are missing. We also do not specify whether it is a Buy or Sell trade (let's assume they are all Sells), trade timing, trade status and other possible data.

Basic SourceCache API

Here we shall talk about most important SourceCache API needed used within our samples below.

As was mentioned above, all the samples are build around the Trade class with int TradeId property playing the role of the primary key for specifying whether an update or a new trade is coming.

Creating SourceCache

To create the SourceCache use the following constructor:

var tradeSourceCache = new SourceCache<Trade, int>(t => t.TradeId);

The argument to the constructor (in our case it is t => t.TradeId) is a lamba expression of type Func specifying how to obtain the unique trade key from a Trade object (remember we use this key to figure out if we deal with an update or with the new trade).

Adding or Updating a Trade to Cache

To add or update a trade (depending on whether the trade with the same TradeId already exists in the cache), we use the method sourceCache.AddOrUpdate(Trade trade), e.g.:

Trade metaTrade = 
      new Trade 
     { 
         TradeId = 21, 
         Trade = Symbol.META,
         TotalTradePrice = 2000
     };

sourceCache.AddOrUpdate(metaTrade);

The sourceCache itself will check a Trade with TradeId equals 21 already exists in it. If yes, it will do an update, if no, it will insert a new Trade object.

Using IObservableCache

IObservableCache has all the functionality for observing changes to the SourceCache, but does not have any functionality for modifying it. In a sense it is a read-only view into SourceCache.

You might want to pass IObservableCache to methods that do not modify the cache. To convert SourceCache into IObservableCache, the best is to use AsObservableCache() method:

IObservableCache<Trade, int> observableCache = 
                                 sourceCache.AsObsrevableCache();

Turning SourceCache (or IObservableView) into an Rx Observable Stream of Changes to the Collection using Connect() Method

Same as ObservableList or SourceList discussed in the previous installment of the series, all the composable LINQ-like operators should be applied not to the SourceCache but to the Rx Stream of collection changes IObservable>.

To convert SourceCache (or ObservableCache) into the Rx Stream of collection changes, use Connect() method, e.g.:

IObservable<IChangeSet<Trade, int>> changeSetStream =
                                      sourceCache.Connect();

Turning ObservableCollection into an Rx Observable Stream of Collection Changes

ObservableCollection is a special collection that implements INotifyCollectionChanged interface. After every change to the collection it fires INotifyCollectionChanged.CollectionChanged event passing the information about the change to the event's arguments.

People who work with the MVVM pattern and with WPF, Avalonia or other XAML framework, know how useful ObservableCollections are - they provide a collection within a view models whose visuals will be changed automatically as elements are added, removed or updated in it.

Dynamic Data provides method ToObservableChangeSet(...) allowing to convert an ObservableCollection straight into the Rx Observable Stream of Collection Changes without using SourceCache explicitly, e.g.:

ObservableCollection<Trade> myObservableCollection = 
                              new ObservableCollection<Trade>();
IObservable<IChangeSet<Trade, int>> changeSetStream =
          myObservableCollection.ToObservableChangeSet(t => t.TradeId);

Once the stream is subscribed to, whatever changes are made to myObservableCollection will be reflected within changeObservableStream.

Code Samples

Now we are ready for the code samples.

As was mentioned above, the code sample solution is IntoToObservableCache.sln from Intro to Observable Cache folder within NP.Samples repository.

All the sample are created as XUnit static [Fact] methods within SimpleObservableCacheExamples.cs file.

Auxiliary Methods and APIs used throughout the Tests

As was mentioned above, all the stock symbols were defined as enum Symbol for the sake of strong typing :

public enum Symbol
{
    META,
    ORCL,
    TSLA,
    MSFT
}

Static class TradesGenerator contains an extension method Create(...) which returns a new trade or a trade update:

internal static Trade CreateTrade
(
    this Symbol symbol,
    decimal totalTradeAmount,
    int tradeId = -1
)
{
    ...
}

If the passed argument tradedId is default (-1), then a new trade with new TradeId is generated. Otherwise the user of the method can pass whatever non-negative value for tradeId that he wants and if such value already exists in the SourceCache - this trade will be considered an update.

SimpleObservableCacheExamples static class contains a static method Verify which compares two collections:

private static void Verify
(
    this IEnumerable<Trade> sourceTradeCollection,
    IEnumerable<Trade> targetCollection, 
    Func<Trade, bool> sourceFilter
)
{
    var filteredAndOrdered =
        sourceTradeCollection
        .Where(sourceFilter)
        .OrderBy(t => t.TotalTradePrice)
        .ToList();

    Assert.True(filteredAndOrdered.SequenceEqual(targetCollection));
}

One of these collection (sourceTradeCollection) is the source collection of trades, the other collection (targetCollection) is the result of applying Dynamic Data operators.

The purpose of this method is to assert that both collections have the same data arranged in the same order.

We assume that our Dynamic Data operators are always sorting the targetCollection in ascension order.

Also we pass a filter to be applied to the source collection (should be the same filter as we apply to obtain the targetCollection).

Filtering, Sorting and Binding Test

Here is the code for FilterSortAndBindTest() method (do not try to understand it all at once, since we shall describe it piece by piece below).

[Fact]
public static void FilterSortAndBindTest()
{
    // create sourceCache
    ISourceCache<Trade, int>? sourceCache =
        new SourceCache<Trade, int>(t => t.TradeId);

    // create several trades and add them to 
    // the sourceCache
    var metaTrade1 = Symbol.META.CreateTrade(2000);
    sourceCache.AddOrUpdate(metaTrade1);

    var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
    sourceCache.AddOrUpdate(oracleTrade1);

    var metaTrade2 = Symbol.META.CreateTrade(1900);
    sourceCache.AddOrUpdate(metaTrade2);

    var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
    sourceCache.AddOrUpdate(oracleTrade2);

    // create stream of IChange parameters
    // from the source collection
    IObservable<IChangeSet<Trade, int>> changeSetStream =
        sourceCache!.Connect();

    // create the target collection
    IObservableCollection<Trade> targetCollection =
        new ObservableCollectionExtended<Trade>();

    // filter, sort and bind the source collection
    // to the targetCollection
    IObservable<IChangeSet<Trade, int>> resultObservable =
        changeSetStream
            .Filter(t => t.TheSymbol == Symbol.ORCL)
            .SortAndBind
            (
                targetCollection,
                SortExpressionComparer<Trade>
                     .Ascending(t => t.TotalTradePrice)
            );

    // now subscribe to start pulling data
    // using clause will dispose the subscription
    using var subscription = resultObservable.Subscribe();

    // create the original source collection
    IEnumerable<Trade> sourceTradeCollection =
        [metaTrade1, oracleTrade1, metaTrade2, oracleTrade2];

    // assert that the filter, and sort operators
    // result in the correct targetCollection
    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // create another entry with the same TradeId key
    // as oracleTrade1 and add it to the sourceCache
    // to override the previous oracleTrade1 entry
    var oracleTrade1_modified =
        Symbol.ORCL.CreateTrade(100, oracleTrade1.TradeId);
    sourceCache.AddOrUpdate(oracleTrade1_modified);

    // change the sourceTradeCollection to contain
    // oracleTrade1_modified trade instead of oracleTrade1
    sourceTradeCollection =
        [metaTrade1, oracleTrade1_modified, metaTrade2, oracleTrade2];

    // make sure the collections are identical
    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // create a new trade oracleTrade3 and add it 
    // to the sourceCache
    var oracleTrade3 =
        Symbol.ORCL.CreateTrade(50);
    sourceCache.AddOrUpdate(oracleTrade3);

    // create the new sourceTradeCollection (adding
    // oracleTrade3
    sourceTradeCollection =
        [
            metaTrade1,
            oracleTrade1_modified,
            metaTrade2,
            oracleTrade2,
            oracleTrade3
        ];

    // verify that the targetCollection is correct
    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // remove all oracle entries
    sourceCache.Remove(oracleTrade1.TradeId);
    sourceCache.Remove(oracleTrade2.TradeId);
    sourceCache.Remove(oracleTrade3.TradeId);

    // make sure the targetCollection is empty
    Assert.True(targetCollection.Count() == 0);
}

First, we create the source cache

ISourceCache<Trade, int> sourceCache =
            new SourceCache<Trade, int>(t => t.TradeId);

Then we create 4 trades (all of them are new trades, none are updates) and add each one of them to the cache:

// create a meta trade with TotalTradePrice 2000
var metaTrade1 = Symbol.META.CreateTrade(2000);
// add metaTrade1 to the sourceCache
sourceCache.AddOrUpdate(metaTrade1);

var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
sourceCache.AddOrUpdate(oracleTrade1);

var metaTrade2 = Symbol.META.CreateTrade(1900);
sourceCache.AddOrUpdate(metaTrade2);

var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
sourceCache.AddOrUpdate(oracleTrade2);

Then we create the Rx Observable Stream of cache changes:

IObservable<IChangeSet<Trade, int>> changeSetStream =
    sourceCache.Connect();

Now, we create the target collection that will contain the DD result:

IObservableCollection<Trade> targetCollection =
    new ObservableCollectionExtended<Trade>();

IObservableCollection and ObservableCollectionExtended are both introduced by DD and represent the same functionality as ObservableCollection with some extra capability (e.g. batching multiple updates).

Now we apply the filter, sort and bind operations (bind will place the output into the targetCollection):

IObservable<IChangeSet<Trade, int>> resultObservable =
    changeSetStream
        .Filter(t => t.TheSymbol == Symbol.ORCL)
        .SortAndBind
        (
            targetCollection,
            SortExpressionComparer<Trade>
                      .Ascending(t => t.TotalTradePrice)
        );

We filter only trades with ORCL symbol and sort them by TotalTradePrice in ascending order.

Note, to improve the performance we use method SortAndBind(...) instead of two separate DD methods Sort(...) and Bind(...).

Now, to start pulling the data, we call Subscribe() method:

using IDisposable subscription = resultObservable.Subscribe();

using clause ensures that the subscription is destroyed automatically when the test goes out of scope.

We build the sourceTradeCollection and verify that under trade.TheSymbol == Symbol.ORCL filter, the collections match:

IEnumerable<Trade> sourceTradeCollection =
    [metaTrade1, oracleTrade1, metaTrade2, oracleTrade2];
sourceTradeCollection
    .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

Next we test that an update to the first oracle trade works - we create a trade update with the same TradeId and TheSymbol as oracleTrade1 and with TotalTradePrice equal 100, then we use it to update the sourceCache and we check that this update is reflected in the targetCollection:

var oracleTrade1_modified =
    Symbol.ORCL.CreateTrade(100, oracleTrade1.TradeId);
sourceCache.AddOrUpdate(oracleTrade1_modified);

sourceTradeCollection =
    [metaTrade1, oracleTrade1_modified, metaTrade2, oracleTrade2];
sourceTradeCollection
    .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

Next we check that once we add a trade to the source, it is also added to the target (if it is filtered in):

// new trade
 var oracleTrade3 =
     Symbol.ORCL.CreateTrade(50);
 sourceCache.AddOrUpdate(oracleTrade3);

 sourceTradeCollection =
     [
         metaTrade1,
         oracleTrade1_modified,
         metaTrade2,
         oracleTrade2,
         oracleTrade3
     ];

 // verify that the targetCollection is correct
 sourceTradeCollection
     .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

Now, we shall remove all the oracle entries from the sourceCache and assert that the target collection is empty (since targetCollection contains only oracle entries because of the filter):

// remove all oracle entries
sourceCache.Remove(oracleTrade1.TradeId);
sourceCache.Remove(oracleTrade2.TradeId);
sourceCache.Remove(oracleTrade3.TradeId);

// make sure the targetCollection is empty
Assert.True(targetCollection.Count() == 0);

Filtering, Sorting and Binding Test with ObservableCollection as Source.

Our next test defined by the static method FilterSortAndBindFromObservableCollectionTest() is almost exactly the same as the previous one (defined by FilterSortAndBindTest()) only instead of SourceCache, we are using an ObservableCollection as the source:

[Fact]
public static void 
    FilterSortAndBindFromObservableCollectionTest()
{
    // create individual trades
    var metaTrade1 = Symbol.META.CreateTrade(2000);

    var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);

    var metaTrade2 = Symbol.META.CreateTrade(1900);;

    var oracleTrade2 = Symbol.ORCL.CreateTrade(900);

    // create the input ObservableCollection
    ObservableCollection<Trade> sourceTradeCollection =
        new ObservableCollection<Trade>
        {
            metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
        };

    // create stream of IChange parameters
    // from the source collection
    IObservable<IChangeSet<Trade, int>> changeSetStream =
        sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);

    // create the target collection
    IObservableCollection<Trade> targetCollection =
        new ObservableCollectionExtended<Trade>();

    IObservable<IChangeSet<Trade, int>> resultObservable =
        changeSetStream
            .Filter(t => t.TheSymbol == Symbol.ORCL)
            .AutoRefresh() // listens to PropertyChanged
                            // events and updates 
                            // downstream results when 
                            // PropertyChanged is fired
            .SortAndBind
            (
                targetCollection,
                SortExpressionComparer<Trade>
                        .Ascending(t => t.TotalTradePrice)
            );

    // now subscribe to start pulling data
    // using clause will dispose the subscription
    using var subscription = resultObservable.Subscribe();

    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // update the source entry and 
    // make sure that the target got resourced
    oracleTrade1.TotalTradePrice = 100;

    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // add another oracle trade
    var oracleTrade3 =
        Symbol.ORCL.CreateTrade(50);

    sourceTradeCollection.Add(oracleTrade3);

    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // remove all oracle entries
    sourceTradeCollection.Remove(oracleTrade1);
    sourceTradeCollection.Remove(oracleTrade2);
    sourceTradeCollection.Remove(oracleTrade3);

    // the target collection should become empty
    Assert.True(targetCollection.Count == 0);
}

Below, we shall go over only differences between this and the previous example.

The first difference is that instead of creating a SourceCache, we create a ObservableCollection and populate it with trades:

// create individual trades
var metaTrade1 = Symbol.META.CreateTrade(2000);

var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);

var metaTrade2 = Symbol.META.CreateTrade(1900);;

var oracleTrade2 = Symbol.ORCL.CreateTrade(900);

// create the input ObservableCollection
ObservableCollection<Trade> sourceTradeCollection =
    new ObservableCollection<Trade>
    {
        metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
    };

Next we use ToObservableChangeSet(...) extension method to create the Rx observable stream of collection changes:

Observable<IChangeSet<Trade, int>> changeSetStream =
   sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);

Among our DD operators we insert AutoRefresh() which will update the sorting every time PropertyChanged event is fired on any on the collection trade objects (which would happen when its TotalTradePrice changes):

IObservable<IChangeSet<Trade, int>> resultObservable =
    changeSetStream
        .Filter(t => t.TheSymbol == Symbol.ORCL)
        .AutoRefresh() // listens to PropertyChanged
                       // events and updates 
                       // downstream results when 
                       // PropertyChanged is fired
        .SortAndBind
        (
            targetCollection,
            SortExpressionComparer<Trade>
                            .Ascending(t => t.TotalTradePrice)
        );

To update a trade, instead of adding another Trade object with the same TradeId to the cache, update the instance within the ObservableCollection:

oracleTrade1.TotalTradePrice = 100;

This will trigger the PropertyChanged event on the object, which will be handled by the AutoRefresh() method and passed to the downstream sorting operator.

Dynamic Filter Test

This test will demonstrate using dynamic filter that can be easily changed at run time.

In this test we shall use the ObservableCollection source instead of the SourceCache.

Here is the full code for the [Fact] public static void DynamicFilterTest() test method:

[Fact]
public static void
    DynamicFilterTest()
{
    // create trades
    var metaTrade1 = Symbol.META.CreateTrade(2000);

    var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);

    var metaTrade2 = Symbol.META.CreateTrade(1900); ;

    var oracleTrade2 = Symbol.ORCL.CreateTrade(900);

    // creates the source ObservableCollection
    ObservableCollection<Trade> sourceTradeCollection =
        new ObservableCollection<Trade>
        {
            metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
        };

    // create stream of IChange parameters
    // from the source collection
    IObservable<IChangeSet<Trade, int>> changeSetStream =
        sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);

    // create filterObservable subject to allow
    // changing the filters
    Subject<Func<Trade, bool>> filterObservable = 
        new Subject<Func<Trade, bool>>();

    // create the target collection
    IObservableCollection<Trade> targetCollection =
        new ObservableCollectionExtended<Trade>();

    IObservable<IChangeSet<Trade, int>> resultObservable =
        changeSetStream
            // use the Filter(...) method
            // that accepts filterObservable
            .Filter(filterObservable)
            .AutoRefresh()
            .SortAndBind
            (
                targetCollection,
                SortExpressionComparer<Trade>
                            .Ascending(t => t.TotalTradePrice)
            );

    // now subscribe to start pulling data
    // using clause will dispose the subscription
    using var subscription = resultObservable.Subscribe();

    // push the lambda expression to filter in only
    // ORCL symbol into filterObservable
    filterObservable.OnNext(t => t.TheSymbol == Symbol.ORCL);

    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    oracleTrade1.TotalTradePrice = 100;

    sourceTradeCollection
        .Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);

    // clear the source collection of oracle 
    // entries
    sourceTradeCollection.Remove(oracleTrade1);
    sourceTradeCollection.Remove(oracleTrade2);

    Assert.True(targetCollection.Count == 0);

    // change filter to META
    filterObservable.OnNext(t => t.TheSymbol == Symbol.META);

    // we should get some META entries in the target Collection
    // right away
    Assert.Contains<Trade>
        (targetCollection, t => t.TheSymbol == Symbol.META);
}

Now we shall go over the main differences between this and the previous samples.

The major difference is that we create an Rx subject to push various filters of type Func into it:

Subject<Func<Trade, bool>> filterObservable = 
    new Subject<Func<Trade, bool>>();

Then we use the Filter(...) extension method that accepts IObservable> as the filtering operator:

IObservable<IChangeSet<Trade, int>> resultObservable =
    changeSetStream
        // use the Filter(...) method
        // that accepts filterObservable
        .Filter(filterObservable)
        .AutoRefresh()
        .SortAndBind
        (
            targetCollection,
            SortExpressionComparer<Trade>
                      .Ascending(t => t.TotalTradePrice)
        );

Note - .Filter(filterObservable) line.

Then, after we start pulling data from the changeSetStream by calling using var subscription = resultObservable.Subscribe();, we push the first filter we want into the filterObservable subject to filter only ORCL symbols:

filterObservable.OnNext(t => t.TheSymbol == Symbol.ORCL);

At the end of the test method, when we want to change the filter to say filtering in META symbols, we call:

filterObservable.OnNext(t => t.TheSymbol == Symbol.META);

and then assert that there are some trades with META symbol in the output collection:

// we should get some META entries in the target Collection
// right away
Assert.Contains<Trade>
    (targetCollection, t => t.TheSymbol == Symbol.META);

Grouping Test

Grouping test shows how to use Dynamic Data operators for grouping trades by Symbol.

Static test method for grouping is called GroupingTest():

[Fact]
public static void GroupingTest()
{
    // create trades
    var metaTrade1 = Symbol.META.CreateTrade(2000);

    var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);

    var metaTrade2 = Symbol.META.CreateTrade(1900); ;

    var oracleTrade2 = Symbol.ORCL.CreateTrade(900);

    // create the source collection of trades as
    // observable collection
    ObservableCollection<Trade> sourceTradeCollection =
        new ObservableCollection<Trade>
        {
            metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
        };

    // create stream of IChange parameters
    // from the source collection
    IObservable<IChangeSet<Trade, int>> changeSetStream = 
        sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);

    // do the grouping
    IObservable<IGroupChangeSet<Trade, int, Symbol>>
        groupedObservable =
            changeSetStream
                .AutoRefresh()
                .Group(t => t.TheSymbol);

    // transform the grouped entries into 
    // SymbolTradeGroup objects
    var transformedGroups =
        groupedObservable.Transform(g => new SymbolTradeGroup(g));

    //output groups
    ReadOnlyObservableCollection<SymbolTradeGroup>? symbolTradeGroups;

    // create and populate an observable collection
    // symbolTradeGroups that contains those SymbolTradeGroup
    // objects
    using IDisposable subscription =
        transformedGroups
            .Bind(out symbolTradeGroups) // create and populate
            .DisposeMany() // make sure that if an item is removed
                            // from the collection, it is disposed
            .Subscribe();  // start the subscription

    // Assert there are two groups
    // (one for ORCL and the other for META trades)
    Assert.True(symbolTradeGroups.Count == 2);

    // get the oracleGroup and assert it exists and single
    SymbolTradeGroup oracleGroup = 
        symbolTradeGroups.Single(g => g.TheSymbol == Symbol.ORCL);

    Assert.True(oracleGroup.Trades.Count() == 2);
    Assert
        .True(oracleGroup.TotalPrice == 
                    oracleGroup.Trades.Sum(t => t.TotalTradePrice));


    // get the metaGroup and assert it exists and single
    SymbolTradeGroup metaGroup =
        symbolTradeGroups.Single(g => g.TheSymbol == Symbol.META);

    Assert.True(metaGroup.Trades.Count() == 2);
    Assert
        .True(metaGroup.TotalPrice == 
                  metaGroup.Trades.Sum(t => t.TotalTradePrice));

    // change the 
    oracleTrade1.TotalTradePrice = 5000;
    // assert that the aggregation's total price got updated
    Assert
        .True
        (oracleGroup.TotalPrice == 
              oracleGroup.Trades.Sum(t => t.TotalTradePrice));

    // add another oracle trade to the source collection
    var oracleTrade3 = Symbol.ORCL.CreateTrade(7000);
    sourceTradeCollection.Add(oracleTrade3);

    // make sure that number of traded without orcl group
    // is now 3 and that the total price of the aggregation
    // got also updated
    Assert.True(oracleGroup.Trades.Count() == 3);
    Assert
        .True(oracleGroup.TotalPrice == 
                   oracleGroup.Trades.Sum(t => t.TotalTradePrice));

    // add a trade for another instrument (e.g. Symbol.TSLA)
    // to the source collection
    var tslaTrade1 = Symbol.TSLA.CreateTrade(10);
    sourceTradeCollection.Add(tslaTrade1);

    // make sure that the number of groups is now 3:
    Assert.True(symbolTradeGroups.Count() == 3);

    // remove all Oracle trades from the source collection
    sourceTradeCollection.Remove(oracleTrade1);
    sourceTradeCollection.Remove(oracleTrade2);
    sourceTradeCollection.Remove(oracleTrade3);

    // make sure that the corresponding group is also removed
    Assert.True(symbolTradeGroups.Count() == 2);
    Assert.True
    (
        symbolTradeGroups
            .Where(group => group.TheSymbol == Symbol.ORCL)
            .Count() == 0);
}

This test uses an extra type SymbolTradeGroup representing a group of trades grouped by TheSymbol property:

public class SymbolTradeGroup : IDisposable
{
    // Dynamic Data group
    IGroup<Trade, int, Symbol> _group;

    // trades within the group
    public IEnumerable<Trade> Trades => _group.Cache.Items;

    // group key
    public Symbol TheSymbol => _group.Key;

    // sum of TotalTradePrice across all
    // the trades within the group
    public decimal TotalPrice { get; private set; }


    private IDisposable? _disposableSubscription;
    public void Dispose()
    {
        // destroy the group cache
        _group.Cache.Dispose();

        // remove the aggregation subscription
        _disposableSubscription?.Dispose();
        _disposableSubscription = null;
    }


    public SymbolTradeGroup(IGroup<Trade, int, Symbol> group)
    {
        _group = group;

        // set up the TotalPrice to be 
        // dynamically calculated when the group
        // or individual trades are changed
        _disposableSubscription =
            _group
                .Cache
                .Connect()
                .ToCollection()
                .Select(collection => 
                             collection.Sum(t => t.TotalTradePrice))
                .Subscribe(sum => this.TotalPrice = sum);
    }
}

Its TheSymbol property represents the common symbol (the grouping key). Its property TotalPrice is the sum of TotalTradePrice properties from each of its Trade objects. The TotalPrice will automatically update when the group content changes or when the individual Trade objects are updated.

Note, that SymbolTradeGroup depends on IGroup implementation from DD and is IDisposable because of that (whenever a group is removed the underlying DD functionality needs to be disposed of).

Now, let us dive into the GroupingTest() code.

The very beginning is exactly the same as in the previous sample - we create 4 trades, build an ObservableCollection out of them and then obtain the IObservable> changeSetStream by calling ToObservableChangeSet(...) static extension method.

Then we do the grouping by TheSymbol property:

IObservable<IGroupChangeSet<Trade, int, Symbol>>
    groupedObservable =
        changeSetStream
            .AutoRefresh()
            .Group(t => t.TheSymbol);

Note, that AutoRefresh() will help to update the aggregated TotalPrice when TotalTradePrice changes on an individual trade object.

After grouping we empty Transform(...) extension method to convert each group into SymbolTradeGroup object:

var transformedGroups =
       groupedObservable.Transform(g => new SymbolTradeGroup(g));

Next, we define collection symbolTradeGroups to contain the groups, call Bind(...) extension method to get the groups into the collection and call Subscribe(...) to start pulling data:

using IDisposable subscription =
    transformedGroups
        .Bind(out symbolTradeGroups) // create and populate
        .DisposeMany() // make sure that if an item is removed
                       // from the collection, it is disposed
        .Subscribe();  // start the subscription

Note, the method DisposeMany() called before Subscribe() - it ensures that once an group is removed from the symbolTradeGroups collection, its Dispose() method is called.

After that we assert the number of groups, the TotalPrice value within each group and the content of each of the groups:

Assert.True(metaGroup.Trades.Count() == 2);
Assert
    .True(metaGroup.TotalPrice == 
             metaGroup.Trades.Sum(t => t.TotalTradePrice));

// change the 
oracleTrade1.TotalTradePrice = 5000;
// assert that the aggregation's total price got updated
Assert
    .True
    (oracleGroup.TotalPrice == oracleGroup.Trades.Sum(t => t.TotalTradePrice));

After that, we update some TotalTradePrice values for individual trades, add and remove some trades to sourceTradeCollection and after each operation verify that the number of the groups, their TotalPrice properties and trades are what they are supposed to be.