Job Builder
The Job builder let you create a job pipeline programmatically using Talend components
(Producers and Processors).
The job pipeline is an acyclic graph, so you can built complex pipelines.
Let’s take a simple use case where we will have 2 data source (employee and salary) that we will format to csv and write the result to a file.
A job is defined based on components (nodes) and links (edges) to connect their branches together.
Every component is defined by an unique id and an URI that identify the component.
The URI follow the form : [family]://[component][?version][&configuration]
- 
family: the name of the component family
 - 
component: the name of the component
 - 
version : the version of the component, it’s represented in a key=value format. where the key is
__versionand the value is a number. - 
configuration: here you can provide the component configuration as key=value tuple where the key is the path of the configuration and the value is the configuration value in string format.
 
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
| configuration parameters must be URI/URL encoded. | 
Here is a more concrete job example:
Job.components()   (1)
        .component("employee","db://input")
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
        .component("csv", "file://out?__version=2")
    .connections()  (2)
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
        .from("concat").to("csv")
    .build()    (3)
    .run(); (4)
| 1 | We define all the components that will be used in the job pipeline. | 
| 2 | Then, we define the connections between the components to construct the job pipeline.
the links from → to use the component id and the default input/output branches.
You can also connect a specific branch of a component if it has multiple or named inputs/outputs branches
using the methods from(id, branchName) → to(id, branchName).
In the example above, the concat component have to inputs (string1 and string2). | 
| 3 | In this step, we validate the job pipeline by asserting that :
  | 
| 4 | We run the job pipeline. | 
| In this version, the execution of the job is linear. the component are not executed in parallel even if some steps may be independents. | 
Environment/Runner
Depending the configuration you can select which environment you execute your job in.
To select the environment the logic is the following one:
- 
if an
org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilderis passed through the job properties then use it (supported type are aExecutionBuilderinstance, aClassor aString). - 
if an
ExecutionBuilderSPI is present then use it (it is the case ifcomponent-runtime-beamis present in your classpath). - 
else just use a local/standalone execution.
 
In the case of a Beam execution you can customize the pipeline options using system properties. They have to be prefixed
by talend.beam.job.. For instance to set appName option you will set -Dtalend.beam.job.appName=mytest.
Key Provider
The job builder let you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job
Job.components()
        .component("employee","db://input")
            .property(GroupKeyProvider.class.getName(),
                 (GroupKeyProvider) context -> context.getData().getString("id")) (1)
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
    .connections()
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
    .build()
    .property(GroupKeyProvider.class.getName(), (2)
                 (GroupKeyProvider) context -> context.getData().getString("employee_id"))
    .run();
| 1 | Here we have defined a key provider for the data produced by the component employee | 
| 2 | Here we have defined a key provider for all the data manipulated in this job. | 
If the incoming data has different ids you can provide a complex global key provider relaying on the context that give you the component id
and the branch Name.
GroupKeyProvider keyProvider = context -> {
    if ("employee".equals(context.getComponentId())) {
        return context.getData().getString("id");
    }
    return context.getData().getString("employee_id");
};
Beam case
For beam case, you need to rely on beam pipeline definition and use component-runtime-beam dependency which provides Beam bridges.
I/O
org.talend.sdk.component.runtime.beam.TalendIO provides a way to convert a partition mapper or a processor to an input
 or processor
using the read or write methods.
public class Main {
    public static void main(final String[] args) {
        final ComponentManager manager = ComponentManager.instance()
        Pipeline pipeline = Pipeline.create();
        //Create beam input from mapper and apply input to pipeline
        pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
                    put("fileprefix", "input");
                }}).get()))
                .apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
        //Create beam processor from talend processor and apply to pipeline
                .apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
                    put("fileprefix", "output");
                }}).get(), emptyMap()));
        //... run pipeline
    }
}
Processors
org.talend.sdk.component.runtime.beam.TalendFn provides the way to wrap a processor in a Beam PTransform and integrate
 it in the pipeline.
public class Main {
    public static void main(final String[] args) {
        //Component manager and pipeline initialization...
        //Create beam PTransform from processor and apply input to pipeline
        pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
        //... run pipeline
    }
}
The multiple inputs/outputs are represented by a Map element in beam case to avoid to use multiple inputs/outputs.
you can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output
format to the record format representing the multiple inputs/output, so a kind of Map<String, List<?>>,
but materialized as a JsonObject. Input data must be of type JsonObject in this case.
 | 
Convert a Beam.io in a component I/O
For simple I/O you can get automatic conversion of the Beam.io to a component I/O transparently if you decorated your PTransform
with @PartitionMapper or @Processor.
The limitation are:
- 
Inputs must implement
PTransform<PBegin, PCollection<?>>and must be aBoundedSource. - 
Outputs must implement
PTransform<PCollection<?>, PDone>and just register on the inputPCollectionaDoFn. 
More information on that topic on How to wrap a Beam I/O page.