Skip to content

Set up Kafka Connect Elasticsearch Connector

Kafka Connect Elasticsearch Connector vs logstash

TBD.

常见问题

使用 Kafka Connect Elasticsearch Connector v15.0.0 遇到问题和处理方法。

处理 JSON object 类型的字段值时会重复嵌套

json
{
  "dept": "r&d",
  "person": {
    "eye": "black",
    "name": "john"
  }
}

会变成

json
{
  "@timestamp": "2025-05-24T15:00:03.000Z",
  "dept": "r&d",
  "person": {
    "@timestamp": "2025-05-24T15:00:03.000Z",
    "dept": "r&d",
    "person": {
      "eye": "black",
      "name": "john"
    },
    "eye": "black",
    "name": "john"
  }
}

解决:修改 connect 配置

properties
transforms=unwrap
transforms.unwrap.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.unwrap.field=person

key.ignore=true
compact.map.entries=true
schema.ignore=true

...

支持消费 无 schema 的记录

修改 connect 配置

properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

...

复用 ElasticSearch 生成/filebeat 使用的 SSL 相关证书报错

[2025-05-22 01:30:20,290] ERROR Failed to create client to verify connection. (io.confluent.connect.elasticsearch.Validator:120) org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Get Key failed: Given final block not properly padded. Such issues can arise if a bad key is used during decryption.

ElasticSearch 默认初始化生成的证书为 PKCS12 格式 (http.p12) ,而 Kafka Connect Elasticsearch Connector 要求使用 JKS 格式。需要重新生成并转换。 具体见 ElasticSearch SSL

References

Released under the CC-BY-NC-4.0