Implementing streaming on a component

By default, input components are designed to receive a one-time batch of data to process. By enabling the streaming mode, you can instead set your component to process a continuous incoming flow of data.

When streaming is enabled on an input component, the component tries to pull data from its producer. When no data is pulled, it waits for a defined period of time before trying to pull data again, and so on. This period of time between tries is defined by a strategy.

This document explains how to configure this strategy and the cases where it can fit your needs.

Choosing between batch and streaming

Before enabling streaming on your component, make sure that it fits the scope and requirements of your project and that regular batch processing cannot be used instead.

Streaming is designed to help you dealing with real-time or near real-time data processing cases, and should be used only for such cases. Enabling streaming will impact the performance when processing batches of data.

Enabling streaming from the Component Kit starter

You can enable streaming right from the design phase of the project by enabling the Stream toggle in the basic configuration of your future component in the Component Kit Starter.

Doing so adds a default streaming-ready configuration to your component when generating the project.
This default configuration implements a constant pause duration of 500 ms between retries, with no limit of retries.

Stream Toggle

Enabling limitations (stop conditions) for Streaming components

Without any configuration, streaming components have an infinite lifecycle and will never stop. Sometimes, you may need to stop component after a certain amount of records read or time elapsed.

You can add configuration that helps you to stop the data reading in your input component when it reaches required limitations. To enable it you need to set true in PartitionMapper#stoppable. An important condition is that PartitionMapper#infinite should also be true.

Here’s a sample code:

@PartitionMapper(name = "Input",
                 infinite = true, (1)
                 stoppable = true) (2)
public class YourPartitionMapper {
  ...
}
1 Define your component as a streaming component.
2 Define that your component may be stopped under conditions.

There are two reading stop conditions (can be combined):

  • maxDurationMs : stop after n milliseconds elapsed.

  • maxRecords : stop after n records read.

See next subsections for configuring those values.

If you choose to use a stoppable streaming component, you will have certainly to adapt your code according the backend technology and how to read values. For instance, if in your component you read 100 values at once and the maxRecords value is 50, you may lose 50 values (if the 100 values were acknowledged).

In that case, to build a correct strategy in your component, you can access to stop condition values. To access these informations use the @PostConstruct method with @Option annotation in your Emitter class.

Available options:

  • Option.MAX_DURATION_PARAMETER : reflects the maxDurationMs parameter.

  • Option.MAX_RECORDS_PARAMETER : reflects the maxRecords parameter.

Here a code sample:

@Version
@PartitionMapper(name = "Input", infinite = true, stoppable = true)
public class YourPartitionMapper {
    // partition mapper code

    @Emitter
    public YourEmitter createWorker() {
        return new YourEmitter(configuration);
    }
}

public class YourEmitter {

    @PostConstruct
    public void init(@Option(Option.MAX_DURATION_PARAMETER) long maxDurationMs, @Option(Option.MAX_RECORDS_PARAMETER) long maxRecords ) {
    	// connector's specific logic here
    }
    // other component's code
}

Configuring streaming stop strategy during job design time

If your streaming connector (infinite=true) is defined with the stoppable=true, you will have a design time UI for specifying stop strategy:

  • Cloud

Stream Toggle

  • Studio

Stream Toggle

By default, in the setting those values are set to -1. It means infinity behavior.

Other ways for configuring streaming stop strategy

At runtime, you can set system properties to apply the strategy. You need to prefix properties with the component’s family.

  • <family>.talend.input.streaming.maxRecords

  • <family>.talend.input.streaming.maxDurationMs

You can also use the LocalConfiguration (see next section) with the following properties:

  • talend.input.streaming.maxRecords

  • talend.input.streaming.maxDurationMs

Configuring streaming from the project

If streaming was not enabled at all during the project generation or if you need to implement a more specific configuration, you can change the default settings according to your needs:

  1. Add the infinite=true parameter to your component class.

  2. Define the number of retries allowed in the component family LocalConfiguration, using the talend.input.streaming.retry.maxRetries parameter. It is set by default to Integer.MAX_VALUE.

  3. Define the pausing strategy between retries in the component family LocalConfiguration, using the talend.input.streaming.retry.strategy parameter. Possible values are:

    • constant (default). It sets a constant pause duration between retries.

    • exponential. It sets an exponential backoff pause duration.

      See the tables below for more details about each strategy.

Constant strategy

Parameter Description Default value

talend.input.streaming.retry.constant.timeout

Pause duration for the constant strategy, in ms.

500

Exponential strategy

Parameter Description Default value

talend.input.streaming.retry.exponential.exponent

Exponent of the exponential calculation.

1.5

talend.input.streaming.retry.exponential.randomizationFactor

Randomization factor used in the calculation.

0.5

talend.input.streaming.retry.exponential.maxDuration

Maximum pausing duration between two retries.

5*60*1000 (5 minutes)

talend.input.streaming.retry.exponential.initialBackOff

Initial backoff value.

1000 (1 second)

The values of these parameters are then used in the following calculations to determine the exact pausing duration between two retries.

For more clarity in the formulas below, parameter names have been replaced with variables.

First, the current interval duration is calculated:

\$A = min(B xx E^I, F)\$

Where:

  • A: currentIntervalMillis

  • B: initialBackOff

  • E: exponent

  • I: current number of retries

  • F: maxDuration

Then, from the current interval duration, the next interval duration is calculated:

\$D = min(F, A + ((R xx 2-1) xx C xx A))\$

Where:

  • D: nextBackoffMillis

  • F: maxDuration

  • A: currentIntervalMillis

  • R: random

  • C: randomizationFactor

Scroll to top