Skip to content

Exactly-Once Semantics

Exactly-once semantics in Kafka is a combination of two key features:

  • Idempotent producers: which help avoid duplicates caused by producer retries
  • Transactional semantics: which guarantee exactly-once processing in stream processing application.

Idempotent Producer

A service is called idempotent if performing the same operation multiple times has the same result as performing it a single time.

How messages can be duplicated in Kafka system?

For example, when a partition leader received a record from the producer and replicated it successfully to the followers, and then the broker on which the leader resides crashed before it could send a response to the producer. The producer, after a certain time without a response, will resend the message. The message will arrive at the new leader, who already has a copy of the message from the previous attempt—resulting in a duplicate.

How Does the Idempotent Producer Work?

When we enable the idempotent producer, each message will include a unique identified producer ID (PID) and a sequence number. These, together with the target topic and partition, uniquely identify each message.

On the broker side, on a per partition basis, it keeps track of the largest PID-Sequence Number combination it has successfully written. The broker will reject a producer request if its sequence number is not exactly one greater than the last committed message from that PID/TopicPartition pair

Producer ID is a Unique Identifier assigned to the producer by the broker that is not exposed to users but is passed on every request to the broker.

If transactional.id is not specified, a fresh PID is generated every-time on producer initialization.

If transactional.id is specified,the broker stores mapping of Transactional ID to PID so that it can return the same PID on producer restart.

How Do I Use the Kafka Idempotent Producer?:

This is the easy part. Add enable.idempotence=true to the producer configuration.

Transactions

Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.

Prerequisite:

Topic must have Replication Factor >= 3 and min.insync.replicas >= 2

What Problems Do Transactions Solve?

Consider a simple stream processing application: it reads events from a source topic, maybe processes them, and writes results to another topic. We want to be sure that for each message we process, the results are written exactly once. What can possibly go wrong?

Reprocessing caused by application crashes

After consuming a message from the source cluster and processing it, the application has to do two things:

  • Produce the result to the output topic
  • Commit the offset of the message that we consumed

What happens if the application crashes after the output was produced but before the offset of the input was committed? After several heartbeats are missed, the application will be assumed dead and its partitions reassigned to another consumer in the consumer group. That consumer will begin consuming records from those partitions, starting at the last committed offset. This means that all the records that were processed by the application between the last committed offset and the crash will be processed again, and the results will be written to the output topic again—resulting in duplicates.

Reprocessing caused by zombie applications

What happens if our application just consumed a batch of records from Kafka and then froze or lost connectivity to Kafka before doing anything else with this batch of records?

After several heartbeats are missed, the application will be assumed dead and its partitions reassigned to another consumer in the consumer group. That consumer will reread that batch of records, process it. Meanwhile, the first instance of the application—the one that froze—may resume its activity: process the batch of records it recently consumed, and produce the results to the output topic.

What problems aren't solved by transaction

Side effects while streaming processing

Let's say that the record processing step in our stream processing app includes sending email to users. Enabling exactly-once semantics will not guarantee that the email will only be sent once. The guarantee only applies to records written to Kafka.

Reading from Kafka topic, and writing to database

In this case, the application is writing to an external database rather than to Kafka. We have 2 steps here:

  • Writing record to database
  • Update an offset to Kafka within the consumer

There is no mechanism that allows writing results to an external database and committing offset to Kafka within a single transaction. Instead we could manage offsets in the database and commit both data and offset to the database in a single transaction, this would rely on the database transaction guarantees rather than Kafka. Reading data from database, writing to Kafka, and from there writing to another database

Copy data from one Kafka cluster to another

Publish/subscribe pattern