Read Multiple Signals Aligned

This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read Basic Value And Domain Data how-to guide.

For brevity, in C++ we also assume that all the code is in namespace daq or it has been imported via using namespace daq; and omit any otherwise necessary header includes until the final complete listing.

When you start reading more than one Signal at once, you immediately run into a problem. The fact that the next sample in all the signals read doesn’t represent the same point in a Domain (usually time domain). This happens because the signals can have different Origins, produce data with different Tick Resolutions representing different Units and with Data Rules taken into account at different rates (usually called sample-rate).

This is quite a complex problem, and even more if you take into account that the read Signals can completely change at any time during measurement. But the fact is also that most Signals don’t change at all or at least not radically, and almost all of them are in a time domain, so in practice, we can cover reading a large portion of Signals when we limit ourselves to a subset of allowed configurations.

With this in mind, we limit ourselves to Signals that have:

  1. A domain Signal assigned

  2. Implicit domain where:

    • Domain ticks represent time in seconds

    • Domain origin is specified in ISO-8601 format

  3. A combination of Tick Resolution and Data Rule that result in the same sample rate

    • Inter-sample offsets are ignored

  4. Values that can be converted to a common data-type

Some of the above limitations might be lifted in the future.

Using the Multi-Reader

Using the Multi Reader is largely the same as any other Reader and is basically a Stream Reader for multiple Signals at once except that with multiple Signals it performs some additional checks to keep the Signals compatible and synchronized to the same domain point.

Creating a default Multi Reader with list of signals
  • Cpp

// These calls all create the same reader
auto reader = MultiReader(signals);
auto reader = MultiReader<double, Int>(signals);
auto reader = MultiReader(signals, SampleType::Float64, SampleType::Int64);

Developer can set list of input ports instead of signals.

Creating a default Multi Reader with list of ports
  • Cpp

// These calls all create the same reader
auto reader = MultiReaderFromPort(ports);
auto reader = MultiReaderFromPort<double, Int>(ports);
auto reader = MultiReaderFromPort(ports, SampleType::Float64, SampleType::Int64);

On creation the Reader first checks the preconditions listed in [limitations] and throws an error if they aren’t met. Once constructed, the Signals are not yet synchronized. This happens on the first read call or any call where the number of samples ready needs to be determined. There the Reader tries to align all signals to the first common domain-point which serves as the new start from which the reader will provide the data.

It does this by dropping samples until the domain value is equal or greater than the chosen start one. Until there are enough samples in the queue of all signals to reach the synchronized start domain value the count parameter will be 0 even if in the background the reader actually reads as much as possible accounting for the timeout.

Aligning Signals start

The basic example we will follow uses 3 Signals with:

  • Value type of SampleType::Float64

  • Domain signal with

    • Tick resolution of 1 / 1000

    • Linear rule with

      • Delta: 1

      • Offset: 0

But different origins of:

  • "2022-09-27T00:02:03+00:00"

  • "2022-09-27T00:02:04+00:00"

  • "2022-09-27T00:02:04.123+00:00"

The third signal has the highest domain value. This makes the first signal start 1.123s or 1123 samples and second 0.123s or 123 samples after. Since the Offset is 0 and tick-resolutions are the same no additional adjustments are made.

Aligning Signals with MultiReader 1
  • Cpp

    ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
    ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
    ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");

    ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
    auto reader = MultiReader(signals);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    auto available = reader.getAvailableCount(); // 0

After the Reader construction, Signals produce Data Packets of differing sizes but not enough to align on the start domain point. So the reader reports it has 0 samples available as it dropped them on the call to check the number of samples available as they are below the start domain-point.

In the examples, a helper function demoSignal() is used to set-up a custom simulated signal with preset epoch / origin and packet size. This is not a real device signal but one with manually generated data to illustrate and support the example.

Reading synchronized data

After some time, more data packets arrive and the Reader finally has enough samples to align the start. The situation after 3 data packets for each signal is:

  • 523 * 3 = 1569 samples (1.569s)

    • need 1123 to sync

    • 1569 - 1123 = 446 remaining

  • 732 * 3 = 2196 samples (2.196s)

    • need 123 to sync

    • 2196 - 123 = 2073 remaining

  • 843 * 3 = 2529 samples (2.529s)

    • need 0 to sync

    • 2529 remaining

To issue read calls, you first need to pre-allocate buffers for the Reader to fill. The procedure is the same as with other Readers except that instead of providing a pointer to the start of the buffer, you now specify an array of per signal pointers to buffers (called a jagged array);

We request 523 samples from the reader but as it needed to align the start and drop 1123 samples from the first signal only 446 aligned samples remain which are then returned.

Aligning Signals with MultiReader 3
  • Cpp

    constexpr const auto NUM_SIGNALS = 3;

    ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
    ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
    ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");

    ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
    auto reader = MultiReader(signals);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    auto available = reader.getAvailableCount(); // 0

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    // Samples per signal
    // 523 * 3 = 1569 (1.569s) need 1123 to sync
    // 732 * 3 = 2196 (2.196s) need  123 to sync
    // 843 * 3 = 2529 (2.529s) need    0 to sync

    auto available = reader.getAvailableCount(); // 446

    constexpr const SizeT SAMPLES = 523;

    std::array<double[SAMPLES], NUM_SIGNALS> values{};
    std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};

    void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
    void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};

    count = SAMPLES;
    reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
    // count = 446

    available = reader.getAvailableCount(); // 0

Using the Multi-Reader to read time-stamps

The Time Reader presented in Read With Absolute Time-Stamps can also be used with Multi-Reader.

Aligning Signals with MultiReader 4
  • Cpp

    constexpr const auto NUM_SIGNALS = 3;

    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr);

    ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
    ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
    ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");

    ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};

    auto reader = MultiReader(signals);

    //
    // To install the domain transform function to system time-stamps
    //
    TimeReader timeReader(reader);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    auto available = reader.getAvailableCount();  // 0

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    // Samples per signal
    // 523 * 3 = 1569 (1.569s) need 1123 to sync
    // 732 * 3 = 2196 (2.196s) need  123 to sync
    // 843 * 3 = 2529 (2.529s) need    0 to sync

    auto available = reader.getAvailableCount();  // 446

    constexpr const SizeT SAMPLES = 523;

    std::array<double[SAMPLES], NUM_SIGNALS> values{};

    //
    // Use time-stamps as a buffer instead of the domain-type
    //
    std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};

    void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
    void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};

    SizeT count{SAMPLES};
    reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);

    count = SAMPLES;
    reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
    // count = 446

    available = reader.getAvailableCount();  // 0

Creating MutiReader with builder

With the builder, developers can add signals and input ports using the methods addSignal and addInputPort. For signals, the builder creates an internal input port connected to the input signal.

By default, the value type is set as daq::SampleType::Float64 and the domain type as daq::SampleType::Int64. This can be overridden with the methods setValueReadType and setDomainReadType.

The default value of the read mode is daq::ReadMode::Scaled, which can be configured using the method setReadMode to daq::ReadMode::Unscaled or daq::ReadMode::RawValue.

In Multi-Reader, developers can set the read timeout type. The default value is daq::ReadTimeoutType::All, which waits for the requested amount or until the timeout is exceeded. It can be set as daq::ReadTimeoutType::Any, meaning the timeout will wait until any available data or the timeout is reached.

The builder has fields for a common sample rate, which is disabled by default (set to -1), and for starting on the full unit of the domain (also disabled by default). These members can be overridden with the methods setRequiredCommonSampleRate and setStartOnFullUnitOfDomain.

Creating multi reader with default builder
  • Cpp

    MultiReaderBuilderPtr builder = MultiReaderBuilder();
    builder.addSignal(signal1).addSignal(signal2).addInputPort(port1).addInputPort(port2);
    builder.setValueReadType(daq::SampleType::Int64);
    builder.setDomainReadType(daq::SampleType::Float64);

    // use can use build function for creating reader as well
    // auto reader = builder.build();
    auto reader = MultiReaderFromBuilder(builder);

When creating a MultiReader from the same builder multiple times, developers should be cautious, especially if they are using input ports as input sources. This is because when creating an input port, it is bound to the first reader. Therefore, attempting to create another reader with the same input port will result in an exception, indicating that the input port is already in use.

Full listing

The following is a self-contained file with all above examples of aligning the reading multiple signals. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.

Full listing
  • Cpp

#include <opendaq/opendaq.h>

using namespace daq;

struct ReadSignal
{
    explicit ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize);
    void sendPacket();

    int packetIndex{0};
    std::int64_t packetSize;

    SignalConfigPtr signal;
    DataDescriptorPtr valueDescriptor;
};

template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values);

SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch);
ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin);

/*
 * Aligns 3 signals to the same domain position and starts reading from there
 */
void exampleSimple()
{
    constexpr const auto NUM_SIGNALS = 3;

    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr);

    ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
    ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
    ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");

    ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
    auto reader = MultiReader(signals);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    [[maybe_unused]]
    auto available = reader.getAvailableCount(); // 0
    assert(available == 0);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    // Samples per signal
    // 523 * 3 = 1569 (1.569s) need 1123 to sync
    // 732 * 3 = 2196 (2.196s) need  123 to sync
    // 843 * 3 = 2529 (2.529s) need    0 to sync

    available = reader.getAvailableCount();  // 446
    assert(available == 446);

    constexpr const SizeT SAMPLES = 523;

    std::array<double[SAMPLES], NUM_SIGNALS> values{};
    std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};

    void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
    void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};

    SizeT count = SAMPLES;
    reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
    // count = 446
    assert(count == 446);

    available = reader.getAvailableCount(); // 0
    assert(available == 0);

    /* Should print:
     *
     *   Signal 0
     *    |d: 1123 |v: 1123.0
     *    |d: 1124 |v: 1124.0
     *    |d: 1125 |v: 1125.0
     *    |d: 1126 |v: 1126.0
     *    |d: 1127 |v: 1127.0
     *   --------
     *   Signal 1
     *    |d: 123 |v: 123.0
     *    |d: 124 |v: 124.0
     *    |d: 125 |v: 125.0
     *    |d: 126 |v: 126.0
     *    |d: 127 |v: 127.0
     *   --------
     *   Signal 2
     *    |d: 0 |v: 0.0
     *    |d: 1 |v: 1.0
     *    |d: 2 |v: 2.0
     *    |d: 3 |v: 3.0
     *    |d: 4 |v: 4.0
     */

    printData(5, domain, values);
}

/*
 * The same as example 1 but read domain in `std::chrono::system_clock::time_point` values
 */
void exampleWithTimeStamps()
{
    constexpr const auto NUM_SIGNALS = 3;

    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr);

    ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
    ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
    ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");

    ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};

    auto reader = MultiReader(signals);
    TimeReader timeReader(reader);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    [[maybe_unused]]
    auto available = reader.getAvailableCount();  // 0
    assert(available == 0);

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    sig0.sendPacket();
    sig1.sendPacket();
    sig2.sendPacket();

    // Samples per signal
    // 523 * 3 = 1569 (1.569s) need 1123 to sync
    // 732 * 3 = 2196 (2.196s) need  123 to sync
    // 843 * 3 = 2529 (2.529s) need    0 to sync

    available = reader.getAvailableCount();  // 446
    assert(available == 446);

    constexpr const SizeT SAMPLES = 523;

    std::array<double[SAMPLES], NUM_SIGNALS> values{};
    std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};

    void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
    void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};

    SizeT count = SAMPLES;
    reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
    // count = 446
    assert(count == 446);

    available = reader.getAvailableCount();  // 0
    assert(available == 0);

    /* Should print:
     *
     *  Signal 0
     *   |d: 2022-09-27 00:02:04.1230000 |v: 1123.0
     *   |d: 2022-09-27 00:02:04.1240000 |v: 1124.0
     *   |d: 2022-09-27 00:02:04.1250000 |v: 1125.0
     *   |d: 2022-09-27 00:02:04.1260000 |v: 1126.0
     *   |d: 2022-09-27 00:02:04.1270000 |v: 1127.0
     *  --------
     *  Signal 1
     *   |d: 2022-09-27 00:02:04.1230000 |v: 123.0
     *   |d: 2022-09-27 00:02:04.1240000 |v: 124.0
     *   |d: 2022-09-27 00:02:04.1250000 |v: 125.0
     *   |d: 2022-09-27 00:02:04.1260000 |v: 126.0
     *   |d: 2022-09-27 00:02:04.1270000 |v: 127.0
     *  --------
     *  Signal 2
     *   |d: 2022-09-27 00:02:04.1230000 |v: 0.0
     *   |d: 2022-09-27 00:02:04.1240000 |v: 1.0
     *   |d: 2022-09-27 00:02:04.1250000 |v: 2.0
     *   |d: 2022-09-27 00:02:04.1260000 |v: 3.0
     *   |d: 2022-09-27 00:02:04.1270000 |v: 4.0
     */

    printData(5, domain, values);
}

void drawBoxMessage(const std::string& message);

int main(int /*argc*/, const char* /*argv*/[])
{
    drawBoxMessage("Example 1");
    exampleSimple();

    drawBoxMessage("Example 2");
    exampleWithTimeStamps();
    return 0;
}

/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////// Utility functions ///////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////

SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch)
{
    daq::DataDescriptorPtr dataDescriptor = daq::DataDescriptorBuilder()
                                                .setSampleType(SampleTypeFromType<ClockTick>::SampleType)
                                                .setOrigin(epoch)
                                                .setTickResolution(Ratio(1, 1000))
                                                .setRule(LinearDataRule(1, 0))
                                                .setUnit(daq::Unit("s", -1, "seconds", "time"))
                                                .build();

    auto domain = Signal(context, nullptr, "time");
    domain.setDescriptor(dataDescriptor);

    return domain;
}

ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin)
{
    static int counter = 0;

    auto newSignal = Signal(context, nullptr, fmt::format("sig{}", counter++));
    newSignal.setDescriptor(DataDescriptorBuilder().setSampleType(SampleType::Float64).build());
    newSignal.setDomainSignal(createDomainSignal(context, domainOrigin));

    return ReadSignal(newSignal, packetSize);
}

ReadSignal::ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize)
    : packetSize(packetSize)
    , signal(signal)
    , valueDescriptor(signal.getDescriptor())
{
}

void ReadSignal::sendPacket()
{
    auto domainSignal = signal.getDomainSignal();
    auto domainDescriptor = domainSignal.getDescriptor();

    Int delta = domainDescriptor.getRule().getParameters()["delta"];

    auto offset = (packetSize * delta) * packetIndex;
    auto domainPacket = daq::DataPacket(domainDescriptor, packetSize, offset);
    auto packet = daq::DataPacketWithDomain(domainPacket, valueDescriptor, packetSize);

    // Zero-out data
    memset(packet.getData(), 0, packet.getSampleCount() * packet.getSampleMemSize());

    auto* data = static_cast<double*>(packet.getData());
    for (auto i = 0; i < packetSize; ++i)
    {
        data[i] = offset + i;
    }

    signal.sendPacket(packet);
    packetIndex++;
}

template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values)
{
    using namespace std::chrono;
    using namespace reader;

    int numSignals = std::size(times);
    for (int sigIndex = 0; sigIndex < numSignals; ++sigIndex)
    {
        fmt::print("--------\n");
        fmt::print("Signal {}\n", sigIndex);

        for (int sampleIndex = 0; sampleIndex < samples; ++sampleIndex)
        {
            std::stringstream ss;
            ss << times[sigIndex][sampleIndex];

            fmt::print(" |d: {} |v: {}\n", ss.str(), values[sigIndex][sampleIndex]);
        }
    }
}

void drawBoxMessage(const std::string& message)
{
    fmt::print("┌{0:─^{2}}┐\n"
               "│{1: ^{2}}│\n"
               "└{0:─^{2}}┘\n",
               "",
               message,
               20);
}