It’s no secret that automation is essential to streamlined development. Nowadays, we typically collaborate across teams and time zones, which requires a structured deployment process to stay organized.

That’s why we decided to build a proof-of-concept using KSQL for data transformation and document our journey. While exploring the abilities of Confluent technologies was exciting, developing a build automation pipeline proved to be a challenge. This post will outline our solution for managing the deployment of KSQL streams and tables. 

To integrate with our existing CI/CD, we used Jenkins pipelines to run Ansible playbooks, but these concepts can be applied to other configuration tools. 

Setting up and tearing down 

Our project manipulated data from several input topics, creating a sequence of streams and tables to calculate results. Each query was built on previous ones, so if a change was made to the KSQL syntax (e.g. modifying a time window, or changing the type of join), then the entire sequence had to be torn down and rebuilt from scratch. 

To illustrate this concept, consider a simple example with just two queries. The first creates a stream from an existing input topic, and the second re-partitions the stream: 

CREATE STREAM PERSON_IDENTIFIER (first VARCHAR, last VARCHAR) 
    WITH (KAFKA_TOPIC='person_topic', VALUE_FORMAT='JSON', KEY='last'); 
CREATE STREAM PERSON_IDENTIFIER_FIRST 
    AS SELECT * FROM PERSON_IDENTIFIER PARTITION BY first; 

These queries can be placed in a .sql file to run using KSQL’s RUN SCRIPT command. 

Tearing down looks similar, except the streams must be dropped in the opposite order, because the second depends on the first: 

DROP STREAM IF EXISTS PERSON_IDENTIFIER_FIRST DELETE TOPIC; 
DROP STREAM IF EXISTS PERSON_IDENTIFIER; 

In an Ansible playbook, copy the setup and teardown scripts to the host, then run them using RUN SCRIPT. 

- name: "Tear down and set up KSQL queries" 
  hosts: "{{ hostname }}" 
  any_errors_fatal: True 

  tasks: 
    - name: "Copy teardown script file to host" 
      copy: 
        src: /dir/ksql-teardown-commands.sql 
        dest: "/dir/ksql-teardown-commands.sql" 

    - name: "Run teardown commands in KSQL" 
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"RUN SCRIPT '/dir/ksql-teardown-commands.sql';\"" 

    - name: "Copy setup script file to host" 
      copy: 
        src: /dir/ksql-setup-commands.sql 
        dest: "/dir/ksql-setup-commands.sql"  

    - name: "Run setup commands in KSQL" 
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"RUN SCRIPT '/dir/ksql-setup-commands.sql';\"" 

This playbook can be run from the command line or triggered using an automation server such as Jenkins. 

Terminating the queries 

The above commands handle the destruction and reconstruction of streams, but they fail to terminate the underlying queries.

Attempting to drop PERSON_IDENTIFIER will result in an error because the CSAS query used to construct PERSON_IDENTIFIER_FIRST is still running.

As of Confluent 5.3, it is not possible to automatically terminate a query when a stream or table is dropped, nor is bulk termination supported. The simplest automated solution is to fetch the queries and terminate them one by one: 

- name: "Terminate running queries on all hosts in group" 
  hosts: "{{ hostname }}" 
  any_errors_fatal: True  

  tasks: 
    # Fetch and terminate currently running queries 
    - name: "Get currently running queries via REST" 
      uri: 
        url: "http://{{ hostname }}:8088/ksql" 
        method: POST 
        body: "{\"ksql\": \"SHOW QUERIES;\"}" 
        headers: 
          Content-Type: "application/vnd.ksql.v1+json; charset=utf-8" 
        return_content: yes 
        body_format: json 
      register: all_queries  

    - name: "Parse query ids" 
      set_fact: 
        query_ids: "{{ all_queries | to_json | from_json | json_query(query) | list }}" 
      vars: 
        query: "json[0].queries[?starts_with(id,'CSAS_PERSON_') || starts_with(id,'CTAS_PERSON_')].id" 
      delegate_to: 127.0.0.1  

    - name: "Terminate queries by id" 
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"TERMINATE {{ item }};\"" 
      with_items: "{{ query_ids }}" 

We use REST once again, this time fetching all of the currently running queries, then filtering for the ones associated with this project (i.e. beginning with CSAS_PERSON_ or CTAS_PERSON_). Then, the TERMINATE command is issued for each query. 

If a query fails to terminate, the error will not be detected by Ansible since the command was run in shell. As we only want to proceed if all relevant queries are terminated, we repeat the process of fetching and filtering, triggering the playbook to fail if the filtered list is not empty: 

# Pause for 5 seconds to allow the queries to finish terminating 
    - pause: 
        seconds: 5 

    # Confirm that all PERSON queries were terminated 
    - name: "Get remaining queries via REST" 
      uri: 
        url: "http://{{ hostname }}:8088/ksql" 
        method: POST 
        body: "{\"ksql\": \"SHOW QUERIES;\"}" 
        headers: 
          Content-Type: "application/vnd.ksql.v1+json; charset=utf-8" 
        return_content: yes 
        body_format: json 
      register: remaining_queries  

    - name: "Parse remaining query ids" 
      set_fact: 
        remaining_ids: "{{ remaining_queries | to_json | from_json | json_query(remaining_query) | list }}" 
      vars: 
        remaining_query: "json[0].queries[?starts_with(id,'CSAS_PERSON_') || starts_with(id,'CTAS_PERSON_')].id" 
      delegate_to: 127.0.0.1  

    - fail: 
        msg: "Some queries were not successfully terminated: {{ remaining_ids }}" 
      when: remaining_ids|length != 0 

Once query termination completes successfully, the streams can then be dropped and rebuilt. New streams and tables can be added to the setup .sql file, and the existing ones can be modified, as long as the corresponding teardown .sql file is kept up-to-date. Redeployment will rebuild the KSQL environment to reflect the changes. 

Other solutions 

There are other approaches to bulk query termination (see Robin Moffatt’s post for an interesting one), but this method worked best with our existing automation tools. 

Our sequence of KSQL syntax became increasingly complex as we added streams and tables, so maintaining an organized method of setup and teardown was essential. Had this project been more than just a proof of concept, we would have sought out a more refined solution, but the steps above worked well to support our rapid development process.