Implementing streaming on a component

By default, input components are designed to receive a one-time batch of data to process. By enabling the streaming mode, you can instead set your component to process a continuous incoming flow of data.

When streaming is enabled on an input component, the component tries to pull data from its producer. When no data is pulled, it waits for a defined period of time before trying to pull data again, and so on. This period of time between tries is defined by a strategy.

This document explains how to configure this strategy and the cases where it can fit your needs.

Choosing between batch and streaming

Before enabling streaming on your component, make sure that it fits the scope and requirements of your project and that regular batch processing cannot be used instead.

Streaming is designed to help you dealing with real-time or near real-time data processing cases, and should be used only for such cases. Enabling streaming will impact the performance when processing batches of data.

Enabling streaming from the Component Kit starter

You can enable streaming right from the design phase of the project by enabling the Stream toggle in the basic configuration of your future component in the Component Kit Starter.

Doing so adds a default streaming-ready configuration to your component when generating the project.
This default configuration implements a constant pause duration of 500 ms between retries, with no limit of retries.

Stream Toggle

Enabling limitations (stoppers) for Streaming components

You can add configuration that help you to stop the data reading in your input component when it reaches required limitations: max duration and max records. To do it you need to set true in PartitionMapper#stoppable. An important condition is that PartitionMapper#infinite should also be true. You can also pass those limits inside @PostConstruct method with @Option annotation. Available options: Option.MAX_DURATION_PARAMETER, Option.MAX_RECORDS_PARAMETER. By default in the setting those values are set to -1. It means "infinity" behavior.

@Version
@PartitionMapper(name = "Input", infinite = true, stoppable = true)
public class YourPartitionMapper {
	// partition mapper code

    @Emitter
    public YourEmitter createWorker() {
        return new YourEmitter(configuration);
    }
}

public class YourEmitter {

    @PostConstruct
    public void init(@Option(Option.MAX_DURATION_PARAMETER) long maxDurationMs, @Option(Option.MAX_RECORDS_PARAMETER) long maxRecords ) {
    	// connector's specific logic here
    }
    // other component's code
}

Configuring streaming from the project

If streaming was not enabled at all during the project generation or if you need to implement a more specific configuration, you can change the default settings according to your needs:

  1. Add the infinite=true parameter to your component class.

  2. Define the number of retries allowed in the component family LocalConfiguration, using the talend.input.streaming.retry.maxRetries parameter. It is set by default to Integer.MAX_VALUE.

  3. Define the pausing strategy between retries in the component family LocalConfiguration, using the talend.input.streaming.retry.strategy parameter. Possible values are:

    • constant (default). It sets a constant pause duration between retries.

    • exponential. It sets an exponential backoff pause duration.

      See the tables below for more details about each strategy.

Constant strategy

Parameter Description Default value

talend.input.streaming.retry.constant.timeout

Pause duration for the constant strategy, in ms.

500

Exponential strategy

Parameter Description Default value

talend.input.streaming.retry.exponential.exponent

Exponent of the exponential calculation.

1.5

talend.input.streaming.retry.exponential.randomizationFactor

Randomization factor used in the calculation.

0.5

talend.input.streaming.retry.exponential.maxDuration

Maximum pausing duration between two retries.

5*60*1000 (5 minutes)

talend.input.streaming.retry.exponential.initialBackOff

Initial backoff value.

1000 (1 second)

The values of these parameters are then used in the following calculations to determine the exact pausing duration between two retries.

For more clarity in the formulas below, parameter names have been replaced with variables.

First, the current interval duration is calculated:

\$A = min(B xx E^I, F)\$

Where:

  • A: currentIntervalMillis

  • B: initialBackOff

  • E: exponent

  • I: current number of retries

  • F: maxDuration

Then, from the current interval duration, the next interval duration is calculated:

\$D = min(F, A + ((R xx 2-1) xx C xx A))\$

Where:

  • D: nextBackoffMillis

  • F: maxDuration

  • A: currentIntervalMillis

  • R: random

  • C: randomizationFactor

Scroll to top