Configure Streaming

Server side configuration

Typically, in addition to publishing its own structure (Signals, Channels, Function Blocks, etc.), an openDAQ™ Device also publishes information about Streaming protocols it supports. This information includes the streaming protocol ID (e.g. "opendaq_lt_streaming" or "opendaq_native_streaming") and a range of optional parameters (e.g. port number). Upon initiating a streaming server on the device, it automatically generates and prepares the mentioned piece of information, also known as a Server capability, for publication alongside the device’s structural details. To enable the publication of this information, the server responsible for the transfer of structural information should be added last to the openDAQ™ instance, following the addition of all streaming servers.

The example below demonstrates the correct sequence for adding various servers to the openDAQ™ instance.

  • Cpp

#include <chrono>
#include <thread>
#include <opendaq/opendaq.h>

using namespace daq;

int main()
{
    using namespace std::chrono_literals;

    const InstancePtr instance = Instance(MODULE_PATH);

    instance.setRootDevice("daqref://device1");

    // Creates and registers a Server capability with the ID `opendaq_lt_streaming` and the default port number 7414
    instance.addServer("openDAQ LT Streaming", nullptr);

    // Creates and registers a Server capability with the ID `opendaq_native_streaming` and the default port number 7420
    instance.addServer("openDAQ Native Streaming", nullptr);

    // As the Streaming servers were added first, the registered Server capabilities are published over OPC UA
    instance.addServer("openDAQ OpcUa", nullptr);

    while(true)
        std::this_thread::sleep_for(100ms);

    return 0;
}

Configure Streaming for structure-enabled Device

Most openDAQ™ devices support structural information transferring. An example of such a device is an openDAQ™ OPC UA-compatible device that is running a compatible OPC UA server. Clients establish connections to these devices using a connection string prefixed with "daq.opcua://" recognized by the openDAQ™ opcua_client_module. This module not only manages the Device’s structural details but also handles the received set of Streaming Server capabilities associated with the Device.

Each Streaming Server capability is identified by the Streaming protocol ID, that is transformed into a prefix within the connection string formed by considering all parameters in the Streaming Server capability along with the known device’s IP address. This connection string enables the delegation of Streaming instantiation to the appropriate data Streaming Module. As a result, when connecting an openDAQ™ device, a streaming connection can be established automatically using the published streaming connection details.

The Device configuration Property object provides a mechanism for customizing the filtering of available Streaming protocols by enabling or disabling specific ones. Additionally, it allows to set the "primary" protocol type of the Streaming connection to be initially used as an active Streaming source for all of the Signals.

Furthermore, there’s an option to specify a Streaming path heuristic, particularly useful for multiple nested devices connected in a tree-structured manner (as illustrated in the diagram). The allowed heuristics include:

  • "Minimize-connections" mode (ID 0) - is used to establish the fewest Streaming connections possible at the cost of routing Signals' data through gateway Devices, increasing the hop count.

  • "Minimize-hops" mode (ID 1) - is used to attempt streaming data directly from nested Devices to minimize the amount of hops between Devices the data must make.

  • "Fallback" mode (ID 2) - is used to establish all possible Streaming connections.

  • "Not connected" mode (ID 3) - with this set the information about supported streaming protocols published by the Device is not used to automatically establish Streaming connections.

Currently, the openDAQ™ structure-enabled devices, operating through the Native configuration protocol, exclusively support the Native streaming protocol for transmitting acquired data.
  • Cpp

#include <opendaq/opendaq.h>

using namespace daq;

int main()
{
    // Create a new Instance that we will use for all the interactions with the SDK
    daq::InstancePtr instance = daq::Instance(MODULE_PATH);

    // Get the default configuration Property object for OPC UA enabled device type
    daq::PropertyObjectPtr deviceConfig = instance.getAvailableDeviceTypes().get("opendaq_opcua_config").createDefaultConfig();

    // Allow multiple streaming protocols by device configuration
    deviceConfig.setPropertyValue("AllowedStreamingProtocols", daq::List<daq::IString>("opendaq_native_streaming", "opendaq_lt_streaming"));

    // Set websocket streaming protocol as primary by device configuration
    deviceConfig.setPropertyValue("PrimaryStreamingProtocol", "opendaq_lt_streaming");

    // Disregard direct streaming connections for nested devices,
    // establish the minimum number of streaming connections possible.
    deviceConfig.setPropertyValue("StreamingConnectionHeuristic", 0);

    // Find and connect to a device hosting an OPC UA TMS server
    const auto availableDevices = instance.getAvailableDevices();
    daq::DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        for (const auto & capability : deviceInfo.getServerCapabilities())
        {
            if (capability.getProtocolName() == "openDAQ OpcUa")
            {
                device = instance.addDevice(capability.getConnectionString(), deviceConfig);
                break;
            }
        }
    }

    if (!device.assigned())
        std::cerr << "No relevant device found!" << std::endl;
    else
        // Output the name of the added device
        std::cout << device.getInfo().getName() << std::endl;

    return 0;
}

Connecting to Streaming protocol based pseudo-devices

Pseudo-devices belong to a category of openDAQ™ Devices whose implementation solely relies on the Streaming protocol. Such Devices offer a flat list of Signals without detailed structural information. These devices are created using the Module responsible for establishing the corresponding Streaming connection. The Device connection string serves to route and delegate Device object instantiation to the relevant Module. This connection string is identical to the Streaming connection string used for Streaming connection instantiation, with the exception that the prefix indicating the Streaming protocol type might be replaced with the prefix representing the appropriate Device type. Following this prefix, the same set of parameters unique to each Streaming protocol type is appended.

For example, the prefix "daq.ns" in the Device connection string aligns with the Native Streaming protocol, which is identified by the same prefix "daq.ns" in the Streaming connection string. Similarly, the Device connection string prefix "daq.lt" corresponds to the Websocket Streaming protocol, recognized by the Streaming connection string prefix "daq.lt".

  • Cpp

#include <opendaq/opendaq.h>

using namespace daq;

int main()
{
    // Create a new Instance that we will use for all the interactions with the SDK
    daq::InstancePtr instance = daq::Instance(MODULE_PATH);

    // Find and connect to a device hosting an Native Streaming server
    const auto availableDevices = instance.getAvailableDevices();
    daq::DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        for (const auto & capability : deviceInfo.getServerCapabilities())
        {
            if (capability.getProtocolName() == "openDAQ Native Streaming")
            {
                device = instance.addDevice(capability.getConnectionString(), deviceConfig);
                break;
            }
        }
    }

    if (!device.assigned())
        std::cerr << "No relevant device found!" << std::endl;
    else
        // Output the name of the added device
        std::cout << device.getInfo().getName() << std::endl;

    return 0;
}

Configure Streaming per Signal

Once the Device is connected, the Streaming sources of its Signals can be examined and modified for each Signal individually at any given time.

The Streaming sources are identified by a connection string that includes the protocol prefix, indicating the protocol type ID, and parameters based on the protocol type (IP address, port number etc.). To manipulate the Streaming sources of particular Signal the MirroredSignalConfig object is used, it provides ability to:

  • retrieve a list of streaming sources available for signal by using getStreamingSources call,

  • get the currently active streaming source by using getActiveStreamingSource call,

  • change the active streaming source for a signal by using setActiveStreamingSource call,

  • enable or disable data streaming for signal by using setStreamed call,

  • check if streaming is enabled or disabled for signal by using getStreamed call.

  • Cpp

#include <opendaq/opendaq.h>

using namespace daq;

// Corresponding document: Antora/modules/howto_guides/pages/howto_configure_streaming.adoc
int main()
{
    ...

    // Get the first signal of conencted device
    daq::SignalPtr signal = device.getSignalsRecursive()[0];

    daq::StringPtr nativeStreamingSource;
    daq::StringPtr websocketStreamingSource;

    // Find and output the streaming sources available for signal
    std::cout << "Signal supports " << signal.getStreamingSources().getCount()
              << " streaming sources:" << std::endl;
    for (const auto& source : signal.getStreamingSources())
    {
        std::cout << source << std::endl;
        if (source.toView().find("daq.ns://") != std::string::npos)
            nativeStreamingSource = source;
        if (source.toView().find("daq.lt://") != std::string::npos)
            websocketStreamingSource = source;
    }

    // Output the active streaming source of signal
    std::cout << "Active streaming source of signal: " << signal.getActiveStreamingSource();

    // Output the streaming status for the signal to verify that streaming is enabled.
    std::cout << "Streaming enabled status for signal is: " << signal.getStreamed();

    // Change the active streaming source of signal
    signal.setActiveStreamingSource(nativeStreamingSource);

    std::cout << "Press \"enter\" to exit the application..." << std::endl;
    std::cin.get();
    return 0;
}

Full listing

The following is a fully working example of configuring Streaming and reading Signal data using different Streaming sources.

The full example code listing
  • Cpp

#include <chrono>
#include <iostream>
#include <thread>
#include <opendaq/opendaq.h>

void readSamples(const daq::MirroredSignalConfigPtr signal)
{
    using namespace std::chrono_literals;
    daq::StreamReaderPtr reader = daq::StreamReader<double, uint64_t>(signal);

    // Get the resolution and origin
    daq::DataDescriptorPtr descriptor = signal.getDomainSignal().getDescriptor();
    daq::RatioPtr resolution = descriptor.getTickResolution();
    daq::StringPtr origin = descriptor.getOrigin();
    daq::StringPtr unitSymbol = descriptor.getUnit().getSymbol();

    std::cout << "\nReading signal: " << signal.getName()
              << "; active streaming source: " << signal.getActiveStreamingSource() << std::endl;
    std::cout << "Origin: " << origin << std::endl;

    // Allocate buffer for reading double samples
    double samples[100];
    uint64_t domainSamples[100];
    for (int i = 0; i < 40; ++i)
    {
        std::this_thread::sleep_for(25ms);

        // Read up to 100 samples every 25ms, storing the amount read into `count`
        daq::SizeT count = 100;
        reader.readWithDomain(samples, domainSamples, &count);
        if (count > 0)
        {
            daq::Float domainValue = (daq::Int) domainSamples[count - 1] * resolution;
            std::cout << "Value: " << samples[count - 1] << ", Domain: " << domainValue << unitSymbol << std::endl;
        }
    }
}

int main(int /*argc*/, const char* /*argv*/[])
{
    // Create a new Instance that we will use for all the interactions with the SDK
    daq::InstancePtr instance = daq::Instance(MODULE_PATH);

    // Get the default configuration Property object for OPC UA enabled device type
    daq::PropertyObjectPtr deviceConfig = instance.getAvailableDeviceTypes().get("opendaq_opcua_config").createDefaultConfig();

    // Allow multiple streaming protocol by device configuration
    deviceConfig.setPropertyValue("AllowedStreamingProtocols", daq::List<daq::IString>("opendaq_native_streaming", "opendaq_lt_streaming"));

    // Set websocket streaming protocol as primary by device configuration
    deviceConfig.setPropertyValue("PrimaryStreamingProtocol", "opendaq_lt_streaming");

    // Find and connect to a device hosting an OPC UA TMS server
    const auto availableDevices = instance.getAvailableDevices();
    daq::DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        for (const auto & capability : deviceInfo.getServerCapabilities())
        {
            if (capability.getProtocolName() == "openDAQ OpcUa")
            {
                device = instance.addDevice(capability.getConnectionString(), deviceConfig);
                break;
            }
        }
    }

    // Exit if no device is found
    if (!device.assigned())
    {
        std::cerr << "No relevant device found!" << std::endl;
        return 0;
    }

    // Output the name of the added device
    std::cout << device.getInfo().getName() << std::endl;

    // Find the AI signal
    auto signals = device.getSignalsRecursive();

    daq::ChannelPtr channel;
    daq::MirroredSignalConfigPtr signal;
    for (const auto& sig : signals)
    {
        auto name = sig.getDescriptor().getName();

        if (name.toView().find("AI") != std::string_view::npos)
        {
            signal = sig;
            channel = signal.getParent().getParent();
            break;
        }
    }

    if (!signal.assigned())
    {
        std::cerr << "No AI signal found!" << std::endl;
        return 1;
    }

    // Find and output the streaming sources of signal
    daq::StringPtr nativeStreamingSource;
    daq::StringPtr websocketStreamingSource;
    std::cout << "AI signal has " << signal.getStreamingSources().getCount()
              << " streaming sources:" << std::endl;
    for (const auto& source : signal.getStreamingSources())
    {
        std::cout << source << std::endl;
        if (source.toView().find("daq.ns://") != std::string::npos)
            nativeStreamingSource = source;
        if (source.toView().find("daq.lt://") != std::string::npos)
            websocketStreamingSource = source;
    }

    // Check the active streaming source of signal
    if (signal.getActiveStreamingSource() != websocketStreamingSource)
    {
        std::cerr << "Wrong active streaming source of AI signal" << std::endl;
        return 1;
    }
    // Output samples using reader with websocket streaming
    readSamples(signal);

    // Change the active streaming source of signal
    signal.setActiveStreamingSource(nativeStreamingSource);
    // Output samples using reader with native streaming
    readSamples(signal);

    std::cout << "Press \"enter\" to exit the application..." << std::endl;
    std::cin.get();
    return 0;
}