The Kafka Topic Target Datastore is identified by the Kafka broker "url" consisting in its most basic form of the Kafka cluster host address and port number along with fully qualified Kafka Topic names and an optional Partition.
DATASTORE kafka://[<hostname>[:<port_number>]] / [<kafka_topic_id>] [/<partition> | /key | /root_key | /]
OF JSON | AVRO
AS <target_alias>
DESCRIBED BY GROUP <group_name>
- Target datastores described by Confluent managed schemas may only be written using the APPLY or the REPLICATE Function.
- The relationship between the DESCRIPTION Alias, Topic and Subject are matters determined by the planners and architects of the organization's Confluent Schema Registry. The examples used here are arbitrary but were selected based on the source Table Name, and the source application, in the examples below, the EMPLOYEE and DEPARTMENT Tables and a Db2 "IVP_HR" Database. The choice of SUBJECT was made based on the default supported by the Confluent Control Center which requires the SUBJECT to be the same as the TOPIC with the addition of "-value".
- The Confluent Schema Registry supports multiple Topic naming strategy and all are supported but they may or may not be compatible with other tools including Confluent's own Control Center.
- The AVRO "schema id" will be supplied at run-time by Confluent based on the TOPIC and SUBJECT parameters specified on the Source DESCRIPTIONs. See the Apply Engine Reference for alternative methods of assigning a schema id.
- The creation of the partition can be controlled as described above and/or explicitly controlled using the SETURLKEY function.
A capture has been configured for the Department and Employee Db2 tables. An Apply Engine will stream Topics that provide the complete before and after image of the source data resulting from a z/OS business application's normal Insert, Update and Delete processing. The Apply Engine script may consist of only a single REPLICATE statement after providing Descriptions for the source tables.
The url 'kafka:///hr_*_cdc/key' would be interpreted as follows with the brokers specified in the sqdata_kafka_producer.conf file and the topic_id and partition based on the source DESCRIPTIONS.
A topic named 'cdc_EMPLOYEE_db2' will be created for each CDC source record from the EMPLOYEE table whose description was aliased as 'EMPLOYEE'. The value of the EMP_NO column and the EMP_STATE column in the CDC record will be used by Kafka to determine the partition rather than only the default which would be the EMP_NO Key column.
Similarly, the topic named 'cdc_DEPARTMENT_db2' will be created for each CDC source record from the DEPARTMENT table whose description was aliased as 'DEPARTMENT'. The value of the table's key column DEPT_NO will be used by default by Kafka to determine the partition.
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO, EMP_STATE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
DATASTORE kafka:///hr_*_cdc/key
OF JSON
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE;
REPLICATE (TARGET)
Using similar Source DESCRIPTIONS from Example 1, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with a single Topic ID for all CDC records, a randomized partition and a single REPLICATE command:
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
...
DATASTORE kafka:///hr_all_cdc
OF JSON
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE;
...
REPLICATE (TARGET)
Using the same Source DESCRIPTIONS from Example 2, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with explicit specification of Topic ID and Partition using the SETURL and SETURLKEY functions:
DATASTORE kafka:///*
...
Used with the following logic in the Apply Engine script:
CASE RECNAME(CDCIN)
WHEN 'EMP' CALLPROC(P_EMP)
WHEN 'DEPT' CALLPROC(P_DEPT)
CREATE PROC P_EMP AS SELECT
{
SETURL(TARGET, 'kafka:///hr_EMPLOYEE_cdc/')
SETURLKEY(TARGET, EMP_NO)
REPLICATE(TARGET, EMP)
}
CREATE PROC P_DEPT AS SELECT
{
SETURL(TARGET, 'kafka:///hr_DEPARTMENT_cdc/')
SETURLKEY(TARGET, DEPT_NO)
REPLICATE(TARGET, DEPT)
}
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
KEY IS EMP_NO
TOPIC hr_EMPLOYEE_cdc
SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
KEY IS DEPT_NO
TOPIC hr_DEPARTMENT_cdc
SUBJECT hr_DEPARTMENT_cdc-value;
END GROUP;
DATASTORE kafka:///*
OF AVRO
FORMAT CONFLUENT
AS TARGET
DESCRIBED BY GROUP DB2_SOURCE
;
REPLICATE (TARGET)
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;
BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/EMPLOYEE.cob AS EMPLOYEE
FOR SEGMENT EMPLOYEE
IN DATABASE HREMPLDB
TOPIC hr_EMPLOYEE_cdc
SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/ANNULREV.cob AS ANNULREV
FOR SEGMENT ANNULREV
IN DATABASE HREMPLDB
TOPIC hr_ANNUAL_REVIEW_cdc
SUBJECT hr_ANNUAL_REVIEW_cdc-value;
END GROUP;
DATASTORE kafka:///*/root_key
OF AVRO
FORMAT CONFLUENT
AS TARGET
DESCRIBED BY GROUP IMS_SEG;
REPLICATE (TARGET)