Kafka Datastores - connect_cdc_sqdata - Latest

Connect CDC (SQData) Apply engine

Product type
Software
Portfolio
Integrate
Product family
Connect
Product
Connect > Connect CDC (SQData)
Version
Latest
Language
English
Product name
Connect CDC (SQData)
Title
Connect CDC (SQData) Apply engine
Copyright
2024
First publish date
2000
Last updated
2024-07-30
Published on
2024-07-30T20:19:56.898694

While the target for data captured by Connect CDC (SQData) is often another database, a Kafka target may be the best, if not the only way to identify events used to trigger or feed other processes or applications. Historically, many z/OS environments used IBM's MQ for distributed communication of events. Apache Kafka is a robust clustered distributed streaming platform used to build real-time streaming data pipelines on open systems platforms using a similar publish and subscribe architecture. Connect CDC (SQData) supports Kafka by bridging the gap between z/OS and other operating systems and publishing a stream of data to Kafka topics based on real-time capture of source datastore changes.

Kafka is typically utilized when the requirement is more than simply replication of captured source data into a legacy relational datastore like Db2/LUW, Oracle or MS SQL Server. Change data capture can be used to identify "events" and use streaming Kafka messages to trigger subsequent downstream business processes. Others use Kafka to populate "big data" repositories where other tools will be used for analytics or to answer questions that may not have been asked at the start of a project.

Kafka's architecture includes four core API's, two of which are of particular interest to heterogeneous z/OS and Open Systems environments:

  1. The Producer API which allows a business application to publish a stream of events to one or more Kafka topics.
  2. The Connector API that can connect Kafka topics to existing source databases. For example, a connector to a relational database that captures every change to a table.

The Apply Engine views Kafka as merely another type of target datastore using the producer API. The apply engine writes Kafka "topics" formatted as either JSON or AVRO, among other formats containing information captured by any of the apply engine's data capture agents. Connect CDC (SQData)'s capture / publish / apply architecture provides a unique two platform solution for z/OS where the Kafka connector API is not supported natively. Precisely's other high performance capture agents paired with an apply engine running on Linux provides for the direct point-to-point (source to target) transfer of captured data without the use of any sort of staging area. When properly configured, captured data does not even land on any intermediate storage device before being loaded into the target Kafka topic.

Environmental Requirements

Connect CDC (SQData)'s Kafka implementation supports change data capture on both z/OS and open systems platforms with certain environmental requirements for the apply engine:
  • Apply Engine for Kafka is only supported on the Linux OS platform although some customers have successfully implemented on IBM/AIX as well.
  • The Kafka external library librdkafka.so is required by the Connect CDC (SQData) in order to apply to Kafka and must be version 0.8 or higher. This library can be downloaded from github https://github.com/edenhill/librdkafka.
  • The Kafka external library must be accessible either in the system path or through an environmental variable SQDATA_KAFKA_LIBRARY that points to the external library.

The Kafka topic target datastore is identified by the Kafka broker "url" consisting in its most basic form of the Kafka cluster host address and port number along with fully qualified Kafka topic names and an optional Partition.

Syntax
------------------------------------------------------------
--       DATASTORE SECTION
------------------------------------------------------------
-- SOURCE DATASTORE
DATASTORE cdc://server:port/capture/target
  OF UTSCDC
  AS CDCIN
  DESCRIBED BY GROUP SOURCE_TABLES;
                        
                        
-- TARGET DATASTORE
DATASTORE kafka://[<hostname>[:<port_number>]] / [<kafka_topic_id>][/ | /<partition> | /key | /root_key]     
   OF AVRO FORMAT [CONFLUENT | CONFLUENT TOMBSTONE | CONTAINER | PLAIN]
   AS TARGET
   KEY IS DEPTNO, MGRNO
   DESCRIBED BY GROUP SOURCE_TABLES;
Keyword and Parameter Descriptions
Keyword Description
<hostname>:<port_number>
Optionally identify specific Kafka Broker Hostname and TCP/IP port. Precisely recommends the dynamic specification of the Kafka Cluster including its host name using the sqdata_kafka_producer.conf file located in the Working Directory of the Apply Engine at launch. The file may actually contain all the configuration options documented by Librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. Typically however, only a small subset of those options are specified, including producer specific security information and a list of Kafka Broker hosts. While you will find good reading here https://kafka.apache.org/documentation/#security and here https://docs.confluent.io/4.0.0/security.html, Precisely recommends that you speak to your Kafka Cluster administrator regarding the configuration. These are just three examples:
security.protocol=SSL
ssl.ca.location=/app/certificates/dev/abc_root_ca.cert
ssl.certificate.location=/home/<kafka_app_user>/kafkassl/client.pem   <-- Client's private key string (PEM format) used for authentication
ssl.key.location=/home/<kafka_app_user>/kafkassl/client.key
ssl.key.password=test1234
metadata.broker.list=<broker_host_01>:<port>,<broker_host_02>:<port>,<broker_host_03>:<port>
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
metadata.broker.list=<broker_host_01>:<port>,<broker_host_02>:<port>,<broker_host_03>:<port>
 
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=<kafka_app_user@domain>
sasl.kerberos.keytab=/app/kafkalib/<kafka_app_user>.keytab
metadata.broker.list=<broker_host_01>:<port>,<broker_host_02>:<port>,<broker_host_03>:<port>
<kafka_topic_id> | <prefix>_*_<suffix>

Optionally specify a unique Kafka Topic ID or dynamically specify the Topic using a wildcard. This is particularly useful when creating many topics or unwieldy long topic ID's. A topic containing an "*" indicates that the url is dynamic and the "*" will be replaced with the alias name of the source DESCRIPTION by default or the TOPIC <name> specified as part of the DESCRIPTION. The "*" may be preceded and followed by a string of characters to complete the full Topic name. Whether topics need to be defined to Kafka in advance depends on how Kafka has been configured.

[/<partition> | /key | /root_key | / ]

Optional parameter, in its absence random partitioning amongst the available partitions for a topic with be used. While a specific valid partition number may be specified, Precisely strongly advises not using partition numbers as it become an additional point of maintenance.

The keyword "key" is used for Relational, VSAM and Keyed File sources by Kafka to determine the target partition. This can be very important to insure that successive changes to the same row/record are sent to the same partition, ensuring they will be processed by the consumer in the order of capture. The default functionality for Relational sources is to use the full concatenated list of source key columns while VSAM and Keyed File sources must specify a KEY IS clause on each source DESCRIPTIONS. A KEY IS clause may also be specified for Relational sources to override the default with a specific set of column names from the source description to be used by Kafka for partitioning.

The keyword "root_key" is used only for IMS sources and by default specifies that the only the root key of any captured IMS Segments will be used used by Kafka to determine the target partition. Using the root key for all the segments captured in the hierarchy ensures that they will be processed by the consumer in the order of capture and together with all segments updated under a particular root segment.

"/" is required as a placeholder if the SETURLKEY function is to be used to create a custom partitioning Key.

OF
  • JSON | AVRO Kafka "Topics" formatted as either JSON or AVRO
  • CONFLUENT TOMBSTONE is a defined as record that has a non-null key and a null value. For more information, refer to Kafka documentation.
AS

<target_alias> Alias of the Target DATASTORE

DESCRIBED BY GROUP

<group_name> DESCRIPTION Group

Notes:

  1. Target datastores described by Confluent managed schemas may only be written using the APPLY or the REPLICATE Function.
  2. The relationship between the DESCRIPTION Alias, Topic and Subject are matters determined by the planners and architects of the organization's Confluent Schema Registry. The examples used here are arbitrary but were selected based on the source Table Name, and the source application, in the examples below, the EMPLOYEE and DEPARTMENT Tables and a Db2 "IVP_HR" Database. The choice of SUBJECT was made based on the default supported by the Confluent Control Center which requires the SUBJECT to be the same as the TOPIC with the addition of "-value".
  3. The Confluent Schema Registry supports multiple Topic naming strategy and all are supported but they may or may not be compatible with other tools including Confluent's own Control Center.
  4. The AVRO "schema id" will be supplied at run-time by Confluent based on the TOPIC and SUBJECT parameters specified on the Source DESCRIPTIONs. See the Apply Engine Reference for alternative methods of assigning a schema id.
  5. The creation of the partition can be controlled as described above and/or explicitly controlled using the SETURLKEY Function.

Example 1 - Db2 JSON

A capture has been configured for the Department and Employee Db2 tables. An Apply Engine will stream Topics that provide the complete before and after image of the source data resulting from a z/OS business application's normal Insert, Update and Delete processing. The Apply Engine script may consist of only a single REPLICATE statement after providing Descriptions for the source tables.

The url 'kafka:///hr_*_cdc/key' would be interpreted as follows with the brokers specified in the sqdata_kafka_producer.conf file and the topic_id and partition based on the source DESCRIPTIONS.

A topic named 'cdc_EMPLOYEE_db2' will be created for each CDC source record from the EMPLOYEE table whose description was aliased as 'EMPLOYEE'. The value of the EMP_NO column and the EMP_STATE column in the CDC record will be used by Kafka to determine the partition rather than only the default which would be the EMP_NO Key column

Similarly, the topic named 'cdc_DEPARTMENT_db2' will be created for each CDC source record from the DEPARTMENT table whose description was aliased as 'DEPARTMENT'. The value of the table's key column DEPT_NO will be used by default by Kafka to determine the partition.
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
          KEY IS EMP_NO, EMP_STATE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
Specification of the Kafka Datastore is thus simplified with only the static portion of the Topic specified and looks like the following:
DATASTORE kafka:///hr_*_cdc/key
  OF JSON
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE;
REPLICATE (TARGET)

Example 2 - Db2 JSON

Using similar Source DESCRIPTIONS from Example 1, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with a single Topic ID for all CDC records, a randomized partition and a single REPLICATE command:
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT;
END GROUP;
...

DATASTORE kafka:///hr_all_cdc
  OF JSON
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE;
...

REPLICATE (TARGET)

Example 3 - Db2 JSON

Using the same Source DESCRIPTIONS from Example 2, the Kafka Cluster can be dynamically specified in the sqdata_kafka_producer.conf file but with explicit specification of Topic ID and Partition using the SETURL and SETURLKEY functions:
DATASTORE kafka:///*
...
Used with the following logic in the Apply Engine script:

CASE RECNAME(CDCIN)
WHEN 'EMP' CALLPROC(P_EMP)
WHEN 'DEPT' CALLPROC(P_DEPT)

CREATE PROC P_EMP AS SELECT
{
SETURL(TARGET, 'kafka:///hr_EMPLOYEE_cdc/')
   SETURLKEY(TARGET, EMP_NO)
   REPLICATE(TARGET, EMP)
}
CREATE PROC P_DEPT AS SELECT
{
SETURL(TARGET, 'kafka:///hr_DEPARTMENT_cdc/')
   SETURLKEY(TARGET, DEPT_NO)
   REPLICATE(TARGET, DEPT)
}

Example 4 - Db2 AVRO

Using similar Source DESCRIPTIONS from Example 2 and the Kafka Cluster dynamically specified as in Example 3, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source table as those schemas evolve over time:
BEGIN GROUP DB2_SOURCE;
DESCRIPTION DB2SQL ./DB2DDL/EMP.ddl AS EMPLOYEE
          KEY IS EMP_NO
          TOPIC hr_EMPLOYEE_cdc
          SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION DB2SQL ./DB2DDL/DEPT.ddl AS DEPARTMENT
          KEY IS DEPT_NO
          TOPIC hr_DEPARTMENT_cdc
          SUBJECT hr_DEPARTMENT_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:
DATASTORE kafka:///*
  OF AVRO
  FORMAT CONFLUENT
  AS TARGET
  DESCRIBED BY GROUP DB2_SOURCE
;
REPLICATE (TARGET)

Example 5 - IMS AVRO

An IMS Source is very different from Relational in that data relationships are defined by both keys, foreign keys and physical hierarchy. Those differences are minimized as much as possible by using the REPLICATE Command with Kafka targets. One critical difference is how partitions are handled. By specifying "root_key" rather than "key" or defaulting to random partitioning you can ensure that Kafka consumers will process all the data associated with a particular root segment key together and in the proper sequence within a unit-of-work. Like Example 2, a Confluent Schema Registry will be used to automatically manage AVRO Topic Schemas for each source segment as those COBOL descriptions evolve over time:
BEGIN GROUP IMS_DBD;
DESCRIPTION IMSDBD ./IMSDBD/HREMPLDB.dbd AS HREMPLDB;
END GROUP;

BEGIN GROUP IMS_SEG;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/EMPLOYEE.cob AS EMPLOYEE
          FOR SEGMENT EMPLOYEE
          IN DATABASE HREMPLDB
          TOPIC hr_EMPLOYEE_cdc
          SUBJECT hr_EMPLOYEE_cdc-value;
DESCRIPTION COBOL ./IMSSEG/HREMPLDB/ANNULREV.cob AS ANNULREV
          FOR SEGMENT ANNULREV
          IN DATABASE HREMPLDB
          TOPIC hr_ANNUAL_REVIEW_cdc
          SUBJECT hr_ANNUAL_REVIEW_cdc-value;
END GROUP;
Specification of the Kafka Datastore is simplified and looks like the following:
DATASTORE kafka:///*/root_key
  OF AVRO
  FORMAT CONFLUENT
  AS TARGET
  DESCRIBED BY GROUP IMS_SEG;
Processing requires only one statement:
REPLICATE (TARGET)