Sample values:
NAU-1100
NAU-1101
NAP-1200
NAP-1201
Topics are where your application produce messages to and consume messages from.
A typical naming convention for kafka topics can be like this:
<PRODUCER_SYSTEM>-<CONSUMER_SYSTEM>-<REQUEST>-<ENVIRONMENT>
A topic where apigateway
is the producer and payment
is the consumer. Incoming payment requests are sent over this topic in the uat environment would look like this:
apigateway-payment-sg-request-uat
A topic where payment
is the producer and apigateway
is the consumer, where payment responses are sent over this topic in the uat environment would look like this:
apigateway-payment-sg-response-uat
apigateway-payment-sg-request-prod
apigateway-payment-sg-response-prod
TODO
TODO
Summary:
Topics:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1-SNAPSHOT</version>
</dependency>
Simple usage of Kafka’s consumer api that relies on automatic offset committing.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // kafka broker
props.setProperty("group.id", "test"); // consumer group id
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000"); // offsets are committed automatically at 1000 ms intervals
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // deserialize record key as simple strings
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // deserialize record value as simple strings
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar")); // topics
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the configuration bootstrap.servers
. This list if just used to discover the rest of the brokers in the cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in case there are servers down when the client is connecting).
Useful when consumption of message is coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
Consume a batch of records and batch them up in memory. Once enough records batched, insert them into a database.
Manually commit the offsets only after records have been inserted into the database. This gives exact control of when a record is considered consumed.
However, the consumer process could fail in the short interval (a few milliseconds) after the insert into database but before the commit. In this case, the batch of records would be consumed again by the next process that takes over the partitions.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); // kafka broker
props.setProperty("group.id", "test"); // consumer group id
props.setProperty("enable.auto.commit", "false"); // auto commit is disabled
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // deserialize record key as simple strings
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // deserialize record value as simple strings
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar")); // topics
final int minBatchSize = 200;
List<ConsumerRecords<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Durations.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.record(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
TODO
TODO
Kafka node liveness has two conditions:
If a follower dies, gets stucks, or falls behind, the leader will remove the follower from the list of in sync replicas.
Have one or more consumer threads
that do all data consumption and hand off records to a pool of processor thread
that actually handle the record processing.
public class Consumer {
MessageProcessor messageProcessor;
public void consume() {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (TopicPartitiion partition: records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record: partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
// Pass record value to messageProcessor
messageProcessor.process(record.value()));
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.cimmitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
} finally {
cosumer.close();
}
}
}
public class MessageProcessor {
@Async
public void process(String record) {
// Process record in a separate processor thread...
}
}
Pro:
Cons:
TODO