Manage Apache Kafka Connect connectors with kcctl

kcctl is the newest tool on the block for managing Kafka Connect. Francesco tried it out to see how it fared in managing a service deck that includes Aiven for Apache Kafka Connect.

13 October 2021
Francesco Tisiot
Francesco Tisiot RSS Feed
Developer Advocate at Aiven

Apache Kafka is widely used as a company data backbone with Kafka Connect acting as a bridge. This way Kafka can be integrated with other technologies easily, reliably and scalably. The Kafka Connect REST APIs provide a way to manage connectors via web calls, but crafting URLs on a terminal can sometimes be tricky.

In this blog post we explore kcctl, a new open source command line tool for Kafka Connect. You'll find out how to integrate it with Apache Kafka and manage connections to other systems.

Aiven offers similar functionality with the Aiven Command Line Interface. You can use the CLI to create, drop, change, verify, pause and restore any Kafka Connect connector running on Aiven services. If all your Kafka Connect instances are Aiven services, then the Aiven command line interface is all you need.

But if you want to use the same tool for any Kafka Connect instance, be it on-premises, on Aiven, or in other cloud providers, then kcctl is your friend.

Create an Apache Kafka instance with Kafka Connect

To start following the process in this article, make sure you already have an Apache Kafka environment with Kafka Connect up and running. If you don't have one, don't worry: Aiven can provide one in minutes. Just create one in the Aiven Console, or in the Aiven Command Line Interface with the following command:

avn service create demo-kafka               \
    --service-type kafka                    \
    --cloud google-europe-west3             \
    --plan business-4                       \
    -c kafka_connect=true                   \
    -c kafka.auto_create_topics_enable=true                  

This command creates an Aiven for Apache Kafka cluster named demo-kafka with three nodes (using the business-4 plan), in the google-europe-west3 region and enables the automatic creation of topics and Kafka Connect. With Aiven, you can deploy Kafka Connect as part of your Kafka cluster for business and premium plans, or as a separate, standalone cluster. To read more about Aiven for Apache Kafka and related Kafka Connect topics, check out the dedicated page.

Let's wait until the service is ready:

avn service wait demo-kafka

Install kcctl

At the time of writing, kcctl is an early access release. The current set of installation instructions can be found in its GitHub repository.

Once kcctl is installed, test that it's working in the terminal by adding the bin subdirectory to PATH and executing:

kcctl

If we did everything correctly, then we should see the usage information. Now it's time to connect to our Kafka Connect cluster.

Connect

In order to plug in to Kafka Connect, first retrieve the cluster URL, which can be found using the via Aiven CLI and jq, to parse the JSON output:

avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'

Now create a kcctl configuration context by executing the following command, replacing the cluster parameter accordingly.

kcctl config set-context \
    --cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 \
    my_kafka_cluster

The above creates a context named my_kafka_cluster pointing to the demo-kafka instance. To verify the configuration:

kcctl info

This retrieves its definition of the current kcctl configuration context:

URL:               https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
Version:           2.7.2-SNAPSHOT
Commit:            d15ddddd3ef3f5ef
Kafka Cluster ID:  -DvILyiXQxSpnFSK9M1qgQ

Create a data source in PostgreSQL

To see the connectors in action, create a PostgreSQL database and configure a Kafka Connect JDBC source connector to bring the data into Kafka. The connector takes data from a table named pasta stored in a PostgreSQL database and includes it in a Kafka topic.

If you don't have a PostgreSQL database handy, you can create one at Aiven with the following Aiven CLI command:

avn service create demo-pg               \
    --service-type pg                    \
    --cloud google-europe-west3          \
    --plan hobbyist

Once the demo-pg PostgreSQL instance is up and running (use avn service wait demo-pg to wait for it), connect to it:

avn service cli demo-pg

Now create a sample pasta table and fill it with data, using the following statements in our terminal:

create table pasta (id serial, name varchar, cooking_minutes int);
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);

Create a new Kafka Connect connector

Once the source data is available, create the Kafka Connect JDBC source connector, sourcing in incremental mode the pasta table based on the id column. To get the required PostgreSQL connection details such as hostname, port, user and password, use this command:

avn service get demo-pg --format '{service_uri_params}'

Create a file named my_jdbc_connect_source.json with the following JSON content (substituting the <HOST>, <PORT> and <PASSWORD> with the actual information retrieved in the previous step):

{
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require",
    "connection.user": "avnadmin",
    "connection.password": "<PASSWORD>",
    "table.whitelist": "pasta",
    "mode": "incrementing",
    "incrementing.column.name":"id",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}

Now invoke the connector creation via kcctl in a new terminal window:

kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source

Verify that the connector was successfully created:

kcctl describe connector pg-incremental-source

The command output shows the pg-incremental-source connector in RUNNING state and all the details associated with it.

Check the data in Apache Kafka with kcctl

You can also check for a new Kafka topic called pg_source_pasta with the same data stored in PostgreSQL, via kcat. Start by first downloading the required certificates:

avn service user-creds-download demo-kafka \
    --username avnadmin                    \
    -d certs

Then create a kcat.config file containing the following entries:

bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem

And reading from the pg_source_pasta topic with the following kcat invocation:

kcat -F kcat.config -C -t pg_source_pasta

If we now insert some rows in the PostgreSQL pasta table, the same changes appear in Kafka via kcat:

Gif showing Postgresql insert and related rows flowing in Kafka via kcat

Managing Kafka Connect connectors with kcctl

Creating connectors is only part of the game with kcctl - You can also manage them! Need a list of all the connectors deployed? Just run the following command:

kcctl get connectors

Need to pause and resume connectors? The code below, for example, pauses the one named pg-incremental-source:

kcctl pause connector pg-incremental-source

What type of connectors can we create? Glad you asked! The full plugin list is available with:

kcctl get plugins

The command shows all the connector plugins available with the related type (source or sink) and version. With this command you'll be able to check the list of the managed Kafka Connect connector types you can create with Aiven for Apache Kafka.

TYPE     CLASS                                                                           VERSION
 source   com.couchbase.connect.kafka.CouchbaseSourceConnector                            4.0.6
 source   com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon   2.1.3
         nector
 source   com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector       2.1.3
 source   com.google.pubsub.kafka.source.CloudPubSubSourceConnector                       2.7.2-SNAPSHOT
 source   com.google.pubsublite.kafka.source.PubSubLiteSourceConnector                    2.7.2-SNAPSHOT

 ...
 sink     io.aiven.kafka.connect.gcs.GcsSinkConnector                                     0.9.0
 sink     io.aiven.kafka.connect.http.HttpSinkConnector                                   0.4.0
 sink     io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector                      2.12.0
 ...

Wrapping up

The ability to manage Kafka Connect connectors from the terminal is just a few commands away. kcctl makes it easy to inspect, deploy, update, pause and restore any connector to our environments. This unifies the end-user experience for Apache Kafka instances deployed on-premises, self-hosted or in Aiven.

Further reading

Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!

In the meantime, make sure you follow our changelog and blog RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.

kafka
orange decoration
yellow decoration

Start your free 30 day trial!

Build your platform, and throw in any data you want for 30 days, with no ifs, ands, or buts.

orange decoration
yellow decoration

Start your free 30 day trial!

Build your platform, and throw in any data you want for 30 days, with no ifs, ands, or buts.

Products

Aiven for Apache KafkaAiven for Apache Kafka ConnectAiven for Apache Kafka MirrorMaker 2Aiven for M3Aiven for M3 AggregatorAiven for Apache CassandraAiven for OpenSearchAiven for PostgreSQLAiven for MySQLAiven for RedisAiven for InfluxDBAiven for Grafana

Let‘s connect

Aiven for Apache Kafka, Aiven for Apache Kafka Connect, Aiven for Apache Kafka MirrorMaker 2, Aiven for M3, Aiven for M3 Aggregator, Aiven for Apache Cassandra, Aiven for OpenSearch, Aiven for PostgreSQL, Aiven for MySQL, Aiven for Redis, Aiven for InfluxDB, Aiven for Grafana are trademarks and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.