我不知道为什么我的StreamInsight引擎在每秒发送超过8000个事件的情况下,不能处理超过140个事件/秒。我在性能监视器中看到查询的事件数。而在应用程序的控制台中,就像引擎避免了很多事件一样。我有一个id为200的事件,然后是下一个id为3330的事件。有人知道问题出在哪里吗?
对于我的测试,我有一系列的查询。第一个查询的输出是第二个查询的输入,依此类推,对于这个场景,结果不超过140个事件/秒。
现在,我使用一个简单的查询来测试我的应用程序,该查询输出从输入流接收的所有事件,在这种情况下,服务器每秒处理大约2000个事件。我有一些包含结果的图像,但不幸的是,我还不能放入它们。让我困扰的是为什么事件/秒的数量突然减少到0,为什么引擎仍然没有考虑到所有的输入事件。
下面是我的服务器配置和我使用的查询。
using (var server = Server.Create("SIInstance23"))
{
log.Info("StreamInsight Server started");
Application application = server.CreateApplication("StreamInsight Application Test");
ServiceHost host = new ServiceHost(server.CreateManagementService());
WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
host.AddServiceEndpoint(typeof(IManagementService),
binding,
"http://localhost:8081/StreamInsight/SIInstance23");
ScenarioWorkflow.NormalScenarioWorkflow(application, server, host);
}
public static void NormalScenarioWorkflow(this Application application, Server server, ServiceHost host)
{
host.Open();
IQStreamable<SensorDataEvent> inputStream = application.DefineSensorDataEventStream();
var simpleQuery = from e in inputStream
select e;
var simpleQueryConsumer = application.DefineConsumer(simpleQuery);
var simpleQueryBinding = simpleQuery.Bind(simpleQueryConsumer);
using (simpleQueryBinding.Run("process"))
{
Thread.Sleep(1000);
Console.WriteLine(string.Empty);
DiagnosticSettings settings = server.GetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"));
settings.Aspects |= DiagnosticAspect.PerformanceCounters;
server.SetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"), settings);
Console.WriteLine("***Hit Return to exit after viewing query output***");
Console.WriteLine();
Console.ReadLine();
}
host.Close();
}接下来,当我尝试运行我的最后一个查询(来自链)时,我得到了大约1500个事件/秒。仍然存在事件/秒突然减少的问题。我认为对事件进行的每个转换都会产生一个新的转换,需要由引擎处理。因此,来自一系列查询的事件数应该超过1500。如果我错了,请纠正我。我是这个领域的新手,欢迎任何建议。
我认为问题出在下面的班级。我也尝试使用PointEvent<SendorDataEvent>而不是SensorDataEvent,也尝试插入CTI,但没有结果。
public class SocketEventInputAdapter : IObservable<SensorDataEvent>, IDisposable
{
public List<IObserver<SensorDataEvent>> observers { get; set; }
public object sync { get; set; }
public bool done { get; set; }
public SocketEventInputAdapter()
{
this.done = false;
this.observers = new List<IObserver<SensorDataEvent>>();
this.sync = new object();
SocketServer serverSocket = new SocketServer(4444, this);
}
internal void NotifyObservers(SensorDataEvent value)
{
lock (sync)
{
if (!done)
{
foreach (var observer in observers)
{
observer.OnNext(value);
}
}
}
}
public IDisposable Subscribe(IObserver<SensorDataEvent> observer)
{
lock (sync)
{
observers.Add(observer);
}
return new Subscription(this, observer);
}
void IDisposable.Dispose()
{
}
private sealed class Subscription : IDisposable
{
private readonly SocketEventInputAdapter _subject;
private IObserver<SensorDataEvent> _observer;
public Subscription(SocketEventInputAdapter subject, IObserver<SensorDataEvent> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
IObserver<SensorDataEvent> observer = _observer;
if (null != observer)
{
lock (_subject.sync)
{
_subject.observers.Remove(observer);
}
_observer = null;
}
}
}
}谢谢。
发布于 2015-01-27 14:11:58
正如Paul所说,您需要提供更多的信息。可能会发生很多事情...CTI未对齐/错误配置,源代码中不必要的锁定,接收器中的锁定或长时间运行的任务...或者任何数量的其他东西。在i7笔记本电脑上,我的StreamInsight进程平均每秒超过120K个事件……超过一周的压力测试。引擎会处理好的。
https://stackoverflow.com/questions/28150554
复制相似问题