Construct the Kafka apply engine script - connect_cdc_sqdata - Latest

Connect CDC (SQData) Kafka Quickstart

Product type
Software
Portfolio
Integrate
Product family
Connect
Product
Connect > Connect CDC (SQData)
Version
Latest
Language
English
Product name
Connect CDC (SQData)
Title
Connect CDC (SQData) Kafka Quickstart
Copyright
2024
First publish date
2000
Last updated
2024-07-30
Published on
2024-07-30T20:00:09.892433
Three simple Apply Engine scripts are provided below. The first two process Db2 changed data and the third IMS change data. All three utilize Connect CDC (SQData)'s ability to transform the source data DESCRIPTION or schema into the desired JSON or AVRO formatted Kafka message payload. See the Apply Engine Reference for more details regarding its highly extensible capabilities.
Note: Apply Engines utilizing AVRO and the Confluent Schema Registry may not use both APPLY and REPLICATE functions for the same Target Datastore.
Example 1 - Db2 to JSON formatted Kafka
Replicate Db2 changed data (CDC) for the IVP EMPLOYEE and DEPARTMENT tables into unique JSON formatted Kafka Topics with default partitioning. An example of the JSON output can be seen above in Determine Kafka output format. The example also includes a filter for the EMPLOYEE table. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the DEPT table are Replicated with no filter applied.

----------------------------------------------------------------------
-- Name: DB2TOKAF: Z/OS DB2 To Kafka JSON on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using JSON
----------------------------------------------------------------------
JOBNAME DB2TOKAF;
OPTIONS
 CDCOP('I','U','D') -- Set CHANGE OP Constants
 ,USE AVRO COMPATIBLE NAMES -- Recommended for JSON
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
 KEY IS EMP_NO;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
 KEY IS DEPT_NO;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
 OF UTSCDC
 AS CDCIN
 DESCRIBED BY GROUP DB2_SOURCE
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/key -- specify dynamic topic
 OF JSON -- specify JSON format
 AS TARGET
 DESCRIBED BY GROUP DB2_SOURCE -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
 IF EMPLOYEE.BONUS > '5000'
 {
REPLICATE(TARGET, EMPLOYEE)
 }
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
 WHEN 'EMP' CALLPROC(P_EMPLOYEE)
 WHEN 'DEPT' REPLICATE(TARGET, DEPARTMENT)
}
FROM CDCIN;
Specification of the Kafka Datastore is thus simplified with only the static portion of the Topic specified and looks like the following:DATASTORE kafka:///hr_*_cdc/key
  OF JSON
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE;
REPLICATE (TARGET)
Example 2 - Db2 JSONUsing similar Source DESCRIPTIONS from Example 1, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with a single Topic ID for all CDC records, a randomized partition and a single REPLICATE command:
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
...

DATASTORE kafka:///hr_all_cdc
  OF JSON
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE;
...
 
REPLICATE (TARGET)
Example 3 - Db2 JSONUsing the same Source DESCRIPTIONS from Example 2, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with explicit specification of Topic ID and Partition using the SETURL and SETURLKEY functions:
DATASTORE kafka:///*
...
 
Used with the following logic in the Apply Engine script:
 
CASE RECNAME(CDCIN)
  WHEN 'EMP' CALLPROC(P_EMP)
  WHEN 'DEPT' CALLPROC(P_DEPT)

CREATE PROC P_EMP AS SELECT
{
SETURL(TARGET, 'kafka:///hr_EMPLOYEE_cdc/')
SETURLKEY(TARGET, EMP_NO)
REPLICATE(TARGET, EMP)
}
CREATE PROC P_DEPT AS SELECT
{
SETURL(TARGET, 'kafka:///hr_DEPARTMENT_cdc/')
  SETURLKEY(TARGET, DEPT_NO)
  REPLICATE(TARGET, DEPT)
}
 Example 4 - Db2 AVROUsing similar Source DESCRIPTIONS from Example 2 and the Kafka Cluster dynamically specified as in Example 3, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source table as those schemas evolve over time:BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
          KEY IS EMP_NO
          TOPIC hr_EMPLOYEE_cdc
          SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
          KEY IS DEPT_NO
          TOPIC hr_DEPARTMENT_cdc
          SUBJECT hr_DEPARTMENT_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:DATASTORE kafka:///*
  OF AVRO
  FORMAT CONFLUENT
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE
;
REPLICATE (TARGET)
Example 5 - IMS AVROAn IMS Source is very different from Relational in that data relationships are defined by both keys, foreign keys and physical hierarchy. Those differences are minimized as much as possible by using the REPLICATE Command with Kafka targets. One critical difference is how partitions are handled. By specifying "root_key" rather than "key" or defaulting to random partitioning you can ensure that Kafka consumers will process all the data associated with a particular root segment key together and in the proper sequence within a unit-of-work. Like Example 2, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source segment as those COBOL descriptions evolve over time: BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
 
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/EMPLOYEE.cob AS EMPLOYEE
          FOR SEGMENT EMPLOYEE
          IN DATABASE HREMPLDB
          TOPIC hr_EMPLOYEE_cdc
          SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/ANNULREV.cob AS ANNULREV
          FOR SEGMENT ANNULREV
          IN DATABASE HREMPLDB
          TOPIC hr_ANNUAL_REVIEW_cdc
          SUBJECT hr_ANNUAL_REVIEW_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:DATASTORE kafka:///*/root_key
  OF AVRO
  FORMAT CONFLUENT
  AS TARGET
  DESCRIBED BY GROUP IMS_SEG;
Processing requires only one statement:REPLICATE (TARGET)
Example 2 - Db2 to AVRO formatted Kafka

After confirming the desired results from the Apply Engine script in Example 1, the Output format will be switched to AVRO, including a Confluent Schema Registry. Only a few elements of the script in Example 1 need to be added or altered as identified by a green bar in the first character of modified lines:

Example 3 - IMS to AVRO formatted Kafka
Replicate IMS changed data (CDC) for the IVP EMPLOYEE and ANNULREV segments in the HREMPLDB IMS database into unique AVRO formatted Kafka Topics with partitioning based on the Root Segment key. The example also includes a filter for the EMPLOYEE segment. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the ANNULREV segment are Replicated with no filter applied.
Note: The user friendly AS <alias> names specified in the source DESCRIPTION statements will be used in the AVRO schema header.
----------------------------------------------------------------------
-- Name: IMSTOKAF:  Z/OS IMS To Kafka AVRO on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
--  SUBSTITUTION PARMS USED IN THIS SCRIPT:
--    %(ENGINE) - ENGINE Name
--    %(SHOST) - Source HOST of the Capture/Publisher
--    %(SPORT) - Source HOST SQDaemon PORT
--    %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
--       Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOKAF;
OPTIONS
   CDCOP('I','U','D')                  -- Set CHANGE OP Constants
  ,USE AVRO COMPATIBLE NAMES           -- Required for AVRO Targets
  ,CONFLUENT REPOSITORY 'http://schema_registry.precisely.com:8081'
;
----------------------------------------------------------------------
--       Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
--       Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
 
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE   -- User friendly alias
                FOR SEGMENT EMPLOYEE
                IN DATABASE HREMPLDB
                TOPIC IVP_HR_EMPLOYEE
                SUBJECT IVP_HR_EMPLOYEE-value;
DESCRIPTION COBOL ./IMSSEG/ANNULREV.cob AS ANNULREV    -- User friendly alias
                FOR SEGMENT ANNULREV
                IN DATABASE HREMPLDB
                TOPIC IVP_HR_ANNUAL_REVIEW
                SUBJECT IVP_HR_ANNUAL_REVIEW-value;
END GROUP;
----------------------------------------------------------------------
--       Target Descriptions
----------------------------------------------------------------------
-- None required 
----------------------------------------------------------------------
--       Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
--       Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
        OF IMSCDC
        AS CDCIN
        DESCRIBED BY GROUP IMS_SEG
;
----------------------------------------------------------------------
--       Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/root_key             -- specify dynamic topic
        OF AVRO                           -- specify AVRO format
        FORMAT CONFLUENT                 -- use Confluent Schema Registry
        AS TARGET
        DESCRIBED BY GROUP IMS_SEG       -- use source for REPLICATE
;
----------------------------------------------------------------------
--       Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
--       Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
  IF EMPLOYEE.BONUS > '5000'
  {
REPLICATE(TARGET, EMPLOYEE)
  }
}
FROM CDCIN;
----------------------------------------------------------------------
--       Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMPLOYEE' CALLPROC(P_EMPLOYEE)
WHEN 'ANNULREV' REPLICATE(TARGET, ANNULREV)
}
FROM CDCIN;
Note: Replication of IMS requires that the Target message descriptions maintain the full parent key sequence. This is ensured by Connect CDC (SQData) when it generates the AVRO schema / JSON message from the Source Datastore Segment Descriptions.