Skip to content

Latest commit



141 lines (121 loc) · 6.68 KB

File metadata and controls

141 lines (121 loc) · 6.68 KB

Twelve Days of Single Message Transforms - Day 6 - InsertField (message details, etc)

🎥 Recording



  1. Clone the repository

    git clone
    cd demo-scene/kafka-connect-single-message-transforms
  2. Bring the stack up

    docker-compose up -d
  3. Wait for Kafka Connect to start up

    bash -c ' \
    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"

Adding fields to messages at ingest in the Kafka Connect source connector

When ingesting data from a source it can be useful to add fields to store information such as the server from which it was read.

Here’s an example source connector, which adds

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day6-00/config \
    -d '{
        "connector.class"                           : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day6-transactions.with"              : "#{Internet.uuid}",
        "genv.day6-transactions.cost.with"          : "#{Commerce.price}",
        "genv.day6-transactions.card_type.with"     : "#{Business.creditCardType}",
        "genv.day6-transactions.item.with"          : "#{}",
        ""       : 5000,
        "transforms"                                : "insertStaticField1,insertStaticField2",
        "transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField1.static.field": "sourceSystem",
        "transforms.insertStaticField1.static.value": "NeverGonna",
        "transforms.insertStaticField2.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField2.static.field": "ingestAgent",
        "transforms.insertStaticField2.static.value": "GiveYouUp"

The resulting message that’s written to Kafka includes the data from the source system (a data generator, in this case, writing fields cost, card_type, and item), plus the static fields configured (sourceSystem, ingestAgent):

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day6-transactions -C -c1 -o end -u -q -J | jq  '.payload'
  "cost": {
    "string": "25.79"
  "card_type": {
    "string": "visa"
  "item": {
    "string": "Westmalle Trappist Tripel"
  "sourceSystem": {
    "string": "NeverGonna"
  "ingestAgent": {
    "string": "GiveYouUp"

Adding details about the Kafka message at egress with a Kafka Connect sink connector

It can often be useful to add information about the Kafka message (topic, partition, offset) when the data is sent to a target system. Here’s an example using the above topic, with another static field added in for good measure. You can also add the timestamp of the Kafka message, as shown previously.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day6-00/config \
    -d '{
          "connector.class"                           : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                            : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                           : "mysqluser",
          "connection.password"                       : "mysqlpw",
          "topics"                                    : "day6-transactions",
          "tasks.max"                                 : "4",
          "auto.create"                               : "true",
          "auto.evolve"                               : "true",
          "transforms"                                : "insertPartition,insertOffset,insertTopic",
          "transforms.insertPartition.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertPartition.partition.field": "kafkaPartition",
          "transforms.insertOffset.type"              : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertOffset.offset.field"      : "kafkaOffset",
          "transforms.insertTopic.type"               : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTopic.topic.field"        : "kafkaTopic"

Here’s the data as it appears in the target system:

mysql> SELECT * FROM `day6-transactions` LIMIT 5;
| cost  | card_type | item                | sourceSystem | ingestAgent | kafkaPartition | kafkaOffset | kafkaTopic        |
| 11.78 | visa      | Schneider Aventinus | NeverGonna   | GiveYouUp   |              0 |           0 | day6-transactions |
| 17.65 | discover  | Two Hearted Ale     | NeverGonna   | GiveYouUp   |              0 |           1 | day6-transactions |
| 11.63 | jcb       | Stone IPA           | NeverGonna   | GiveYouUp   |              0 |           2 | day6-transactions |
| 67.51 | switch    | Shakespeare Oatmeal | NeverGonna   | GiveYouUp   |              0 |           3 | day6-transactions |
| 22.25 | discover  | Hop Rod Rye         | NeverGonna   | GiveYouUp   |              0 |           4 | day6-transactions |
5 rows in set (0.00 sec)