4/25/2025 at 1:42:09 PM
Agreed. The head of line problem is worth solving for certain use cases.But today, all streaming systems (or workarounds) with per message key acknowledgements incur O(n^2) costs in either computation, bandwidth, or storage per n messages. This applies to Pulsar for example, which is often used for this feature.
Now, now, this degenerate time/space complexity might not show up every day, but when it does, you’re toast, and you have to wait it out.
My colleagues and I have studied this problem in depth for years, and our conclusion is that a fundamental architectural change is needed to support scalable per message key acknowledgements. Furthermore, the architecture will fundamentally require a sorted index, meaning that any such a queuing / streaming system will process n messages in O (n log n).
We’ve wanted to blog about this for a while, but never found the time. I hope this comment helps out if you’re thinking of relying on per message key acknowledgments; you should expect sporadic outages / delays.
by galeaspablo
4/25/2025 at 3:06:10 PM
Check out the parallel consumer: https://github.com/confluentinc/parallel-consumerIt processes unrelated keys in parallel within a partition. It has to track what offsets have been processed between the last committed offset of the partition and the tip (i.e. only what's currently processed out of order). When it commits, it saves this state in the commit metadata highly compressed.
Most of the time, it was only processing a small number of records out of order so this bookkeeping was insignificant, but if one key gets stuck, it would scale to at least 100,000 offsets ahead, at which point enough alarms would go off that we would do something. That's definitely a huge improvement to head of line blocking.
by singron
4/25/2025 at 4:31:18 PM
Disclosure (given this is from Confluent): I'm ex MSK (Managed Streaming for Kafka at AWS) and my current company was competing with Confluent before we pivoted.Yup, this is one more example, just like Pulsar. There are definitely great optimizations to be made on the average case. In the case of parallel consumer, if you'd like to keep ordering guarantees, you retain O(n^2) processing time in the worst case.
The issues arise when you try to traverse arbitrary dependency topologies in your messages. So you're left with two options:
1. Make damn sure that causal dependencies don't exhibit O(n^2) behavior, which requires formal models to be 100% sure. 2. Give up ordering or make some other nasty tradeoff.
At a high level the problem boils down to traversing a DAG in topological order. From computer science theory, we know that this requires a sorted index. And if you're implementing an index on top of Kafka, you might as well embed your data into and consume directly from the index. Of course, this is easier said than done, and that's why no one has cracked this problem yet. We were going to try, but alas we pivoted :)
Edit: Topological sort does not required a sorted index (or similar) if you don't care about concurrency. But then you've lost the advantages of your queue.
by galeaspablo
4/25/2025 at 5:22:30 PM
> traverse arbitrary dependency topologiesIs there another way to state this? It’s very difficult for me to grok.
> DAG
Directed acyclic graph right?
by taurath
4/25/2025 at 5:59:34 PM
Apologies, we've been so deep into this problem that we take our slang for granted :)A graphical representation might be worth a thousand words, keeping in mind it's just one example. Imagine you're traversing the following.
A1 -> A2 -> A3...
|
v
B1 -> B2 -> B3...
|
v
C1 -> C2 -> C3...
|
v
D1 -> D2 -> D3...
|
v
E1 -> E2 -> E3...
|
v
F1 -> F2 -> F3...
|
v
...
Efficient concurrent consumption of these messages (while respecting causal dependency) would take O(w + h), where w = the _width_ (left to right) of the longest sequence, and h = the _height_ (top to bottom of the first column)
But Pulsar, Kafka + parallel consumer, Et al. would take O(n^2) either in processing time or in space complexity. This is because at a fundamental level, the underlying data storages store looks like this
A1 -> A2 -> A3...
B1 -> B2 -> B3...
C1 -> C2 -> C3...
D1 -> D2 -> D3...
E1 -> E2 -> E3...
F1 -> F2 -> F3...
Notice that the underlying data storage loses information about nodes with multiple children (e.g., A1 previously parented both A2 and B1)
If we want to respect order, the consumer will be responsible for declining to process messages that don't respect causal order. E.g., attempting to process F1 before E1. Thus we could get into a situation where we try to process F1, then E1, then D1, then C1, then B1, then A1. Now that A1 is processed, kafka tries again, but it tries F1, then E1, then D1, then C1, then B1... And so on and so forth. This is O(n^2) behavior.
Without changing the underlying data storage architecture, you will either:
1. Incur O(n^2) space or time complexity
2. Reimplement the queuing mechanism at the consumer level, but then you might as well not even use Kafka (or others) at all. In practice this is not practical (my evidence being that no one has pulled it off).
3. Face other nasty issues (e.g., in Kafka parallel consumer you can run out of memory or your processing time can become O(n^2)).
by galeaspablo
4/25/2025 at 7:03:04 PM
Do you have an example use case for this? This does seem like something unsuited to kafka, but I'm having a hard time imagining why you would structure something like this.by singron
4/26/2025 at 12:05:42 AM
Great follow up question, thank you. I could talk about this "topic" for days, so I appreciate the opportunity to expand. :)Let's imagine ourselves as a couple of engineers at Acme Foreign Exchange House. We'd like to track Acme's net cash position across multiple currencies, and execute trades accordingly (e.g., heding). And we'd like to retrospectively analyze our hedges, to assess their effectiveness.
Let's say I have this set of transactions (for accounts A, B, C, D, E, F, etc.)
A1 -> A2 -> A3 -> A4
B1 -> B2 -> B3 -> B4
C1-> C2
D1 -> D2 -> D3 -> D4
E1 -> E2
F1
Let's say that that:
- E1 was a deposit made into account E for $2M USD.
- E2 was an outgoing transfer of $2M USD sent to account F (incoming £1.7M GBP at F1).
If we consume our transactions and partiton our consumption by account id, we could get into a state where E1 and F1 are reflected in our net position, but E2 isn't. That is, our calculation has both $2M USD and £1.7M GBP, when in reality we only ever held either $2M USD or £1.7M GBP.
So what could we do?
1. Make sure that we respect causality order. I.e., there's no F1 reflected in our net position if we haven't processed E2.
2. Make sure that pairs of transactions (e.g., E2 and F1) update our net position atomically.
This is otherwise known as a "consistent cut" (see slide 25 here https://www.cs.cornell.edu/courses/cs6410/2011fa/lectures/19...).
Opinion: the world is causally ordered in arbitrary ways as above. But the tools, frameworks, and infrastructure more readily available to us struggle at modeling arbitrary partially ordered causality graphs. So we shrug our shoulders, and we learn to live with the edge cases. But it doesn't have to be so.
by galeaspablo
4/25/2025 at 4:34:42 PM
I suppose it depends on your message volume. To me, processing 100k messages and then getting a page however long later as the broker (or whatever) falls apart sounds much worse than head of line blocking and seeing the problem directly in my consumer. If I need to not do head of line blocking, I can build whatever failsafe mechanisms I need for the problematic data and defer to some other queueing system (typically, just add an attempt counter and replay the message to the same kafka topic and then if attempts > X, send it off to wherever)I'd rather debug a worker problem than an infra scaling problem every day of the week and twice on Sundays.
by Misdicorl
4/25/2025 at 7:13:43 PM
It's interesting you say that, since this turned an infra scaling problem into a worker problem for us. Previously, we would get terrible head-of-line throughput issues, so we would use an egregious number of partitions to try to alleviate that. Lots of partitions is hard to manage since resizing topics is operationally tedious and it puts a lot of strain on brokers. But no matter how many partitions you have, the head-of-line still blocks. Even cases where certain keys had slightly slower throughput would clog up the whole partition with normal consumers.The parallel consumer nearly entirely solved this problem. Only the most egregious cases where keys were ~3000 times slower than other keys would cause an issue, and then you could solve it by disabling that key for a while.
by singron
4/25/2025 at 9:30:29 PM
Yeah I'd say kafka is not a great technology if your median and 99ths (or 999ths if volume is large enough) are wildly different which sounds like your situation. I use kafka in contexts where 99ths going awry usually aren't key dependent so I don't have the issues you see.I tend to prefer other queueing mechanisms in those cases, although I still work hard to make 99ths and medians align as it can still cause issues (especially for monitoring)
by Misdicorl
4/25/2025 at 4:43:05 PM
Follow on: If you're using kafka to publish messages to multiple consumers, this is even worse as now you're infecting every consumer with data processing issues from every other consumer. Bad jujuby Misdicorl
4/25/2025 at 2:14:57 PM
> streaming system will process n messages in O (n log n)I'm guessing this is mostly around how backed up the stream is. n isn't the total number of messages but rather the current number of unacked messages.
Would a radix structure work better here? If you throw something like a UUID7 on the messages and store them in a radix structure you should be able to get O(n) performance here correct? Or am I not understanding the problem well.
by cogman10
4/25/2025 at 3:22:06 PM
I think the problem is that if you want quick access to all messages with a particular key then you have to maintain some kind of index over all persisted messages. So n would be total number of persisted messages as I read it, which can be quite large. But even storing them in the first place is O(n), so O(n log n) might not be so bad.by bjornsing
4/25/2025 at 4:33:18 PM
That's correct. And keep in mind that you might have new consumers starting from the beginning come into play, so you have to permanently retain the indexes.And yes, O(n log n ) is not bad at all. Sorted database indexes (whether SQL, NoSQL, or AcmeVendorSQL, etc.) already take O(n log n) to insert n elements into data storage or to read n elements from data storage.
by galeaspablo