首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >opcua会话被客户关闭。

opcua会话被客户关闭。
EN

Stack Overflow用户
提问于 2017-11-24 06:23:56
回答 2查看 1.9K关注 0票数 0

我已经为opc连接相关活动编写了附加的OpcUaConnector类.

但它不是在处理会话。例如:

  1. 在opc ua配置中,禁用端点。
  2. 在kepserver配置中运行时>重新初始化

windows服务正在抛出:

来源: system.Reactive.Core InnerException :会话被客户端关闭

并停止windows服务,因为此错误无法处理。

有人能建议如何在opc中处理会话吗?

代码语言:javascript
复制
public class OpcUaConnector
{
    private static SimplerAES simplerAES = new SimplerAES();

    private DataContainer dataCointainer = null;

    private UaTcpSessionChannel channel;

    private string opcServerName = string.Empty;
    private string opcUserId = string.Empty;
    private string opcPassword = string.Empty;

    private static ILog LogOpcStore;

    private static System.IDisposable token;

    private static uint id;
    public OpcConnector(ILog Log)
    {
        IntializeLogOpcStore(Log);
    }

    private static void IntializeLogOpcStore(ILog Log)
    {
        LogOpcStore = Log;
    }

    public async Task OpenOpcConnection()
    {
        try
        {
            if ((!string.IsNullOrEmpty(this.opcServerName) & (this.opcServerName != AppMain.MyAppSettings.OpcServer)) ||
            (!string.IsNullOrEmpty(this.opcUserId) & (this.opcUserId != AppMain.MyAppSettings.OpcUserId)) ||
            (!string.IsNullOrEmpty(this.opcPassword) & (this.opcPassword != AppMain.MyAppSettings.OpcPassword)))
            {
                await channel.CloseAsync();
                this.opcServerName = AppMain.MyAppSettings.OpcServer;
                this.opcUserId = AppMain.MyAppSettings.OpcUserId;
                this.opcPassword = AppMain.MyAppSettings.OpcPassword;
            }

            if (channel==null || (channel != null && (channel.State == CommunicationState.Closed || channel.State == CommunicationState.Faulted)))
            {
                var appDescription = new ApplicationDescription()
                {
                    ApplicationName = "MyAppName",
                    ApplicationUri = $"urn:{System.Net.Dns.GetHostName()}:MyAppName",
                    ApplicationType = ApplicationType.Client,
                };

                //application data won't be deleted when uninstall
                var certificateStore = new DirectoryStore(
                    Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData), MyAppName", "pki"),
                    true, true
                    );

                //if the Ethernet cable unplugs or the Wifi drops out,
                //you have some timeouts that can keep the session open for a while. 
                //There is a SessionTimeout (default of 2 min).

                this.channel = new UaTcpSessionChannel(
                                appDescription,
                                certificateStore,
                                SignInOpc,
                                AppMain.MyAppSettings.OpcServer,
                                null,
                                options: new UaTcpSessionChannelOptions { SessionTimeout = 120000 });

                await channel.OpenAsync();

                //LogOpcStore.Info(String.Format("Opc connection sucessful"));
            }

            this.opcServerName = AppMain.MyAppSettings.OpcServer;
            this.opcUserId = AppMain.MyAppSettings.OpcUserId;
            this.opcPassword = AppMain.MyAppSettings.OpcPassword;
        }
        catch (Exception ex)
        {
            ServiceException serviceException = new ServiceException(ex.HResult + " " + ex.Message, "C052");

            throw serviceException;
        }
    }

    private static async Task RecursivelyFindNode(UaTcpSessionChannel channel, NodeId nodeid)
    {
        BrowseRequest browseRequest = new BrowseRequest
        {
            NodesToBrowse = new BrowseDescription[] { new BrowseDescription { NodeId = nodeid, BrowseDirection = BrowseDirection.Forward, ReferenceTypeId = NodeId.Parse(ReferenceTypeIds.HierarchicalReferences), NodeClassMask = (uint)NodeClass.Variable | (uint)NodeClass.Object, IncludeSubtypes = true, ResultMask = (uint)BrowseResultMask.All } },
        };

        BrowseResponse browseResponse = await channel.BrowseAsync(browseRequest);

        foreach (var rd1 in browseResponse.Results[0].References ?? new ReferenceDescription[0])
        {
            uint chid = AppMain.MyTagDatabase.GetClientHandleByTag(rd1.DisplayName.ToString());

            if (chid > 0)
            {
                AppMain.MyTagDatabase.UpdateNodeByClientHandle(chid, rd1.NodeId.ToString());
            }

            await RecursivelyFindNode(channel, ExpandedNodeId.ToNodeId(rd1.NodeId, channel.NamespaceUris));
        }
    }

    public async Task CreateSubscription(DataContainer dc)
    {
        double curReadingValue;

        try
        {
            dataCointainer = dc;

            await RecursivelyFindNode(channel, NodeId.Parse(ObjectIds.RootFolder));

            if (AppMain.MyTagDatabase.GetCntTagsNotInOpcServer() == AppMain.MyTagDatabase.GetTagCount())
            {
                //no need to create subscription
                return;
            }


            //subscription timeout that is the product of PublishingInterval * LifetimeCount:

            var subscriptionRequest = new CreateSubscriptionRequest
            {
                RequestedPublishingInterval = 1000f,
                RequestedMaxKeepAliveCount = 30,
                RequestedLifetimeCount = 30 * 3,
                PublishingEnabled = true,
            };

            var subscriptionResponse = await channel.CreateSubscriptionAsync(subscriptionRequest);

            id = subscriptionResponse.SubscriptionId;

            var itemsToCreate = new MonitoredItemCreateRequest[AppMain.MyTagDatabase.GetTagHavingNodeCount()];

            int i = 0;

            foreach (var item in AppMain.MyTagDatabase.GetMyTagDatabase())
            {
                var itemKey = item.Key;
                var itemValue = item.Value;

                itemsToCreate[i] = new MonitoredItemCreateRequest { ItemToMonitor = new ReadValueId { NodeId = NodeId.Parse(itemValue.NodeId), AttributeId = AttributeIds.Value }, MonitoringMode = MonitoringMode.Reporting, RequestedParameters = new MonitoringParameters { ClientHandle = itemKey, SamplingInterval = -1, QueueSize = 0, DiscardOldest = true } };
                i++;
            }

            var itemsRequest = new CreateMonitoredItemsRequest
            {
                SubscriptionId = id,
                ItemsToCreate = itemsToCreate,
            };

            var itemsResponse = await channel.CreateMonitoredItemsAsync(itemsRequest);

            token = channel.Where(pr => pr.SubscriptionId == id).Subscribe(pr =>
            {
            // loop thru all the data change notifications
            // receiving data change notifications here
            var dcns = pr.NotificationMessage.NotificationData.OfType<DataChangeNotification>();

                foreach (var dcn in dcns)
                {
                    foreach (var min in dcn.MonitoredItems)
                    {
                        MyTag MyTag = new MyTag();

                        bool hasValue = AppMain.MyTagDatabase.GetMyTag(min.ClientHandle, out MyTag);

                        if (hasValue)
                        {
                            if (double.TryParse(min.Value.Value.ToString(), out curReadingValue))
                            {
                                //LogOpcStore.Info(String.Format("ClientHandle : {0}  TagName : {1} SourceTimestamp : {2}  ServerTimeStamp : {3}  curReadingValue : {4}", min.ClientHandle, MyTag.TagName, min.Value.SourceTimestamp, min.Value.ServerTimestamp, curReadingValue));
                                AddDataPointToContainer(1, MyTag.TagName, min.Value.SourceTimestamp, curReadingValue);
                            }
                        }
                    }
                }
            });
        }
        catch (Exception ex)
        {
            //If the interruption lasts longer than these timeouts then the SessionChannel and Subscriptions will need to be recreated.

            channel = null;

            FatalServiceException fatalserviceException = new FatalServiceException(ex.Message, "C052");
            throw fatalserviceException;
        }
    }

    public async Task DeleteSubscription()
    {
        try
        {
            var request = new DeleteSubscriptionsRequest
            {
                SubscriptionIds = new uint[] { id }
            };

            await channel.DeleteSubscriptionsAsync(request);

            token.Dispose();
        }
        catch (Exception ex)
        {
            ServiceException serviceException = new ServiceException(ex.Message, "C052");

            throw serviceException;
        }
    }

    private static async Task<IUserIdentity> SignInOpc(EndpointDescription endpoint)
    {
        IUserIdentity userIdentity = null;

        if (endpoint.UserIdentityTokens.Any(p => p.TokenType == UserTokenType.Anonymous))
        {
            userIdentity = new AnonymousIdentity();
        }
        else if (endpoint.UserIdentityTokens.Any(p => p.TokenType == UserTokenType.UserName))
        {
            var userName = AppMain.MyAppSettings.OpcUserId;
            var password = simplerAES.Decrypt(AppMain.MyAppSettings.OpcPassword);

            userIdentity = new UserNameIdentity(userName, password);
        }

        return userIdentity;
    }

    private void AddDataPointToContainer(int dataType, string source, DateTime SourceTimestampUTC, double value)
    {
        ConditionValue conditionValue = new ConditionValue();

        long timestamp = AppMain.ServerSyncTimeStore.ConvertDateTimeToTimeStampUTC(SourceTimestampUTC);

        conditionValue.dataType = dataType;
        conditionValue.source = source;
        conditionValue.timestamp = timestamp;
        conditionValue.SourceTimestampUTC = SourceTimestampUTC;
        conditionValue.LocalTime = SourceTimestampUTC.ToLocalTime();
        conditionValue.value = value;

        //LogOpcStore.Info(String.Format("TagName : {0} SourceTimestampUTC : {1} timestamp : {2} LocalTime : {3} curReadingValue : {4}", source, SourceTimestampUTC, timestamp, SourceTimestampUTC.ToLocalTime(),  value));

        dataCointainer.AddDataPoint(conditionValue);
    }
}
EN

回答 2

Stack Overflow用户

发布于 2017-11-27 00:05:08

我看到您正在使用项目https://github.com/convertersystems/opc-ua-client

当服务器关闭会话和套接字(如重新初始化Kepware时发生的情况),客户端会立即收到导致客户端通道出错的通知。错误的通道不能重新打开,它应该中止,并且应该创建一个新的通道。

我做了这个独立的测试,以表明您可能必须捕获一个异常并重新创建通道和订阅。此测试的目的是订阅CurrentTime节点并收集60个数据池。测试应该持续一分钟。如果您在测试过程中重新插入Kepware服务器,代码将捕获异常并重新创建通道和订阅。

代码语言:javascript
复制
    [TestMethod]
    public async Task OpcConnectorTest()
    {
        var count = 0;
        UaTcpSessionChannel channel = null;

        while (count < 60)
        {
            try
            {
                channel = new UaTcpSessionChannel(
                        this.localDescription,
                        this.certificateStore,
                        new AnonymousIdentity(),
                        EndpointUrl,
                        SecurityPolicyUris.None,
                        loggerFactory: this.loggerFactory);

                await channel.OpenAsync();

                // create the keep alive subscription.
                var subscriptionRequest = new CreateSubscriptionRequest
                {
                    RequestedPublishingInterval = 1000f,
                    RequestedMaxKeepAliveCount = 30,
                    RequestedLifetimeCount = 30 * 3,
                    PublishingEnabled = true,
                };
                var subscriptionResponse = await channel.CreateSubscriptionAsync(subscriptionRequest).ConfigureAwait(false);
                var id = subscriptionResponse.SubscriptionId;

                var token = channel.Where(pr => pr.SubscriptionId == id).Subscribe(pr =>
                {
                    // loop thru all the data change notifications
                    var dcns = pr.NotificationMessage.NotificationData.OfType<DataChangeNotification>();
                    foreach (var dcn in dcns)
                    {
                        foreach (var min in dcn.MonitoredItems)
                        {
                            Console.WriteLine($"sub: {pr.SubscriptionId}; handle: {min.ClientHandle}; value: {min.Value}");
                            count++;
                        }
                    }

                });

                var itemsRequest = new CreateMonitoredItemsRequest
                {
                    SubscriptionId = id,
                    ItemsToCreate = new MonitoredItemCreateRequest[]
                    {
                        new MonitoredItemCreateRequest { ItemToMonitor = new ReadValueId { NodeId = NodeId.Parse("i=2258"), AttributeId = AttributeIds.Value }, MonitoringMode = MonitoringMode.Reporting, RequestedParameters = new MonitoringParameters { ClientHandle = 12345, SamplingInterval = -1, QueueSize = 0, DiscardOldest = true } }
                    },
                };
                var itemsResponse = await channel.CreateMonitoredItemsAsync(itemsRequest);

                while (channel.State == CommunicationState.Opened && count < 60)
                {
                    await Task.Delay(1000);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Exception: {ex.GetType()}. {ex.Message}");
            }
        }

        if (channel != null)
        {
            Console.WriteLine($"Closing session '{channel.SessionId}'.");
            await channel.CloseAsync();
        }
    }
票数 0
EN

Stack Overflow用户

发布于 2018-01-10 17:23:00

我知道这是个老职位,但我也偶然发现了这个问题。对感兴趣的人:

这个问题与订阅有关。

运行以下代码时:

代码语言:javascript
复制
token = channel.Where(pr => pr.SubscriptionId == id).Subscribe(pr =>
{
    // loop thru all the data change notifications
    // receiving data change notifications here
    var dcns = pr.NotificationMessage.NotificationData.OfType<DataChangeNotification>();

    foreach (var dcn in dcns)
    {
        foreach (var min in dcn.MonitoredItems)
        {
            MyTag MyTag = new MyTag();

            bool hasValue = AppMain.MyTagDatabase.GetMyTag(min.ClientHandle, out MyTag);

            if (hasValue)
            {
                if (double.TryParse(min.Value.Value.ToString(), out curReadingValue))
                {
                    //LogOpcStore.Info(String.Format("ClientHandle : {0}  TagName : {1} SourceTimestamp : {2}  ServerTimeStamp : {3}  curReadingValue : {4}", min.ClientHandle, MyTag.TagName, min.Value.SourceTimestamp, min.Value.ServerTimestamp, curReadingValue));
                            AddDataPointToContainer(1, MyTag.TagName, min.Value.SourceTimestamp, curReadingValue);
                }
            }
        }
    }
});

Observable.subscribe()接受多个参数。您应该包括在发生错误时应该做些什么。例如:

代码语言:javascript
复制
token = channel.Where(pr => pr.SubscriptionId == id).Subscribe(
    pr => { code to run normally... }, 
    ex => { Log.Info(ex.Message); }, 
    () => { }
);

有关详细信息,请参阅http://reactivex.io/documentation/operators/subscribe.html

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47467454

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档