# Implementing streaming on a component

By default, input components are designed to receive a one-time batch of data to process. By enabling the streaming mode, you can instead set your component to process a continuous incoming flow of data.

When streaming is enabled on an input component, the component tries to pull data from its producer. When no data is pulled, it waits for a defined period of time before trying to pull data again, and so on. This period of time between tries is defined by a strategy.

This document explains how to configure this strategy and the cases where it can fit your needs.

## Choosing between batch and streaming

Before enabling streaming on your component, make sure that it fits the scope and requirements of your project and that regular batch processing cannot be used instead.

Streaming is designed to help you dealing with real-time or near real-time data processing cases, and should be used only for such cases. Enabling streaming will impact the performance when processing batches of data.

## Enabling streaming from the Component Kit starter

You can enable streaming right from the design phase of the project by enabling the Stream toggle in the basic configuration of your future component in the Component Kit Starter.

This default configuration implements a constant pause duration of 500 ms between retries, with no limit of retries.

## Enabling limitations (stop conditions) for Streaming components

Without any configuration, streaming components have an infinite lifecycle and will never stop. Sometimes, you may need to stop component after a certain amount of records read or time elapsed.

You can add configuration that helps you to stop the data reading in your input component when it reaches required limitations. To enable it you need to set `true` in `PartitionMapper#stoppable`. An important condition is that `PartitionMapper#infinite` should also be `true`.

Here’s a sample code:

``````@PartitionMapper(name = "Input",
infinite = true, (1)
stoppable = true) (2)
public class YourPartitionMapper {
...
}``````
 1 Define your component as a streaming component. 2 Define that your component may be stopped under conditions.

There are two reading stop conditions (can be combined):

• `maxDurationMs` : stop after n milliseconds elapsed.

• `maxRecords` : stop after n records read.

See next subsections for configuring those values.

 If you choose to use a `stoppable` streaming component, you will have certainly to adapt your code according the backend technology and how to read values. For instance, if in your component you read 100 values at once and the `maxRecords` value is 50, you may lose 50 values (if the 100 values were acknowledged).

In that case, to build a correct strategy in your component, you can access to stop condition values. To access these informations use the `@PostConstruct` method with `@Option` annotation in your Emitter class.

Available options:

• `Option.MAX_DURATION_PARAMETER` : reflects the `maxDurationMs` parameter.

• `Option.MAX_RECORDS_PARAMETER` : reflects the `maxRecords` parameter.

Here a code sample:

``````@Version
@PartitionMapper(name = "Input", infinite = true, stoppable = true)
public class YourPartitionMapper {
// partition mapper code

@Emitter
public YourEmitter createWorker() {
return new YourEmitter(configuration);
}
}

public class YourEmitter {

@PostConstruct
public void init(@Option(Option.MAX_DURATION_PARAMETER) long maxDurationMs, @Option(Option.MAX_RECORDS_PARAMETER) long maxRecords ) {
// connector's specific logic here
}
// other component's code
}``````

### Configuring streaming stop strategy during job design time

If your streaming connector (`infinite=true`) is defined with the `stoppable=true`, you will have a design time UI for specifying stop strategy:

• Cloud

• Studio

By default, in the setting those values are set to -1. It means infinity behavior.

### Other ways for configuring streaming stop strategy

At runtime, you can set system properties to apply the strategy. You need to prefix properties with the component’s family.

• `<family>.talend.input.streaming.maxRecords`

• `<family>.talend.input.streaming.maxDurationMs`

You can also use the LocalConfiguration (see next section) with the following properties:

• `talend.input.streaming.maxRecords`

• `talend.input.streaming.maxDurationMs`

## Configuring streaming from the project

If streaming was not enabled at all during the project generation or if you need to implement a more specific configuration, you can change the default settings according to your needs:

1. Add the `infinite=true` parameter to your component class.

2. Define the number of retries allowed in the component family LocalConfiguration, using the `talend.input.streaming.retry.maxRetries` parameter. It is set by default to `Integer.MAX_VALUE`.

3. Define the pausing strategy between retries in the component family `LocalConfiguration`, using the `talend.input.streaming.retry.strategy` parameter. Possible values are:

• `constant` (default). It sets a constant pause duration between retries.

• `exponential`. It sets an exponential backoff pause duration.

 See the tables below for more details about each strategy.

### Constant strategy

Parameter Description Default value

`talend.input.streaming.retry.constant.timeout`

Pause duration for the `constant` strategy, in ms.

`500`

### Exponential strategy

Parameter Description Default value

`talend.input.streaming.retry.exponential.exponent`

Exponent of the exponential calculation.

`1.5`

`talend.input.streaming.retry.exponential.randomizationFactor`

Randomization factor used in the calculation.

`0.5`

`talend.input.streaming.retry.exponential.maxDuration`

Maximum pausing duration between two retries.

`5*60*1000` (5 minutes)

`talend.input.streaming.retry.exponential.initialBackOff`

Initial backoff value.

`1000` (1 second)

The values of these parameters are then used in the following calculations to determine the exact pausing duration between two retries.

 For more clarity in the formulas below, parameter names have been replaced with variables.

First, the current interval duration is calculated:

A = min(B xx E^I, F)

Where:

• A: currentIntervalMillis

• B: initialBackOff

• E: exponent

• I: current number of retries

• F: maxDuration

Then, from the current interval duration, the next interval duration is calculated:

D = min(F, A + ((R xx 2-1) xx C xx A))

Where:

• D: nextBackoffMillis

• F: maxDuration

• A: currentIntervalMillis

• R: random

• C: randomizationFactor

Scroll to top