By default, input components are designed to receive a one-time batch of data to process. By enabling the streaming mode, you can instead set your component to process a continuous incoming flow of data.
When streaming is enabled on an input component, the component tries to pull data from its producer. When no data is pulled, it waits for a defined period of time before trying to pull data again, and so on. This period of time between tries is defined by a strategy.
This document explains how to configure this strategy and the cases where it can fit your needs.
Choosing between batch and streaming
Before enabling streaming on your component, make sure that it fits the scope and requirements of your project and that regular batch processing cannot be used instead.
Streaming is designed to help you dealing with real-time or near real-time data processing cases, and should be used only for such cases. Enabling streaming will impact the performance when processing batches of data.
Enabling streaming from the Component Kit starter
You can enable streaming right from the design phase of the project by enabling the Stream toggle in the basic configuration of your future component in the Component Kit Starter.
Doing so adds a default streaming-ready configuration to your component when generating the project.
This default configuration implements a constant pause duration of 500 ms between retries, with no limit of retries.
Enabling limitations (stoppers) for Streaming components
You can add configuration that help you to stop the data reading in your input component when it reaches required limitations: max duration and max records. To do it you need to set true in PartitionMapper#stoppable. An important condition is that PartitionMapper#infinite should also be true. You can also pass those limits inside @PostConstruct method with @Option annotation. Available options: Option.MAX_DURATION_PARAMETER, Option.MAX_RECORDS_PARAMETER. By default in the setting those values are set to -1. It means "infinity" behavior.
@Version
@PartitionMapper(name = "Input", infinite = true, stoppable = true)
public class YourPartitionMapper {
// partition mapper code
@Emitter
public YourEmitter createWorker() {
return new YourEmitter(configuration);
}
}
public class YourEmitter {
@PostConstruct
public void init(@Option(Option.MAX_DURATION_PARAMETER) long maxDurationMs, @Option(Option.MAX_RECORDS_PARAMETER) long maxRecords ) {
// connector's specific logic here
}
// other component's code
}
Configuring streaming from the project
If streaming was not enabled at all during the project generation or if you need to implement a more specific configuration, you can change the default settings according to your needs:
-
Add the
infinite=true
parameter to your component class. -
Define the number of retries allowed in the component family LocalConfiguration, using the
talend.input.streaming.retry.maxRetries
parameter. It is set by default toInteger.MAX_VALUE
. -
Define the pausing strategy between retries in the component family
LocalConfiguration
, using thetalend.input.streaming.retry.strategy
parameter. Possible values are:-
constant
(default). It sets a constant pause duration between retries. -
exponential
. It sets an exponential backoff pause duration.See the tables below for more details about each strategy.
-
Constant strategy
Parameter | Description | Default value |
---|---|---|
|
Pause duration for the |
|
Exponential strategy
Parameter | Description | Default value |
---|---|---|
|
Exponent of the exponential calculation. |
|
|
Randomization factor used in the calculation. |
|
|
Maximum pausing duration between two retries. |
|
|
Initial backoff value. |
|
The values of these parameters are then used in the following calculations to determine the exact pausing duration between two retries.
For more clarity in the formulas below, parameter names have been replaced with variables. |
First, the current interval duration is calculated:
\$A = min(B xx E^I, F)\$
Where:
-
A: currentIntervalMillis
-
B: initialBackOff
-
E: exponent
-
I: current number of retries
-
F: maxDuration
Then, from the current interval duration, the next interval duration is calculated:
\$D = min(F, A + ((R xx 2-1) xx C xx A))\$
Where:
-
D: nextBackoffMillis
-
F: maxDuration
-
A: currentIntervalMillis
-
R: random
-
C: randomizationFactor