Run Data Transforms in Linux
Data transforms is generally available for all Redpanda Community and Redpanda Enterprise Edition users. To unlock this feature in Redpanda Cloud, contact Redpanda support. |
Data transforms let you run common data streaming tasks, like filtering, scrubbing, and transcoding, within Redpanda. For example, you may have consumers that require you to redact credit card numbers or convert JSON to Avro. Data transforms can also interact with the Redpanda Schema Registry to work with encoded data types.
Data transforms use a WebAssembly (Wasm) engine inside a Redpanda broker. A Wasm function acts on a single record in an input topic. You can develop and manage data transforms with rpk transform
commands.
You should build and deploy transforms from a separate, non-production machine (host machine). Using a separate host machine avoids potential resource conflicts and stability issues on the nodes that run your brokers.
See also: How Data Transforms Work and Limitations
Prerequisites
You must have the following:
-
A Redpanda cluster running version 24.1.
-
External access to the Kafka API and the Admin API.
-
Development tools installed on your host machine:
-
Install
rpk
on your host machine and configure it to connect to your Redpanda cluster.
Limitations
-
Transforms have no external access to disk or network resources.
-
Only single record transforms is supported, but multiple output records from a single input record is supported. For aggregations, joins, or complex transformations, use Apache Flink.
-
Up to 8 output topics are supported.
-
Transforms have at-least-once delivery.
-
When clients use the Kafka Transactions API on partitions of an input topic, transforms process only committed records.
-
Because data transforms are powered by Wasm, transform functions can be authored in any language. However, a data transforms SDK currently is only available in Golang and Rust.
Enable data transforms
-
To enable data transforms, set the
data_transforms_enabled
cluster property totrue
:rpk cluster config set data_transforms_enabled true
bash -
Restart all brokers:
rpk redpanda stop rpk redpanda start
bash
Configure memory for data transforms
Redpanda reserves memory for each transform function within the broker. You need enough memory for your input record and output record to be in memory at the same time.
Set the following properties based on the number of functions you have and the amount of memory you anticipate needing.
-
wasm_per_core_memory_reservation
: Total amount of memory (in bytes) to reserve per shard for all Wasm VMs. Default = 20 MiB. Requires restart. -
wasm_per_function_memory_limit
: Amount of memory (in bytes) to reserve per instance of a Wasm VM. Default = 2 MiB. Requires restart.
For example, to set wasm_per_core_memory_reservation
to 40 MiB:
rpk cluster config set wasm_per_core_memory_reservation=41943040
The maximum number of functions that can be deployed to a cluster is equal to wasm_per_core_memory_reservation
/ wasm_per_function_memory_limit
. When that limit is hit, Redpanda cannot allocate memory for the VM and the transforms stay in error states.
See also: How Data Transforms Work
Create a data transforms project
-
Go
-
Rust
-
Create and initialize a data transforms project:
rpk transform init --language=tinygo
bashIf you do not include the
--language
flag, thetransform init
command will prompt you for the language.A successful command generates project files in your current directory:
. ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml
The
transform.go
file contains the transform logic, and thetransform.yaml
file specifies the transform’s configuration.When creating a custom data transform, initialization steps can be done either in main
(because it’s only run once at the start of the package) or in Go’s standard predefinedinit()
function. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost. -
Implement your project by adding transform logic.
The following examples show some basic transforms. Each example can be copied into the
transform.go
file.-
Identity transform
-
Transcoder transform
-
Validation filter transform
package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows the basic usage of the package: // This transform does nothing but copy the same data from an // input topic to an output topic. func main() { // Make sure to register your callback and perform other setup in main transform.OnRecordWritten(identityTransform) } // This will be called for each record in the source topic. // // The output records returned will be written to the destination topic. func identityTransform(e transform.WriteEvent, w transform.RecordWriter) error { return w.Write(e.Record()) }
gopackage main import ( "bytes" "encoding/csv" "encoding/json" "errors" "io" "strconv" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows a transform that converts CSV inputs into JSON outputs. func main() { transform.OnRecordWritten(csvToJsonTransform) } type Foo struct { A string `json:"a"` B int `json:"b"` } func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error { // The input data is a CSV (without a header row) that is the structure of: // key, a, b reader := csv.NewReader(bytes.NewReader(e.Record().Value)) // Improve performance by reusing the result slice. reader.ReuseRecord = true for { row, err := reader.Read() if err == io.EOF { break } else if err != nil { return err } if len(row) != 3 { return errors.New("unexpected number of rows") } // Convert the last column into an int b, err := strconv.Atoi(row[2]) if err != nil { return err } // Marshal our JSON value f := Foo{ A: row[1], B: b, } v, err := json.Marshal(&f) if err != nil { return err } // Add our output record using the first column as the key. r := transform.Record{ Key: []byte(row[0]), Value: v, } if err := w.Write(r); err != nil { return err } } return nil }
goimport ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows a filter that outputs only valid JSON into the // output topic. func main() { transform.OnRecordWritten(filterValidJson) } func filterValidJson(e transform.WriteEvent, w transform.RecordWriter) error { if json.Valid(e.Record().Value) { return w.Write(e.Record()) } // Send invalid records to separate topic return w.Write(e.Record(), transform.ToTopic("invalid-json")) }
go -
-
Create and initialize a data transforms project:
rpk transform init --language=rust
bashIf you do not include the
--language
flag, thetransform init
command will prompt you for the language.A successful command generates project files in your current directory:
. ├── Cargo.lock ├── Cargo.toml ├── README.md ├── src │ └── main.rs └── transform.yaml
The
src/main.rs
file contains the transform logic, and thetransform.yaml
file specifies the transform’s configuration.When creating a custom data transform, initialization steps can be done in main()
because it’s only run once at the start of the package. Although state can be cached in global variables, Redpanda may restart a Wasm module at any point, which causes the state to be lost.The following examples show some basic transforms. Each example can be copied into the
main.rs
file.-
Identity transform
-
Transcoder transform
-
Validation filter transform
use anyhow::Result; use redpanda_transform_sdk::*; // This example shows the basic usage of the crate: // This transform does nothing but copy the same data from an // input topic to an output topic. fn main() { // Make sure to register your callback and perform other setup in main on_record_written(my_transform); } // This will be called for each record in the source topic. // // The output records returned will be written to the destination topic. fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { writer.write(event.record)?; Ok(()) }
rustuse anyhow::Result; use redpanda_transform_sdk::*; use serde::{Deserialize, Serialize}; // This example shows a transform that converts CSV inputs into JSON outputs. fn main() { on_record_written(my_transform); } #[derive(Serialize, Deserialize)] struct Foo { a: String, b: i32, } fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { // The input data is a CSV (without a header row) that is defined as our Foo structure. let mut reader = csv::Reader::from_reader(event.record.value().unwrap_or_default()); // For each record in our CSV for result in reader.deserialize() { let foo: Foo = result?; // Convert it to JSON let value = serde_json::to_vec(&foo)?; // Then output it with the same key. writer.write(BorrowedRecord::new(event.record.key(), Some(&value)))?; } Ok(()) }
rustuse anyhow::Result; use redpanda_transform_sdk::*; // This example shows a filter that outputs only valid JSON to the output topic. fn main() { on_record_written(filter_valid_json); } fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { let value = event.record.value().unwrap_or_default(); if serde_json::from_slice::<serde_json::Value>(value).is_ok() { writer.write(event.record)?; } else { // Send invalid records to separate topic write.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?; } Ok(()) }
rust -
Build and deploy the transform
-
Build the transform into a Wasm module with metadata:
rpk transform build
bash -
Create demo topics to apply the transform function to:
rpk topic create demo-1 demo-2
bash -
Deploy the Wasm module to your cluster. For example, with the identity transform:
rpk transform deploy --input-topic=demo-1 --output-topic=demo-2
bash -
Validate that your transform is running. For example:
-
Produce a few records to the
demo-1
topic.echo "foo\nbar" | rpk topic produce demo-1
bash -
Consume from the
demo-2
topic.rpk topic consume demo-2
bash{ "topic": "demo-2", "value": "foo", "timestamp": 1687545891433, "partition": 0, "offset": 0 } { "topic": "demo-2", "value": "bar", "timestamp": 1687545892434, "partition": 0, "offset": 1 }
json
-
View data transform logs
Runtime logs for transforms are written to an internal topic called _redpanda.transform_logs
.
To view the logs, run the following rpk
command:
rpk transform logs <transform_name>
Or, in the Redpanda Console, go to Topics, and make sure to check Show internal topics to see _redpanda.transform_logs
in the topic list.
For usage details, see the rpk transform logs
reference.
Monitor data transforms
You can monitor your transforms with the following metrics:
-
transform_execution_latency_sec
-
transform_execution_errors
-
wasm_engine_cpu_seconds_total
-
wasm_engine_memory_usage
-
wasm_engine_max_memory
-
wasm_binary_executable_memory_usage
-
transform_read_bytes
-
transform_write_bytes
-
transform_lag
-
transform_failures
-
transform_state
See Public Metrics