Thanks for the write up! In my current application I have a few different scenarios that are a bit different from yours but still require processing aggregated data in order
1. Reading from various files where each file has lines with a unique identifier I can use to process in order: I open all the files and create a min heap reading the first line of each, then process by grabbing the lowest from the min-heap repeatedly, after reading a line from a file, I read another and put it in the min-heap again (the min heap cells contain the opened file descriptor for that file)
2. Aggregating across goroutines that service data generators with different latencies and throughputs. I have a goroutine each that interfaces with them and consider them “producers”. Using a global atomic integer I can quickly assign a unique increasing index to the messages coming in, these can be serviced with a min-heap same as above. There are some considerations about dropping too old messages, so an alternative approach for some cases is to index the min-heap on received time and process only up to time.Now()-some buffering time to allow more time for things to settle before dropping things (trading total latency for this).
3. Similar to the above I have another scenario where throughput ingestion is more important and repeated processing happens in-order but there is no requirement on all messages to have been processed every time, just that they are processed in order (this is the backing for a log viewer). In this case I just slab allocate and dump what I receive without ordering concerns but I also keep a btree with the indexes that I iterate over when it’s time to process. I originally had this buffering like (2) to guarantee mostly ordered insertions in the slabs themselves (which I simply iterated on) but if a stall happened in a goroutine then shifting over the items in the slab when the old items came in became very expensive and could spiral badly.
Hi everyone, I’m the author of the article. Happy to answer any questions or discuss concurrency patterns in Go. Curious how others tackle such problems.
Another scenario where order matters is in Temporal workflows. Temporal’s replay capability requires deterministic execution.
For something like this, I would instinctively reach for an external queue mechanism instead of trying to work through the complexity of golangs concurrency.
Create a bunch of sequentially numbered jobs that then update their output into postgres database. Then have N number of workers process the jobs. Something like GCP's CloudTasks is perfect for this because the "workers" are just GCP Cloud Functions, so you can have a near infinite number of them (limited by concurrent DB connections).
This approach also buys you durability of the queue for free (ie: what happens when you need to stop your golang process mid queue?).
Then it is just a query:
select * from finished_jobs order by job_num;
Personally, I've come to really hate channels in Go. They are a source of some seriously heinous deadlock bugs that are really hard to debug, and closing channels in the wrong spot can crash your entire app. I try using plain locks until it hurts before I reach for channels these days.
UPD.
I've just made a small but important clarification to the article. While in many cases it's easier and even preferred to calculate all results, accumulate them somewhere, then sort; this article focuses on memory bound algorithms that support infinite streams and backpressure.