Read Multiple Signals Aligned
This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read Basic Value And Domain Data how-to guide. For brevity, in |
When you start reading more than one Signal at once, you immediately run into a problem. The fact that the next sample in all the signals read doesn’t represent the same point in a Domain (usually time domain). This happens because the signals can have different Origins, produce data with different Tick Resolutions representing different Units and with Data Rules taken into account at different rates (usually called sample-rate).
This is quite a complex problem, and even more if you take into account that the read Signals can completely change at any time during measurement. But the fact is also that most Signals don’t change at all or at least not radically, and almost all of them are in a time domain, so in practice, we can cover reading a large portion of Signals when we limit ourselves to a subset of allowed configurations.
With this in mind, we limit ourselves to Signals that have:
-
A domain Signal assigned
-
Implicit domain where:
-
Domain ticks represent time in seconds
-
Domain origin is specified in ISO-8601 format
-
-
A combination of Tick Resolution and Data Rule that result in the same sample rate
-
Inter-sample offsets are ignored
-
-
Values that can be converted to a common data-type
Some of the above limitations might be lifted in the future. |
Using the Multi-Reader
Using the Multi Reader is largely the same as any other Reader and is basically a Stream Reader for multiple Signals at once except that with multiple Signals it performs some additional checks to keep the Signals compatible and synchronized to the same domain point.
-
Cpp
// These calls all create the same reader
auto reader = MultiReader(signals);
auto reader = MultiReader<double, Int>(signals);
auto reader = MultiReader(signals, SampleType::Float64, SampleType::Int64);
Developer can set list of input ports instead of signals.
-
Cpp
// These calls all create the same reader
auto reader = MultiReaderFromPort(ports);
auto reader = MultiReaderFromPort<double, Int>(ports);
auto reader = MultiReaderFromPort(ports, SampleType::Float64, SampleType::Int64);
On creation the Reader first checks the preconditions listed in [limitations] and throws an error if they aren’t met. Once constructed, the Signals are not yet synchronized. This happens on the first read call or any call where the number of samples ready needs to be determined. There the Reader tries to align all signals to the first common domain-point which serves as the new start from which the reader will provide the data.
It does this by dropping samples until the domain value is equal or greater than the chosen start one.
Until there are enough samples in the queue of all signals to reach the synchronized start domain value the count
parameter will be 0
even if in the background the reader actually reads as much as possible accounting for the timeout.
Aligning Signals start
The basic example we will follow uses 3
Signals with:
-
Value type of
SampleType::Float64
-
Domain signal with
-
Tick resolution of
1
/1000
-
Linear rule with
-
Delta: 1
-
Offset: 0
-
-
But different origins of:
-
"2022-09-27T00:02:03+00:00"
-
"2022-09-27T00:02:04+00:00"
-
"2022-09-27T00:02:04.123+00:00"
The third signal has the highest domain value.
This makes the first signal start 1.123s
or 1123
samples and second 0.123s
or 123
samples after.
Since the Offset is 0
and tick-resolutions are the same no additional adjustments are made.
-
Cpp
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
After the Reader construction, Signals produce Data Packets of differing sizes but not enough to align on the start domain point.
So the reader reports it has 0
samples available as it dropped them on the call to check the number of samples available as they are below the start domain-point.
In the examples, a helper function |
Reading synchronized data
After some time, more data packets arrive and the Reader finally has enough samples to align the start. The situation after 3 data packets for each signal is:
-
523 * 3
=1569
samples (1.569s)-
need
1123
to sync -
1569
-1123
=446
remaining
-
-
732 * 3
=2196
samples (2.196s)-
need
123
to sync -
2196 - 123
=2073
remaining
-
-
843 * 3
=2529
samples (2.529s)-
need
0
to sync -
2529
remaining
-
To issue read calls, you first need to pre-allocate buffers for the Reader to fill. The procedure is the same as with other Readers except that instead of providing a pointer to the start of the buffer, you now specify an array of per signal pointers to buffers (called a jagged array);
We request 523
samples from the reader but as it needed to align the start and drop 1123
samples from the first signal only 446
aligned samples remain which are then returned.
-
Cpp
constexpr const auto NUM_SIGNALS = 3;
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
auto available = reader.getAvailableCount(); // 446
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
available = reader.getAvailableCount(); // 0
Using the Multi-Reader to read time-stamps
The Time Reader presented in Read With Absolute Time-Stamps can also be used with Multi-Reader.
-
Cpp
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
//
// To install the domain transform function to system time-stamps
//
TimeReader timeReader(reader);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
auto available = reader.getAvailableCount(); // 446
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
//
// Use time-stamps as a buffer instead of the domain-type
//
std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count{SAMPLES};
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
available = reader.getAvailableCount(); // 0
Creating MutiReader with builder
With the builder, developers can add signals and input ports using the methods addSignal
and addInputPort
. For signals, the builder creates an internal input port connected to the input signal.
By default, the value type is set as daq::SampleType::Float64
and the domain type as daq::SampleType::Int64
. This can be overridden with the methods setValueReadType
and setDomainReadType.
The default value of the read mode is daq::ReadMode::Scaled
, which can be configured using the method setReadMode
to daq::ReadMode::Unscaled
or daq::ReadMode::RawValue
.
In Multi-Reader, developers can set the read timeout type. The default value is daq::ReadTimeoutType::All
, which waits for the requested amount or until the timeout is exceeded. It can be set as daq::ReadTimeoutType::Any
, meaning the timeout will wait until any available data or the timeout is reached.
The builder has fields for a common sample rate, which is disabled by default (set to -1), and for starting on the full unit of the domain (also disabled by default). These members can be overridden with the methods setRequiredCommonSampleRate
and setStartOnFullUnitOfDomain
.
-
Cpp
MultiReaderBuilderPtr builder = MultiReaderBuilder();
builder.addSignal(signal1).addSignal(signal2).addInputPort(port1).addInputPort(port2);
builder.setValueReadType(daq::SampleType::Int64);
builder.setDomainReadType(daq::SampleType::Float64);
// use can use build function for creating reader as well
// auto reader = builder.build();
auto reader = MultiReaderFromBuilder(builder);
When creating a MultiReader from the same builder multiple times, developers should be cautious, especially if they are using input ports as input sources. This is because when creating an input port, it is bound to the first reader. Therefore, attempting to create another reader with the same input port will result in an exception, indicating that the input port is already in use. |
Full listing
The following is a self-contained file with all above examples of aligning the reading multiple signals. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.
-
Cpp
#include <opendaq/opendaq.h>
using namespace daq;
struct ReadSignal
{
explicit ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize);
void sendPacket();
int packetIndex{0};
std::int64_t packetSize;
SignalConfigPtr signal;
DataDescriptorPtr valueDescriptor;
};
template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values);
SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch);
ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin);
/*
* Aligns 3 signals to the same domain position and starts reading from there
*/
void exampleSimple()
{
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
[[maybe_unused]]
auto available = reader.getAvailableCount(); // 0
assert(available == 0);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
available = reader.getAvailableCount(); // 446
assert(available == 446);
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
assert(count == 446);
available = reader.getAvailableCount(); // 0
assert(available == 0);
/* Should print:
*
* Signal 0
* |d: 1123 |v: 1123.0
* |d: 1124 |v: 1124.0
* |d: 1125 |v: 1125.0
* |d: 1126 |v: 1126.0
* |d: 1127 |v: 1127.0
* --------
* Signal 1
* |d: 123 |v: 123.0
* |d: 124 |v: 124.0
* |d: 125 |v: 125.0
* |d: 126 |v: 126.0
* |d: 127 |v: 127.0
* --------
* Signal 2
* |d: 0 |v: 0.0
* |d: 1 |v: 1.0
* |d: 2 |v: 2.0
* |d: 3 |v: 3.0
* |d: 4 |v: 4.0
*/
printData(5, domain, values);
}
/*
* The same as example 1 but read domain in `std::chrono::system_clock::time_point` values
*/
void exampleWithTimeStamps()
{
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
TimeReader timeReader(reader);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
[[maybe_unused]]
auto available = reader.getAvailableCount(); // 0
assert(available == 0);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
available = reader.getAvailableCount(); // 446
assert(available == 446);
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
assert(count == 446);
available = reader.getAvailableCount(); // 0
assert(available == 0);
/* Should print:
*
* Signal 0
* |d: 2022-09-27 00:02:04.1230000 |v: 1123.0
* |d: 2022-09-27 00:02:04.1240000 |v: 1124.0
* |d: 2022-09-27 00:02:04.1250000 |v: 1125.0
* |d: 2022-09-27 00:02:04.1260000 |v: 1126.0
* |d: 2022-09-27 00:02:04.1270000 |v: 1127.0
* --------
* Signal 1
* |d: 2022-09-27 00:02:04.1230000 |v: 123.0
* |d: 2022-09-27 00:02:04.1240000 |v: 124.0
* |d: 2022-09-27 00:02:04.1250000 |v: 125.0
* |d: 2022-09-27 00:02:04.1260000 |v: 126.0
* |d: 2022-09-27 00:02:04.1270000 |v: 127.0
* --------
* Signal 2
* |d: 2022-09-27 00:02:04.1230000 |v: 0.0
* |d: 2022-09-27 00:02:04.1240000 |v: 1.0
* |d: 2022-09-27 00:02:04.1250000 |v: 2.0
* |d: 2022-09-27 00:02:04.1260000 |v: 3.0
* |d: 2022-09-27 00:02:04.1270000 |v: 4.0
*/
printData(5, domain, values);
}
void drawBoxMessage(const std::string& message);
int main(int /*argc*/, const char* /*argv*/[])
{
drawBoxMessage("Example 1");
exampleSimple();
drawBoxMessage("Example 2");
exampleWithTimeStamps();
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////// Utility functions ///////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch)
{
daq::DataDescriptorPtr dataDescriptor = daq::DataDescriptorBuilder()
.setSampleType(SampleTypeFromType<ClockTick>::SampleType)
.setOrigin(epoch)
.setTickResolution(Ratio(1, 1000))
.setRule(LinearDataRule(1, 0))
.setUnit(daq::Unit("s", -1, "seconds", "time"))
.build();
auto domain = Signal(context, nullptr, "time");
domain.setDescriptor(dataDescriptor);
return domain;
}
ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin)
{
static int counter = 0;
auto newSignal = Signal(context, nullptr, fmt::format("sig{}", counter++));
newSignal.setDescriptor(DataDescriptorBuilder().setSampleType(SampleType::Float64).build());
newSignal.setDomainSignal(createDomainSignal(context, domainOrigin));
return ReadSignal(newSignal, packetSize);
}
ReadSignal::ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize)
: packetSize(packetSize)
, signal(signal)
, valueDescriptor(signal.getDescriptor())
{
}
void ReadSignal::sendPacket()
{
auto domainSignal = signal.getDomainSignal();
auto domainDescriptor = domainSignal.getDescriptor();
Int delta = domainDescriptor.getRule().getParameters()["delta"];
auto offset = (packetSize * delta) * packetIndex;
auto domainPacket = daq::DataPacket(domainDescriptor, packetSize, offset);
auto packet = daq::DataPacketWithDomain(domainPacket, valueDescriptor, packetSize);
// Zero-out data
memset(packet.getData(), 0, packet.getSampleCount() * packet.getSampleMemSize());
auto* data = static_cast<double*>(packet.getData());
for (auto i = 0; i < packetSize; ++i)
{
data[i] = offset + i;
}
signal.sendPacket(packet);
packetIndex++;
}
template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values)
{
using namespace std::chrono;
using namespace reader;
int numSignals = std::size(times);
for (int sigIndex = 0; sigIndex < numSignals; ++sigIndex)
{
fmt::print("--------\n");
fmt::print("Signal {}\n", sigIndex);
for (int sampleIndex = 0; sampleIndex < samples; ++sampleIndex)
{
std::stringstream ss;
ss << times[sigIndex][sampleIndex];
fmt::print(" |d: {} |v: {}\n", ss.str(), values[sigIndex][sampleIndex]);
}
}
}
void drawBoxMessage(const std::string& message)
{
fmt::print("┌{0:─^{2}}┐\n"
"│{1: ^{2}}│\n"
"└{0:─^{2}}┘\n",
"",
message,
20);
}