Three simple Apply Engine scripts are provided below. The first two process Db2 changed data and the third IMS change data. All three utilize Connect CDC (SQData)'s ability to transform the source data DESCRIPTION or schema into the desired JSON or AVRO formatted Kafka message payload. See the Apply Engine Reference for more details regarding its highly extensible capabilities.
Replicate IMS changed data (CDC) for the IVP EMPLOYEE and ANNULREV segments in the HREMPLDB IMS database into unique AVRO formatted Kafka Topics with partitioning based on the Root Segment key. The example also includes a filter for the EMPLOYEE segment. Only updates to employees with a bonus over $5,000 will cause the record to be written to Kafka. All changes to the ANNULREV segment are Replicated with no filter applied.
- The user friendly AS <alias> names specified in the source DESCRIPTION statements which will be used in the AVRO schema header.
- Replication of IMS requires that the Target message descriptions maintain the full parent key sequence. This is ensured by SQData when it generates the AVRO schema / JSON message from the Source Datastore Segment Descriptions.
----------------------------------------------------------------------
-- Name: IMSTOKAF: Z/OS IMS To Kafka AVRO on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Source HOST of the Capture/Publisher
-- %(SPORT) - Source HOST SQDaemon PORT
-- %(PUBNM) - Source Capture/Publisher Agent Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2019-02-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOKAF;
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
,USE AVRO COMPATIBLE NAMES -- Required for AVRO Targets
,CONFLUENT REPOSITORY 'http://schema_registry.precisely.com:8081'
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias
FOR SEGMENT EMPLOYEE
IN DATABASE HREMPLDB
TOPIC IVP_HR_EMPLOYEE
SUBJECT IVP_HR_EMPLOYEE-value;
DESCRIPTION COBOL ./IMSSEG/ANNULREV.cob AS ANNULREV -- User friendly alias
FOR SEGMENT ANNULREV
IN DATABASE HREMPLDB
TOPIC IVP_HR_ANNUAL_REVIEW
SUBJECT IVP_HR_ANNUAL_REVIEW-value;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
-- None required
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE cdc://%(SHOST):%(SPORT)/%(PUBNM)/%(ENGINE)
OF IMSCDC
AS CDCIN
DESCRIBED BY GROUP IMS_SEG
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE kafka:///*/root_key -- specify dynamic topic
OF AVRO -- specify AVRO format
FORMAT CONFLUENT -- use Confluent Schema Registry
AS TARGET
DESCRIBED BY GROUP IMS_SEG -- use source for REPLICATE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
DATEFORMAT 'ISOIBM';
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC P_EMPLOYEE AS SELECT
{
IF EMPLOYEE.BONUS > '5000'
{
REPLICATE(TARGET, EMPLOYEE)
}
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
CASE RECNAME(CDCIN)
WHEN 'EMPLOYEE' CALLPROC(P_EMPLOYEE)
WHEN 'ANNULREV' REPLICATE(TARGET, ANNULREV)
}
FROM CDCIN;