Being able to access the unsynchronized data is important to us developers who want to do our job in advanced scenarios without creating deadlocks.
So thank you.
In order to understand the unsynchronized Instrument.MarketData.Update event a little better, I wrote the code that is attached and saw the output that is shown in the image below.
To understand it better, I'd like to make some observations and ask some questions:
Observation 1:
There are two threads used to raise the events.
Each thread seems to raise its own events in the correct chronological sequence.
Combined, the two threads raise duplicate copies of each event, and the combined chronological sequence will vary depending on each thread's progress.
Question 1:
Why is a single event subscription receiving events from both threads and thereby receiving duplicate copies of each event?
Observation 2:
There seems to be a pattern with the threads that can be more easily explained if we name the threads:
Let the thread with the first event, in this case having ThreadId=49, be called the "primary thread".
Let the thread with the second event, i this case having ThreadId=57, be called the "secondary thread".
When the indicator is replaced, it will again subscribe and receive events from two threads.
The primary thread (id=49) will still be there. The secondary thread will have a different id.
It seems that the "primary thread" is like a "constant" thread that is always there and always pumping events.
It seems that the "secondary thread" is more like a temporary thread that was created just for this one particular subscription and a different thread will be used for the next subscription.
Question 2:
Can I rely on these observations about the threads to be consistent?
Question 3:
It seems like the best way to use the event data is to choose a thread id and only accept events from that thread id.
Do you agree?
NB: Ignore the '0' price in the first entry in the image below. It happened because of a minor code issue at the time I took the screenshot and does not happen with the code now posted below.
Here's the code I used to generate the view above:
using NinjaTrader.Core; using NinjaTrader.Data; using NinjaTrader.Gui; using System; using System.Globalization; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Media; namespace NinjaTrader.NinjaScript.Indicators { public class MarketDataSynchronizationTests : Indicator { #region Static stuff related to the unique indicator id private static int __id = 0; // The source of the unique indicator ids. private static int GetUniqueId() { return Interlocked.Increment(ref __id); } #endregion private readonly object Mutex = new object(); private readonly int Id = GetUniqueId(); // A unique id assigned only to this indicator instance. private bool _subscribed = false; // True when this indicator has subscribed to market data updates. private TickSequencePosition sequencePosition = new TickSequencePosition(); protected override void OnStateChange() { switch (State) { case State.SetDefaults: IsOverlay = false; Name = "_Market Data Synchronization Tests"; break; case State.DataLoaded: { if (ChartControl is object) { Instrument.MarketData.Update += MarketData_Update; _subscribed = true; } break; } case State.Terminated: if (_subscribed) { try { Instrument.MarketData.Update -= MarketData_Update; } catch { } } break; } } private void MarketData_Update(object sender, MarketDataEventArgs e) { lock (Mutex) { if (e.Instrument != Instrument) return; if (e.MarketDataType != MarketDataType.Last) return; var sb = new StringBuilder(); /// Figure out if the tick time is out of sequence, so we can flag it in the output window with a "*" character. var isBehind = false; try { sequencePosition.OnTick(e.Time); } catch { isBehind = true; } sb.Append(isBehind ? "* " : " "); sb.Append(e.Time.ToString("HH:mm:ss.fffffff")); sb.Append(' '); sb.Append(e.Volume.ToString().PadLeft(3)); sb.Append('@'); sb.Append(Instrument.MasterInstrument.FormatPrice(e.Price)); sb.Append(" Id: "); sb.Append(Id.ToString().PadLeft(3)); sb.Append(" ThreadId: "); sb.Append(Thread.CurrentThread.ManagedThreadId.ToString().PadLeft(3)); Print(sb.ToString()); } } /// <summary> /// This object keeps track of a tick's position within a sequence of ticks, and can be used to compare that tick's position /// with another tick's position. /// A tick's position in the sequence of ticks is defined firstly by its time. But because multiple ticks can occur at the same /// time, we also need to keep track of the tick's position within the group of ticks that occured at the same time. /// </summary> public struct TickSequencePosition : IComparable<TickSequencePosition>, IEquatable<TickSequencePosition> { /// <summary> /// The tick's time. /// </summary> public DateTime Time { get; private set; } /// <summary> /// The tick's position within the group of ticks that occurred at the same time. /// </summary> public int CountAtTime { get; private set; } /// <summary> /// Use this to advance the position counter. /// </summary> /// <exception cref="Exception">Thrown when time has gone backward. When thrown, it shows that there is an error in your code.</exception> public void OnTick(DateTime time) { if (time > Time) { Time = time; CountAtTime = 0; } else if (time == Time) { CountAtTime++; } else { throw new Exception("Tick time went backwards"); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public override int GetHashCode() { return (Time.GetHashCode() * (-1521134295)) + CountAtTime.GetHashCode(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public override bool Equals(object obj) { if (obj is TickSequencePosition) return Equals((TickSequencePosition)obj); return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool Equals(TickSequencePosition other) { return this == other; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public int CompareTo(TickSequencePosition other) { var timeCompare = Time.CompareTo(other.Time); return timeCompare == 0 ? CountAtTime.CompareTo(other.CountAtTime) : timeCompare; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator ==(TickSequencePosition left, TickSequencePosition right) { return left.Time == right.Time && left.CountAtTime == right.CountAtTime; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator !=(TickSequencePosition left, TickSequencePosition right) { return !(left == right); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator >(TickSequencePosition left, TickSequencePosition right) { return left.CompareTo(right) == 1; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator <(TickSequencePosition left, TickSequencePosition right) { return left.CompareTo(right) == -1; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator >=(TickSequencePosition left, TickSequencePosition right) { return left.CompareTo(right) >= 0; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool operator <=(TickSequencePosition left, TickSequencePosition right) { return left.CompareTo(right) <= 0; } } } }
Comment