Set up Kafka Connect Elasticsearch Connector
Kafka Connect Elasticsearch Connector vs logstash
TBD.
常见问题
使用 Kafka Connect Elasticsearch Connector v15.0.0 遇到问题和处理方法。
处理 JSON object 类型的字段值时会重复嵌套
如
json
{
"dept": "r&d",
"person": {
"eye": "black",
"name": "john"
}
}会变成
json
{
"@timestamp": "2025-05-24T15:00:03.000Z",
"dept": "r&d",
"person": {
"@timestamp": "2025-05-24T15:00:03.000Z",
"dept": "r&d",
"person": {
"eye": "black",
"name": "john"
},
"eye": "black",
"name": "john"
}
}解决:修改 connect 配置
properties
transforms=unwrap
transforms.unwrap.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.unwrap.field=person
key.ignore=true
compact.map.entries=true
schema.ignore=true
...支持消费 无 schema 的记录
修改 connect 配置
properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
...复用 ElasticSearch 生成/filebeat 使用的 SSL 相关证书报错
[2025-05-22 01:30:20,290] ERROR Failed to create client to verify connection. (io.confluent.connect.elasticsearch.Validator:120) org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Get Key failed: Given final block not properly padded. Such issues can arise if a bad key is used during decryption.
ElasticSearch 默认初始化生成的证书为 PKCS12 格式 (http.p12) ,而 Kafka Connect Elasticsearch Connector 要求使用 JKS 格式。需要重新生成并转换。 具体见 ElasticSearch SSL 。
References
- https://docs.confluent.io/kafka-connectors/elasticsearch/current/security.html
- https://docs.confluent.io/platform/current/connect/userguide.html
- https://github.com/confluentinc/kafka-connect-elasticsearch
- https://www.elastic.co/search-labs/blog/elasticsearch-apache-kafka-ingest-data
- https://kafka.apache.org/documentation.html#connect
