Talend Component Getting Started

Introducing Talend Component

Talend Component intends to simplify the development of connectors at two main levels:

Runtime

how to inject the specific component code into a job or pipeline. It should unify as much as possible the code required to run in DI and BEAM environments.

Graphical interfaces

unify the code required to be able to render in a browser (web) or the eclipse based Studio (SWT).

Talend Component System Requirement

Talend Component requires Java 8. You can download it on Oracle website.

To develop a component or the project itself it is recommanded to use Apache Maven 3.5.0. you can download it on Apache Maven website.

Quick Start

For our quick start, we’ll develop three components:

  • one reading from a file and building Person records

  • one converting the Person in a User based on the incoming data and conventions

  • one writing the `User`s in a file

Before we start checking how to implement it, let’s just have a quick look to our data structure (records):

public class Person {

    private String name;

    private int age;
public class User implements Serializable {

    private String id;

    private String name;

You can note these classes are Serializable. This is mainly for BEAM case but we’ll get back on this point later.

Dependencies

To code a component the only needed dependency is the API:

<dependency>
  <groupId>org.talend.sdk.component</groupId>
  <artifactId>component-api</artifactId>
  <version>${component-api.version}</version>
</dependency>

Inputs

The inputs often require to open/close a connection and then just work on that connection.

Opening the connection can be done in a method decorated with @PostConstruct:

@PostConstruct
public void open() throws FileNotFoundException {
    reader = service.createInput(file);
}

Now we have a connection we can read records. For our simple case we’ll just read a kind of CSV file containing per row the name and age of a Person. This is done in a method dedicated to read next element which is decorated with @Producer:

@Producer
public Person readNext() throws IOException {
    final String line = reader.readLine();
    if (line == null) { // end of the data is marked with null
        return null;
    }
    final String[] info = line.split(";"); // poor csv parser
    return new Person(info[0], Integer.parseInt(info[1].trim()));
}
if you need to rely on a buffer because you load multiple data at once you can reuse org.talend.sdk.component.api.base.BufferizedProducerSupport class to implement your producer method.

Finally closing the connection once work was done (dataset was completely browsed) can be done in a method decorated with @PreDestroy:

@PreDestroy
public void close() throws IOException {
    if (reader != null) {
        reader.close();
    }
}
there is another type of input called PartitionMapper which allows to map (in the map/reduce sense) an input but it will be tackled later.

A Processor

A processor is imply a task taking an input and converting it to another one. In our case it will just take a person and convert it to a user which has a name copied form the person and an id generated from the name and age of the user.

Exactly as the input we just write you can use @PostConstruct and @PreDestroy if you need to do some initializations but a processor also has a particular pair of methods to handle the flow lifecycle: @BeforeGroup and @AfterGroup. This is the same idea but it is to mark smaller chunks of data. It is close to the idea we have of a batch but without any guarantee of the associated size.

it is highly inspired from BEAM bundles.

To get started we don’t need to be so advanced and just need to take a Person and return a User in a method. This can be done decorating a method with @ElementListener:

@ElementListener
public User map(final Person person) throws IOException {
    if (log.isDebugEnabled()) {
        log.debug("Handling {}", person);
    }
    return new User(generateId(person), person.getName());
}

An output

Now we read our Person, mapped it to a User, we just need to write back the User to another file.

To keep it simple we’ll do the symmetric operation of our input, i.e. writing a kind of CSV file representing the users.

How to implement an output then? An output is just a processor not returning any data.

Same as for our reader, we’ll define an open and close methods to handle the interaction with the output stream:

@PostConstruct
public void open() throws FileNotFoundException {
    writer = service.createOutput(file);
}
@PreDestroy
public void close() throws IOException {
    if (writer != null) {
        writer.close();
    }
}

And the method handling the write will just be an @ElementListener method returning void:

@ElementListener
public void write(final User user) throws IOException {
    writer.println(user.getId() + ";" + user.getName());
}

Registering our components

You probably noticed our components have some configuration. Typically the file related components take a File as configuration. To make it work we need to add a factory for these instances which will link the user configuration to the instances.

This is simply done through the component constructor themselves. To instantiate a component, the framework will select the constructor using the most @Option and if not the one with the most parameters.

if your component needs to be serialized (like inputs, processors) don’t forget to ensure you match the java serialization rules like for instance having a protected no-arg constructor if you have a parent which is not Object.

Each type of component has a specific marker annotation taking the name of the component as parameter to declare a component and a component method to declare the name of the component family:

  • @Emitter for producers (inputs)

  • @Processor for processors (output or not)

component is a way to group together (logically) components, for instance FlatFile can be used in component and reader/writer as name to have a group of two components related to flat files.

The constructor is responsible to do the link between the configuration and the instance of the component. The configuration is injected through its parameters. Each parameter is marked with @Option which enables you to define a name to the parameter (if not the framework will use the reflection parameter name, ie arg0, arg1, arg2, …​ if you didn’t activate the -parameters flag at compile time or it will be the actual parameter name otherwise).

The instantiation mechanism will be explained in details later but for this sample we just need to know a File is a valid parameter type.

Run the quick start example

To run the code we just created you will need the runtime dependency of the framework:

<dependency>
  <groupId>org.talend.sdk.component</groupId>
  <artifactId>component-runtime-manager</artifactId>
  <version>${talend-component-runtime.version}</version>
</dependency>

Then you have access to the ComponentManager which handles the research of the components and their instantiation. We’ll dig into the way it finds the plugins later but for our sample we just need to know that using the default constructor the calling jar (the jar containing the Main) will be considered as a plugin if it has some components.

Now we have a component manager, we just need to build our execution chain. For that purpose we can use the ExecutionChainBuilder. This builder provides a DSL enforcing you to configure the job (with a name and if the system properties can override each component configuration), then the input and finally the chain of processors. Once we have that the builder can create a Supplier<ExecutionChain> you can call immediately using get() then execute().

try (final ComponentManager manager = ComponentManager.instance()) {
    components() // from Job DSL
        .component("reader", "sample://reader?file=/tmp/input.csv")
        .component("mapper", "sample://mapper")
        .component("writer", "sample://writer?file=/tmp/output.csv")
    .connections()
        .from("reader").to("mapper")
        .from("mapper").to("writer")
    .build()
    .run();
}

Now if you create a /tmp/input.csv containing:

normal;6
marilyn;36

Then if you run the previous Main you can check there is now a /tmp/output.csv containing as expected:

a6normal;normal
a36marilyn;marilyn
Scroll to top