Wednesday, August 15, 2018

Spring Kafka Producer Class diagram

Spring Kafka offers a Spring Bean structure for Producing Kafka Messages. For someone familiar with using Kafka API, Spring Kafka can seem a bit different.

Here is a code snippet from Spring Kafka documentation showing the Classes involved in the Spring Kafka Producer.

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
view raw gistfile1.txt hosted with ❤ by GitHub
We can see that there are 2 important classes involved. DefaultKafkaProducerFactory and KafkaTemplate. Here is a UML class diagram showing how they are related.


Spring KafkaTemplate UML diagram



The KafkaTemplate bean needs an implementation  ProducerFactory interface that has information to produce messages. There is only one implementation available - the DefaultKafkaProducerFactory class.

DefaultKafkaProducerFactory takes a Hashmap with properties like Kafka Server URLs, Serializer classes, etc.


Reference - https://docs.spring.io/spring-kafka/reference/htmlsingle/#_overview

Thursday, May 10, 2018

Some common errors seen after upgrading Elasticsearch, Logstash & Kibana stack from version 5 to version 6

I recently worked on upgrading an existing Elasticsearch, Logstash & Kibana (ELK ) stack from version 5.2 to 6.2.4. There are several breaking  changes in this upgrade.

I encountered the following changes in the Index mapping. Please see each error below and how to fix them. I hope this will help someone searching on Google to fix them.

These errors will be displayed in the logstash-plain.log or logstash.log.

[2018-05-09T03:45:12,204][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2018.05.09", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x673f5e81>], :response=>{"index"=>{"_index"=>"logstash-2018.05.09", "_type"=>"doc", "_id"=>"t92BRGMBQMgRhS0WQ5xE", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to find type parsed [string] for [uriParams]"}}}}

Solution - Change the field type of the uriParams field from string to text or keyword.

[2018-05-09T13:17:45,930][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"logstash-2018.05.09", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x238de49c>], :response=>{"index"=>{"_index"=>"logstash-2018.05.09", "_type"=>"doc", "_id"=>"n06NRmMBQMgRhS0WdPQs", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Could not convert [fielddata] to boolean", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [{format=false}] as only [true] or [false] are allowed."}}}}}}
Solution - Change the value of the "fielddata" attributes in the Index mapping. If you had been running ELK since version 2, your  "fielddata" value will have been set as

"fielddata": {
   "format": "disabled" 


}                       


This is not supported in version 6. Change the value as follows:

"fielddata": true



References:


Tuesday, January 23, 2018

Apache Kafka InvalidReplicationFactorException

When creating a topic this Exception may be thrown by Kafka.

Scenario

If we try to create a topic with replication factor larger than the number of cluster, we will see an error.


Exception

This is the exception thrown by Apache Kafka.

Error while executing topic command : Replication factor: 5 larger than available brokers: 2.
[2018-01-23 18:02:37,534] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException:
Replication factor: 5 larger than ava
ilable brokers: 2.
(kafka.admin.TopicCommand$)
view raw gistfile1.txt hosted with ❤ by GitHub



Resolution

Ensure that the replication factor is less than or equal to the number of brokers in the cluster.

Kafka Exception - kafka.common.InconsistentBrokerIdException

I came across this exception when I was working with Apache Kafka.

Scenario

Set up multiple Kafka brokers in a cluster. Copy the server.properties to create a new propeties file for the new broker. I was following the instructions here https://kafka.apache.org/documentation/#quickstart_multibroker

Result

The first broker started without error. When I started the second broker, I saw this exception - kafka.common.InconsistentBrokerIdException.


[2018-01-23 17:30:19,004] FATAL Fatal error during KafkaServer startup. Prepare
to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 2 doesn't match
stored broker.id 1 in meta.properties. If you moved your data, make sure your c
onfigured broker.id matches. If you intend to create a new broker, you should re
move all data in your data directories (log.dirs).
at kafka.server.KafkaServer.getBrokerIdAndOfflineDirs(KafkaServer.scala:
615)
at kafka.server.KafkaServer.startup(KafkaServer.scala:201)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:
38)
at kafka.Kafka$.main(Kafka.scala:92)
at kafka.Kafka.main(Kafka.scala)
[2018-01-23 17:30:19,007] INFO shutting down (kafka.server.KafkaServer)
[2018-01-23 17:30:19,010] INFO Terminate ZkClient event thread. (org.I0Itec.zkcl
ient.ZkEventThread)
[2018-01-23 17:30:19,017] INFO Session: 0x16125c42cd00004 closed (org.apache.zoo
keeper.ZooKeeper)
[2018-01-23 17:30:19,018] INFO EventThread shut down for session: 0x16125c42cd00
004 (org.apache.zookeeper.ClientCnxn)
[2018-01-23 17:30:19,020] INFO shut down completed (kafka.server.KafkaServer)
[2018-01-23 17:30:19,022] FATAL Exiting Kafka. (kafka.server.KafkaServerStartabl
e)
[2018-01-23 17:30:19,024] INFO shutting down (kafka.server.KafkaServer)
view raw .java hosted with ❤ by GitHub

Resolution

I found that the cause was that when I copied the proerties file I had missed changing the "log.dirs" property. If the two brokers share the same "log.dirs" property then we get this error. I restarted the second instance with a unique value for "log.dirs" property. The issue was resolved.