Job Builder
The Job builder lets you create a job pipeline programmatically using Talend components (Producers and Processors). The job pipeline is an acyclic graph, allowing you to build complex pipelines.
Let’s take a simple use case where two data sources (employee and salary) are formatted to CSV and the result is written to a file.
A job is defined based on components (nodes) and links (edges) to connect their branches together.
Every component is defined by a unique id and an URI that identify the component.
The URI follows the form [family]://[component][?version][&configuration], where:
- 
family is the name of the component family.
 - 
component is the name of the component.
 - 
version is the version of the component. It is represented in a key=value format. The key is
__versionand the value is a number. - 
configuration is component configuration. It is represented in a key=value format. The key is the path of the configuration and the value is a `string' corresponding to the configuration value.
 
URI example:
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
| configuration parameters must be URI/URL encoded. | 
Job example:
Job.components()   (1)
        .component("employee","db://input")
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
        .component("csv", "file://out?__version=2")
    .connections()  (2)
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
        .from("concat").to("csv")
    .build()    (3)
    .run(); (4)
| 1 | Defining all components used in the job pipeline. | 
| 2 | Defining the connections between the components to construct the job pipeline. The links from/to use the component id and the default input/output branches.You can also connect a specific branch of a component, if it has multiple or named input/output branches, using the methods from(id, branchName) and to(id, branchName).In the example above, the concat component has two inputs ("string1" and "string2").  | 
| 3 | Validating the job pipeline by asserting that:
  | 
| 4 | Running the job pipeline. | 
| In this version, the execution of the job is linear. Components are not executed in parallel even if some steps may be independents. | 
Environment/Runner
Depending on the configuration, you can select the environment which you execute your job in.
To select the environment, the logic is the following one:
- 
If an
org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilderclass is passed through the job properties, then use it. The supported types are anExecutionBuilderinstance, aClassor aString. - 
If an
ExecutionBuilderSPI is present, use it. It is the case ifcomponent-runtime-beamis present in your classpath. - 
Else, use a local/standalone execution.
 
In the case of a Beam execution, you can customize the pipeline options using system properties. They have to be prefixed with talend.beam.job.. For example, to set the appName option, you need to use -Dtalend.beam.job.appName=mytest.
Key Provider
The job builder lets you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job.
Job.components()
        .component("employee","db://input")
            .property(GroupKeyProvider.class.getName(),
                 (GroupKeyProvider) context -> context.getData().getString("id")) (1)
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
    .connections()
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
    .build()
    .property(GroupKeyProvider.class.getName(), (2)
                 (GroupKeyProvider) context -> context.getData().getString("employee_id"))
    .run();
| 1 | Defining a key provider for the data produced by the employee component. | 
| 2 | Defining a key provider for all data manipulated in the job. | 
If the incoming data has different IDs, you can provide a complex global key provider that relies on the context given by the component id and the branch name.
GroupKeyProvider keyProvider = context -> {
    if ("employee".equals(context.getComponentId())) {
        return context.getData().getString("id");
    }
    return context.getData().getString("employee_id");
};
Beam case
For Beam case, you need to rely on Beam pipeline definition and use the component-runtime-beam dependency, which provides Beam bridges.
Inputs and Outputs
org.talend.sdk.component.runtime.beam.TalendIO provides a way to convert a partition mapper or a processor to an input or processor using the read or write methods.
public class Main {
    public static void main(final String[] args) {
        final ComponentManager manager = ComponentManager.instance()
        Pipeline pipeline = Pipeline.create();
        //Create beam input from mapper and apply input to pipeline
        pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
                    put("fileprefix", "input");
                }}).get()))
                .apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
        //Create beam processor from talend processor and apply to pipeline
                .apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
                    put("fileprefix", "output");
                }}).get(), emptyMap()));
        //... run pipeline
    }
}
Processors
org.talend.sdk.component.runtime.beam.TalendFn provides the way to wrap a processor in a Beam PTransform and to integrate it into the pipeline.
public class Main {
    public static void main(final String[] args) {
        //Component manager and pipeline initialization...
        //Create beam PTransform from processor and apply input to pipeline
        pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());
        //... run pipeline
    }
}
The multiple inputs and outputs are represented by a Map element in Beam case to avoid using multiple inputs and outputs.
You can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output format to the record format representing the multiple inputs/output, like Map<String, List<?>>, but materialized as a Record. Input data must be of the Record type in this case.
 | 
Converting a Beam.io into a component I/O
For simple inputs and outputs, you can get an automatic and transparent conversion of the Beam.io into an I/O component, if you decorated your PTransform with @PartitionMapper or @Processor.
However, there are limitations:
- 
Inputs must implement
PTransform<PBegin, PCollection<?>>and must be aBoundedSource. - 
Outputs must implement
PTransform<PCollection<?>, PDone>and register aDoFnon the inputPCollection. 
For more information, see the How to wrap a Beam I/O page.