Single Message Transforms
Single Message Transforms (SMTs) help you modify data and its characteristics as it passes through a connector, without needing additional stream processors.
Prior to using an SMT with production data, test the configuration on a smaller subset of data to verify the behavior of the SMT.
Cast
Cast SMT lets you change the data type of fields in a Redpanda message, updating the schema if one is present.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key
) or value (org.apache.kafka.connect.transforms.Cast$Value
).
EventRouter (Debezium)
The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service’s internal state (as typically persisted in its database) and state in events consumed by services that need the same data.
To implement the outbox pattern in a Debezium application, configure a Debezium connector to:
-
Capture changes in an outbox table
-
Apply the Debezium outbox EventRouter Single Message Transformation
EventRouter SMT is available for managed Debezium connectors only. |
Configuration
Property key | Description |
---|---|
|
Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. |
|
Specifies the name of the topic to which the connector emits outbox messages. The default topic name is |
|
Specifies whether the JSON expansion of a String payload should be done. If no content is found, or if there’s a parsing error, the content is kept "as is". |
|
Specifies one or more outbox table columns to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. |
|
Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining the correct order in Kafka partitions. |
Example
Sample JSON configuration:
"transforms": "outbox", "transforms.outbox.route.by.field": "type", "transforms.outbox.route.topic.replacement": "my-topic.${routedByValue}", "transforms.outbox.table.expand.json.payload": "true", "transforms.outbox.table.field.event.key": "aggregate_id", "transforms.outbox.table.fields.additional.placement": "before:envelope", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
ExtractField
ExtractField SMT pulls the specified field from a Struct when a schema is present, or a Map for schemaless data. Any null values are passed through unmodified.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key
) or value (org.apache.kafka.connect.transforms.ExtractField$Value
).
Filter
Filter SMT drops all records, filtering them from subsequent transformations in the chain. This is intended to be used conditionally to filter out records matching (or not matching) a particular predicate.
Example
Sample configuration:
"transforms": "Filter", "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter", "transforms.Filter.predicate": "IsMyTopic", "predicates": "IsMyTopic", "predicates.IsMyTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IsMyTopic.pattern": "my-topic"
Predicates
Managed connectors support the following predicates:
TopicNameMatches
org.apache.kafka.connect.transforms.predicates.TopicNameMatches
- A predicate that is true for records with a topic name that matches the configured regular expression.
Property key | Description |
---|---|
|
A Java regular expression for matching against the name of a record’s topic. |
Flatten
Flatten SMT flattens a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when a schema is present, or a Map for schemaless data. Array fields and their contents are not modified. The default delimiter is .
.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key
) or value (org.apache.kafka.connect.transforms.Flatten$Value
).
Configuration
Property key | Description |
---|---|
|
Delimiter to insert between field names from the input record when generating field names for the output record. |
Example
"transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "."
Before:
{
"user": {
"id": 10,
"name": {
"first": "Red",
"last": "Panda"
}
}
}
After:
{
"user.id": 10,
"user.name.first": "Red",
"user.name.last": "Panda"
}
HeaderFrom
HeaderFrom SMT moves or copies fields in the key or value of a record into that record’s headers. Corresponding elements of fields
and headers
together identify a field and the header it should be moved or copied to.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HeaderFrom$Key
) or value (org.apache.kafka.connect.transforms.HeaderFrom$Value
).
Configuration
Property key | Description |
---|---|
|
Comma-separated list of field names in the record whose values are to be copied or moved to headers. |
|
Comma-separated list of header names, in the same order as the field names listed in the fields configuration property. |
|
Either |
Example
"transforms": "HeaderFrom", "transforms.HeaderFrom.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.HeaderFrom.fields": "id,last_login_ts", "transforms.HeaderFrom.headers": "user_id,timestamp", "transforms.HeaderFrom.operation": "move"
Before:
-
Record value:
{ "id": 11, "name": "Harry Wilson", "last_login_ts": 1715242380 }
-
Record header:
{ "conv_id": "uier923" }
After:
-
Record value:
{ "name": "Harry Wilson" }
-
Record header:
{ "conv_id": "uier923", "user_id": 11, "timestamp": 1715242380 }
HoistField
HoistField SMT wraps data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key
) or value (org.apache.kafka.connect.transforms.HoistField$Value
).
InsertField
InsertField SMT inserts field(s) using attributes from the record metadata or a configured static value.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key
) or value (org.apache.kafka.connect.transforms.InsertField$Value
).
Configuration
Property key | Description |
---|---|
|
Field name for Redpanda offset. |
|
Field name for Redpanda partition. |
|
Field name for static data field. |
|
The static field value. |
|
Field name for record timestamp. |
|
Field name for Redpanda topic. |
Example
Sample configuration:
"transforms": "InsertField", "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertField.static.field": "cluster_id", "transforms.InsertField.static.value": "19423"
Before:
{"product_id":9987,"price":1234}
After:
{"price":1234,"cluster_id":"19423","product_id":9987}
MaskField
MaskField SMT replaces the contents of fields in a record.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key
) or value (org.apache.kafka.connect.transforms.MaskField$Value
).
Configuration
Property key | Description |
---|---|
|
Comma-separated list of fields to mask. |
|
Custom value replacement used to mask field values. |
Example
"transforms": "MaskField", "transforms.MaskField.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.MaskField.fields": "metadata", "transforms.MaskField.replacement": "***"
Before:
{"product_id":9987,"price":1234,"metadata":"test"}
After:
{"metadata":"***","price":1234,"product_id":9987}
RegexRouter
RegexRouter SMT updates the record topic using the configured regular expression and replacement string. Under the hood, the regex is compiled to a java.util.regex.Pattern
. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst()
is used with the replacement string to obtain the new topic.
Configuration
Property key | Description |
---|---|
|
Regular expression to use for matching. |
|
Replacement string. |
Example
This configuration snippet shows how to add the prefix prefix_
to the beginning of a topic.
"transforms": "AppendPrefix", "transforms.AppendPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.AppendPrefix.regex": ".*", "transforms.AppendPrefix.replacement": "prefix_$0"
Before: topic-name
After: prefix_topic-name
ReplaceField
ReplaceField SMT filters or renames fields in a Redpanda record.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key
) or value (org.apache.kafka.connect.transforms.ReplaceField$Value
).
Configuration
Property key | Description |
---|---|
|
Fields to exclude. This takes precedence over the fields to include. |
|
Fields to include. If specified, only these fields are used. |
|
List of comma-separated pairs. For example: |
Example
Sample configuration:
"transforms": "ReplaceField", "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.ReplaceField.renames": "product_id:item_number"
Before:
{"product_id":9987,"price":1234}
After:
{"item_number":9987,"price":1234}
ReplaceTimestamp (Redpanda)
ReplaceTimestamp (Redpanda) SMT is designed to support using a record key/value field as a record timestamp, which then can be used to partition data with an S3 connector.
Use the concrete transformation type designed for the record key (com.redpanda.connectors.transforms.ReplaceTimestamp$Key
) or value (com.redpanda.connectors.transforms.ReplaceTimestamp$Value
).
ReplaceTimestamp is available for Sink connector only. |
Configuration
Property key | Description |
---|---|
|
Specifies the name of a field to be used as a source of timestamp. |
Example
To use my-timestamp
field as a source of the timestamp for the record, update a connector config with:
"transforms": "ReplaceTimestamp", "transforms.ReplaceTimestamp.type": "com.redpanda.connectors.transforms.ReplaceTimestamp$Value", "transforms.ReplaceTimestamp.field": "my-timestamp"
for messages in a format:
{ "name": "my-name", ... "my-timestamp": 1707928150868, ... }
The SMT needs structured data to be able to extract the field from it, which means either a Map in the case of schemaless data, or a Struct when a schema is present. The timestamp value should be of a numeric type (epoch millis), or a Java Date object (which is the case when using "connect.name":"org.apache.kafka.connect.data.Timestamp"
in schema).
SchemaRegistryReplicator (Redpanda)
SchemaRegistryReplicator (Redpanda) SMT is a transform to replicate schemas.
SchemaRegistryReplicator SMT is designed to be used with the MirrorMaker2 connector only.
To use it, remove the _schema topic from the topic exclude list.
|
SetSchemaMetadata
SetSchemaMetadata SMT sets the schema name, version, or both on the record’s key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
) schema.
TimestampConverter
TimestampConverter SMT converts timestamps between different formats, such as Unix epoch, strings, and Connect Date/Timestamp types. It applies to individual fields or to the entire value.
Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key
) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value
).
Configuration
Property key | Description |
---|---|
|
The field containing the timestamp, or empty if the entire value is a timestamp. Default: |
|
The desired timestamp representation: |
|
A |
|
The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. Used to generate the output when type=unix or used to parse the input if the input is a Long. Note: This SMT causes precision loss during conversions from, and to, values with sub-millisecond components. Default: |
Example
Sample configuration:
"transforms": "TimestampConverter", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.field": "last_login_date", "transforms.TimestampConverter.format": "yyyy-MM-dd", "transforms.TimestampConverter.target.type": "string"
Before: 1702041416
After: 2023-12-08
TimestampRouter
TimestampRouter SMT updates the record’s topic field as a function of the original topic value and the record timestamp. This is mainly useful for sink connectors, because the topic field is often used to determine the equivalent entity name in the destination system (for example, a database table or search index name).
TimestampRouter SMT should be used with sink connectors only. |
ValueToKey
Error handling
By default, Error tolerance
is set to NONE
, so SMTs fail for any exception (notably, data parsing or data processing errors). To avoid the connector crashing for data issues, set Error tolerance
to ALL
, and specify Dead Letter Queue Topic Name
as a place where failed messages are redirected.