Three simple Apply Engine scripts are provided below. The first two process Db2 changed data and the third IMS change data. All three utilize Connect CDC (SQData)'s ability to transform the source data DESCRIPTION or schema into the desired JSON or AVRO formatted Kafka message payload. See the Apply Engine Reference for more details regarding its highly extensible capabilities.
- Db2 to JSON formatted Kafka
- Db2 to AVRO formatted Kafka
- IMS to AVRO formatted Kafka
Note: Apply Engines utilizing AVRO and the Confluent Schema Registry may not use both APPLY and REPLICATE functions for the same Target Datastore.
Example 1 - Db2 to JSON formatted Kafka
Replicate Db2 changed data (CDC) for the IVP EMPLOYEE and DEPARTMENT tables into unique JSON formatted Kafka Topics with default partitioning. An example of the JSON output can be seen above in Determine Kafka output format. The example also includes a filter for the EMPLOYEE table. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the DEPT table are Replicated with no filter applied.
----------------------------------------------------------------------
-- Name: DB2TOKAF: Z/OS DB2 To Kafka JSON on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using JSON
----------------------------------------------------------------------
JOBNAME DB2TOKAF;
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Recommended for JSON
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
KEY IS DEPT_NO;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
OF UTSCDC
AS CDCIN
DESCRIBED BY GROUP DB2_SOURCE
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/key -- specify dynamic topic
OF JSON -- specify JSON format
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
IF EMPLOYEE.BONUS > '5000'
{
REPLICATE(TARGET, EMPLOYEE)
}
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMP' CALLPROC(P_EMPLOYEE)
WHEN 'DEPT' REPLICATE(TARGET, DEPARTMENT)
}
FROM CDCIN;
Specification of the Kafka Datastore is thus simplified with only the static portion of the Topic specified and looks like the following:DATASTORE kafka:///hr_*_cdc/key
OF JSON
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE;
REPLICATE (TARGET)
Example 2 - Db2 JSONUsing similar Source DESCRIPTIONS from Example 1, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with a single Topic ID for all CDC records, a randomized partition and a single REPLICATE command:
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
...
DATASTORE kafka:///hr_all_cdc
OF JSON
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE;
...
REPLICATE (TARGET)
Example 3 - Db2 JSONUsing the same Source DESCRIPTIONS from Example 2, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with explicit specification of Topic ID and Partition using the SETURL and SETURLKEY functions:
DATASTORE kafka:///*
...
Used with the following logic in the Apply Engine script:
CASE RECNAME(CDCIN)
WHEN 'EMP' CALLPROC(P_EMP)
WHEN 'DEPT' CALLPROC(P_DEPT)
CREATE PROC P_EMP AS SELECT
{
SETURL(TARGET, 'kafka:///hr_EMPLOYEE_cdc/')
SETURLKEY(TARGET, EMP_NO)
REPLICATE(TARGET, EMP)
}
CREATE PROC P_DEPT AS SELECT
{
SETURL(TARGET, 'kafka:///hr_DEPARTMENT_cdc/')
SETURLKEY(TARGET, DEPT_NO)
REPLICATE(TARGET, DEPT)
}
Example 4 - Db2 AVROUsing similar Source DESCRIPTIONS from Example 2 and the Kafka Cluster dynamically specified as in Example 3, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source table as those schemas evolve over time:BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO
TOPIC hr_EMPLOYEE_cdc
SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
KEY IS DEPT_NO
TOPIC hr_DEPARTMENT_cdc
SUBJECT hr_DEPARTMENT_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:DATASTORE kafka:///*
OF AVRO
FORMAT CONFLUENT
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE
;
REPLICATE (TARGET)
Example 5 - IMS AVROAn IMS Source is very different from Relational in that data relationships are defined by both keys, foreign keys and physical hierarchy. Those differences are minimized as much as possible by using the REPLICATE Command with Kafka targets. One critical difference is how partitions are handled. By specifying "root_key" rather than "key" or defaulting to random partitioning you can ensure that Kafka consumers will process all the data associated with a particular root segment key together and in the proper sequence within a unit-of-work. Like Example 2, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source segment as those COBOL descriptions evolve over time: BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/EMPLOYEE.cob AS EMPLOYEE
FOR SEGMENT EMPLOYEE
IN DATABASE HREMPLDB
TOPIC hr_EMPLOYEE_cdc
SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/ANNULREV.cob AS ANNULREV
FOR SEGMENT ANNULREV
IN DATABASE HREMPLDB
TOPIC hr_ANNUAL_REVIEW_cdc
SUBJECT hr_ANNUAL_REVIEW_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:DATASTORE kafka:///*/root_key
OF AVRO
FORMAT CONFLUENT
AS TARGET
DESCRIBED BY GROUP IMS_SEG;
Processing requires only one statement:REPLICATE (TARGET)
Example 2 - Db2 to AVRO formatted Kafka
After confirming the desired results from the Apply Engine script in Example 1, the Output format will be switched to AVRO, including a Confluent Schema Registry. Only a few elements of the script in Example 1 need to be added or altered as identified by a green bar in the first character of modified lines:
Example 3 - IMS to AVRO formatted Kafka
Replicate IMS changed data (CDC) for the IVP EMPLOYEE and ANNULREV segments in the HREMPLDB IMS database into unique AVRO formatted Kafka Topics with partitioning based on the Root Segment key. The example also includes a filter for the EMPLOYEE segment. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the ANNULREV segment are Replicated with no filter applied.
Note: The user friendly AS <alias> names specified in the source DESCRIPTION statements will be used in the AVRO schema header.
----------------------------------------------------------------------
-- Name: IMSTOKAF: Z/OS IMS To Kafka AVRO on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOKAF;
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Required for AVRO Targets
,CONFLUENT REPOSITORY 'http://schema_registry.precisely.com:8081'
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias
FOR SEGMENT EMPLOYEE
IN DATABASE HREMPLDB
TOPIC IVP_HR_EMPLOYEE
SUBJECT IVP_HR_EMPLOYEE-value;
DESCRIPTION COBOL ./IMSSEG/ANNULREV.cob AS ANNULREV -- User friendly alias
FOR SEGMENT ANNULREV
IN DATABASE HREMPLDB
TOPIC IVP_HR_ANNUAL_REVIEW
SUBJECT IVP_HR_ANNUAL_REVIEW-value;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
OF IMSCDC
AS CDCIN
DESCRIBED BY GROUP IMS_SEG
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/root_key -- specify dynamic topic
OF AVRO -- specify AVRO format
FORMAT CONFLUENT -- use Confluent Schema Registry
AS TARGET
DESCRIBED BY GROUP IMS_SEG -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
IF EMPLOYEE.BONUS > '5000'
{
REPLICATE(TARGET, EMPLOYEE)
}
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMPLOYEE' CALLPROC(P_EMPLOYEE)
WHEN 'ANNULREV' REPLICATE(TARGET, ANNULREV)
}
FROM CDCIN;
Note: Replication of IMS requires that the Target message descriptions maintain the full parent key sequence. This is ensured by Connect CDC (SQData) when it generates the AVRO schema / JSON message from the Source Datastore Segment Descriptions.