Job Builder$
The Job builder lets you create a job pipeline programmatically using Talend components (Producers and Processors). The job pipeline is an acyclic graph, allowing you to build complex pipelines.
Let’s take a simple use case where two data sources (employee and salary) are formatted to CSV and the result is written to a file.
A job is defined based on components (nodes) and links (edges) to connect their branches together.
Every component is defined by a unique id
and an URI that identify the component.
The URI follows the form [family]://[component][?version][&configuration]
, where:
family is the name of the component family.
component is the name of the component.
version is the version of the component. It is represented in a key=value format. The key is
and the value is a number. -
configuration is component configuration. It is represented in a key=value format. The key is the path of the configuration and the value is a `string' corresponding to the configuration value.
URI example:
configuration parameters must be URI/URL encoded. |
Job example:
Job.components() (1)
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.component("csv", "file://out?__version=2")
.connections() (2)
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.build() (3)
.run(); (4)
1 | Defining all components used in the job pipeline. |
2 | Defining the connections between the components to construct the job pipeline. The links from /to use the component id and the default input/output branches.You can also connect a specific branch of a component, if it has multiple or named input/output branches, using the methods from(id, branchName) and to(id, branchName) .In the example above, the concat component has two inputs ("string1" and "string2"). |
3 | Validating the job pipeline by asserting that:
4 | Running the job pipeline. |
In this version, the execution of the job is linear. Components are not executed in parallel even if some steps may be independents. |
Depending on the configuration, you can select the environment which you execute your job in.
To select the environment, the logic is the following one:
If an
class is passed through the job properties, then use it. The supported types are anExecutionBuilder
instance, aClass
or aString
. -
If an
SPI is present, use it. It is the case ifcomponent-runtime-beam
is present in your classpath. -
Else, use a local/standalone execution.
In the case of a Beam execution, you can customize the pipeline options using system properties. They have to be prefixed with talend.beam.job.
. For example, to set the appName
option, you need to use -Dtalend.beam.job.appName=mytest
Key Provider$
The job builder lets you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job.
(GroupKeyProvider) context -> context.getData().getString("id")) (1)
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.property(GroupKeyProvider.class.getName(), (2)
(GroupKeyProvider) context -> context.getData().getString("employee_id"))
1 | Defining a key provider for the data produced by the employee component. |
2 | Defining a key provider for all data manipulated in the job. |
If the incoming data has different IDs, you can provide a complex global key provider that relies on the context given by the component id
and the branch name
GroupKeyProvider keyProvider = context -> {
if ("employee".equals(context.getComponentId())) {
return context.getData().getString("id");
return context.getData().getString("employee_id");
Beam case$
For Beam case, you need to rely on Beam pipeline definition and use the component-runtime-beam
dependency, which provides Beam bridges.
Inputs and Outputs$
provides a way to convert a partition mapper or a processor to an input or processor using the read
or write
public class Main {
public static void main(final String[] args) {
final ComponentManager manager = ComponentManager.instance()
Pipeline pipeline = Pipeline.create();
//Create beam input from mapper and apply input to pipeline
pipeline.apply("sample", "reader", 1, new HashMap<String, String>() {{
put("fileprefix", "input");
.apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
//Create beam processor from talend processor and apply to pipeline
.apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
put("fileprefix", "output");
}}).get(), emptyMap()));
//... run pipeline
provides the way to wrap a processor in a Beam PTransform
and to integrate it into the pipeline.
public class Main {
public static void main(final String[] args) {
//Component manager and pipeline initialization...
//Create beam PTransform from processor and apply input to pipeline
pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
//... run pipeline
The multiple inputs and outputs are represented by a Map
element in Beam case to avoid using multiple inputs and outputs.
You can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output format to the record format representing the multiple inputs/output, like Map<String, List<?>> , but materialized as a Record . Input data must be of the Record type in this case.
Converting a into a component I/O$
For simple inputs and outputs, you can get an automatic and transparent conversion of the into an I/O component, if you decorated your PTransform
with @PartitionMapper
or @Processor
However, there are limitations:
Inputs must implement
PTransform<PBegin, PCollection<?>>
and must be aBoundedSource
. -
Outputs must implement
PTransform<PCollection<?>, PDone>
and register aDoFn
on the inputPCollection
For more information, see the How to wrap a Beam I/O page.