Read With Time-Outs

This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read Basic Value And Domain Data how-to guide.

For brevity, in C++ we also assume that all the code is in namespace daq or it has been imported via using namespace daq; and omit any otherwise necessary header includes until the final complete listing.

In this guide, you’ll learn how to handle reading with time-outs and all the associated available time-out options on the example of a Stream reader.

The basic functionality of a read call is to be non-blocking and returns only the samples currently available without any waiting. This is great for reading as fast as possible but can be a problem when you need an exact number of samples. In this case, you’d have to figure out a way to cobble together samples from multiple read calls in some way. To ease this, it is why most Readers provide a time-out in milliseconds as an additional optional parameter.

Reading with time-outs
  • Cpp

  • Python

auto reader = StreamReader(signal);

// Signal produces 2 samples

// Normal call will only return existing 2 samples immediately.
SizeT count{5};
double values[5]{};
reader.read(values, &count); // count = 2

// Signal produces 2 samples, then in 100ms after the read call another 2

count = 5;
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 4
reader = opendaq.StreamReader(signal)

# Signal produces 2 samples

# Normal call will only return existing 2 samples immediately.
values = reader.read(5)
len(values) # 2

# Signal produces 2 samples, then in 100ms after the read call another 2

values = reader.read(5, 200)
len(values) # 4

The above example shows how to make the Reader wait for a specified amount of time to fill the requested number of samples. The Reader first reads the currently available samples, and if that is not enough, checks whether there is still time available and enters a wait until it receives a new Data Packet or the time-out expires. When it receives a Data Packet before the time-out and there are still samples to read, it starts waiting again repeating the whole process until it either reads all the requested samples or the time-out expires.

In the above Example 1 the Reader first reads the 2 currently available samples then enters a wait until after 100ms it receives another Data Packet with 2 more samples. As it still needs to read 1 more sample it waits the remaining time until the timeout expires.

Timeout options

The time-out functionality described in the previous section is used most often and is the default, but sometimes you only wish to wait if there are actually no samples available. This can be accomplished using the option ReadTimeoutType::Any.

Using this option, the Reader will return immediately if any samples are available ignoring the specified timeout. Only if there are none available, it waits for the time-out or the next Data Packet and then returns immediately even if there is time remaining.

Reading with the time-out option "Any"
  • Cpp

  • Python

auto reader = StreamReader(signal, ReadTimeoutType::Any);

// Signal produces 2 packets with 3 samples
// [Packet 1]: { 1 }
// [Packet 2]: { 2, 3 }

auto available = reader.getAvailableCount(); // available = 3

// Returns immediately with the currently available samples
SizeT count{5};
double values[5]{};
reader.read(values, &count, 200); // count = 3, values = { 1, 2, 3 }

// There are no samples left in the Reader
auto available = reader.getAvailableCount(); // available = 0

// 50ms after the read call the Signal produces a Packet with 2 samples { 4, 5 }
// then after another 20ms produces the next 3 samples { 6, 7, 8 }

count = {5};
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 2, newValues = { 4, 5 }
reader = opendaq.StreamReader(signal, timeout_type=opendaq.ReadTimeoutType.Any)

# Signal produces 2 packets with 3 samples
# [Packet 1]: [ 1 ]
# [Packet 2]: [ 2, 3 ]

available = reader.available_count # 3

# Returns immediately with the currently available samples
values = reader.read(5, 200) # [1, 2, 3]

# There are no samples left in the Reader
available = reader.available_count # 0

# 50ms after the read call the Signal produces a Packet with 2 samples [ 4, 5 ]
# then after another 20ms produces the next 3 samples [ 6, 7, 8 ]

values = reader.read(5, 200) # [4, 5]

Full listing

The following is a self-contained file with all the examples of reading with time-out and time-out options. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.

Full listing
  • Cpp

#include <opendaq/context_factory.h>
#include <opendaq/packet_factory.h>
#include <opendaq/reader.h>
#include <opendaq/reader_factory.h>
#include <opendaq/scheduler_factory.h>
#include <opendaq/signal_factory.h>

#include <thread>
#include <cassert>

using namespace daq;
using namespace std::chrono_literals;

SignalConfigPtr setupExampleSignal();
DataDescriptorPtr setupDescriptor(SampleType type);
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples);

/*
 * Example 1: Reading with time-outs
 */
void example1(const SignalConfigPtr& signal)
{
    auto reader = StreamReader(signal);

    // Signal produces 2 samples
    auto packet1 = createPacketForSignal(signal, 2);
    signal.sendPacket(packet1);

    auto available = reader.getAvailableCount();
    assert(available == 2u);

    // Normal call will only return existing 2 samples immediately.
    SizeT count{5};
    double values[5]{};
    reader.read(values, &count);  // count = 2

    assert(count == 2u);

    // Signal produces 2 samples, then in 100ms after the read call another 2
    auto packet2 = createPacketForSignal(signal, 2);
    signal.sendPacket(packet2);

    std::thread t([&signal]
    {
        std::this_thread::sleep_for(100ms);

        auto packet3 = createPacketForSignal(signal, 2);
        signal.sendPacket(packet3);
    });

    count = 5;
    double newValues[5]{};
    reader.read(newValues, &count, 200);  // count = 4

    if (t.joinable())
        t.join();

    assert(count == 4u);
}

/*
 * Example 2: Reading with the time-out option "Any"
 */
void example2(const SignalConfigPtr& signal)
{
    auto reader = StreamReader(signal, ReadTimeoutType::Any);

    // Signal produces 2 packets with 3 samples
    // [Packet 1]: { 1 }
    // [Packet 2]: { 2, 3 }
    {
        auto packet1 = createPacketForSignal(signal, 1);
        auto data1 = static_cast<double*>(packet1.getData());
        data1[0] = 1;

        signal.sendPacket(packet1);

        auto packet2 = createPacketForSignal(signal, 2);
        auto data2 = static_cast<double*>(packet2.getData());
        data2[0] = 2;
        data2[1] = 3;

        signal.sendPacket(packet2);
    }

    auto available = reader.getAvailableCount();  // available = 3

    // Returns immediately with the currently available samples
    SizeT count{5};
    double values[5]{};
    reader.read(values, &count, 200);  // count = 3, values = { 1, 2, 3 }

    assert(count == 3u);
    assert(values[0] == 1);
    assert(values[1] == 2);
    assert(values[2] == 3);

    // There are no samples left in the Reader
    available = reader.getAvailableCount();  // available = 0
    assert(available == 0u);

    std::thread t([&signal]
    {
        // 50ms after the read call the Signal produces a Packet with 2 samples { 4, 5 }
        std::this_thread::sleep_for(50ms);

        auto packet3 = createPacketForSignal(signal, 2);
        auto data3 = static_cast<double*>(packet3.getData());
        data3[0] = 4;
        data3[1] = 5;

        signal.sendPacket(packet3);

        // Then after another 20ms produces the next 3 samples { 6, 7, 8 }
        std::this_thread::sleep_for(20ms);

        auto packet4 = createPacketForSignal(signal, 3);
        auto data4 = static_cast<double*>(packet4.getData());
        data4[0] = 6;
        data4[1] = 7;
        data4[2] = 8;
        signal.sendPacket(packet3);
    });

    count = {5};
    double newValues[5]{};
    reader.read(newValues, &count, 200);  // count = 2, newValues = { 4, 5 }

    if (t.joinable())
        t.join();

    assert(count == 2u);
    assert(newValues[0] == 4);
    assert(newValues[1] == 5);
}
/*
 * ENTRY POINT
 */
int main(int /*argc*/, const char* /*argv*/ [])
{
    SignalConfigPtr signal = setupExampleSignal();

    example1(signal);
    example2(signal);

    return 0;
}

/*
 * Set up the Signal with Float64 data
 */
SignalConfigPtr setupExampleSignal()
{
    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr);

    auto signal = Signal(context, nullptr, "example signal");
    signal.setDescriptor(setupDescriptor(SampleType::Float64));

    return signal;
}

DataDescriptorPtr setupDescriptor(SampleType type)
{
    // Set-up the data descriptor with the provided Sample-Type
    return DataDescriptorBuilder().setSampleType(type).build();
}

DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples)
{
    return daq::DataPacket(
        signal.getDescriptor(),
        numSamples
    );
}