Job Builder
The Job
builder let you create a job pipeline programmatically using Talend components
(Producers and Processors).
The job pipeline is an acyclic graph, so you can built complex pipelines.
Let’s take a simple use case where we will have 2 data source (employee and salary) that we will format to csv and write the result to a file.
Job.components() (1)
.component("employee","db://input")
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.component("csv", "file://out?__version=2")
.connections() (2)
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.from("concat").to("csv")
.build() (3)
.run(); (4)
1 | We define all the components that will be used in the job pipeline.
Every component is defined by an unique id and an URI that identify the component. |
The URI follow the form : [family]://[component][?version][&configuration]
-
family: the name of the component family
-
component: the name of the component
-
version : the version of the component, it’s represented in a key=value format. where the key is
__version
and the value is a number. -
configuration: here you can provide the component configuration as key=value tuple. where the key is the path of the configuration and the value is the configuration value in string format.
EXAMPLE: "job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
configuration parameters must be URI/URL encoded. |
1 | Then, we define the connections between the components to construct the job pipeline.
the links from → to use the component id and the default input/output branches. |
You can also connect a specific branch of a component if it has multiple or named inputs/outputs branches
using the methods from(id, branchName)
→ to(id, branchName)
.
In the example above, the concat component have to inputs (string1 and string2).
1 | In this step, we validate the job pipeline by asserting that :
|
2 | We run the job pipeline. |
In this version, the execution of the job is linear. the component are not executed in parallel even if some steps may be independents. |
Environment/Runner
Depending the configuration you can select which environment you execute your job in.
To select the environment the logic is the following one:
-
if an
org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilder
is passed through the job properties then use it (supported type are aExecutionBuilder
instance, aClass
or aString
). -
if an
ExecutionBuilder
SPI is present then use it (it is the case ifcomponent-runtime-beam
is present in your classpath). -
else just use a local/standalone execution.
In the case of a Beam execution you can customize the pipeline options using system properties. They have to be prefixed
by talend.beam.job.
. For instance to set appName
option you will set -Dtalend.beam.job.appName=mytest
.
Key Provider
The job builder let you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job
Job.components()
.component("employee","db://input")
.property(GroupKeyProvider.class.getName(),
(GroupKeyProvider) context -> context.getData().getString("id")) (1)
.component("salary", "db://input")
.component("concat", "transform://concat?separator=;")
.connections()
.from("employee").to("concat", "string1")
.from("salary").to("concat", "string2")
.build()
.property(GroupKeyProvider.class.getName(), (2)
(GroupKeyProvider) context -> context.getData().getString("employee_id"))
.run();
1 | Here we have defined a key provider for the data produced by the component employee |
2 | Here we have defined a key provider for all the data manipulated in this job. |
If the incoming data has different ids you can provide a complex global key provider relaying on the context that give you the component id
and the branch Name
.
GroupKeyProvider keyProvider = context -> {
if ("employee".equals(context.getComponentId())) {
return context.getData().getString("id");
}
return context.getData().getString("employee_id");
};
Beam case
For beam case, you need to rely on beam pipeline definition and use component-runtime-beam
dependency which provides Beam bridges.
I/O
org.talend.sdk.component.runtime.beam.TalendIO
provides a way to convert a partition mapper or a processor to an input
or processor
using the read
or write
methods.
public class Main {
public static void main(final String[] args) {
final ComponentManager manager = ComponentManager.instance()
Pipeline pipeline = Pipeline.create();
//Create beam input from mapper and apply input to pipeline
pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
put("fileprefix", "input");
}}).get()))
.apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
//Create beam processor from talend processor and apply to pipeline
.apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
put("fileprefix", "output");
}}).get(), emptyMap()));
//... run pipeline
}
}
Processors
org.talend.sdk.component.runtime.beam.TalendFn
provides the way to wrap a processor in a Beam PTransform
and integrate
it in the pipeline.
public class Main {
public static void main(final String[] args) {
//Component manager and pipeline initialization...
//Create beam PTransform from processor and apply input to pipeline
pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
//... run pipeline
}
}
The multiple inputs/outputs are represented by a Map
element in beam case to avoid to use multiple inputs/outputs.
you can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output
format to the record format representing the multiple inputs/output, so a kind of Map<String, List<?>> ,
but materialized as a JsonObject . Input data must be of type JsonObject in this case.
|
Deployment
Beam serializing components it is crucial to add component-runtime-standalone dependency to the project. It will take
care of providing an implicit and lazy ComponentManager managing the component in a fatjar case.
|
Convert a Beam.io in a component I/O
For simple I/O you can get automatic conversion of the Beam.io to a component I/O transparently if you decorated your PTransform
with @PartitionMapper
or @Processor
.
The limitation are:
-
Inputs must implement
PTransform<PBegin, PCollection<?>>
and must be aBoundedSource
. -
Outputs must implement
PTransform<PCollection<?>, PDone>
and just register on the inputPCollection
aDoFn
.
More information on that topic on How to wrap a Beam I/O page.