ClickHouse

Part 4: Can Apache Flink be the solution?

Apache Flink isn't the solution for duplications and JOINs on ClickHouse.

Written by Armend Avdijaj28/03/2025, 14.58
hero about image

Part 4: Can Apache Flink be the solution?

Let's say you decide that Flink is the solution you want to use. I will respect your bold decision and even show you a step-by-step guide to set up and perform JOINs/Deduplication before ingesting into ClickHouse. At the end of that post I will go into details of the expected maintenance effort and provide a conclusion. I warn you, Flink implementation can become scary.

giphy.gif


Step 1: Connect Flink to Kafka:

Decision to Make:

  • Kafka Broker & Topic Configuration: Define Kafka brokers, topic names, and partitions.
  • Data Format: Choose between JSON, Avro, or Protobuf. In the example, we choose JSON.
  • Consumer Group & Offset Strategy: Decide whether to start from the latest or earliest message. In the example, we decided on the earliest message.
  • Parallelism: Determine how many Flink tasks will consume from Kafka partitions. In the example we decided on 4 parallel tasks consuming from Kafka.

Code Example:

  • 1. Add Dependencies (Flink & Kafka Connectors)

    In your Flink project, add the necessary dependencies:

  • 2. Create a Kafka Source

  • About Parallelism

    Parallelism in Flink (env.setParallelism(n))

    • Determines the number of concurrent Flink tasks that will process Kafka data.
    • Example: env.setParallelism(4)4 parallel tasks consuming from Kafka.
    • If Kafka has 6 partitions and Flink has 4 tasks, some tasks will handle multiple partitions.

    Parallelism per Operator (.setParallelism(n))

    • If not set, the job-wide parallelism is used (env.setParallelism(n)).
    • Example: kafkaStream.setParallelism(2) → Only this Kafka consumer runs with 2 parallel instances.

    Choosing the Right Parallelism

    • Flink parallelism ≥ Kafka partitions: Best performance, each task reads from a separate partition.
    • Flink parallelism < Kafka partitions: Some tasks handle multiple partitions (could cause lag).
    • Flink parallelism > Kafka partitions: Some tasks stay idle, wasting resources.
  • 3. Set Up the Kafka Source in Flink

  • 4. Deserialize and Convert Data into a Flink Format

    Assuming JSON messages:


Step 2: Deduplication in Flink

Decisions to Make:

  • Define a Unique Key: Choose a field (e.g., event_id) for deduplication.
  • Time Window Strategy: Define how long to store seen records.
  • State Management: Use RocksDB or Heap Memory to track seen events.

Code Example:

  • Implementation with window strategy for deduplication and rocksDB:

Where is the Time Window Strategy Set?

  • 1. Event Time Window (Handling Late Events)

    Handles out-of-order events by allowing a 10s delay.

  • 2. Deduplication Window (24 Hours)

    • Keeps track of events for 24 hours in RocksDB.
    • If the same event arrives within 24 hours, it’s ignored.
  • Why Use RocksDB?

    Scalability: Handles millions of events efficiently.

    Persistence: Avoids losing state after failures.

    Checkpointing Support: Enables fault tolerance.

    Efficient Lookups: Uses RocksDB MapState for fast key lookups.

  • Key Considerations

    Choosing the Deduplication Window:

    • Short window (e.g., 1 min) → Less state storage, but duplicates might pass.
    • Long window (e.g., 24 hrs) → More storage, but better deduplication.
  • State Size & Cleanup:

    • Too large? Use TTL settings to expire old state.
      • Disk issues? Ensure enough storage for RocksDB.
  • Scaling Considerations:

    • Set parallelism to match Kafka partitions (env.setParallelism(n)).
    • Ensure sufficient disk space for RocksDB if handling millions of events.

Step 3: Running Joins in Flink

Decisions to Make:

  • Join Type: Inner, Left, Right, or Outer.
  • Time Window for Streaming Joins: Define how long to retain data for joins.
  • State Management: Ensure efficient storage of joined records.

Code Example:

  • Joining Two Kafka Streams in Flink

  • Example Input & Output

    Kafka Transactions Topic (transactions-topic)

    Kafka Users Topic (users-topic)

    Flink Output (Enriched Transactions)

    pgsql
    CopyEdit
    EnrichedTransaction{userId='123', amount=150.75, name='Alice', age=28}
    EnrichedTransaction{userId='456', amount=99.99, name='Bob', age=35}
    
    

Key Considerations

  • Event Time vs. Processing Time
    • WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) ensures late events are handled correctly.
  • Choosing the Join Window
    • between(Duration.ofMinutes(-5), Duration.ofMinutes(5)): Matches events that occurred within ±5 minutes.
    • Smaller windows reduce state size but may miss late events.
  • Scaling Considerations
    • Parallelism should match Kafka partitions for efficiency.
    • If data is large, use RocksDB as a state backend.

Summary

This example shows how to join two Kafka streams in Flink:

  • Transactions stream (purchases) + Users stream (user details).
  • Interval Join (user_id) with a 5-minute time window.
  • Outputs enriched transactions containing amount, name, and age.

Step 4: Batching and Writing to ClickHouse with a Custom Connector

Decisions to Make:

  • Batch Size: Optimize batch size to balance latency and throughput.
  • Insert Strategy: Use ClickHouse’s INSERT INTO ... VALUES or INSERT INTO ... SELECT.
  • Error Handling: Implement retry mechanisms.

Implementation example:

  • 1. Create a Flink Sink to ClickHouse

  • 2. Use Flink’s windowAll for Efficient Batching


Overview of maintenance efforts for Flink

After setting up Flink for stream processing to ClickHouse, users must handle ongoing maintenance efforts to ensure smooth operation and high performance. Users often need to monitor and perform extensive maintenance after setting up the system to ensure it runs smoothly. Here are the typical areas of maintenance:

Kafka Consumer Management:

The consumer lag (the time between Kafka writes and Flink consumes) can become an issue. Data can be lost or duplicated, especially when Flink processes too slowly. Usually, you need to monitor the lag closely so you can react early enough.

Flink Job Stability & Failures:

It can always happen that the Flink jobs fail, and data gets lost or duplicated. Usually, you see that when the checkpointing settings are set incorrectly. Checkpointing is a concept that works based on snapshots taken so that the system can recover from that point on. As a user, you would, for example, need to decide where the checkpoints are stored (filesystem, database, etc.).

Debugging Challenges for Python Users in Flink:

One major challenge for Python users working with Flink is that debugging often requires working in Java. While Flink provides a PyFlink API, much of its underlying execution framework, including error messages, logs, and internal failures, are Java-based. This means that when a job fails or a checkpoint issue arises, Python users often need to dive into Java stack and Flinks mechanics to troubleshoot the problem. This creates a steep learning curve, requiring users to understand FJava runtime, memory management, and error handling, even if they primarily work in Python. As a result, debugging and maintaining Flink jobs can be complex and time-consuming for teams that rely on Python for data processing.

Deduplication Handling:

It is highly recommended that old duplicate entries be cleaned up. This way, you can ensure that memory is not exhausted. Another issue that can become a maintenance task is the event timestamps. If they are set up incorrectly, the duplicates can grow quickly.

JOIN Performance Tuning:

It is always recommended that JOIN performance is monitored. Especially when using stateful JOINs, the memory can be exhausted quite quickly. Usually, what users do is consider broadcast joins on small data sets. So basically, the join is broadcasted to all task nodes and looks for the most efficient small table to keep high performance.

ClickHouse Write Performance:

When monitoring the write performance, certain things can happen that will impact the latency.

  • If batches are too small → high insert overhead.
  • If partitions are incorrect → slow queries.
  • If ClickHouse is under high load → backpressure in Flink.

It is always recommended that disk usage be monitored and MergeTree tables are compacted.

Resource Monitoring & Scaling:

Always ensuring enough memory is available for Flink jobs will help you avoid building up a big backlog and exhausting the system. The same goes for the Kafka producer side, which shouldn’t be overloaded.

Summary of Flink usage for Stream Processing of Kafka Events to ClickHouse

You can get very far with that setup, but as you can see, there are several challenges that you would need to master.

  1. The initial setup takes a lot of effort. If you are not experienced with Flink, you will have a long ramp-up time.
  2. Certain decisions, such as Batch Sizes, Windowing Strategy, Parallelism, etc., must be made and can greatly impact the pipeline's performance. Setting the wrong configurations can become a risky investment.
  3. Fine-tuning is a task that will always come up depending on how dynamic your data volume is. The effort can become very high.
  4. Maintaining the setup can become a big effort as you must monitor each piece of the system (memory usage, latency, duplications, etc.) to ensure it runs smoothly. I have seen several teams spending hours daily debugging the system.

OK, now you understand that Flink is coming with its own challenges. Now the question is, how does GlassFlow solve duplications and JOINs in a better way? Check out our next article to learn the details of our approach.


Notify me when repo is available


Next part: Part 5: How GlassFlow will solve Duplications and JOINs for ClickHouse

Part 4: Can Apache Flink be the solution?

Get started today

Reach out and we show you how GlassFlow interacts with your existing data stack.

Book a demo