On a current client project, I was tasked with optimizing a very large, very slow, very CPU-bound stream-based pipeline. Before I even started to think about optimizing this pipeline, I needed an objective way to measure the execution time of each step of the pipeline.

Imagine the pipeline in question looks something like this:


pipeline(
    httpStream,
    decodeStream,
    parseStream,
    batchStream,
    processStream
);

We’re reading in a stream of JSON-encoded events (httpStream), making sure they’re appropriately decoded (decodeStream), JSON parsing each incoming event (parseStream), batching events together (batchStream), and finally processing each batch of events (processStream).

Ideally I’d like to measure any or all of these individual steps.

However, many of these stream implementations are out of our hands. We can’t easily reach in and add timing code. Thankfully, we can easily write a function that decorates a provided stream with a simple runtime calculation.

Let’s call our decorator function time:


const time = (stream, name) => {
    return stream;
};

Our time function accepts and returns the stream we’ll be decorating, along with a name that describes the provided stream. It should be noted that it’s assumed that stream implements the Readable interface.

What we’re trying to accomplish here is relatively simple. We want to measure the amount of time that elapses between data emission events on our stream. We can use console.time/console.timeEnd and an event listener to make short work of this task:


const time = (stream, name) => {
    let timing = false;
    stream.on('data', () => {
        if (timing) {
            console.timeEnd(name);
        }
        console.time(name);
        timing = true;
    });
    return stream;
};

Every time we receive a 'data' event on our stream, we log the duration since the last received 'data' event, and start a new timer. We’re using a timing flag to ensure that console.timeEnd isn’t called the first time we receive a 'data' event.

Notice that we’re also using the provided name as the label in our console.time/console.timeEnd calls. This keeps us from getting confused when we start measuring multiple stages of our pipeline.

This solution mostly works. Unfortunately, a data event isn’t fired when the stream starts processing its first chunk of data. This means that we’re missing a measurement for this first chunk of execution time. Thankfully, we can capture that missing metric by also listening for a 'resume' event, which is called when the stream starts processing its first chunk of data:


const time = (stream, name) => {
    stream.on('resume', () => {
        console.time(name);
    });
    stream.on('data', () => {
        console.timeEnd(name);
        console.time(name);
    });
    return stream;
};

Notice that we’re no longer concerned about wrapping our console.timeEnd call in a guard in our 'data' event listener. We know that the 'resume' event handler will always call console.time before we reach our 'data' event handler, so we have no need for the timing guard anymore.

We can use our time function by decorating any or all of the stages of our pipeline:


await pipeline(
    httpStream,
    decodeStream,
    parseStream,
    time(batchStream, 'batch'),
    time(processStream, 'process')
);

Now that our runtime durations are finding their way to the logs, we can either use them as-is, or take things a step further and aggregate them for more in-depth data analysis:

...
batch: 258.213ms
process: 512.493ms
batch: 239.112ms
process: 475.293ms
...

As a warning to the reader, I’ll be the first to admit that I’m no stream expert. That said, this utility function proved invaluable to me, so I thought I’d record what I learned and pass it along for posterity.

Stream on.