Starbeamrainbowlabs

Stardust
Blog

PhD Aside: Reading a file descriptor line-by-line from multiple Node.js processes

Phew, that's a bit of a mouthful. We're taking a short break from the cluster series of posts (though those will be back next week I hope), because I've just run into a fascinating problem, the solution to which I thought I'd share here - since I didn't find a solution elsewhere on the web.

For my PhD, I've got a big old lump of data, and it all needs preprocessing before I train an AI model (or a variant thereof, since I'm effectively doing video-to-image translation). Unfortunately, one of the preprocessing steps is really slow. And because I'll naturally be training my AI for multiple epochs, the problem is multiplied.....

The solution, of course, is to do all the preprocessing up front such that I can just read the data in and push it directly into a Tensor in the right format. However, doing this on such a large dataset would take forever if I did the items 1 by 1. The thing is that Javascript isn't inherently multithreaded. I like this quote, as it describes the situation rather well:

In Javascript everything runs in parallel... except your code

--Felix Geisendörfer

In other words, when Node.js is reading or writing to and from the network, disk, or other places it can do lots of things at the same time because it does them asynchronously. The Javascript that gets executed though is only done on a single thread though.

This is great for io-bound tasks (such as a web server), as Node.js (a Javascript runtime) can handle many requests at the same time. On a side note, this is also the reason why Nginx is more efficient than Apache (because Nginx is event based too like Javascript, unlike Apache which is thread based).

It's not so great though for CPU bound tasks, such as the one I've got on my hands. All is not lost though, because Node.js has a number of useful functions inbuilt that we can use to tackle the issue.

Firstly, Node.js has a clever forking system. By using child_process.fork(), a single Node.js process can create multiple copies of itself to act as workers:

// main.js
import child_process from 'child_process';
import os from 'os';

let workers = [];

for(let i = 0; i < os.cpus().length; i++) {
    workers.push(
        child_process.fork("worker.mjs")
    );
}
// worker.js
console.log(`Hello, world from a child process!`);

Very useful! The next much more sticky problem though is how to actually preprocess the data in a performant manner. In my specific case, I'm piping the data in from a shell script that decompresses a number of gzip archives in a specific order (as of the time of typing I have yet to implement this).

Because this is a single pipe we're talking about here, the question now arises of how to allow all the child processes to access the data that's coming in from the standard input of the master process.

I've actually encountered an issue like this one before. I initially tried reading it in on the master process, and then using worker.send(message) to send it to the worker processes for processing. This didn't end up working very well, because the master process became a bottleneck as it couldn't read from the standard input and send stuff to the workers fast enough.

With this in mind, I came up with a new plan. In Node.js, when you're forking to create a worker process, you can supply it with some custom file descriptors upon initialisation. So long as it has at least IPC (inter-process communication) channel for passing messages back and forth with the .send() and .on("message", (message) => ....) method and listeners, it doesn't actually care what you do with the others.

Cue file descriptor cloning:


// main.js
import child_process from 'child_process';
import os from 'os';

let workers = [];

for(let i = 0; i 

I've highlighted the key line here (line 10 for those who can't see it). Here we tell it to clone file descriptors 0, 1, and 2 - which refer to stdin, stdout, and stderr respectively. This allows the worker processes direct access to the master process' stdin, stdout, and stderr.

With this, we can read from the same pipe with as many worker processes as we like - so long as they do so 1 at a time.

With this sorted, it gives rise to the next issue: reading line-by-line. Packages exist on npm (such as nexline, my personal favourite) to read from a stream line-by-line, but they have the unfortunate side-effect of maintaining a read buffer. While this is great for performance, it's not so great in my situation because it ends up scrambling the input! This is because said read buffer would be local to each worker process, so when the next worker along reads, it will skip a random number of bytes and start reading from the next bit along.

This means that I need to implement a custom method that reads a single line from a given file descriptor without maintaining a read buffer. I came up with this:

import fs from 'fs';

//  .....

// Global buffer to avoid unnecessary memory churn
let buffer = Buffer.alloc(4096);
function read_line_unbuffered(fd) {
    let i = 0;
    while(true) {
        let bytes_read = fs.readSync(fd, buffer, i, 1);
        if(bytes_read !== 1 || buffer[i] == 0x0A) {
            if(i == 0 && bytes_read == null) return null;
            return buffer.toString("utf-8", 0, i); // This is not inclusive, so we can abuse it to trim the \n off the end
        }

        i++;
        if(i == buffer.length) {
            let new_buffer = new Buffer(Math.ceil(buffer.length * 1.5));
            buffer.copy(new_buffer);
            buffer = new_buffer;
        }
    }
}

I read from the given file descriptor character by character directly into a buffer. As soon as it detects a new line character (\n, or character code 0x0A), it returns the new line. If we run out of space in the buffer, then we create a new larger one, copy the old buffer's contents into it, and keep going.

I maintain a global buffer here, because this helps to avoid unnecessary memory churn. In my case, the lines I'm reading in a rather long (hence the need to clone the file descriptor in the first place), and if I didn't keep a shared buffer I'd be allocating and deallocating a new pretty large buffer every time.

This also has the nice side-effect that we keep the largest buffer we've had to use so far around for next time, avoiding the need for subsequent copies to larger and larger buffers.

Finally, we can also guarantee that it won't be a problem if we call this multiple times, because as I explained above Javascript is single-threaded, so if we call the function multiple times in quick succession each read will happen 1 after another.

With this chain of Node.js features, we can read a large amount of data from and efficiently process the content of a pipe. The trick from here is to implement a proper messaging and locking system to avoid reading from the stream at the same time, and avoid write to the standard output at the same time.

Taking this further, I ended up with this:

(Licence: Mozilla Public Licence 2.0)

This correctly ensures that only 1 worker process reads from the stream at the same time. It doesn't do anything with the result though except log a message to the console, but when I implement that I'll implement a similar messaging system to ensure that only 1 process writes to the output at once.

On that note, my data is also ordered, so I'll have to implement a complicated cache system // ordering system to ensure that I write them to the standard output in the same order I read them in. When I do implement that, I'll probably blog about that too....

The main problem I still have with this solution is that I'm reading from the input stream. I haven't done any proper testing, but I'm pretty sure that doing so will be really slow. I not sure I can avoid this though and read a few KiBs at a time, because I don't currently know of any way to put the extra characters back into the input stream.

If anyone has a solution to that that increases performance, I'd love to know. Leave a comment below!

Tag Cloud

3d 3d printing account algorithms android announcement architecture archives arduino artificial intelligence artix assembly async audio automation backups bash batch blender blog bookmarklet booting bug hunting c sharp c++ challenge chrome os cluster code codepen coding conundrums coding conundrums evolved command line compilers compiling compression conference conferences containerisation css dailyprogrammer data analysis debugging defining ai demystification distributed computing dns docker documentation downtime electronics email embedded systems encryption es6 features ethics event experiment external first impressions freeside future game github github gist gitlab graphics guide hardware hardware meetup holiday holidays html html5 html5 canvas infrastructure interfaces internet interoperability io.js jabber jam javascript js bin labs latex learning library linux lora low level lua maintenance manjaro minetest network networking nibriboard node.js open source operating systems optimisation outreach own your code pepperminty wiki performance phd photos php pixelbot portable privacy problem solving programming problems project projects prolog protocol protocols pseudo 3d python reddit redis reference release releases rendering research resource review rust searching secrets security series list server software sorting source code control statistics storage svg systemquery talks technical terminal textures thoughts three thing game three.js tool tutorial twitter ubuntu university update updates upgrade version control virtual reality virtualisation visual web website windows windows 10 worldeditadditions xmpp xslt

Archive

Art by Mythdael