Job Builder
The Job
builder let you create a job pipeline programmatically using Talend components
(Producers and Processors).
The job pipeline is an acyclic graph, so you can built complex pipelines.
Let’s take a simple use case where we will have 2 data source (employee and salary) that we will format to csv and write the result to a file.
A job is defined based on components (nodes) and links (edges) to connect their branches together.
Every component is defined by an unique id
and an URI that identify the component.
The URI follow the form : [family]://[component][?version][&configuration]
-
family: the name of the component family
-
component: the name of the component
-
version : the version of the component, it’s represented in a key=value format. where the key is
__version
and the value is a number. -
configuration: here you can provide the component configuration as key=value tuple where the key is the path of the configuration and the value is the configuration value in string format.
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
configuration parameters must be URI/URL encoded. |
Here is a more concrete job example:
Job.components() (1)
.component("employee","db://input")
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.component("csv", "file://out?__version=2")
.connections() (2)
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.from("concat").to("csv")
.build() (3)
.run(); (4)
1 | We define all the components that will be used in the job pipeline. |
2 | Then, we define the connections between the components to construct the job pipeline.
the links from → to use the component id and the default input/output branches.
You can also connect a specific branch of a component if it has multiple or named inputs/outputs branches
using the methods from(id, branchName) → to(id, branchName) .
In the example above, the concat component have to inputs (string1 and string2). |
3 | In this step, we validate the job pipeline by asserting that :
|
4 | We run the job pipeline. |
In this version, the execution of the job is linear. the component are not executed in parallel even if some steps may be independents. |
Environment/Runner
Depending the configuration you can select which environment you execute your job in.
To select the environment the logic is the following one:
-
if an
org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilder
is passed through the job properties then use it (supported type are aExecutionBuilder
instance, aClass
or aString
). -
if an
ExecutionBuilder
SPI is present then use it (it is the case ifcomponent-runtime-beam
is present in your classpath). -
else just use a local/standalone execution.
In the case of a Beam execution you can customize the pipeline options using system properties. They have to be prefixed
by talend.beam.job.
. For instance to set appName
option you will set -Dtalend.beam.job.appName=mytest
.
Key Provider
The job builder let you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job
Job.components()
.component("employee","db://input")
.property(GroupKeyProvider.class.getName(),
(GroupKeyProvider) context -> context.getData().getString("id")) (1)
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.connections()
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.build()
.property(GroupKeyProvider.class.getName(), (2)
(GroupKeyProvider) context -> context.getData().getString("employee_id"))
.run();
1 | Here we have defined a key provider for the data produced by the component employee |
2 | Here we have defined a key provider for all the data manipulated in this job. |
If the incoming data has different ids you can provide a complex global key provider relaying on the context that give you the component id
and the branch Name
.
GroupKeyProvider keyProvider = context -> {
if ("employee".equals(context.getComponentId())) {
return context.getData().getString("id");
}
return context.getData().getString("employee_id");
};
Beam case
For beam case, you need to rely on beam pipeline definition and use component-runtime-beam
dependency which provides Beam bridges.
I/O
org.talend.sdk.component.runtime.beam.TalendIO
provides a way to convert a partition mapper or a processor to an input
or processor
using the read
or write
methods.
public class Main {
public static void main(final String[] args) {
final ComponentManager manager = ComponentManager.instance()
Pipeline pipeline = Pipeline.create();
//Create beam input from mapper and apply input to pipeline
pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
put("fileprefix", "input");
}}).get()))
.apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
//Create beam processor from talend processor and apply to pipeline
.apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
put("fileprefix", "output");
}}).get(), emptyMap()));
//... run pipeline
}
}
Processors
org.talend.sdk.component.runtime.beam.TalendFn
provides the way to wrap a processor in a Beam PTransform
and integrate
it in the pipeline.
public class Main {
public static void main(final String[] args) {
//Component manager and pipeline initialization...
//Create beam PTransform from processor and apply input to pipeline
pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
//... run pipeline
}
}
The multiple inputs/outputs are represented by a Map
element in beam case to avoid to use multiple inputs/outputs.
you can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output
format to the record format representing the multiple inputs/output, so a kind of Map<String, List<?>> ,
but materialized as a JsonObject . Input data must be of type JsonObject in this case.
|
Deployment
Beam serializing components it is crucial to add component-runtime-standalone dependency to the project. It will take
care of providing an implicit and lazy ComponentManager managing the component in a fatjar case.
|
Convert a Beam.io in a component I/O
For simple I/O you can get automatic conversion of the Beam.io to a component I/O transparently if you decorated your PTransform
with @PartitionMapper
or @Processor
.
The limitation are:
-
Inputs must implement
PTransform<PBegin, PCollection<?>>
and must be aBoundedSource
. -
Outputs must implement
PTransform<PCollection<?>, PDone>
and just register on the inputPCollection
aDoFn
.
More information on that topic on How to wrap a Beam I/O page.