Engineering

Real-Time Data Processing With Actian Zen and Kafka Connectors

Johnson Varughese

July 17, 2024

data processing with actian zen and apache kafka

Welcome back to the world of Actian Zen, a versatile and powerful edge data management solution designed to help you build low-latency embedded apps. In part 1 , we explored how to leverage BtrievePython to run Btrieve2 Python applications, using the Zen 16.0 Enterprise/Server Database Engine. 

This is Part 2 of the quickstart blog series that focuses on helping embedded app developers get started with Actian Zen. In this blog post, we’ll walk through setting up a Kafka demo using Actian Zen, demonstrating how to manage and process real-time financial transactions seamlessly. This includes configuring environment variables, using an orchestration script, generating mock transaction data, leveraging Docker for streamlined deployment, and utilizing Docker Compose for orchestration.

Introduction to Actian Zen Kafka Connectors

In the dynamic world of finance, processing and managing real-time transactions efficiently is a must-have. Actian Zen’s Kafka Connectors offer a robust solution for streaming transaction data between financial systems and Kafka topics. The Actian Zen Kafka Connectors facilitate seamless integration between Actian Zen databases and Apache Kafka. These connectors support both source and sink operations, allowing you to stream data out of a Zen Btrieve database into Kafka topics or vice versa.

Source Connector

The Zen Source connector streams JSON data from a Zen Btrieve database into a Kafka topic. It employs change capture polling to pick up new data at user-defined intervals, ensuring that your Kafka topics are always updated with the latest information from your Zen databases.

Sink Connector

The Zen Sink connector streams JSON data from a Kafka topic into a Zen Btrieve database. You can choose to stream data into an existing database or create a new one when starting the connector.

Setting Up Environment Variables

Before diving into the configuration, it’s essential to set up the necessary environment variables. These variables ensure that your system paths and library paths are correctly configured, and that you accept the Zen End User License Agreement (EULA).

Here’s an example of the environment variables you need to set:

export PATH="/usr/local/actianzen/bin:/usr/local/actianzen/lib64:$PATH"
export LD_LIBRARY_PATH="/usr/local/actianzen/lib64:/usr/lib64:/usr/lib"
export CLASSPATH="/usr/local/actianzen/lib64"
export CONNECT_PLUGIN_PATH='/usr/share/java'
export ZEN_ACCEPT_EULA="YES"

Configuring the Kafka Connectors

The configuration parameters for the Kafka connectors are provided as key-value pairs. These configurations can be set via a properties file, the Kafka REST API, or programmatically. Here’s an example JSON configuration for a source connector:

{
    "name": "financial-transactions-source",
    "config": {
        "connector.class": "com.actian.zen.Kafka.connect.source.BtrieveSourceConnector",
        "db.filename.param": "transactions.mkd",
        "server.name.param": "financial_db",  
        "poll.interval.ms": "2000",
        "tasks.max": "1",
        "topic": "transactionLog",
        "key.converter": "org.apache.Kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.Kafka.connect.storage.StringConverter",
        "topic.creation.enable": "true",
        "topic.creation.default.replication.factor": "-1",
        "topic.creation.default.partitions": "-1"
    }
}

You can also define user queries for more granular data filtering using the JSON query language detailed in the Btrieve2 API Documentation. For example, to filter for transactions greater than or equal to $1000:

"\"Transaction\":{\"Amount\":{\"$gte\":1000}}"

Orchestration Script: kafkasetup.py

The kafkasetup.py script automates the process of starting and stopping the Kafka connectors. Here’s a snippet showing how the script sets up connectors:

import requests
import json
def main():
    requestMap = {}
    requestMap["Financial Transactions"] = ({
        "name": "financial-transactions-source",
        "config": {
            "connector.class": "com.actian.zen.kafka.connect.source.BtrieveSourceConnector",
            "db.filename.param": "transactions.mkd",
            "server.name.param": "financial_db",  
            "poll.interval.ms": "2000",
            "tasks.max": "1",
            "topic": "transactionLog",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "topic.creation.enable": "true",
            "topic.creation.default.replication.factor": "-1",
            "topic.creation.default.partitions": "-1"
        }
    }, "8083")
    for name, requestTuple in requestMap.items():
        input("Press Enter to continue...")
        (request, port) = requestTuple
        print("Now starting " + name + " connector")
        try:
            r = requests.post("http://localhost:"+port+"/connectors", json=request)
            print("Response:", r.json)
        except Exception as e:
            print("ERROR: ", e)
    print("Finished setup!...")
    input("\n\nPress Enter to begin shutdown")
    for name, requestTuple in  requestMap.items():
        (request, port) = requestTuple
        try:
            r = requests.delete("http://localhost:"+port+"/connectors/"+request["name"])
        except Exception as e:
            print("ERROR: ", e)
if __name__ == "__main__":
    main()

When you run the script, it prompts you to start each connector one by one, ensuring everything is set up correctly.

Generating Transaction Data With data_generator.py

The data_generator.py script simulates financial transaction data, creating transaction records at specified intervals. Here’s a look at the core function:

import sys
import os
import signal
import json
import random
from time import sleep
from datetime import datetime
sys.path.append("/usr/local/actianzen/lib64")
import btrievePython as BP    
class GracefulKiller:
    kill_now = False
  def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)
  def exit_gracefully(self, *args):
    self.kill_now = True
def generate_transactions():
    client = BP.BtrieveClient()
    assert(client != None)
    collection = BP.BtrieveCollection()
    assert(collection != None)
    collectionName = os.getenv("GENERATOR_DB_URI")
    rc = client.CollectionCreate(collectionName)
    rc = client.CollectionOpen(collection, collectionName)
    assert(rc == BP.Btrieve.STATUS_CODE_NO_ERROR), BP.Btrieve.StatusCodeToString(rc)
    interval = int(os.getenv("GENERATOR_INTERVAL"))
    kill_condition = GracefulKiller()
    while not kill_condition.kill_now:
        transaction = {
            "Transaction": {
                "ID": random.randint(1000, 9999),
                "Amount": round(random.uniform(10.0, 5000.0), 2),
                "Currency": "USD",
                "Timestamp": str(datetime.now())
            }
        }
        print(f"Generated transaction: {transaction}")
        documentId = collection.DocumentCreate(json.dumps(transaction))
        if documentId < 0:
            print("DOCUMENT CREATE ERROR: " + BP.Btrieve.StatusCodeToString(collection.GetLastStatusCode()))
        sleep(interval)
    rc = client.CollectionClose(collection)
    assert(rc == BP.Btrieve.STATUS_CODE_NO_ERROR), BP.Btrieve.StatusCodeToString(rc)
if __name__ == "__main__":
    generate_transactions()

This script runs an infinite loop, continuously generating and inserting transaction data into a Btrieve collection.

Using Docker for Deployment

To facilitate this setup, we use a Docker container. Here’s the Dockerfile that sets up the environment to run our data generator script:

FROM actian/zen-client:16.00
USER root
RUN apt update && apt install python3 -y
COPY --chown=zen-svc:zen-data data_generator.py /usr/local/actianzen/bin
ADD _btrievePython.so /usr/local/actianzen/lib64
ADD btrievePython.py /usr/local/actianzen/lib64
USER zen-svc
CMD ["python3", "/usr/local/actianzen/bin/data_generator.py"]

This Dockerfile extends from the Actian Zen client image, installs Python, and includes the data generation script. By building and running this Docker container, we can generate and stream transaction data into Kafka topics as configured.

Docker Compose for Orchestration

To manage and orchestrate multiple containers, including Kafka, Zookeeper, and our data generator, we use Docker Compose. Here’s the docker-compose.yml file that brings everything together:

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_LOG_RETENTION_HOURS: 1
      KAFKA_MESSAGE_MAX_BYTES: 10485760
      KAFKA_BROKER_ID: 1
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  actianzen:
    build: .
    environment:
      GENERATOR_DB_URI: "transactions.mkd"
      GENERATOR_LOCALE: "Austin"
      GENERATOR_INTERVAL: "5"
    volumes:
      - ./data:/usr/local/actianzen/data

This docker-compose.yml file sets up Zookeeper, Kafka, and our Actian Zen data generator in a single configuration. By running docker-compose up, we can spin up the entire stack and start streaming financial transaction data into Kafka topics in real-time.

Visualizing the Kafka Stream

To give you a better understanding of the data flow in this setup, here’s a diagram illustrating the Kafka stream:

actian zen database with kafka source conenctor

In this diagram, the financial transaction data flows from the Actian Zen database through the Kafka source connector into the Kafka topics. The data can then be consumed and processed by various downstream applications.

Kafka Connect: Kafka Connect instances are properly joining groups and syncing. Tasks and connectors are being configured and started as expected.

Financial Transactions: Transactions from both New York and San Francisco are being processed and logged correctly. The transactions include a variety of credit and debit actions with varying amounts and timestamps.

Zen and Kafka Connectors

Conclusion

Integrating Actian Zen with Kafka Connectors provides a powerful solution for real-time data streaming and processing. By following this guide, you can set up a robust system to handle financial transactions, ensuring data is efficiently streamed, processed, and stored. This setup not only demonstrates the capabilities of Actian Zen and Kafka but also highlights the ease of deployment using Docker and Docker Compose. Whether you’re dealing with financial transactions or other data-intensive applications, this solution offers a scalable and reliable approach to real-time data management.

For further details and visual guides, refer to the Actian Academy and the comprehensive documentation. Happy coding!

Johnson Varughese headshot

About Johnson Varughese

Johnson Varughese is the Manager of Support Engineering at Actian, where he supports developers creating applications using ZEN interfaces such as Btrieve, ODBC, JDBC, ADO.NET, Btrieve2, and others. His role involves providing technical guidance, troubleshooting, and ensuring developers can effectively utilize these interfaces to build robust and efficient applications in different programming environments. Johnson's expertise in these technologies helps streamline development processes and enhance the performance and functionality of the applications being developed.