Kafka Exactly once with Transactional Producer

Clash Royale CLAN TAG#URR8PPP
Kafka Exactly once with Transactional Producer
I am trying to understand Kafka exactly once using Transactional producer/consumer.
I came across the below example. But, still I have hard time in understanding exactly once. Is this code correct?
producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?
What is system crashes before consumer.commitSync(); // The same messages will be read again and duplicate messages will be produced?
public class ExactlyOnceLowLevel
public void runConsumer() throws Exception
final KafkaConsumer<byte, byte> consumer = createConsumer();
final Producer<Long, String> producer = createProducer();
producer.initTransactions();
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) AuthorizationException e)
e.printStackTrace();
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
catch (final KafkaException e)
e.printStackTrace();
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
finally
producer.flush();
producer.close();
private static KafkaConsumer<byte, byte> createConsumer()
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be
return new KafkaConsumer<>(consumerConfig);
private static Producer<Long, String> createProducer()
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1
return new KafkaProducer<>(props);
public static void main(final String... args) throws Exception
final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
example.runConsumer();
1 Answer
1
You should not attempt to commit offsets with the Consumer when using the read/process/write pattern with Kafka Transactions. As you hinted this can cause issues.
In this use case, offsets need to be added to the transaction and you should only use sendOffsetsToTransaction() to do that. That method ensure that these offsets are only committed if the full transaction succeeds. See the Javadoc:
sendOffsetsToTransaction()
Sends a list of specified offsets to the consumer group coordinator,
and also marks those offsets as part of the current transaction. These
offsets will be considered committed only if the transaction is
committed successfully. The committed offset should be the next
message your application will consume, i.e. lastProcessedMessageOffset
+ 1.
This method should be used when you need to batch consumed and
produced messages together, typically in a consume-transform-produce
pattern. Thus, the specified consumerGroupId should be the same as
config parameter group.id of the used consumer. Note, that the
consumer should have enable.auto.commit=false and should also not
commit offsets manually (via sync or async commits).
In your example, you don't have to call
clear() as you will be re-creating a fresh map when starting the next iteration. Even if you want to call clear() that happens after the transaction is committed, so no data is lost.– Mickael Maison
Aug 8 at 12:55
clear()
clear()
Do we need transactional consumer here. consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); What role it plays here as it simply reads from other topic.
– user1578872
Aug 8 at 13:01
If the input topic does not contain any transactions, keep the default for this setting. If it does contain transactions, then that depends on your requirements.
– Mickael Maison
Aug 8 at 13:23
producer.sendOffsetsToTransaction -> Where this transaction offsets are stored? Is it __stransaction_state topic or in transaction commit log or ....
– user1578872
Aug 8 at 14:54
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
So, sendOffsetsToTransaction should be to the source topic. What if there are issues when calling currentOffsets.clear();?
– user1578872
Aug 8 at 12:23