Create a MySQL (Debezium) Source Connector
You can use a MySQL (Debezium) Source connector to import a stream of changes from MySQL, AmazonRDS, and Amazon Aurora.
Prerequisites
-
A MySQL database that is accessible from the connector instance.
-
A MySQL user exists. This database user for the Debezium connector must have LOCK TABLES privileges. For details, see MySQL Creating a user.
-
A binlog must be enabled for the source MySQL cluster.
Limitations
-
Only
JSON
,CloudEvents
orAVRO
formats can be used as a a Kafka message key and value format. -
The MySQL (Debezium) Source connector can work with only a single task at a time.
Create a MySQL (Debezium) Source connector
To create the MySQL (Debezium) Source connector:
-
In Redpanda Cloud, click Connectors in the navigation menu, and then click Create Connector.
-
Select Import from MySQL (Debezium).
-
On the Create Connector page, specify the following required connector configuration options:
Property name Property key Description Topic prefix
topic.prefix
A topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing changes. The topic prefix should be unique across all other connectors because it is used as a prefix for all Kafka topic names that receive events emitted by this connector. Only alphanumeric characters, hyphens, dots, and underscores are accepted.
Hostname
database.hostname
A resolvable hostname or IP address of the MySQL database server.
Port
database.port
Integer port number of the MySQL database server.
User
database.user
Name of the MySQL user to be used when connecting to the MySQL database.
Password
database.password
The password of the MySQL database user who will be connecting to the MySQL database.
SSL mode
database.ssl.mode
Specifies whether to use an encrypted connection to the MySQL server. Select
disable
to use an unencrypted connection. Select 'preferred' to use an encrypted connection if the server supports secure connections. If the server does not support secure connections, falls back to an unencrypted connection. Selectrequire
to use a secure, or encrypted connection. If a secure connection cannot be established whenrequired
is selected, then the connector fails.Kafka message key format
key.converter
Format of the key in the Redpanda topic.
Message key JSON contains schema
key.converter.schemas.enable
Enable to specify that the message key contains schema in the schema field.
Kafka message value format
value.converter
Format of the value in the Redpanda topic.
Message value JSON contains schema
value.converter.schemas.enable
Enable to specify that the message value contains schema in the schema field.
Connector name
name
Globally-unique name to use for this connector.
-
Click Next. Review the connector properties specified, then click Create.
Map data
Use Include databases
, Include tables
, and Include columns
to define data
mapping. Alternatively, use Exclude databases
, Exclude tables
, and Exclude columns
.
Following is an example table in db
database:
CREATE TABLE IF NOT EXISTS Persons
(
Id int PRIMARY KEY,
FirstName varchar(255),
LastName varchar(255)
);
The table has one record:
INSERT INTO Persons (FirstName, LastName) VALUES (1, 'Winnie', 'the Pooh');
The connector configuration for the table:
column.include.list = db\\.Persons\\.(Id|FirstName|LastName)
table.include.list = db\\.Persons
database.include.list = db
topic.prefix = frommysql
The connector configuration will create the Redpanda topic frommysql.db.Persons
.
For Kafka message value format
= JSON
(org.apache.kafka.connect.json.JsonConverter
), the connector produces JSON messages
with a schema like the following:
{
"payload": {
"schema": {
// schema definition
},
"payload": {
"before": null,
"after": {
"Id": 1,
"FirstName": "Winnie",
"LastName": "the Pooh"
},
...
}
},
"encoding": "json",
"schemaId": 0
}
For Kafka message value format
= AVRO
(io.confluent.connect.avro.AvroConverter
), the connector creates a Schema Registry
frommysql.db.Persons-value
record and produces messages like the following:
{
"payload": {
"before": null,
"after": {
"mysql.db.Persons.Value": {
"Id": 1,
"FirstName": {
"string": "Winnie"
},
"LastName": {
"string": "the Pooh"
}
}
},
...
},
"encoding": "avro",
"schemaId": 2
}
For Kafka message value format
= CloudEvents
(io.debezium.converters.CloudEventsConverter
), the connector uses JSON
or AVRO
data serializer.
-
For
JSON
data serializer, enableMessage value CloudEvents JSON contains schema
to include JSON schema in message -
For
AVRO
data serializer, connector creates schema in Schema Registry and produces messages in CloudEvents data format.
Test the connection
After the connector is created:
-
Check the connector status and confirm that there are no errors in logs and in Redpanda Console.
-
Review the Redpanda topic to confirm that it contains the expected data.
Troubleshoot
If the connector configuration is invalid, an error appears upon clicking Finish. If the connector fails, check the error message or select Show Logs to view error details.
-
Topics not created by the connector
Create the topic manually or let the connector create it by setting (use desired number of partitions and replication factor):
Topic creation enabled: true Topic creation partitions: 1 Topic creation replication factor: -1
Or in JSON:
"topic.creation.enable": true, "topic.creation.default.partitions": "1", "topic.creation.default.replication.factor": "-1"
json -
Connector requires binlog file 'mysql-bin-changelog.257116', but MySQL only has mysql-bin-changelog.257123
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted" Connector requires binlog file 'mysql-bin-changelog.257116', but MySQL only has mysql-bin-changelog.257123, mysql-bin-changelog.257124, mysql-bin-changelog.257125
The connector needs a binlog file that was already purged. Change the
Snapshot mode
property from the default towhen_needed
.
Additional errors and corrective actions follow.
Message | Action |
---|---|
Unable to connect: Public Key Retrieval is not allowed |
Set |
Unable to connect: Communications link failure |
Confirm that |
Access denied for user |
Confirm that |
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Invalid schema Invalid namespace: from-mysql.db.Persons; error code: 422 |
The Schema Registry namespace is incorrect. Consider changing the |