IMS to PostgreSQL distributor - connect_cdc_sqdata - Latest

Connect CDC (SQData) Replicator Engine

Product type
Software
Portfolio
Integrate
Product family
Connect
Product
Connect > Connect CDC (SQData)
Version
Latest
Language
English
Product name
Connect CDC (SQData)
Title
Connect CDC (SQData) Replicator Engine
Copyright
2024
First publish date
2000
ft:lastEdition
2024-08-01
ft:lastPublication
2024-08-01T16:58:30.842568

Replicate IMS changed data (CDC) for the IVP HR and Facilities databases into a PostgreSQL RDBMS running on a cloud platform such as AWS RDS PostgreSQL. Two Engines will be used, a Replicator operating as a Parallel Processing Distributor using Kafka and multiple instances of a PostgreSQL Apply Engine operating as a Kafka Consumer.

Example 1 - Replicator engine in distributor mode

The volume of changes is anticipated to be quite large so Kafka will be used to split the source CDC data into separate streams to be processed in parallel partitioned by Database Root Key in a cloud VM. The values for TOPIC and SUBJECT used here are arbitrary but were selected based on the source DBD Names, in this example, IMS DBD's IVPHRDBD and IVPFCDBD. Operation of the Replicator will be optimized using one worker thread.
----------------------------------------------------------------------
-- Name: IMSTOKAF:  Z/OS IMS CDC MESSAGE To Apply Engine Consumer
-- Client/Project: client/project
----------------------------------------------------------------------
--       Change Log:
----------------------------------------------------------------------
-- 2022-09-01 INITIAL RELEASE using Replicator / Distributor Engine
--
----------------------------------------------------------------------
--       Replicate Source/Target
----------------------------------------------------------------------
REPLICATE
  IMS cdc://<host_name>:<<%PREFIX_2CHLC%>daemon_port>/<publisher_name>/<subscription_name>
  TO CDC MESSAGE 'kafka://<host_name>:<host_port>/<topic>'
  WITH 1 WORKER
;
OPTIONS
AVRO COMPATIBLE NAMES,
STRIP TRAILING SPACES,
NAMESPACE '<name_space>'
;
MAPPINGS
  SOURCE 'IVPHRDBD'
          SUBJECT 'IVPHRCDC-value>
          ALIAS 'IMSCDC1'
          ROOTKEYLEN 8
  SOURCE 'IVPFCDBD'
          SUBJECT 'IVPFCCDC-value>
          ALIAS 'IMSCDC2'
          ROOTKEYLEN 5
;

Example 2 - Apply engine Kafka consumer

Multiple instances of the same Apply Engine script will run in a cloud VM, each processing in parallel a portion of the original CDC data partitioned by root segment key to maximize RDBMS Insert/Update/Delete performance, the slowest part of the replication process.
----------------------------------------------------------------------
-- Name: IMSTOPGS:  Z/OS IMS To PostgreSQL via Kafka on Linux
-- Client/Project: client/project
----------------------------------------------------------------------
--  SUBSTITUTION PARMS USED IN THIS SCRIPT:
--   %(ENGINE) - ENGINE Name
--   %(SHOST) - Kafka Cluster
--   %(SPORT) - Kafka Cluster port
--   %(SCHEMA) - Target SCHEMA Name
----------------------------------------------------------------------
--       Change Log:
----------------------------------------------------------------------
-- 2022-09-01 INITIAL RELEASE using AVRO
----------------------------------------------------------------------
JOBNAME IMSTOPGS;
COMMIT EVERY 500;           -- The Default is also 500

OPTIONS
  CDCOP('I','U','D')     -- Set CHANGE OP Constants
;

RDBMS ODBC ORGDATA_FACILITIES FACILRPL passcmd 
/+
 ./getpw.sh
 +/
 ; 
----------------------------------------------------------------------
--       Data Definition Section
----------------------------------------------------------------------
----------------------------------------------------------------------
--       Source Descriptions
----------------------------------------------------------------------
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/IVPHRDBD.dbd AS IVPHRDBD;
DESCRIPTION IMSDBD ./IMSDBD/IVPFCDBD.dbd AS IVPFCDBD;
END GROUP
;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/EMPLOYEE.cob AS EMPLOYEE -- User friendly alias
                FOR SEGMENT EMPLOYEE
                IN DATABASE IVPHRDBD

;
DESCRIPTION COBOL ./IMSSEG/FACILITY.cob AS FACILITY -- User friendly alias
                FOR SEGMENT FACILITY
                IN DATABASE IVPFCDBD
;
END GROUP;
----------------------------------------------------------------------
--       Target Descriptions
----------------------------------------------------------------------
BEGIN GROUP PGS_DDL;
DESCRIPTION SQLDDL ./PGSDDL/IVPHRDBD/EMPLOYEE.ddl AS T_EMPLOYEE;
DESCRIPTION SQLDDL ./PGSDDL/IVPFCDBD/FACILITY.ddl AS T_FACILITY;
END GROUP;
----------------------------------------------------------------------
--       Datastore Section
----------------------------------------------------------------------
----------------------------------------------------------------------
--       Source Datastore
----------------------------------------------------------------------
DATASTORE kafka://%(SHOST):%(SPORT)         -- specify Kafka cluster
        OF IMSCDC                         -- specify IMS CDC format
        AS SOURCE
        DESCRIBED BY GROUP IMS_SEG        
;
----------------------------------------------------------------------
--       Target Datastore(s)
----------------------------------------------------------------------
DATASTORE RDBMS
        OF RELATIONAL
        AS TARGET
        FORCE QUALIFIER %(SCHEMA)
        DESCRIBED BY GROUP PGS_DDL
        FOR CHANGE
;
----------------------------------------------------------------------
--       Field Specification Section
----------------------------------------------------------------------
----------------------------------------------------------------------
--       Procedure Section
----------------------------------------------------------------------
CREATE PROC M_EMPLOYEE AS SELECT
{
   T_EMPLOYEE.CDC_SOURCE_APPLY_DTTM             = V_SOURCE_APPLY_DTTM
   T_EMPLOYEE.CDC_SOURCE_CAPTURE_DTTM           = V_SOURCE_CAPTURE_DTTM
   T_EMPLOYEE.CDC_SOURCE_ACTION_CODE           = V_SOURCE_CDC_ACTION_CODE
-- Parent Keys
-- Segment Keys
   T_EMPLOYEE.EMP_NO                           = EMPLOYEE.EMP-NO
-- Segment Data
   T_EMPLOYEE.FULL_NAME                         = EMPLOYEE.FULL_NAME
   T_EMPLOYEE.LAST_NAME                         = EMPLOYEE.LAST-NAME
   T_EMPLOYEE.FIRST_NAME                       = EMPLOYEE.FIRST-NAME
   T_EMPLOYEE.PHONE                             = EMPLOYEE.PHONE
   T_EMPLOYEE.ZIP-CODE                         = EMPLOYEE.ZIP-CODE
}  
FROM CDCIN;

CREATE PROC M_FACILITY AS SELECT
   T_FACILITY.CDC_SOURCE_APPLY_DTTM             = V_SOURCE_APPLY_DTTM
   T_FACILITY.CDC_SOURCE_CAPTURE_DTTM           = V_SOURCE_CAPTURE_DTTM
   T_FACILITY.CDC_SOURCE_ACTION_CODE           = V_SOURCE_CDC_ACTION_CODE
-- Parent Keys

-- Segment Keys
   T_FACILITY.STATE_CODE                       = FACILITY.STATE-CODE
   T_FACILITY.OFFICE_NO                         = FACILITY.OFFICE-NUMBER
-- Segment Data
   T_FACILITY.CITY_NAME                         = FACILITY.CITY_NAME  
   T_FACILITY.ADDRESS_1                         = FACILITY.ADDRESS_1  
   T_FACILITY.ADDRESS_2                         = FACILITY.ADDRESS_2  
   T_FACILITY.ADDRESS_3                         = FACILITY.ADDRESS_3  
   T_FACILITY.ZIP_CODE                         = FACILITY.ZIP_CODE  
   T_FACILITY.MANAGER_ID                       = FACILITY.MANAGER_ID    
}
FROM CDCIN;
----------------------------------------------------------------------
--       Main Section
----------------------------------------------------------------------
PROCESS INTO TARGET
SELECT
{
-- OUTMSG statement(s)for debugging ONLY and should be commented
-- out when not needed
--   OUTMSG(0,'Segment=',IMSSEGNAME(CDCIN)
--             ,' CDCOP=',CDCOP(CDCIN)
--           )
   V_SOURCE_CDC_SOURCE_APPLY_DTTM               = LOCAL2GMT(TIMESTAMP ())
   V_SOURCE_CAPTURE_DTTM                       = CDCTSTMP(CDCIN)
   V_SOURCE_CDC_ACTION_CODE                     = CDCOP(CDCIN)    

CASE IMSSEGNAME(CDCIN)
      WHEN 'EMPLOYEE' { CALLPROC(M_EMPLOYEE) APPLY(TARGET,T_EMPLOYEE) }
      WHEN 'FACILITY' { CALLPROC(M_FACILITY) APPLY(TARGET,T_FACILITY) }
}
FROM CDCIN;