Develop Data Transforms
Learn how to initialize a data transforms project and write transform functions in your chosen language.
Prerequisites
You must have the following development tools installed on your host machine:
-
The
rpk
command-line client installed on your host machine and configured to connect to your Redpanda cluster. -
For Golang projects, you must have at least version 1.20 of Go.
-
For Rust projects, you must have the latest stable version of Rust.
-
For JavaScript and TypeScript projects, you must have the latest long-term-support release of Node.js.
Initialize a data transforms project
To initialize a data transforms project, use the following command to set up the project files in your current directory. This command adds the latest version of the SDK as a project dependency:
rpk transform init --language=<language> --name=<name>
If you do not include the --language
flag, the command will prompt you for the language. Supported languages include:
-
tinygo-no-goroutines
(does not include Goroutines) -
tinygo-with-goroutines
-
rust
-
javascript
-
typescript
For example, if you choose tinygo-no-goroutines
, the following project files are created:
. ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml
The transform.go
file contains a boilerplate transform function.
The transform.yaml
file specifies the configuration settings for the transform function.
See also:
Build transform functions
You can develop your transform logic with one of the available SDKs that allow your transform code to interact with a Redpanda cluster.
-
Go
-
Rust
-
JavaScript
All transform functions must register a callback with the OnRecordWritten()
method.
You should run any initialization steps in the main()
function because it’s only run once when the transform function is first deployed. You can also use the standard predefined init()
function.
package main
import (
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
func main() {
// Register your transform function.
// This is a good place to perform other setup too.
transform.OnRecordWritten(myTransform)
}
// myTransform is where you read the record that was written, and then you can
// output new records that will be written to the destination topic
func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
return writer.Write(event.Record())
}
All transform functions must register a callback with the on_record_written()
method.
You should run any initialization steps in the main()
function because it’s only run once when the transform function is first deployed.
use redpanda_transform_sdk::*;
fn main() {
// Register your transform function.
// This is a good place to perform other setup too.
on_record_written(my_transform);
}
// my_transform is where you read the record that was written, and then you can
// return new records that will be written to the output topic
fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
writer.write(event.record)?;
Ok(())
}
All transform functions must register a callback with the onRecordWritten()
method.
You should run any initialization steps outside of the callback so that they are only run once when the transform function is first deployed.
// src/index.js
import { onRecordWritten } from "@redpanda-data/transform-sdk";
// This is a good place to perform setup steps.
// Register your transform function.
onRecordWritten((event, writer) => {
// This is where you read the record that was written, and then you can
// output new records that will be written to the destination topic
writer.write(event.record);
});
If you need to use Node.js standard modules in your transform function, you must configure the polyfillNode
plugin for esbuild. This plugin allows you to polyfill Node.js APIs that are not natively available in the Redpanda JavaScript runtime environment.
esbuild.js
import * as esbuild from 'esbuild';
import { polyfillNode } from 'esbuild-plugin-polyfill-node';
await esbuild.build({
plugins: [
polyfillNode({
globals: {
buffer: true, // Allow a global Buffer variable if referenced.
process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that.
},
polyfills: {
= crypto: true, // Enable crypto polyfill
// Add other polyfills as needed
},
}),
],
});
Error handling
By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively.
Redpanda tracks the offsets of records that have been processed by transform functions. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function will retry processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved.
Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely.
-
Go
-
Rust
-
JavaScript
package main
import (
"log"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
func main() {
transform.OnRecordWritten(myTransform)
}
func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
record := event.Record()
if record.Key == nil {
// Handle the error internally by logging it
log.Println("Error: Record key is nil")
// Skip this record and continue to process other records
return nil
}
// Allow errors with writes to escape
return writer.Write(record)
}
use redpanda_transform_sdk::*;
use log::error;
fn main() {
// Set up logging
env_logger::init();
on_record_written(my_transform);
}
fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> anyhow::Result<()> {
let record = event.record;
if record.key().is_none() {
// Handle the error internally by logging it
error!("Error: Record key is nil");
// Skip this record and continue to process other records
return Ok(());
}
// Allow errors with writes to escape
return writer.write(record)
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";
// Register your transform function.
onRecordWritten((event, writer) => {
const record = event.record;
if (!record.key) {
// Handle the error internally by logging it
console.error("Error: Record key is nil");
// Skip this record and continue to process other records
return;
}
// Allow errors with writes to escape
writer.write(record);
});
When you deploy this transform function, and produce a message without a key, you’ll get the following in the logs:
{
"body": {
"stringValue": "2024/06/20 08:17:33 Error: Record key is nil\n"
},
"timeUnixNano": 1718871455235337000,
"severityNumber": 13,
"attributes": [
{
"key": "transform_name",
"value": {
"stringValue": "test"
}
},
{
"key": "node",
"value": {
"intValue": 0
}
}
]
}
You can view logs for transform functions using the rpk transform logs <transform-function-name>
command.
To ensure that you are notified of any errors or issues in your data transforms, Redpanda provides metrics that you can use to monitor the state of your data transforms.
See also:
Avoid state management
Relying on in-memory state across transform invocations can lead to inconsistencies and unpredictable behavior. Data transforms operate with at-least-once semantics, meaning a transform function might be executed more than once for a given record. Redpanda may also restart a transform function at any point, which causes its state to be lost.
Access environment variables
You can access both built-in and custom environment variables in your transform function. In this example, environment variables are checked once during initialization:
-
Go
-
Rust
-
JavaScript
package main
import (
"fmt"
"os"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
func main() {
// Check environment variables before registering the transform function.
outputTopic1, ok := os.LookupEnv("REDPANDA_OUTPUT_TOPIC_1")
if ok {
fmt.Printf("Output topic 1: %s\n", outputTopic1)
} else {
fmt.Println("Only one output topic is set")
}
// Register your transform function.
transform.OnRecordWritten(myTransform)
}
func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
return writer.Write(event.Record())
}
use redpanda_transform_sdk::*;
use std::env;
use log::error;
fn main() {
// Set up logging
env_logger::init();
// Check environment variables before registering the transform function.
match env::var("REDPANDA_OUTPUT_TOPIC_1") {
Ok(output_topic_1) => println!("Output topic 1: {}", output_topic_1),
Err(_) => println!("Only one output topic is set"),
}
// Register your transform function.
on_record_written(my_transform);
}
fn my_transform(_event: WriteEvent, _writer: &mut RecordWriter) -> anyhow::Result<()> {
Ok(())
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";
// Check environment variables before registering the transform function.
const outputTopic1 = process.env.REDPANDA_OUTPUT_TOPIC_1;
if (outputTopic1) {
console.log(`Output topic 1: ${outputTopic1}`);
} else {
console.log("Only one output topic is set");
}
// Register your transform function.
onRecordWritten((event, writer) => {
return writer.write(event.record);
});
Write to specific output topics
You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.
-
Go
-
Rust
-
JavaScript
import (
"encoding/json"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
func main() {
transform.OnRecordWritten(filterValidJson)
}
func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
if json.Valid(event.Record().Value) {
return w.Write(e.Record())
}
// Send invalid records to separate topic
return writer.Write(e.Record(), transform.ToTopic("invalid-json"))
}
use anyhow::Result;
use redpanda_transform_sdk::*;
fn main() {
on_record_written(filter_valid_json);
}
fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
let value = event.record.value().unwrap_or_default();
if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
writer.write(event.record)?;
} else {
// Send invalid records to separate topic
writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?;
}
Ok(())
}
The JavaScript SDK does not support writing records to a specific output topic.