Configure Streaming
Server side Configuration
Typically, in addition to publishing its own structure (Signals,
Channels,
Function Blocks, etc.), an openDAQ™
Device also publishes information about Streaming protocols it supports.
This information includes the Streaming protocol ID (e.g. "OpenDAQLTStreaming"
or "OpenDAQNativeStreaming"
)
and a range of optional parameters (e.g. port number). Upon initiating a Streaming server on the Device,
it automatically generates and prepares the mentioned piece of information, also known as a Server capability,
for publication alongside the Device’s structural details. To enable the publication of this information,
the server responsible for the transfer of structural information should be added last to the openDAQ™ Instance,
following the addition of all Streaming servers.
The example below demonstrates the correct sequence for adding various servers to the openDAQ™ Instance.
-
Cpp
#include <chrono>
#include <thread>
#include <opendaq/opendaq.h>
using namespace daq;
int main()
{
using namespace std::chrono_literals;
const InstancePtr instance = Instance();
instance.setRootDevice("daqref://device1");
// Creates and registers a Server capability with the ID `OpenDAQLTStreaming` and the default port number 7414
instance.addServer("OpenDAQLTStreaming", nullptr);
// Creates and registers a Server capability with the ID `OpenDAQNativeStreaming` and the default port number 7420
instance.addServer("OpenDAQNativeStreaming", nullptr);
// As the Streaming servers were added first, the registered Server capabilities are published over OPC UA
instance.addServer("OpenDAQOPCUA", nullptr);
while(true)
std::this_thread::sleep_for(100ms);
return 0;
}
Configure Streaming for structure-enabled Device automatically
Most openDAQ™ Devices support structural information transferring via the configuration protocol. These are openDAQ™ OPC UA-compatible Devices running an OPC UA server, as well as Devices that are compatible with the openDAQ™ Native configuration protocol. Regardless of the protocol type, the configuration protocol not only transfers the Device’s structural details but also the set of Streaming Server capabilities available for the Device.
Each Streaming Server capability is identified by the Streaming protocol ID, that is transformed into a prefix within the connection string formed by considering all parameters in the Streaming Server capability along with the known Device’s IP address. This connection string enables the delegation of Streaming instantiation to the appropriate data Streaming Module. As a result, when connecting an openDAQ™ Device, a streaming connection can be established automatically using the published streaming connection details. When establishing a connection to a gateway Devices, which contains nested Devices, the default behavior is to ignore direct Streaming connections for these nested Devices, and establishing the minimum number of Streaming connections possible.
However, users can customize the rules for automatically establishing the Streaming connections. This is done by
passing the Device Configuration Property
object as a second parameter to the addDevice
call.
The configuration mechanism allows filtering available streaming protocols by enabling or disabling specific ones.
Protocol listed first as enabled is given higher priority, determining its selection as the active Streaming
source for all of the Device’s Signals.
Furthermore, there’s an option to specify a Streaming path heuristic, particularly useful for multiple nested Devices connected in a tree-structured manner (as illustrated in the diagram). The allowed heuristics include:
-
"Minimize-connections" mode (ID 0) - is used to establish the fewest Streaming connections possible at the cost of routing Signals' data through gateway Devices, increasing the hop count (default mode).
-
"Minimize-hops" mode (ID 1) - is used to attempt streaming data directly from nested Devices to minimize the amount of hops between Devices the data must make.
-
"Not connected" mode (ID 2) - with this set the information about supported streaming protocols published by the Device is not used to automatically establish Streaming connections.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Create an empty Property object
PropertyObjectPtr deviceConfig = PropertyObject();
// Add property to allow multiple Streaming protocols with native protocol having the first priority
auto prioritizedStreamingProtocols = List<IString>("OpenDAQLTStreaming", "OpenDAQNativeStreaming");
deviceConfig.addProperty(ListProperty("PrioritizedStreamingProtocols", prioritizedStreamingProtocols));
// Set property to disregard direct Streaming connections for nested Devices,
// and establish the minimum number of streaming connections possible.
const auto streamingConnectionHeuristicProp = SelectionProperty("StreamingConnectionHeuristic",
List<IString>("MinConnections",
"MinHops",
"NotConnected"),
0);
deviceConfig.addProperty(streamingConnectionHeuristicProp);
// Find and connect to a Device hosting an OPC UA TMS server
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
for (const auto& capability : deviceInfo.getServerCapabilities())
if (capability.getProtocolName() == "OpenDAQOPCUA")
{
device = instance.addDevice(capability.getConnectionString(), deviceConfig);
break;
}
}
if (!device.assigned())
std::cerr << "No relevant Device found!" << std::endl;
else
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
return 0;
}
Add Streaming for structure-enabled Device manually
The additional Streaming connections for the Device can be instantiated manually at any time after the Device is connected.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Create an empty Property object
PropertyObjectPtr deviceConfig = PropertyObject();
// Set property to disable automatic Streaming connection
const auto streamingConnectionHeuristicProp = SelectionProperty("StreamingConnectionHeuristic",
List<IString>("MinConnections",
"MinHops",
"NotConnected"),
2);
deviceConfig.addProperty(streamingConnectionHeuristicProp);
// Connect to a Device hosting an OPC UA TMS server using connection string
DevicePtr device = instance.addDevice("daq.opcua://127.0.0.1", deviceConfig);
if (!device.assigned())
{
std::cerr << "No relevant Device found!" << std::endl;
return 0;
}
else
{
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
}
// Connect to a Native Streaming protocol using connection string
StreamingPtr streaming = device.addStreaming("daq.ns://127.0.0.1");
// Get all Device's Signals recursively
const auto deviceSignals = device.getSignals(search::Recursive(search::Any()));
// Associate Device's Signals with Streaming
streaming.addSignals(deviceSignals);
return 0;
}
Connecting to Streaming protocol based Pseudo-Devices
Pseudo-Devices belong to a category of openDAQ™ Devices whose implementation solely relies on the Streaming protocol. Such Devices offer a flat list of Signals without detailed structural information. These Devices are created using the Module responsible for establishing the corresponding Streaming connection. The Device connection string serves to route and delegate Device object instantiation to the relevant Module. This connection string is identical to the Streaming connection string used for Streaming connection instantiation, with the exception that the prefix indicating the Streaming protocol type might be replaced with the prefix representing the appropriate Device type. Following this prefix, the same set of parameters unique to each Streaming protocol type is appended.
For example, the prefix "daq.ns"
in the Device connection string aligns with the Native Streaming protocol,
which is identified by the same prefix "daq.ns"
in the Streaming connection string. Similarly, the Device
connection string prefix "daq.lt"
corresponds to the Websocket Streaming protocol, recognized
by the Streaming connection string prefix "daq.lt"
.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Find and connect to a Device hosting an Native Streaming server
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
for (const auto& capability : deviceInfo.getServerCapabilities())
{
if (capability.getProtocolName() == "OpenDAQNativeStreaming")
{
device = instance.addDevice(capability.getConnectionString());
break;
}
}
}
if (!device.assigned())
std::cerr << "No relevant Device found!" << std::endl;
else
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
return 0;
}
Configure Streaming per Signal
Once the Device is connected, the Streaming sources of its Signals can be examined and modified for each Signal individually at any given time.
The Streaming sources are identified by a connection string that includes the protocol prefix, indicating
the protocol type ID, and parameters based on the protocol type (IP address, port number etc.).
To manipulate the Streaming sources of particular Signal the MirroredSignalConfig
object is used,
it provides ability to:
-
retrieve a list of streaming sources available for signal by using
getStreamingSources
call, -
get the currently active streaming source by using
getActiveStreamingSource
call, -
change the active streaming source for a signal by using
setActiveStreamingSource
call, -
enable or disable data streaming for signal by using
setStreamed
call, -
check if streaming is enabled or disabled for signal by using
getStreamed
call.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// ...
// Get the first Signal of connected Device
MirroredSignalConfigPtr signal = device.getSignalsRecursive()[0];
// Find and output the Streaming sources available for Signal
StringPtr nativeStreamingSource;
StringPtr websocketStreamingSource;
std::cout << "Signal supports " << signal.getStreamingSources().getCount() << " streaming sources:" << std::endl;
for (const auto& source : signal.getStreamingSources())
{
std::cout << source << std::endl;
if (source.toView().find("daq.ns://") != std::string::npos)
nativeStreamingSource = source;
if (source.toView().find("daq.lt://") != std::string::npos)
websocketStreamingSource = source;
}
// Output the active Streaming source of Signal
std::cout << "Active streaming source of signal: " << signal.getActiveStreamingSource() << std::endl;
// Output the Streaming status for the Signal to verify that streaming is enabled
std::cout << "Streaming enabled status for signal is: " << (signal.getStreamed() ? "true" : "false") << std::endl;
// Change the active Streaming source of Signal
signal.setActiveStreamingSource(nativeStreamingSource);
std::cout << "Press \"enter\" to exit the application..." << std::endl;
std::cin.get();
return 0;
}
Full listing
The following is a fully working example of configuring Streaming and reading Signal data using different Streaming sources.
-
Cpp
#include <opendaq/opendaq.h>
#include <chrono>
#include <iostream>
#include <thread>
using namespace daq;
void readSamples(const MirroredSignalConfigPtr signal)
{
using namespace std::chrono_literals;
StreamReaderPtr reader = StreamReader<double, uint64_t>(signal);
// Get the resolution and origin
DataDescriptorPtr descriptor = signal.getDomainSignal().getDescriptor();
RatioPtr resolution = descriptor.getTickResolution();
StringPtr origin = descriptor.getOrigin();
StringPtr unitSymbol = descriptor.getUnit().getSymbol();
std::cout << "\nReading signal: " << signal.getName() << "; active Streaming source: " << signal.getActiveStreamingSource()
<< std::endl;
std::cout << "Origin: " << origin << std::endl;
// Allocate buffer for reading double samples
double samples[100];
uint64_t domainSamples[100];
for (int i = 0; i < 40; ++i)
{
std::this_thread::sleep_for(25ms);
// Read up to 100 samples every 25 ms, storing the amount read into `count`
SizeT count = 100;
reader.readWithDomain(samples, domainSamples, &count);
if (count > 0)
{
Float domainValue = (Int) domainSamples[count - 1] * resolution;
std::cout << "Value: " << samples[count - 1] << ", Domain: " << domainValue << unitSymbol << std::endl;
}
}
}
int main(int /*argc*/, const char* /*argv*/[])
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Create an empty Property object
PropertyObjectPtr deviceConfig = PropertyObject();
// Add property to allow multiple Streaming protocols with native protocol having the first priority
auto prioritizedStreamingProtocols = List<IString>("OpenDAQLTStreaming", "OpenDAQNativeStreaming");
deviceConfig.addProperty(ListProperty("PrioritizedStreamingProtocols", prioritizedStreamingProtocols));
// Set property to ignore streaming sources of nested Devices
const auto streamingConnectionHeuristicProp = SelectionProperty("StreamingConnectionHeuristic",
List<IString>("MinConnections",
"MinHops",
"Fallbacks",
"NotConnected"),
0);
deviceConfig.addProperty(streamingConnectionHeuristicProp);
// Find and connect to a Device using the device info connection string
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
if (deviceInfo.getConnectionString().toView().find("daq://") != std::string::npos)
{
device = instance.addDevice(deviceInfo.getConnectionString(), deviceConfig);
break;
}
}
// Exit if no Device is found
if (!device.assigned())
{
std::cerr << "No relevant Device found!" << std::endl;
return 0;
}
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
// Find the AI Signal
auto signals = device.getSignalsRecursive();
ChannelPtr channel;
MirroredSignalConfigPtr signal;
for (const auto& sig : signals)
{
auto name = sig.getDescriptor().getName();
if (name.toView().find("AI") != std::string_view::npos)
{
signal = sig;
channel = signal.getParent().getParent();
break;
}
}
if (!signal.assigned())
{
std::cerr << "No AI signal found!" << std::endl;
return 1;
}
// Find and output the Streaming sources of Signal
StringPtr nativeStreamingSource;
StringPtr websocketStreamingSource;
std::cout << "AI signal has " << signal.getStreamingSources().getCount() << " Streaming sources:" << std::endl;
for (const auto& source : signal.getStreamingSources())
{
std::cout << source << std::endl;
if (source.toView().find("daq.ns://") != std::string::npos)
nativeStreamingSource = source;
if (source.toView().find("daq.lt://") != std::string::npos)
websocketStreamingSource = source;
}
// Check the active Streaming source of Signal
if (signal.getActiveStreamingSource() != websocketStreamingSource)
{
std::cerr << "Wrong active Streaming source of AI signal" << std::endl;
return 1;
}
// Output samples using Reader with Streaming LT
readSamples(signal);
// Change the active Streaming source of Signal
signal.setActiveStreamingSource(nativeStreamingSource);
// Output samples using Reader with native Streaming
readSamples(signal);
std::cout << "Press \"enter\" to exit the application..." << std::endl;
std::cin.get();
return 0;
}