The examples show how to define Kafka Datastores for both AVRO and JSON targets, including source table descriptions, key selection, and Apply Engine processing.
Db2z to Avro Kafka
CREATE TABLE DEPT
(
DEPTNO CHAR(3) NOT NULL,
DEPTNAME VARCHAR(36) NOT NULL,
MGRNO CHAR(6),
ADMRDEPT CHAR(3) NOT NULL,
LOCATION CHAR(16)
);
---------------------------------------------------------------
-- Name: FILETOKAF: FILE To Kafka
---------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
---------------------------------------------------------------
JOBNAME FILETOKAFKA;
OPTIONS
USE AVRO COMPATIBLE NAMES
,CONFLUENT REPOSITORY "https://XXXX:XXXX@XXXX.aws.confluent.cloud"
;
----------------------------
-- TABLE DESCRIPTIONS
----------------------------
BEGIN GROUP SOURCE_TABLES;
DESCRIPTION Db2SQL DEPT.ddl AS DEPT2 SUBJECT DEPT2-value KEY SUBJECT DEPT2-key;
END GROUP;
------------------------------------------------------------
-- DATASTORE SECTION
------------------------------------------------------------
-- SOURCE DATASTORE
DATASTORE cdc://server:port/capture/target
OF UTSCDC
AS CDCIN
DESCRIBED BY GROUP SOURCE_TABLES;
-- TARGET DATASTORE
DATASTORE kafka:///DEPT2/key
OF AVRO FORMAT [CONFLUENT | CONFLUENT TOMBSTONE | CONTAINER | PLAIN]
AS TARGET
KEY IS DEPTNO, MGRNO
DESCRIBED BY GROUP SOURCE_TABLES;
----------------------------------
PROCESS INTO TARGET
SELECT
{
REPLICATE(TARGET)
}
FROM CDCIN;
Db2z to JSON Kafka
CREATE TABLE DEPT
(
DEPTNO CHAR(3) NOT NULL,
DEPTNAME VARCHAR(36) NOT NULL,
MGRNO CHAR(6),
ADMRDEPT CHAR(3) NOT NULL,
LOCATION CHAR(16)
);
----------------------------------------------------------------------
-- Name: FILETOKAF: FILE To Kafka
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
----------------------------------------------------------------------
JOBNAME FILETOKAFKA;
;
----------------------------
-- TABLE DESCRIPTIONS
----------------------------
BEGIN GROUP SOURCE_TABLES;
DESCRIPTION Db2SQL DEPT.ddl AS DEPTJSON;
END GROUP;
------------------------------------------------------------
-- DATASTORE SECTION
------------------------------------------------------------
-- SOURCE DATASTORE
DATASTORE cdc://server:port/capture/target
OF UTSCDC
AS CDCIN
DESCRIBED BY GROUP SOURCE_TABLES;
-- TARGET DATASTORE
DATASTORE kafka:///DEPTJSON/key
OF JSON FORMAT [CONFLUENT [TOMBSTONE]
AS TARGET
KEY IS DEPTNO, MGRNO
DESCRIBED BY GROUP SOURCE_TABLES;
----------------------------------
PROCESS INTO TARGET
SELECT
{
REPLICATE(TARGET)
}
FROM CDCIN;