PhD Aside: Reading a file descriptor line-by-line from multiple Node.js processes
Phew, that's a bit of a mouthful. We're taking a short break from the cluster series of posts (though those will be back next week I hope), because I've just run into a fascinating problem, the solution to which I thought I'd share here - since I didn't find a solution elsewhere on the web.
For my PhD, I've got a big old lump of data, and it all needs preprocessing before I train an AI model (or a variant thereof, since I'm effectively doing video-to-image translation). Unfortunately, one of the preprocessing steps is really slow. And because I'll naturally be training my AI for multiple epochs, the problem is multiplied.....
The solution, of course, is to do all the preprocessing up front such that I can just read the data in and push it directly into a Tensor in the right format. However, doing this on such a large dataset would take forever if I did the items 1 by 1. The thing is that Javascript isn't inherently multithreaded. I like this quote, as it describes the situation rather well:
In Javascript everything runs in parallel... except your code
In other words, when Node.js is reading or writing to and from the network, disk, or other places it can do lots of things at the same time because it does them asynchronously. The Javascript that gets executed though is only done on a single thread though.
This is great for io-bound tasks (such as a web server), as Node.js (a Javascript runtime) can handle many requests at the same time. On a side note, this is also the reason why Nginx is more efficient than Apache (because Nginx is event based too like Javascript, unlike Apache which is thread based).
It's not so great though for CPU bound tasks, such as the one I've got on my hands. All is not lost though, because Node.js has a number of useful functions inbuilt that we can use to tackle the issue.
Firstly, Node.js has a clever forking system. By using child_process.fork()
, a single Node.js process can create multiple copies of itself to act as workers:
// main.js
import child_process from 'child_process';
import os from 'os';
let workers = [];
for(let i = 0; i < os.cpus().length; i++) {
workers.push(
child_process.fork("worker.mjs")
);
}
// worker.js
console.log(`Hello, world from a child process!`);
Very useful! The next much more sticky problem though is how to actually preprocess the data in a performant manner. In my specific case, I'm piping the data in from a shell script that decompresses a number of gzip archives in a specific order (as of the time of typing I have yet to implement this).
Because this is a single pipe we're talking about here, the question now arises of how to allow all the child processes to access the data that's coming in from the standard input of the master process.
I've actually encountered an issue like this one before. I initially tried reading it in on the master process, and then using worker.send(message)
to send it to the worker processes for processing. This didn't end up working very well, because the master process became a bottleneck as it couldn't read from the standard input and send stuff to the workers fast enough.
With this in mind, I came up with a new plan. In Node.js, when you're forking to create a worker process, you can supply it with some custom file descriptors upon initialisation. So long as it has at least IPC (inter-process communication) channel for passing messages back and forth with the .send()
and .on("message", (message) => ....)
method and listeners, it doesn't actually care what you do with the others.
Cue file descriptor cloning:
// main.js
import child_process from 'child_process';
import os from 'os';
let workers = [];
for(let i = 0; i
I've highlighted the key line here (line 10 for those who can't see it). Here we tell it to clone file descriptors 0, 1, and 2 - which refer to stdin, stdout, and stderr respectively. This allows the worker processes direct access to the master process' stdin, stdout, and stderr.
With this, we can read from the same pipe with as many worker processes as we like - so long as they do so 1 at a time.
With this sorted, it gives rise to the next issue: reading line-by-line. Packages exist on npm (such as nexline, my personal favourite) to read from a stream line-by-line, but they have the unfortunate side-effect of maintaining a read buffer. While this is great for performance, it's not so great in my situation because it ends up scrambling the input! This is because said read buffer would be local to each worker process, so when the next worker along reads, it will skip a random number of bytes and start reading from the next bit along.
This means that I need to implement a custom method that reads a single line from a given file descriptor without maintaining a read buffer. I came up with this:
import fs from 'fs';
// .....
// Global buffer to avoid unnecessary memory churn
let buffer = Buffer.alloc(4096);
function read_line_unbuffered(fd) {
let i = 0;
while(true) {
let bytes_read = fs.readSync(fd, buffer, i, 1);
if(bytes_read !== 1 || buffer[i] == 0x0A) {
if(i == 0 && bytes_read == null) return null;
return buffer.toString("utf-8", 0, i); // This is not inclusive, so we can abuse it to trim the \n off the end
}
i++;
if(i == buffer.length) {
let new_buffer = new Buffer(Math.ceil(buffer.length * 1.5));
buffer.copy(new_buffer);
buffer = new_buffer;
}
}
}
I read from the given file descriptor character by character directly into a buffer. As soon as it detects a new line character (\n
, or character code 0x0A
), it returns the new line. If we run out of space in the buffer, then we create a new larger one, copy the old buffer's contents into it, and keep going.
I maintain a global buffer here, because this helps to avoid unnecessary memory churn. In my case, the lines I'm reading in a rather long (hence the need to clone the file descriptor in the first place), and if I didn't keep a shared buffer I'd be allocating and deallocating a new pretty large buffer every time.
This also has the nice side-effect that we keep the largest buffer we've had to use so far around for next time, avoiding the need for subsequent copies to larger and larger buffers.
Finally, we can also guarantee that it won't be a problem if we call this multiple times, because as I explained above Javascript is single-threaded, so if we call the function multiple times in quick succession each read will happen 1 after another.
With this chain of Node.js features, we can read a large amount of data from and efficiently process the content of a pipe. The trick from here is to implement a proper messaging and locking system to avoid reading from the stream at the same time, and avoid write to the standard output at the same time.
Taking this further, I ended up with this:
(Licence: Mozilla Public Licence 2.0)
This correctly ensures that only 1 worker process reads from the stream at the same time. It doesn't do anything with the result though except log a message to the console, but when I implement that I'll implement a similar messaging system to ensure that only 1 process writes to the output at once.
On that note, my data is also ordered, so I'll have to implement a complicated cache system // ordering system to ensure that I write them to the standard output in the same order I read them in. When I do implement that, I'll probably blog about that too....
The main problem I still have with this solution is that I'm reading from the input stream. I haven't done any proper testing, but I'm pretty sure that doing so will be really slow. I not sure I can avoid this though and read a few KiBs at a time, because I don't currently know of any way to put the extra characters back into the input stream.
If anyone has a solution to that that increases performance, I'd love to know. Leave a comment below!