Job Builder
The Job
builder lets you create a job pipeline programmatically using Talend components
(Producers and Processors).
The job pipeline is an acyclic graph, allowing you to build complex pipelines.
Let’s take a simple use case where two data sources (employee and salary) are formatted to CSV and the result is written to a file.
A job is defined based on components (nodes) and links (edges) to connect their branches together.
Every component is defined by a unique id
and an URI that identify the component.
The URI follows the form [family]://[component][?version][&configuration]
, where:
-
family is the name of the component family.
-
component is the name of the component.
-
version is the version of the component. It is represented in a key=value format. The key is
__version
and the value is a number. -
configuration is component configuration. It is represented in a key=value format. The key is the path of the configuration and the value is a `string' corresponding to the configuration value.
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
configuration parameters must be URI/URL encoded. |
Job.components() (1)
.component("employee","db://input")
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.component("csv", "file://out?__version=2")
.connections() (2)
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.from("concat").to("csv")
.build() (3)
.run(); (4)
1 | Defining all components used in the job pipeline. |
2 | Defining the connections between the components to construct the job pipeline. The links from /to use the component id and the default input/output branches.You can also connect a specific branch of a component, if it has multiple or named input/output branches, using the methods from(id, branchName) and to(id, branchName) .In the example above, the concat component has two inputs ("string1" and "string2"). |
3 | Validating the job pipeline by asserting that:
|
4 | Running the job pipeline. |
In this version, the execution of the job is linear. Components are not executed in parallel even if some steps may be independents. |
Environment/Runner
Depending on the configuration, you can select the environment which you execute your job in.
To select the environment, the logic is the following one:
-
If an
org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilder
class is passed through the job properties, then use it. The supported types are aExecutionBuilder
instance, aClass
or aString
. -
if an
ExecutionBuilder
SPI is present, use it. It is the case ifcomponent-runtime-beam
is present in your classpath. -
else, use a local/standalone execution.
In the case of a Beam execution, you can customize the pipeline options using system properties. They have to be prefixed with talend.beam.job.
. For example, to set the appName
option, you need to use -Dtalend.beam.job.appName=mytest
.
Key Provider
The job builder lets you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job.
Job.components()
.component("employee","db://input")
.property(GroupKeyProvider.class.getName(),
(GroupKeyProvider) context -> context.getData().getString("id")) (1)
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.connections()
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.build()
.property(GroupKeyProvider.class.getName(), (2)
(GroupKeyProvider) context -> context.getData().getString("employee_id"))
.run();
1 | Defining a key provider for the data produced by the employee component. |
2 | Defining a key provider for all data manipulated in the job. |
If the incoming data has different IDs, you can provide a complex global key provider that relies on the context given by the component id
and the branch name
.
GroupKeyProvider keyProvider = context -> {
if ("employee".equals(context.getComponentId())) {
return context.getData().getString("id");
}
return context.getData().getString("employee_id");
};
Beam case
For Beam case, you need to rely on Beam pipeline definition and use the component-runtime-beam
dependency, which provides Beam bridges.
Inputs and Outputs
org.talend.sdk.component.runtime.beam.TalendIO
provides a way to convert a partition mapper or a processor to an input or processor using the read
or write
methods.
public class Main {
public static void main(final String[] args) {
final ComponentManager manager = ComponentManager.instance()
Pipeline pipeline = Pipeline.create();
//Create beam input from mapper and apply input to pipeline
pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
put("fileprefix", "input");
}}).get()))
.apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
//Create beam processor from talend processor and apply to pipeline
.apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
put("fileprefix", "output");
}}).get(), emptyMap()));
//... run pipeline
}
}
Processors
org.talend.sdk.component.runtime.beam.TalendFn
provides the way to wrap a processor in a Beam PTransform
and to integrate it into the pipeline.
public class Main {
public static void main(final String[] args) {
//Component manager and pipeline initialization...
//Create beam PTransform from processor and apply input to pipeline
pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
//... run pipeline
}
}
The multiple inputs and outputs are represented by a Map
element in Beam case to avoid using multiple inputs and outputs.
You can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output format to the record format representing the multiple inputs/output, like Map<String, List<?>> , but materialized as a JsonObject . Input data must be of the JsonObject type in this case.
|
Converting a Beam.io into a component I/O
For simple inputs and outputs, you can get an automatic and transparent conversion of the Beam.io into an I/O component, if you decorated your PTransform
with @PartitionMapper
or @Processor
.
However, there are limitations:
-
Inputs must implement
PTransform<PBegin, PCollection<?>>
and must be aBoundedSource
. -
Outputs must implement
PTransform<PCollection<?>, PDone>
and register aDoFn
on the inputPCollection
.
For more information, see the How to wrap a Beam I/O page.