Snažím se integrovat Kafka v mém projektu. Mám 2 jarní aplikace musí být nakonfigurována jako producent a druhá jako spotřebitel. Snažím se odeslat uvítací zprávu. Pozdrav je třída má dvě oblasti, bude vidět níže. Ale myslím, že tato chyba ve spotřebitelské aplikace konzoly:
2021-11-23 13:31:53.412 ERROR 8452 --- [ntainer#1-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition greetingTopic-0 at offset 3. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'assignment2.kafka.Greeting' is not in the trusted packages: [java.util, java.lang, ro.tuc.ds2020.kafkaconsumer, ro.tuc.ds2020.kafkaconsumer.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Kód Kafka výrobce konfiguraci:
package assignment2.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
}
Mám následující kód pro Kafka spotřebitele konfigurace:
package ro.tuc.ds2020.kafkaconsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"sensorGroup");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:assignment2.kafka.Greeting");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "assignment2.kafka.Greeting");
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
}
A Pozdrav třídy, který je přítomen v obou aplikací. Na spotřebitelské aplikace, v ro.tuc.ds2020.kafkaconsumer obalu a na výrobce, část, na assignment2.kafka balíček.
public class Greeting {
private String msg;
private String name;
public Greeting() {
}
public Greeting(String msg, String name) {
this.msg = msg;
this.name = name;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return msg + ", " + name + "!";
}
}
Mám podezření, že je něco špatně s mojí konfiguraci, našel jsem podobné vyřešit otázku zde: Spring Boot Kafka Spotřebitele házení chyby ve smyčce . Přidal jsem zvláštní linky, které vyřešil tento člověk chyba, ale bohužel nemůžu, aby to fungovalo. Myslím, že to souvisí s tím, že mám dvě samostatné aplikace, takže objekt typu Pozdrav , který je poslat od výrobce ke spotřebiteli není v jednom balení. Děkuji!!! Doufám, že někdo může pomoci
props.put(JsonSerializer.TYPE_MAPPINGS, "greeting:ro.tuc.ds2020.kafkaconsumer.Greeting"); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "ro.tuc.ds2020.kafkaconsumer.Greeting");
ale chyba stále objevuje