logoalt Hacker News

tetraodonpufferyesterday at 2:48 PM1 replyview on HN

Thanks for the write up! In my current application I have a few different scenarios that are a bit different from yours but still require processing aggregated data in order

1. Reading from various files where each file has lines with a unique identifier I can use to process in order: I open all the files and create a min heap reading the first line of each, then process by grabbing the lowest from the min-heap repeatedly, after reading a line from a file, I read another and put it in the min-heap again (the min heap cells contain the opened file descriptor for that file)

2. Aggregating across goroutines that service data generators with different latencies and throughputs. I have a goroutine each that interfaces with them and consider them “producers”. Using a global atomic integer I can quickly assign a unique increasing index to the messages coming in, these can be serviced with a min-heap same as above. There are some considerations about dropping too old messages, so an alternative approach for some cases is to index the min-heap on received time and process only up to time.Now()-some buffering time to allow more time for things to settle before dropping things (trading total latency for this).

3. Similar to the above I have another scenario where throughput ingestion is more important and repeated processing happens in-order but there is no requirement on all messages to have been processed every time, just that they are processed in order (this is the backing for a log viewer). In this case I just slab allocate and dump what I receive without ordering concerns but I also keep a btree with the indexes that I iterate over when it’s time to process. I originally had this buffering like (2) to guarantee mostly ordered insertions in the slabs themselves (which I simply iterated on) but if a stall happened in a goroutine then shifting over the items in the slab when the old items came in became very expensive and could spiral badly.


Replies

destelyesterday at 4:55 PM

Wow, that’s some seriously sophisticated stuff - it’s not that often you see a heap used in typical production code (outside of libraries)!

Your first example definitely gives me merge-sort vibes - a really clean way to keep things ordered across multiple sources. The second and third scenarios are a bit beyond what I’ve tackled so far, but super interesting to read about.

This also reminded me of a WIP PR I drafted for rill (probably too niche, so I’m not sure I’ll ever merge it). It implements a channel buffer that behaves like a heap - basically a fixed-size priority queue where re-prioritization only happens for items that pile up due to backpressure. Maybe some of that code could be useful for your future use cases: https://github.com/destel/rill/pull/50

show 1 reply