Replicate IMS changed data (CDC) for the IVP HR and Facilities databases into a PostgreSQL RDBMS running on a cloud platform such as AWS RDS PostgreSQL. Two Engines will be used, a Replicator operating as a Parallel Processing Distributor using Kafka and multiple instances of a PostgreSQL Apply Engine operating as a Kafka Consumer.
Example 1 - Replicator engine in distributor mode
The volume of changes is anticipated to be quite large so Kafka will be used to split the source CDC data into separate streams to be processed in parallel partitioned by Database Root Key in a cloud VM. The values for TOPIC and SUBJECT used here are arbitrary but were selected based on the source DBD Names, in this example, IMS DBD's IVPHRDBD and IVPFCDBD. Operation of the Replicator will be optimized using one worker thread.
----------------------------------------------------------------------
-- Name: IMSTOKAF: Z/OS IMS CDC MESSAGE To Apply Engine Consumer
-- Client/Project: client/project
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2022-09-01 INITIAL RELEASE using Replicator / Distributor Engine
--
----------------------------------------------------------------------
-- Replicate Source/Target
----------------------------------------------------------------------
REPLICATE
IMS cdc://<host_name>:<<%PREFIX_2CHLC%>daemon_port>/<publisher_name>/<subscription_name>
TO CDC MESSAGE 'kafka://<host_name>:<host_port>/<topic>'
WITH 1 WORKER
;
OPTIONS
AVRO COMPATIBLE NAMES,
STRIP TRAILING SPACES,
NAMESPACE '<name_space>'
;
MAPPINGS
SOURCE 'IVPHRDBD'
SUBJECT 'IVPHRCDC-value>
ALIAS 'IMSCDC1'
ROOTKEYLEN 8
SOURCE 'IVPFCDBD'
SUBJECT 'IVPFCCDC-value>
ALIAS 'IMSCDC2'
ROOTKEYLEN 5
;
Example 2 - Apply engine Kafka consumer
Multiple instances of the same Apply Engine script will run in a cloud VM, each processing in parallel a portion of the original CDC data partitioned by root segment key to maximize RDBMS Insert/Update/Delete performance, the slowest part of the replication process.
----------------------------------------------------------------------
-- Name: IMSTOPGS: Z/OS IMS To PostgreSQL via Kafka on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
-- SUBSTITUTION PARMS USED IN THIS SCRIPT:
-- %(ENGINE) - ENGINE Name
-- %(SHOST) - Kafka Cluster
-- %(SPORT) - Kafka Cluster port
-- %(SCHEMA) - Target SCHEMA Name
----------------------------------------------------------------------
-- Change Log:
----------------------------------------------------------------------
-- 2022-09-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOPGS;
COMMIT EVERY 500; -- The Default is also 500
OPTIONS
CDCOP('I','U','D') -- Set CHANGE OP Constants
;
RDBMS ODBC ORGDATA_FACILITIES FACILRPL passcmd
/+
./getpw.sh
+/
;
----------------------------------------------------------------------
-- Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/IVPHRDBD.dbd AS IVPHRDBD;
DESCRIPTION IMSDBD ./IMSDBD/IVPFCDBD.dbd AS IVPFCDBD;
END GROUP
;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias
FOR SEGMENT EMPLOYEE
IN DATABASE IVPHRDBD
;
DESCRIPTION COBOL ./IMSSEG/FACILITY.cob AS FACILITY -- User friendly alias
FOR SEGMENT FACILITY
IN DATABASE IVPFCDBD
;
END GROUP;
----------------------------------------------------------------------
-- Target Descriptions
----------------------------------------------------------------------
BEGIN GROUP PGS_DDL;
DESCRIPTION SQLDDL ./PGSDDL/IVPHRDBD/EMPLOYEE.ddl AS T_EMPLOYEE;
DESCRIPTION SQLDDL ./PGSDDL/IVPFCDBD/FACILITY.ddl AS T_FACILITY;
END GROUP;
----------------------------------------------------------------------
-- Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Source Datastore
----------------------------------------------------------------------
DATASTORE kafka://%(SHOST):%(SPORT) -- specify Kafka cluster
OF IMSCDC -- specify IMS CDC format
AS SOURCE
DESCRIBED BY GROUP IMS_SEG
;
----------------------------------------------------------------------
-- Target Datastore(s)
----------------------------------------------------------------------
DATASTORE RDBMS
OF RELATIONAL
AS TARGET
FORCE QUALIFIER %(SCHEMA)
DESCRIBED BY GROUP PGS_DDL
FOR CHANGE
;
----------------------------------------------------------------------
-- Field Specification Section
----------------------------------------------------------------------
----------------------------------------------------------------------
-- Procedure Section
----------------------------------------------------------------------
CREATE PROC M_EMPLOYEE AS SELECT
{
T_EMPLOYEE.CDC_SOURCE_APPLY_DTTM = V_SOURCE_APPLY_DTTM
T_EMPLOYEE.CDC_SOURCE_CAPTURE_DTTM = V_SOURCE_CAPTURE_DTTM
T_EMPLOYEE.CDC_SOURCE_ACTION_CODE = V_SOURCE_CDC_ACTION_CODE
-- Parent Keys
-- Segment Keys
T_EMPLOYEE.EMP_NO = EMPLOYEE.EMP-NO
-- Segment Data
T_EMPLOYEE.FULL_NAME = EMPLOYEE.FULL_NAME
T_EMPLOYEE.LAST_NAME = EMPLOYEE.LAST-NAME
T_EMPLOYEE.FIRST_NAME = EMPLOYEE.FIRST-NAME
T_EMPLOYEE.PHONE = EMPLOYEE.PHONE
T_EMPLOYEE.ZIP-CODE = EMPLOYEE.ZIP-CODE
}
FROM CDCIN;
CREATE PROC M_FACILITY AS SELECT
T_FACILITY.CDC_SOURCE_APPLY_DTTM = V_SOURCE_APPLY_DTTM
T_FACILITY.CDC_SOURCE_CAPTURE_DTTM = V_SOURCE_CAPTURE_DTTM
T_FACILITY.CDC_SOURCE_ACTION_CODE = V_SOURCE_CDC_ACTION_CODE
-- Parent Keys
-- Segment Keys
T_FACILITY.STATE_CODE = FACILITY.STATE-CODE
T_FACILITY.OFFICE_NO = FACILITY.OFFICE-NUMBER
-- Segment Data
T_FACILITY.CITY_NAME = FACILITY.CITY_NAME
T_FACILITY.ADDRESS_1 = FACILITY.ADDRESS_1
T_FACILITY.ADDRESS_2 = FACILITY.ADDRESS_2
T_FACILITY.ADDRESS_3 = FACILITY.ADDRESS_3
T_FACILITY.ZIP_CODE = FACILITY.ZIP_CODE
T_FACILITY.MANAGER_ID = FACILITY.MANAGER_ID
}
FROM CDCIN;
----------------------------------------------------------------------
-- Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
-- OUTMSG statement(s)for debugging ONLY and should be commented
-- out when not needed
-- OUTMSG(0,'Segment=',IMSSEGNAME(CDCIN)
-- ,' CDCOP=',CDCOP(CDCIN)
-- )
V_SOURCE_CDC_SOURCE_APPLY_DTTM = LOCAL2GMT(TIMESTAMP ())
V_SOURCE_CAPTURE_DTTM = CDCTSTMP(CDCIN)
V_SOURCE_CDC_ACTION_CODE = CDCOP(CDCIN)
CASE IMSSEGNAME(CDCIN)
WHEN 'EMPLOYEE' { CALLPROC(M_EMPLOYEE) APPLY(TARGET,T_EMPLOYEE) }
WHEN 'FACILITY' { CALLPROC(M_FACILITY) APPLY(TARGET,T_FACILITY) }
}
FROM CDCIN;