Intro to Dynamic Data
Dynamic Data (DD) applies the concepts of Reactive Observable (Rx) Streams to collections.
DD was created by Roland Pheasant for the purpose of greatly simplifying the data collection updates.
It allows monitoring the source collections and updating target collection(s) in a way that the target collection satisfies conditions specified by the Dynamic Data operators.
For example assuming that we have a source collection of Trade
objects representing pending trades for various stocks. We can filter the target collection to contain only Trade
s for IBM stock.
Filtering is only one Dynamic Data operator out of many. Same as LINQ or Rx operators, DD operators can be piped together making them perform as complex tasks as needed.
Essentially all LINQ operators have their matches in Dynamic Data (in Dynamic Data operators have different names in order to avoid clashes with Rx operators). For example
- Rx (or LINQ) Operator
Where(...)
is namedFilter(...)
in DD. -
Select(...)
operator is namedTransform(...)
in DD. -
OrderBy(...)
is calledSort(...)
in DD. -
GroupBy(...)
is namedGroup(...)
in DD.
Dynamic Data has two special types representing the observable collections that can serve as source collections:
-
SourceList
for collections whose items are immutable (or at least whose items changes do not need to be observed by the DD operators). -
SourceCache
- for collections with updatable items.
In this article we shall talk about SourceCache
containing updatable objects (while the previous article in the series was dedicated to SourceList
s.
Source Code Location
All the source code for this article is located under Intro to Observable Cache within NP.Sample repository.
The samples are built as XUnit Fact
s defined within
SimpleObservableCacheExamples
class. Each of the facts can be run separately from the rest.
For more on XUnit read Everything you need to know to create XUnit C# Tests in Visual Studio.
Simplified Trade
class
All our samples are built around Trade
class representing a simplification of a stock trade:
public class Trade :
AbstractNotifyPropertyChanged
{
// primary key (used for distinguishing
// between the new entries and the
// updates_)
public int TradeId { get; }
// Stock Symbol
public Symbol TheSymbol { get; }
// updatable total trade prices
decimal _totalTradePrice;
public decimal TotalTradePrice
{
get => _totalTradePrice;
// SetAndRaise fires PropertyChanged event
// when TotalTradePrice property changes
set => SetAndRaise(ref _totalTradePrice, value);
}
public Trade
(
int tradeId,
Symbol symbol,
decimal totalTradeAmount)
{
this.TradeId = tradeId;
this.TheSymbol = symbol;
this.TotalTradePrice = totalTradeAmount;
}
...
public override bool Equals(object? obj)
{
if (obj is Trade trade)
{
return this.TradeId.Equals(trade.TradeId) &&
(this.TheSymbol == trade.TheSymbol) &&
(this.TotalTradePrice == trade.TotalTradePrice);
}
return false;
}
}
Trade
has only 3 properties:
int TradeId { get; }
Symbol TheSymbol { get; }
decimal TotalTradePrice { get { ... } set {...} }
The first two properties TradeId
and TheSymbol
cannot be modified, while the 3rd TotalTradePrice
can be updated and fires INotifyPropertyChanged.PropertyChanged
event after it is updated.
Property TradeId
is the primary key - it determines if the trade is a new trade within the cache or an update of an existing trade. If a Trade with the same TradeId
already exists in the SourceCache
then the newly coming entry is an update, otherwise it is a new Trade
object.
Property TheSymbol
uniquely specifies the stock of the trade. Note, that there can be different trades with for the same stock at the same time within the system. This property is of type Symbol
which is an C# enum
(since this sample deals with very few stocks, I wanted to take advantage of enum
's strong typing instead of using strings
which leave a strong changes of spelling errors).
Property TotalTradePrice
essentially represents the number of shares multiplied by the per share price for the trade.
Those who deal with stock trades will easily understand that this Trade class is greatly simplified. I already mentioned number-of-shares and price-per-share properties are missing. We also do not specify whether it is a Buy or Sell trade (let's assume they are all Sells), trade timing, trade status and other possible data.
Basic SourceCache
API
Here we shall talk about most important SourceCache
API needed used within our samples below.
As was mentioned above, all the samples are build around the Trade
class with int TradeId
property playing the role of the primary key for specifying whether an update or a new trade is coming.
Creating SourceCache
To create the SourceCache
use the following constructor:
var tradeSourceCache = new SourceCache<Trade, int>(t => t.TradeId);
The argument to the constructor (in our case it is t => t.TradeId
) is a lamba expression of type Func
specifying how to obtain the unique trade key from a Trade
object (remember we use this key to figure out if we deal with an update or with the new trade).
Adding or Updating a Trade to Cache
To add or update a trade (depending on whether the trade with the same TradeId
already exists in the cache), we use the method sourceCache.AddOrUpdate(Trade trade)
, e.g.:
Trade metaTrade =
new Trade
{
TradeId = 21,
Trade = Symbol.META,
TotalTradePrice = 2000
};
sourceCache.AddOrUpdate(metaTrade);
The sourceCache
itself will check a Trade
with TradeId
equals 21
already exists in it. If yes, it will do an update, if no, it will insert a new Trade
object.
Using IObservableCache
IObservableCache
has all the functionality for observing changes to the SourceCache
, but does not have any functionality for modifying it. In a sense it is a read-only view into SourceCache
.
You might want to pass IObservableCache
to methods that do not modify the cache. To convert SourceCache
into IObservableCache
, the best is to use AsObservableCache()
method:
IObservableCache<Trade, int> observableCache =
sourceCache.AsObsrevableCache();
Turning SourceCache
(or IObservableView
) into an Rx Observable Stream of Changes to the Collection using Connect()
Method
Same as ObservableList
or SourceList
discussed in the previous installment of the series, all the composable LINQ-like operators should be applied not to the SourceCache
but to the Rx Stream of collection changes IObservable
.
To convert SourceCache
(or ObservableCache
) into the Rx Stream of collection changes, use Connect()
method, e.g.:
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceCache.Connect();
Turning ObservableCollection
into an Rx Observable Stream of Collection Changes
ObservableCollection
is a special collection that implements INotifyCollectionChanged
interface. After every change to the collection it fires INotifyCollectionChanged.CollectionChanged
event passing the information about the change to the event's arguments.
People who work with the MVVM pattern and with WPF, Avalonia or other XAML framework, know how useful ObservableCollection
s are - they provide a collection within a view models whose visuals will be changed automatically as elements are added, removed or updated in it.
Dynamic Data provides method ToObservableChangeSet(...)
allowing to convert an ObservableCollection
straight into the Rx Observable Stream of Collection Changes without using SourceCache
explicitly, e.g.:
ObservableCollection<Trade> myObservableCollection =
new ObservableCollection<Trade>();
IObservable<IChangeSet<Trade, int>> changeSetStream =
myObservableCollection.ToObservableChangeSet(t => t.TradeId);
Once the stream is subscribed to, whatever changes are made to myObservableCollection
will be reflected within changeObservableStream
.
Code Samples
Now we are ready for the code samples.
As was mentioned above, the code sample solution is IntoToObservableCache.sln from Intro to Observable Cache folder within NP.Samples repository.
All the sample are created as XUnit static [Fact]
methods within SimpleObservableCacheExamples.cs file.
Auxiliary Methods and APIs used throughout the Tests
As was mentioned above, all the stock symbols were defined as enum Symbol
for the sake of strong typing :
public enum Symbol
{
META,
ORCL,
TSLA,
MSFT
}
Static class TradesGenerator
contains an extension method Create(...)
which returns a new trade or a trade update:
internal static Trade CreateTrade
(
this Symbol symbol,
decimal totalTradeAmount,
int tradeId = -1
)
{
...
}
If the passed argument tradedId
is default (-1), then a new trade with new TradeId
is generated. Otherwise the user of the method can pass whatever non-negative value for tradeId
that he wants and if such value already exists in the SourceCache
- this trade will be considered an update.
SimpleObservableCacheExamples
static class contains a static method Verify
which compares two collections:
private static void Verify
(
this IEnumerable<Trade> sourceTradeCollection,
IEnumerable<Trade> targetCollection,
Func<Trade, bool> sourceFilter
)
{
var filteredAndOrdered =
sourceTradeCollection
.Where(sourceFilter)
.OrderBy(t => t.TotalTradePrice)
.ToList();
Assert.True(filteredAndOrdered.SequenceEqual(targetCollection));
}
One of these collection (sourceTradeCollection
) is the source collection of trades, the other collection (targetCollection
) is the result of applying Dynamic Data operators.
The purpose of this method is to assert that both collections have the same data arranged in the same order.
We assume that our Dynamic Data operators are always sorting the targetCollection
in ascension order.
Also we pass a filter to be applied to the source collection (should be the same filter as we apply to obtain the targetCollection
).
Filtering, Sorting and Binding Test
Here is the code for FilterSortAndBindTest()
method (do not try to understand it all at once, since we shall describe it piece by piece below).
[Fact]
public static void FilterSortAndBindTest()
{
// create sourceCache
ISourceCache<Trade, int>? sourceCache =
new SourceCache<Trade, int>(t => t.TradeId);
// create several trades and add them to
// the sourceCache
var metaTrade1 = Symbol.META.CreateTrade(2000);
sourceCache.AddOrUpdate(metaTrade1);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
sourceCache.AddOrUpdate(oracleTrade1);
var metaTrade2 = Symbol.META.CreateTrade(1900);
sourceCache.AddOrUpdate(metaTrade2);
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
sourceCache.AddOrUpdate(oracleTrade2);
// create stream of IChange parameters
// from the source collection
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceCache!.Connect();
// create the target collection
IObservableCollection<Trade> targetCollection =
new ObservableCollectionExtended<Trade>();
// filter, sort and bind the source collection
// to the targetCollection
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
.Filter(t => t.TheSymbol == Symbol.ORCL)
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
// now subscribe to start pulling data
// using clause will dispose the subscription
using var subscription = resultObservable.Subscribe();
// create the original source collection
IEnumerable<Trade> sourceTradeCollection =
[metaTrade1, oracleTrade1, metaTrade2, oracleTrade2];
// assert that the filter, and sort operators
// result in the correct targetCollection
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// create another entry with the same TradeId key
// as oracleTrade1 and add it to the sourceCache
// to override the previous oracleTrade1 entry
var oracleTrade1_modified =
Symbol.ORCL.CreateTrade(100, oracleTrade1.TradeId);
sourceCache.AddOrUpdate(oracleTrade1_modified);
// change the sourceTradeCollection to contain
// oracleTrade1_modified trade instead of oracleTrade1
sourceTradeCollection =
[metaTrade1, oracleTrade1_modified, metaTrade2, oracleTrade2];
// make sure the collections are identical
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// create a new trade oracleTrade3 and add it
// to the sourceCache
var oracleTrade3 =
Symbol.ORCL.CreateTrade(50);
sourceCache.AddOrUpdate(oracleTrade3);
// create the new sourceTradeCollection (adding
// oracleTrade3
sourceTradeCollection =
[
metaTrade1,
oracleTrade1_modified,
metaTrade2,
oracleTrade2,
oracleTrade3
];
// verify that the targetCollection is correct
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// remove all oracle entries
sourceCache.Remove(oracleTrade1.TradeId);
sourceCache.Remove(oracleTrade2.TradeId);
sourceCache.Remove(oracleTrade3.TradeId);
// make sure the targetCollection is empty
Assert.True(targetCollection.Count() == 0);
}
First, we create the source cache
ISourceCache<Trade, int> sourceCache =
new SourceCache<Trade, int>(t => t.TradeId);
Then we create 4 trades (all of them are new trades, none are updates) and add each one of them to the cache:
// create a meta trade with TotalTradePrice 2000
var metaTrade1 = Symbol.META.CreateTrade(2000);
// add metaTrade1 to the sourceCache
sourceCache.AddOrUpdate(metaTrade1);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
sourceCache.AddOrUpdate(oracleTrade1);
var metaTrade2 = Symbol.META.CreateTrade(1900);
sourceCache.AddOrUpdate(metaTrade2);
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
sourceCache.AddOrUpdate(oracleTrade2);
Then we create the Rx Observable Stream of cache changes:
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceCache.Connect();
Now, we create the target collection that will contain the DD result:
IObservableCollection<Trade> targetCollection =
new ObservableCollectionExtended<Trade>();
IObservableCollection
and ObservableCollectionExtended
are both introduced by DD and represent the same functionality as ObservableCollection
with some extra capability (e.g. batching multiple updates).
Now we apply the filter, sort and bind operations (bind will place the output into the targetCollection
):
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
.Filter(t => t.TheSymbol == Symbol.ORCL)
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
We filter only trades with ORCL symbol and sort them by TotalTradePrice
in ascending order.
Note, to improve the performance we use method SortAndBind(...)
instead of two separate DD methods Sort(...)
and Bind(...)
.
Now, to start pulling the data, we call Subscribe()
method:
using IDisposable subscription = resultObservable.Subscribe();
using
clause ensures that the subscription is destroyed automatically when the test goes out of scope.
We build the sourceTradeCollection
and verify that under trade.TheSymbol == Symbol.ORCL
filter, the collections match:
IEnumerable<Trade> sourceTradeCollection =
[metaTrade1, oracleTrade1, metaTrade2, oracleTrade2];
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
Next we test that an update to the first oracle trade works - we create a trade update with the same TradeId
and TheSymbol
as oracleTrade1
and with TotalTradePrice
equal 100
, then we use it to update the sourceCache
and we check that this update is reflected in the targetCollection
:
var oracleTrade1_modified =
Symbol.ORCL.CreateTrade(100, oracleTrade1.TradeId);
sourceCache.AddOrUpdate(oracleTrade1_modified);
sourceTradeCollection =
[metaTrade1, oracleTrade1_modified, metaTrade2, oracleTrade2];
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
Next we check that once we add a trade to the source, it is also added to the target (if it is filtered in):
// new trade
var oracleTrade3 =
Symbol.ORCL.CreateTrade(50);
sourceCache.AddOrUpdate(oracleTrade3);
sourceTradeCollection =
[
metaTrade1,
oracleTrade1_modified,
metaTrade2,
oracleTrade2,
oracleTrade3
];
// verify that the targetCollection is correct
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
Now, we shall remove all the oracle entries from the sourceCache
and assert that the target collection is empty (since targetCollection
contains only oracle entries because of the filter):
// remove all oracle entries
sourceCache.Remove(oracleTrade1.TradeId);
sourceCache.Remove(oracleTrade2.TradeId);
sourceCache.Remove(oracleTrade3.TradeId);
// make sure the targetCollection is empty
Assert.True(targetCollection.Count() == 0);
Filtering, Sorting and Binding Test with ObservableCollection
as Source.
Our next test defined by the static method FilterSortAndBindFromObservableCollectionTest()
is almost exactly the same as the previous one (defined by FilterSortAndBindTest()
) only instead of SourceCache
, we are using an ObservableCollection
as the source:
[Fact]
public static void
FilterSortAndBindFromObservableCollectionTest()
{
// create individual trades
var metaTrade1 = Symbol.META.CreateTrade(2000);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
var metaTrade2 = Symbol.META.CreateTrade(1900);;
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
// create the input ObservableCollection
ObservableCollection<Trade> sourceTradeCollection =
new ObservableCollection<Trade>
{
metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
};
// create stream of IChange parameters
// from the source collection
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);
// create the target collection
IObservableCollection<Trade> targetCollection =
new ObservableCollectionExtended<Trade>();
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
.Filter(t => t.TheSymbol == Symbol.ORCL)
.AutoRefresh() // listens to PropertyChanged
// events and updates
// downstream results when
// PropertyChanged is fired
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
// now subscribe to start pulling data
// using clause will dispose the subscription
using var subscription = resultObservable.Subscribe();
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// update the source entry and
// make sure that the target got resourced
oracleTrade1.TotalTradePrice = 100;
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// add another oracle trade
var oracleTrade3 =
Symbol.ORCL.CreateTrade(50);
sourceTradeCollection.Add(oracleTrade3);
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// remove all oracle entries
sourceTradeCollection.Remove(oracleTrade1);
sourceTradeCollection.Remove(oracleTrade2);
sourceTradeCollection.Remove(oracleTrade3);
// the target collection should become empty
Assert.True(targetCollection.Count == 0);
}
Below, we shall go over only differences between this and the previous example.
The first difference is that instead of creating a SourceCache
, we create a ObservableCollection
and populate it with trades:
// create individual trades
var metaTrade1 = Symbol.META.CreateTrade(2000);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
var metaTrade2 = Symbol.META.CreateTrade(1900);;
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
// create the input ObservableCollection
ObservableCollection<Trade> sourceTradeCollection =
new ObservableCollection<Trade>
{
metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
};
Next we use ToObservableChangeSet(...)
extension method to create the Rx observable stream of collection changes:
Observable<IChangeSet<Trade, int>> changeSetStream =
sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);
Among our DD operators we insert AutoRefresh()
which will update the sorting every time PropertyChanged
event is fired on any on the collection trade objects (which would happen when its TotalTradePrice
changes):
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
.Filter(t => t.TheSymbol == Symbol.ORCL)
.AutoRefresh() // listens to PropertyChanged
// events and updates
// downstream results when
// PropertyChanged is fired
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
To update a trade, instead of adding another Trade
object with the same TradeId
to the cache, update the instance within the ObservableCollection
:
oracleTrade1.TotalTradePrice = 100;
This will trigger the PropertyChanged
event on the object, which will be handled by the AutoRefresh()
method and passed to the downstream sorting operator.
Dynamic Filter Test
This test will demonstrate using dynamic filter that can be easily changed at run time.
In this test we shall use the ObservableCollection
source instead of the SourceCache
.
Here is the full code for the [Fact] public static void DynamicFilterTest()
test method:
[Fact]
public static void
DynamicFilterTest()
{
// create trades
var metaTrade1 = Symbol.META.CreateTrade(2000);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
var metaTrade2 = Symbol.META.CreateTrade(1900); ;
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
// creates the source ObservableCollection
ObservableCollection<Trade> sourceTradeCollection =
new ObservableCollection<Trade>
{
metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
};
// create stream of IChange parameters
// from the source collection
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);
// create filterObservable subject to allow
// changing the filters
Subject<Func<Trade, bool>> filterObservable =
new Subject<Func<Trade, bool>>();
// create the target collection
IObservableCollection<Trade> targetCollection =
new ObservableCollectionExtended<Trade>();
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
// use the Filter(...) method
// that accepts filterObservable
.Filter(filterObservable)
.AutoRefresh()
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
// now subscribe to start pulling data
// using clause will dispose the subscription
using var subscription = resultObservable.Subscribe();
// push the lambda expression to filter in only
// ORCL symbol into filterObservable
filterObservable.OnNext(t => t.TheSymbol == Symbol.ORCL);
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
oracleTrade1.TotalTradePrice = 100;
sourceTradeCollection
.Verify(targetCollection, t => t.TheSymbol == Symbol.ORCL);
// clear the source collection of oracle
// entries
sourceTradeCollection.Remove(oracleTrade1);
sourceTradeCollection.Remove(oracleTrade2);
Assert.True(targetCollection.Count == 0);
// change filter to META
filterObservable.OnNext(t => t.TheSymbol == Symbol.META);
// we should get some META entries in the target Collection
// right away
Assert.Contains<Trade>
(targetCollection, t => t.TheSymbol == Symbol.META);
}
Now we shall go over the main differences between this and the previous samples.
The major difference is that we create an Rx subject to push various filters of type Func
into it:
Subject<Func<Trade, bool>> filterObservable =
new Subject<Func<Trade, bool>>();
Then we use the Filter(...)
extension method that accepts IObservable
as the filtering operator:
IObservable<IChangeSet<Trade, int>> resultObservable =
changeSetStream
// use the Filter(...) method
// that accepts filterObservable
.Filter(filterObservable)
.AutoRefresh()
.SortAndBind
(
targetCollection,
SortExpressionComparer<Trade>
.Ascending(t => t.TotalTradePrice)
);
Note - .Filter(filterObservable)
line.
Then, after we start pulling data from the changeSetStream
by calling using var subscription = resultObservable.Subscribe();
, we push the first filter we want into the filterObservable
subject to filter only ORCL symbols:
filterObservable.OnNext(t => t.TheSymbol == Symbol.ORCL);
At the end of the test method, when we want to change the filter to say filtering in META symbols, we call:
filterObservable.OnNext(t => t.TheSymbol == Symbol.META);
and then assert that there are some trades with META symbol in the output collection:
// we should get some META entries in the target Collection
// right away
Assert.Contains<Trade>
(targetCollection, t => t.TheSymbol == Symbol.META);
Grouping Test
Grouping test shows how to use Dynamic Data operators for grouping trades by Symbol.
Static test method for grouping is called GroupingTest()
:
[Fact]
public static void GroupingTest()
{
// create trades
var metaTrade1 = Symbol.META.CreateTrade(2000);
var oracleTrade1 = Symbol.ORCL.CreateTrade(1000);
var metaTrade2 = Symbol.META.CreateTrade(1900); ;
var oracleTrade2 = Symbol.ORCL.CreateTrade(900);
// create the source collection of trades as
// observable collection
ObservableCollection<Trade> sourceTradeCollection =
new ObservableCollection<Trade>
{
metaTrade1, oracleTrade1, metaTrade2, oracleTrade2
};
// create stream of IChange parameters
// from the source collection
IObservable<IChangeSet<Trade, int>> changeSetStream =
sourceTradeCollection.ToObservableChangeSet(t => t.TradeId);
// do the grouping
IObservable<IGroupChangeSet<Trade, int, Symbol>>
groupedObservable =
changeSetStream
.AutoRefresh()
.Group(t => t.TheSymbol);
// transform the grouped entries into
// SymbolTradeGroup objects
var transformedGroups =
groupedObservable.Transform(g => new SymbolTradeGroup(g));
//output groups
ReadOnlyObservableCollection<SymbolTradeGroup>? symbolTradeGroups;
// create and populate an observable collection
// symbolTradeGroups that contains those SymbolTradeGroup
// objects
using IDisposable subscription =
transformedGroups
.Bind(out symbolTradeGroups) // create and populate
.DisposeMany() // make sure that if an item is removed
// from the collection, it is disposed
.Subscribe(); // start the subscription
// Assert there are two groups
// (one for ORCL and the other for META trades)
Assert.True(symbolTradeGroups.Count == 2);
// get the oracleGroup and assert it exists and single
SymbolTradeGroup oracleGroup =
symbolTradeGroups.Single(g => g.TheSymbol == Symbol.ORCL);
Assert.True(oracleGroup.Trades.Count() == 2);
Assert
.True(oracleGroup.TotalPrice ==
oracleGroup.Trades.Sum(t => t.TotalTradePrice));
// get the metaGroup and assert it exists and single
SymbolTradeGroup metaGroup =
symbolTradeGroups.Single(g => g.TheSymbol == Symbol.META);
Assert.True(metaGroup.Trades.Count() == 2);
Assert
.True(metaGroup.TotalPrice ==
metaGroup.Trades.Sum(t => t.TotalTradePrice));
// change the
oracleTrade1.TotalTradePrice = 5000;
// assert that the aggregation's total price got updated
Assert
.True
(oracleGroup.TotalPrice ==
oracleGroup.Trades.Sum(t => t.TotalTradePrice));
// add another oracle trade to the source collection
var oracleTrade3 = Symbol.ORCL.CreateTrade(7000);
sourceTradeCollection.Add(oracleTrade3);
// make sure that number of traded without orcl group
// is now 3 and that the total price of the aggregation
// got also updated
Assert.True(oracleGroup.Trades.Count() == 3);
Assert
.True(oracleGroup.TotalPrice ==
oracleGroup.Trades.Sum(t => t.TotalTradePrice));
// add a trade for another instrument (e.g. Symbol.TSLA)
// to the source collection
var tslaTrade1 = Symbol.TSLA.CreateTrade(10);
sourceTradeCollection.Add(tslaTrade1);
// make sure that the number of groups is now 3:
Assert.True(symbolTradeGroups.Count() == 3);
// remove all Oracle trades from the source collection
sourceTradeCollection.Remove(oracleTrade1);
sourceTradeCollection.Remove(oracleTrade2);
sourceTradeCollection.Remove(oracleTrade3);
// make sure that the corresponding group is also removed
Assert.True(symbolTradeGroups.Count() == 2);
Assert.True
(
symbolTradeGroups
.Where(group => group.TheSymbol == Symbol.ORCL)
.Count() == 0);
}
This test uses an extra type SymbolTradeGroup
representing a group of trades grouped by TheSymbol
property:
public class SymbolTradeGroup : IDisposable
{
// Dynamic Data group
IGroup<Trade, int, Symbol> _group;
// trades within the group
public IEnumerable<Trade> Trades => _group.Cache.Items;
// group key
public Symbol TheSymbol => _group.Key;
// sum of TotalTradePrice across all
// the trades within the group
public decimal TotalPrice { get; private set; }
private IDisposable? _disposableSubscription;
public void Dispose()
{
// destroy the group cache
_group.Cache.Dispose();
// remove the aggregation subscription
_disposableSubscription?.Dispose();
_disposableSubscription = null;
}
public SymbolTradeGroup(IGroup<Trade, int, Symbol> group)
{
_group = group;
// set up the TotalPrice to be
// dynamically calculated when the group
// or individual trades are changed
_disposableSubscription =
_group
.Cache
.Connect()
.ToCollection()
.Select(collection =>
collection.Sum(t => t.TotalTradePrice))
.Subscribe(sum => this.TotalPrice = sum);
}
}
Its TheSymbol
property represents the common symbol (the grouping key). Its property TotalPrice
is the sum of TotalTradePrice
properties from each of its Trade
objects. The TotalPrice
will automatically update when the group content changes or when the individual Trade
objects are updated.
Note, that SymbolTradeGroup
depends on IGroup
implementation from DD and is IDisposable
because of that (whenever a group is removed the underlying DD functionality needs to be disposed of).
Now, let us dive into the GroupingTest()
code.
The very beginning is exactly the same as in the previous sample - we create 4 trades, build an ObservableCollection
out of them and then obtain the IObservable
by calling ToObservableChangeSet(...)
static extension method.
Then we do the grouping by TheSymbol
property:
IObservable<IGroupChangeSet<Trade, int, Symbol>>
groupedObservable =
changeSetStream
.AutoRefresh()
.Group(t => t.TheSymbol);
Note, that AutoRefresh()
will help to update the aggregated TotalPrice
when TotalTradePrice
changes on an individual trade object.
After grouping we empty Transform(...)
extension method to convert each group into SymbolTradeGroup
object:
var transformedGroups =
groupedObservable.Transform(g => new SymbolTradeGroup(g));
Next, we define collection symbolTradeGroups
to contain the groups, call Bind(...)
extension method to get the groups into the collection and call Subscribe(...)
to start pulling data:
using IDisposable subscription =
transformedGroups
.Bind(out symbolTradeGroups) // create and populate
.DisposeMany() // make sure that if an item is removed
// from the collection, it is disposed
.Subscribe(); // start the subscription
Note, the method DisposeMany()
called before Subscribe()
- it ensures that once an group is removed from the symbolTradeGroups
collection, its Dispose()
method is called.
After that we assert the number of groups, the TotalPrice
value within each group and the content of each of the groups:
Assert.True(metaGroup.Trades.Count() == 2);
Assert
.True(metaGroup.TotalPrice ==
metaGroup.Trades.Sum(t => t.TotalTradePrice));
// change the
oracleTrade1.TotalTradePrice = 5000;
// assert that the aggregation's total price got updated
Assert
.True
(oracleGroup.TotalPrice == oracleGroup.Trades.Sum(t => t.TotalTradePrice));
After that, we update some TotalTradePrice
values for individual trades, add and remove some trades to sourceTradeCollection
and after each operation verify that the number of the groups, their TotalPrice
properties and trades are what they are supposed to be.