Add AnsiRequestScheduler that prevents 2+ queries executing at once and has a global throttle

This commit is contained in:
tznind
2024-10-20 20:30:03 +01:00
parent db1977aa85
commit d345fc0130
3 changed files with 183 additions and 16 deletions

View File

@@ -1,7 +1,91 @@
#nullable enable
using System.Collections.Concurrent;
using System.Runtime.ConstrainedExecution;
namespace Terminal.Gui;
public class AnsiRequestScheduler(IAnsiResponseParser parser)
{
public static int sent = 0;
public List<AnsiEscapeSequenceRequest> Requsts = new ();
private ConcurrentDictionary<string, DateTime> _lastSend = new ();
private TimeSpan _throttle = TimeSpan.FromMilliseconds (100);
/// <summary>
/// Sends the <paramref name="request"/> immediately or queues it if there is already
/// an outstanding request for the given <see cref="AnsiEscapeSequenceRequest.Terminator"/>.
/// </summary>
/// <param name="request"></param>
/// <returns><see langword="true"/> if request was sent immediately. <see langword="false"/> if it was queued.</returns>
public bool SendOrSchedule (AnsiEscapeSequenceRequest request )
{
if (CanSend(request))
{
Send (request);
return true;
}
else
{
Requsts.Add (request);
return false;
}
}
/// <summary>
/// Identifies and runs any <see cref="Requsts"/> that can be sent based on the
/// current outstanding requests of the parser.
/// </summary>
/// <returns><see langword="true"/> if a request was found and run. <see langword="false"/>
/// if no outstanding requests or all have existing outstanding requests underway in parser.</returns>
public bool RunSchedule ()
{
var opportunity = Requsts.FirstOrDefault (CanSend);
if (opportunity != null)
{
Requsts.Remove (opportunity);
Send (opportunity);
return true;
}
return false;
}
private void Send (AnsiEscapeSequenceRequest r)
{
Interlocked.Increment(ref sent);
_lastSend.AddOrUpdate (r.Terminator,(s)=>DateTime.Now,(s,v)=>DateTime.Now);
parser.ExpectResponse (r.Terminator,r.ResponseReceived);
r.Send ();
}
public bool CanSend (AnsiEscapeSequenceRequest r)
{
if (ShouldThrottle (r))
{
return false;
}
return !parser.IsExpecting (r.Terminator);
}
private bool ShouldThrottle (AnsiEscapeSequenceRequest r)
{
if (_lastSend.TryGetValue (r.Terminator, out DateTime value))
{
return DateTime.Now - value < _throttle;
}
return false;
}
}
internal abstract class AnsiResponseParserBase : IAnsiResponseParser
{
protected readonly List<(string terminator, Action<string> response)> expectedResponses = new ();
@@ -227,6 +311,13 @@ internal abstract class AnsiResponseParserBase : IAnsiResponseParser
/// completed.
/// </summary>
public void ExpectResponse (string terminator, Action<string> response) { expectedResponses.Add ((terminator, response)); }
/// <inheritdoc />
public bool IsExpecting (string requestTerminator)
{
// If any of the new terminator matches any existing terminators characters it's a collision so true.
return expectedResponses.Any (r => r.terminator.Intersect (requestTerminator).Any());
}
}
internal class AnsiResponseParser<T> : AnsiResponseParserBase
@@ -298,3 +389,56 @@ internal class AnsiResponseParser : AnsiResponseParserBase
protected override void AddToHeld (object o) { held.Append ((char)o); }
}
/// <summary>
/// Describes an ongoing ANSI request sent to the console.
/// Use <see cref="ResponseReceived"/> to handle the response
/// when console answers the request.
/// </summary>
public class AnsiEscapeSequenceRequest
{
/// <summary>
/// Request to send e.g. see
/// <see>
/// <cref>EscSeqUtils.CSI_SendDeviceAttributes.Request</cref>
/// </see>
/// </summary>
public required string Request { get; init; }
/// <summary>
/// Invoked when the console responds with an ANSI response code that matches the
/// <see cref="Terminator"/>
/// </summary>
public Action<string> ResponseReceived;
/// <summary>
/// <para>
/// The terminator that uniquely identifies the type of response as responded
/// by the console. e.g. for
/// <see>
/// <cref>EscSeqUtils.CSI_SendDeviceAttributes.Request</cref>
/// </see>
/// the terminator is
/// <see>
/// <cref>EscSeqUtils.CSI_SendDeviceAttributes.Terminator</cref>
/// </see>
/// .
/// </para>
/// <para>
/// After sending a request, the first response with matching terminator will be matched
/// to the oldest outstanding request.
/// </para>
/// </summary>
public required string Terminator { get; init; }
/// <summary>
/// Sends the <see cref="Request"/> to the raw output stream of the current <see cref="ConsoleDriver"/>.
/// Only call this method from the main UI thread. You should use <see cref="AnsiRequestScheduler"/> if
/// sending many requests.
/// </summary>
public void Send ()
{
Application.Driver?.RawWrite (Request);
}
}

View File

@@ -5,4 +5,12 @@ public interface IAnsiResponseParser
{
AnsiResponseParserState State { get; }
void ExpectResponse (string terminator, Action<string> response);
/// <summary>
/// Returns true if there is an existing expectation (i.e. we are waiting a response
/// from console) for the given <paramref name="requestTerminator"/>.
/// </summary>
/// <param name="requestTerminator"></param>
/// <returns></returns>
bool IsExpecting (string requestTerminator);
}

View File

@@ -24,10 +24,14 @@ public class AnsiRequestsScenario : Scenario
private List<DateTime> sends = new ();
private Dictionary<DateTime,string> answers = new ();
private Label _lblSummary;
private AnsiRequestScheduler _scheduler;
public override void Main ()
{
Application.Init ();
_scheduler = new AnsiRequestScheduler (Application.Driver.GetParser ());
_win = new Window { Title = $"{Application.QuitKey} to Quit - Scenario: {GetName ()}" };
var lbl = new Label ()
@@ -45,7 +49,7 @@ public class AnsiRequestsScenario : Scenario
UpdateResponses ();
return _win.DisposedCount == 0;
return true;
});
var tv = new TextView ()
@@ -79,24 +83,32 @@ public class AnsiRequestsScenario : Scenario
_win.Add (cbDar);
int lastSendTime = Environment.TickCount;
object lockObj = new object ();
Application.AddTimeout (
TimeSpan.FromMilliseconds (50),
TimeSpan.FromMilliseconds (100),
() =>
{
if (cbDar.Value > 0)
lock (lockObj)
{
int interval = 1000 / cbDar.Value; // Calculate the desired interval in milliseconds
int currentTime = Environment.TickCount; // Current system time in milliseconds
// Check if the time elapsed since the last send is greater than the interval
if (currentTime - lastSendTime >= interval)
if (cbDar.Value > 0)
{
SendDar (); // Send the request
lastSendTime = currentTime; // Update the last send time
int interval = 1000 / cbDar.Value; // Calculate the desired interval in milliseconds
int currentTime = Environment.TickCount; // Current system time in milliseconds
// Check if the time elapsed since the last send is greater than the interval
if (currentTime - lastSendTime >= interval)
{
SendDar (); // Send the request
lastSendTime = currentTime; // Update the last send time
}
}
// TODO: Scheduler probably should be part of core driver
// Also any that we didn't get a chance to send
_scheduler.RunSchedule();
}
return _win.DisposedCount == 0;
return true;
});
@@ -157,7 +169,7 @@ public class AnsiRequestsScenario : Scenario
_graphView.Series.Add (_answeredSeries = new ScatterSeries ());
_sentSeries.Fill = new GraphCellToRender (new Rune ('.'), new Attribute (ColorName16.BrightGreen, ColorName16.Black));
_answeredSeries.Fill = new GraphCellToRender (new Rune ('.'), new Attribute (ColorName16.BrightCyan, ColorName16.Black));
_answeredSeries.Fill = new GraphCellToRender (new Rune ('.'), new Attribute (ColorName16.BrightRed, ColorName16.Black));
// Todo:
// _graphView.Annotations.Add (_sentSeries new PathAnnotation {});
@@ -192,10 +204,13 @@ public class AnsiRequestsScenario : Scenario
private void SendDar ()
{
// Ask for device attributes (DAR)
var p = Application.Driver.GetParser ();
p.ExpectResponse ("c", HandleResponse);
Application.Driver.RawWrite (EscSeqUtils.CSI_SendDeviceAttributes);
_scheduler.SendOrSchedule (
new ()
{
Request = EscSeqUtils.CSI_SendDeviceAttributes,
Terminator = EscSeqUtils.CSI_ReportDeviceAttributes_Terminator,
ResponseReceived = HandleResponse
});
sends.Add (DateTime.Now);
}