Creating a job pipeline

Job Builder

The Job builder let you create a job pipeline programmatically using Talend components (Producers and Processors). The job pipeline is an acyclic graph, so you can built complex pipelines.

Let’s take a simple use case where we will have 2 data source (employee and salary) that we will format to csv and write the result to a file.

A job is defined based on components (nodes) and links (edges) to connect their branches together.

Every component is defined by an unique id and an URI that identify the component.

The URI follow the form : [family]://[component][?version][&configuration]

  • family: the name of the component family

  • component: the name of the component

  • version : the version of the component, it’s represented in a key=value format. where the key is __version and the value is a number.

  • configuration: here you can provide the component configuration as key=value tuple where the key is the path of the configuration and the value is the configuration value in string format.

URI Example
job://csvFileGen?__version=1&path=/temp/result.csv&encoding=utf-8"
configuration parameters must be URI/URL encoded.

Here is a more concrete job example:

Job.components()   (1)
        .component("employee","db://input")
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
        .component("csv", "file://out?__version=2")
    .connections()  (2)
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
        .from("concat").to("csv")
    .build()    (3)
    .run(); (4)
1 We define all the components that will be used in the job pipeline.
2 Then, we define the connections between the components to construct the job pipeline. the links fromto use the component id and the default input/output branches. You can also connect a specific branch of a component if it has multiple or named inputs/outputs branches using the methods from(id, branchName)to(id, branchName). In the example above, the concat component have to inputs (string1 and string2).
3 In this step, we validate the job pipeline by asserting that :
  • It has some starting components (component that don’t have a from connection and that need to be of type producer).

  • There is no cyclic connections. as the job pipeline need to be an acyclic graph.

  • All the components used in connections are already declared.

  • The connection is used only once. you can’t connect a component input/output branch twice.

4 We run the job pipeline.
In this version, the execution of the job is linear. the component are not executed in parallel even if some steps may be independents.

Environment/Runner

Depending the configuration you can select which environment you execute your job in.

To select the environment the logic is the following one:

  1. if an org.talend.sdk.component.runtime.manager.chain.Job.ExecutorBuilder is passed through the job properties then use it (supported type are a ExecutionBuilder instance, a Class or a String).

  2. if an ExecutionBuilder SPI is present then use it (it is the case if component-runtime-beam is present in your classpath).

  3. else just use a local/standalone execution.

In the case of a Beam execution you can customize the pipeline options using system properties. They have to be prefixed by talend.beam.job.. For instance to set appName option you will set -Dtalend.beam.job.appName=mytest.

Key Provider

The job builder let you set a key provider to join your data when a component has multiple inputs. The key provider can be set contextually to a component or globally to the job

Job.components()
        .component("employee","db://input")
            .property(GroupKeyProvider.class.getName(),
                 (GroupKeyProvider) context -> context.getData().getString("id")) (1)
        .component("salary", "db://input")
        .component("concat", "transform://concat?separator=;")
    .connections()
        .from("employee").to("concat", "string1")
        .from("salary").to("concat", "string2")
    .build()
    .property(GroupKeyProvider.class.getName(), (2)
                 (GroupKeyProvider) context -> context.getData().getString("employee_id"))
    .run();
1 Here we have defined a key provider for the data produced by the component employee
2 Here we have defined a key provider for all the data manipulated in this job.

If the incoming data has different ids you can provide a complex global key provider relaying on the context that give you the component id and the branch Name.

GroupKeyProvider keyProvider = context -> {
    if ("employee".equals(context.getComponentId())) {
        return context.getData().getString("id");
    }
    return context.getData().getString("employee_id");
};

Beam case

For beam case, you need to rely on beam pipeline definition and use component-runtime-beam dependency which provides Beam bridges.

I/O

org.talend.sdk.component.runtime.beam.TalendIO provides a way to convert a partition mapper or a processor to an input or processor using the read or write methods.

public class Main {
    public static void main(final String[] args) {
        final ComponentManager manager = ComponentManager.instance()
        Pipeline pipeline = Pipeline.create();
        //Create beam input from mapper and apply input to pipeline
        pipeline.apply(TalendIO.read(manager.findMapper(manager.findMapper("sample", "reader", 1, new HashMap<String, String>() {{
                    put("fileprefix", "input");
                }}).get()))
                .apply(new ViewsMappingTransform(emptyMap(), "sample")) // prepare it for the output record format (see next part)
        //Create beam processor from talend processor and apply to pipeline
                .apply(TalendIO.write(manager.findProcessor("test", "writer", 1, new HashMap<String, String>() {{
                    put("fileprefix", "output");
                }}).get(), emptyMap()));

        //... run pipeline
    }
}

Processors

org.talend.sdk.component.runtime.beam.TalendFn provides the way to wrap a processor in a Beam PTransform and integrate it in the pipeline.

public class Main {
    public static void main(final String[] args) {
        //Component manager and pipeline initialization...

        //Create beam PTransform from processor and apply input to pipeline
        pipeline.apply(TalendFn.asFn(manager.findProcessor("sample", "mapper", 1, emptyMap())).get())), emptyMap());

        //... run pipeline
    }
}

The multiple inputs/outputs are represented by a Map element in beam case to avoid to use multiple inputs/outputs.

you can use ViewsMappingTransform or CoGroupByKeyResultMappingTransform to adapt the input/output format to the record format representing the multiple inputs/output, so a kind of Map<String, List<?>>, but materialized as a JsonObject. Input data must be of type JsonObject in this case.

Deployment

Beam serializing components it is crucial to add component-runtime-standalone dependency to the project. It will take care of providing an implicit and lazy ComponentManager managing the component in a fatjar case.

Convert a Beam.io in a component I/O

For simple I/O you can get automatic conversion of the Beam.io to a component I/O transparently if you decorated your PTransform with @PartitionMapper or @Processor.

The limitation are:

  • Inputs must implement PTransform<PBegin, PCollection<?>> and must be a BoundedSource.

  • Outputs must implement PTransform<PCollection<?>, PDone> and just register on the input PCollection a DoFn.

More information on that topic on How to wrap a Beam I/O page.

Scroll to top