Read with time-outs
This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read basic value and Domain data how-to guide. For brevity, in |
In this guide, you’ll learn how to handle reading with time-outs and all the associated available time-out options on the example of a Stream Reader.
The basic functionality of a read call is to be non-blocking and returns only the samples currently available without any waiting. This is great for reading as fast as possible but can be a problem when you need an exact number of samples. In this case, you’d have to figure out a way to cobble together samples from multiple read calls in some way. To ease this, it is why most Readers provide a time-out in milliseconds as an additional optional parameter.
-
Cpp
-
Python
-
C#
auto reader = StreamReaderBuilder()
.setSignal(signal)
.setValueReadType(SampleType::Float64)
.setSkipEvents(true)
.build();
// Signal produces 2 samples
// Normal call will only return existing 2 samples immediately
SizeT count{5};
double values[5]{};
reader.read(values, &count); // count = 2
// Signal produces 2 samples, then in 100 ms after the read call another 2
count = 5;
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 4
reader = opendaq.StreamReader(signal)
# Signal produces 2 samples
# Normal call will only return existing 2 samples immediately
values = reader.read(5)
len(values) # 2
# Signal produces 2 samples, then in 100 ms after the read call another 2
values = reader.read(5, 200)
len(values) # 4
var reader = OpenDAQFactory.CreateStreamReader<double, long>(signal);
// Signal produces 2 samples
// Normal call will only return existing 2 samples immediately
nuint count = 5;
double[] values = new double[5];
reader.Read(values, ref count); // count = 2
// Signal produces 2 samples, then in 100 ms after the read call another 2
count = 5;
double[] newValues = new double[5];
reader.Read(newValues, ref count, 200); // count = 4
The above example shows how to make the Reader wait for a specified amount of time to fill the requested number of samples. The Reader first reads the currently available samples, and if that is not enough, checks whether there is still time available and enters a wait until it receives a new Data Packet or the time-out expires. When it receives a Data Packet before the time-out and there are still samples to read, it starts waiting again repeating the whole process until it either reads all the requested samples or the time-out expires.
In the above Example 1 the Reader first reads the 2
currently available samples then enters a wait until after 100 ms
it receives another Data Packet with 2
more samples.
As it still needs to read 1
more sample it waits the remaining time until the timeout expires.
Timeout options
The time-out functionality described in the previous section is used most often and is the default, but sometimes you only wish to wait if there are actually no samples available.
This can be accomplished using the option ReadTimeoutType::Any
.
Using this option, the Reader will return immediately if any samples are available ignoring the specified timeout. Only if there are none available, it waits for the time-out or the next Data Packet and then returns immediately even if there is time remaining.
-
Cpp
-
Python
-
C#
auto reader = StreamReaderBuilder()
.setSignal(signal)
.setValueReadType(SampleType::Float64)
.setReadTimeoutType(ReadTimeoutType::Any)
.setSkipEvents(true)
.build();
// Signal produces 2 Packets with 3 samples
// [Packet 1]: { 1 }
// [Packet 2]: { 2, 3 }
auto available = reader.getAvailableCount(); // available = 3
// Returns immediately with the currently available samples
SizeT count{5};
double values[5]{};
reader.read(values, &count, 200); // count = 3, values = { 1, 2, 3 }
// There are no samples left in the Reader
available = reader.getAvailableCount(); // available = 0
// 50 ms after the read call the Signal produces a Packet with 2 samples { 4, 5 }
// then, after another 20 ms, produces the next 3 samples { 6, 7, 8 }
count = 5;
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 2, newValues = { 4, 5 }
reader = opendaq.StreamReader(signal, timeout_type=opendaq.ReadTimeoutType.Any)
# Signal produces 2 packets with 3 samples
# [Packet 1]: [ 1 ]
# [Packet 2]: [ 2, 3 ]
available = reader.available_count # 3
# Returns immediately with the currently available samples
values = reader.read(5, 200) # [1, 2, 3]
# There are no samples left in the Reader
available = reader.available_count # 0
# 50 ms after the read call the Signal produces a Packet with 2 samples [ 4, 5 ]
# then after another 20ms produces the next 3 samples [ 6, 7, 8 ]
values = reader.read(5, 200) # [4, 5]
var reader = OpenDAQFactory.CreateStreamReader<double, long>(signal);
// Signal produces 2 Packets with 3 samples
// [Packet 1]: { 1 }
// [Packet 2]: { 2, 3 }
var available = reader.AvailableCount; // available = 3
// Returns immediately with the currently available samples
nuint count = 5;
double[] values = new double[5];
reader.Read(values, ref count, 200); // count = 3, values = { 1, 2, 3 }
// There are no samples left in the Reader
available = reader.AvailableCount; // available = 0
// 50 ms after the read call the Signal produces a Packet with 2 samples { 4, 5 }
// then, after another 20 ms, produces the next 3 samples { 6, 7, 8 }
count = 5;
double[] newValues = new double[5];
reader.Read(newValues, ref count, 200); // count = 2, newValues = { 4, 5 }
Full listing
The following is a self-contained file with all the examples of reading with time-out and time-out options. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.
-
Cpp
#include <opendaq/context_factory.h>
#include <opendaq/packet_factory.h>
#include <opendaq/reader.h>
#include <opendaq/reader_factory.h>
#include <opendaq/scheduler_factory.h>
#include <opendaq/signal_factory.h>
#include <cassert>
#include <thread>
using namespace daq;
using namespace std::chrono_literals;
SignalConfigPtr setupExampleSignal();
DataDescriptorPtr setupDescriptor(SampleType type);
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples);
/*
* Example 1: Reading with time-outs
*/
void example1(const SignalConfigPtr& signal)
{
auto reader = StreamReaderBuilder()
.setSignal(signal)
.setValueReadType(SampleType::Float64)
.setSkipEvents(true)
.build();
// Signal produces 2 samples
auto packet1 = createPacketForSignal(signal, 2);
signal.sendPacket(packet1);
[[maybe_unused]] auto available = reader.getAvailableCount();
assert(available == 2u);
// Normal call will only return existing 2 samples immediately
SizeT count{5};
double values[5]{};
reader.read(values, &count); // count = 2
assert(count == 2u);
// Signal produces 2 samples, then in 100 ms after the read call another 2
auto packet2 = createPacketForSignal(signal, 2);
signal.sendPacket(packet2);
std::thread t(
[&signal]
{
std::this_thread::sleep_for(100ms);
auto packet3 = createPacketForSignal(signal, 2);
signal.sendPacket(packet3);
});
count = 5;
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 4
if (t.joinable())
t.join();
assert(count == 4u);
}
/*
* Example 2: Reading with the time-out option "Any"
*/
void example2(const SignalConfigPtr& signal)
{
auto reader = StreamReaderBuilder()
.setSignal(signal)
.setValueReadType(SampleType::Float64)
.setReadTimeoutType(ReadTimeoutType::Any)
.setSkipEvents(true)
.build();
// Signal produces 2 Packets with 3 samples
// [Packet 1]: { 1 }
// [Packet 2]: { 2, 3 }
{
auto packet1 = createPacketForSignal(signal, 1);
auto data1 = static_cast<double*>(packet1.getData());
data1[0] = 1;
signal.sendPacket(packet1);
auto packet2 = createPacketForSignal(signal, 2);
auto data2 = static_cast<double*>(packet2.getData());
data2[0] = 2;
data2[1] = 3;
signal.sendPacket(packet2);
}
[[maybe_unused]] auto available = reader.getAvailableCount(); // available = 3
// Returns immediately with the currently available samples
SizeT count{5};
double values[5]{};
reader.read(values, &count, 200); // count = 3, values = { 1, 2, 3 }
assert(count == 3u);
assert(values[0] == 1);
assert(values[1] == 2);
assert(values[2] == 3);
// There are no samples left in the Reader
available = reader.getAvailableCount(); // available = 0
assert(available == 0u);
std::thread t(
[&signal]
{
// 50 ms after the read call the Signal produces a Packet with 2 samples { 4, 5 }
std::this_thread::sleep_for(50ms);
auto packet3 = createPacketForSignal(signal, 2);
auto data3 = static_cast<double*>(packet3.getData());
data3[0] = 4;
data3[1] = 5;
signal.sendPacket(packet3);
// then, after another 20 ms, produces the next 3 samples { 6, 7, 8 }
std::this_thread::sleep_for(20ms);
auto packet4 = createPacketForSignal(signal, 3);
auto data4 = static_cast<double*>(packet4.getData());
data4[0] = 6;
data4[1] = 7;
data4[2] = 8;
signal.sendPacket(packet3);
});
count = {5};
double newValues[5]{};
reader.read(newValues, &count, 200); // count = 2, newValues = { 4, 5 }
if (t.joinable())
t.join();
assert(count == 2u);
assert(newValues[0] == 4);
assert(newValues[1] == 5);
}
/*
* ENTRY POINT
*/
int main(int /*argc*/, const char* /*argv*/[])
{
SignalConfigPtr signal = setupExampleSignal();
example1(signal);
example2(signal);
return 0;
}
/*
* Set up the Signal with Float64 data
*/
SignalConfigPtr setupExampleSignal()
{
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr, nullptr);
auto signal = Signal(context, nullptr, "example signal");
signal.setDescriptor(setupDescriptor(SampleType::Float64));
return signal;
}
DataDescriptorPtr setupDescriptor(SampleType type)
{
// Set-up the Data Descriptor with the provided Sample Type
return DataDescriptorBuilder().setSampleType(type).build();
}
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples)
{
return daq::DataPacket(signal.getDescriptor(), numSamples);
}