OPC Reactive Extensions (Rx/OPC)
The reactive programming model merges the world of Microsoft Reactive Extensions (Rx) for .NET with OPC. The Reactive Extensions (http://msdn.microsoft.com/en-us/data/gg577609.aspx) is a library to compose asynchronous and event-based programs using observable collections (data streams) and LINQ-style query operators.
OPC Reactive Extensions (Rx/OPC) are part of our QuickOPC product, a toolkit for OPC client development.
The reactive model for OPC client development, along with the original and traditional imperative programming model, can be freely mixed in the same project. For even higher level and demands, the reactive programming model in QuickOPC can be used in complex event processing (CEP) applications, with the use of the StreamInsight Extensions.
Introduction to Reactive Programming for OPC
OPC Reactive Extensions (Rx/OPC) are OPC-specific classes that implement the Rx interfaces. With Rx/OPC, you can e.g.:
- Create observable collections (data streams) that provide OPC data.
- Consume (observe) data streams, and process them using OPC.
- Combine data flows to create more complex operations.
For OPC Data Access, Rx/OPC gives you the ability to work with flows representing OPC item changes, or flows that represents values to be written into an OPC item. Rx/OPC also works with OPC Alarms&Events, and OPC Unified Architecture data sources.
For example, your application may be required to continuously monitor the value of an OPC item, and when it is available (there is no failure obtaining the value), make some computations with it (we will multiply the value by 1000), and write the results into a different OPC item. This logic can be expressed by following code:
DAItemChangedObservable.Create<int>( "", "OPCLabs.KitServer.2", "Simulation.Incrementing (1 s)", 100) .Where(e => e.Exception == null) .Select(e => e.Vtq.Value * 1000) .Subscribe(DAWriteItemValueObserver.Create<int>( "", "OPCLabs.KitServer.2", "Simulation.Register_I4"));
Let's dissect what this example does:
- It creates an observable sequence for significant changes in OPC-DA item "Simulation.Incrementing (1 s)".
- The "Where" clause filters (from the observable sequence) only the changes that have a null Exception, i.e. those that carry a valid Vtq object (value/timestamp/quality).
- The "Select" clause takes the actual value from the Vtq property (it is of type DAVtq<int>), and returns the value multiplied by 1000.
- An observer that writes incoming vales into the "Simulation.Register_I4" OPC-DA item is created.
- The observer is subscribed to the transformed (processed) observable sequence.
As you can see, the code that does this kind of OPC data processing is very concise - all the extra "plumbing" needed in imperative programming model is gone, and only the meaningful pieces remain. Programs written in this way clearly express their intent, and the logic that handles certain functionality is concentrated in one place and not spread around various classes and methods.
Useful links: Documentation / Knowledge Base
Footnote & required disclosure: QuickOPC (including its Options) is a software development kit (SDK) for development of OPC clients and subscribers. Installing QuickOPC or its Options does not change system settings.