Creating a job pipeline

Job Builder

The Job builder lets you create a job pipeline programmatically using Talend components (Producers and Processors). The job pipeline is an acyclic graph, allowing you to build complex pipelines.

Let’s take a simple use case where two data sources (employee and salary) are formatted to CSV and the result is written to a file.

A job is defined based on components (nodes) and links (edges) to connect their branches together.

Every component is defined by a unique id and an URI that identify the component.

The URI follows the form [family]://[component][?version][&configuration], where:

  • family is the name of the component family.

  • component is the name of the component.

  • version is the version of the component. It is represented in a key=value format. The key is __version and the value is a number.

  • configuration is component configuration. It is represented in a key=value format. The key is the path of the configuration and the value is a `string' corresponding to the configuration value.

URI example
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
configuration parameters must be URI/URL encoded.
Job example
Job.components()   (1)
        .component("employee","db://input")
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
        .component("csv", "file://out?__version=2")
    .connections()  (2)
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
        .from("concat").to("csv")
    .build()    (3)
    .run(); (4)
1 Defining all components used in the job pipeline.
2 Defining the connections between the components to construct the job pipeline. The links from/to use the component id and the default input/output branches.
You can also connect a specific branch of a component, if it has multiple or named input/output branches, using the methods from(id, branchName) and to(id, branchName).
In the example above, the concat component has two inputs ("string1" and "string2").
3 Validating the job pipeline by asserting that:
  • It has some starting components (components that don’t have a from connection and that need to be of the producer type).

  • There are no cyclic connections. The job pipeline needs to be an acyclic graph.

  • All components used in the connections are already declared.

  • Each connection is used only once. You cannot connect a component input/output branch twice.

4 Running the job pipeline.
In this version, the execution of the job is linear. Components are not executed in parallel even if some steps may be independents.

Environment/Runner

Depending on the configuration, you can select the environment which you execute your job in.

To select the environment, the logic is the following one:

  1. If an org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilder class is passed through the job properties, then use it. The supported types are a ExecutionBuilder instance, a Class or a String.

  2. if an ExecutionBuilder SPI is present, use it. It is the case if component-runtime-beam is present in your classpath.

  3. else, use a local/standalone execution.

In the case of a Beam execution, you can customize the pipeline options using system properties. They have to be prefixed with talend.beam.job.. For example, to set the appName option, you need to use -Dtalend.beam.job.appName=mytest.

Key Provider

The job builder lets you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job.

Job.components()
        .component("employee","db://input")
            .property(GroupKeyProvider.class.getName(),
                 (GroupKeyProvider) context -> context.getData().getString("id")) (1)
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
    .connections()
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
    .build()
    .property(GroupKeyProvider.class.getName(), (2)
                 (GroupKeyProvider) context -> context.getData().getString("employee_id"))
    .run();
1 Defining a key provider for the data produced by the employee component.
2 Defining a key provider for all data manipulated in the job.

If the incoming data has different IDs, you can provide a complex global key provider that relies on the context given by the component id and the branch name.

GroupKeyProvider keyProvider = context -> {
    if ("employee".equals(context.getComponentId())) {
        return context.getData().getString("id");
    }
    return context.getData().getString("employee_id");
};

Beam case

For Beam case, you need to rely on Beam pipeline definition and use the component-runtime-beam dependency, which provides Beam bridges.

Inputs and Outputs

org.talend.sdk.component.runtime.beam.TalendIO provides a way to convert a partition mapper or a processor to an input or processor using the read or write methods.

public class Main {
    public static void main(final String[] args) {
        final ComponentManager manager = ComponentManager.instance()
        Pipeline pipeline = Pipeline.create();
        //Create beam input from mapper and apply input to pipeline
        pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
                    put("fileprefix", "input");
                }}).get()))
                .apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
        //Create beam processor from talend processor and apply to pipeline
                .apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
                    put("fileprefix", "output");
                }}).get(), emptyMap()));

        //... run pipeline
    }
}

Processors

org.talend.sdk.component.runtime.beam.TalendFn provides the way to wrap a processor in a Beam PTransform and to integrate it into the pipeline.

public class Main {
    public static void main(final String[] args) {
        //Component manager and pipeline initialization...

        //Create beam PTransform from processor and apply input to pipeline
        pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());

        //... run pipeline
    }
}

The multiple inputs and outputs are represented by a Map element in Beam case to avoid using multiple inputs and outputs.

You can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output format to the record format representing the multiple inputs/output, like Map<String, List<?>>, but materialized as a JsonObject. Input data must be of the JsonObject type in this case.

Converting a Beam.io into a component I/O

For simple inputs and outputs, you can get an automatic and transparent conversion of the Beam.io into an I/O component, if you decorated your PTransform with @PartitionMapper or @Processor.

However, there are limitations:

  • Inputs must implement PTransform<PBegin, PCollection<?>> and must be a BoundedSource.

  • Outputs must implement PTransform<PCollection<?>, PDone> and register a DoFn on the input PCollection.

For more information, see the How to wrap a Beam I/O page.

Scroll to top