Implementing an Output component for Hazelcast

This tutorial is the continuation of Talend Input component for Hazelcast tutorial. We will not walk through the project creation again, So please start from there before taking this one.

This tutorial shows how to create a complete working output component for Hazelcast

Defining the configurable part and the layout of the component

As seen before, in Hazelcast there is multiple data source type. You can find queues, topics, cache, maps…​

In this tutorials we will stick with the Map dataset and all what we will see here is applicable to the other types.

Let’s assume that our Hazelcast output component will be responsible of inserting data into a distributed Map. For that, we will need to know which attribute from the incoming data is to be used as a key in the map. The value will be the hole record encoded into a json format.

Bu that in mind, we can design our output configuration as: the same Datastore and Dataset from the input component and an additional configuration that will define the key attribute.

Let’s create our Output configuration class.

package org.talend.components.hazelcast;

import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.configuration.ui.layout.GridLayout;
import org.talend.sdk.component.api.meta.Documentation;import java.io.Serializable;

@GridLayout({
                @GridLayout.Row("dataset"),
                @GridLayout.Row("key")
})
@Documentation("Hazelcast output configuration")
public class HazelcastOutputConfig implements Serializable {

    @Option
    @Documentation("the hazelcast dataset")
    private HazelcastDataset dataset;

    @Option
    @Documentation("The key attribute")
    private String key;


    // Getters & Setters omitted for simplicity
    // You need to generate them
}

Let’s add the i18n properties of our configuration into the Messages.properties file

# Output config
HazelcastOutputConfig.dataset._displayName=Hazelcast dataset
HazelcastOutputConfig.key._displayName=Key attribute

Output Implementation

The skeleton of the output component looks as follows:

package org.talend.components.hazelcast;

import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.Serializable;

import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;

@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {

    public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration) {
    }

    @PostConstruct
    public void init() {
    }

    @PreDestroy
    public void release() {
    }

    @ElementListener
    public void onElement(final Record record) {
    }

}
  • @Version annotation indicates the version of the component. It is used to migrate the component configuration if needed.

  • @Icon annotation indicates the icon of the component. Here, the icon is a custom icon that needs to be bundled in the component JAR under resources/icons.

  • @Processor annotation indicates that this class is the processor (output) and defines the name of the component.

  • constructor of the processor is responsible for injecting the component configuration and services. Configuration parameters are annotated with @Option. The other parameters are considered as services and are injected by the component framework. Services can be local (class annotated with @Service) or provided by the component framework.

  • The method annotated with @PostConstruct is executed once by instance and can be used for initialization.

  • The method annotated with @PreDestroy is used to clean resources at the end of the execution of the output.

  • Data is passed to the method annotated with @ElementListener. That method is responsible for handling the data output. You can define all the related logic in this method.

If you need to bulk write the updates accordingly to groups, see Processors and batch processing.

Now, we will need to add the display name of the Output to the i18n resources file Messages.properties

#Output
Hazelcast.Output._displayName=Output

Let’s implement all of those methods

Defining the constructor method

We will create the outpu contructor to inject the component configuration and some additional local and built in services.

Built in services are services provided by TCK.
package org.talend.components.hazelcast;

import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;

import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;

@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {

    private final HazelcastOutputConfig configuration;

    private final HazelcastService hazelcastService;

    private final Jsonb jsonb;

    public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
            final HazelcastService hazelcastService, final Jsonb jsonb) {
        this.configuration = configuration;
        this.hazelcastService = hazelcastService;
        this.jsonb = jsonb;
    }

    @PostConstruct
    public void init() {
    }

    @PreDestroy
    public void release() {
    }

    @ElementListener
    public void onElement(final Record record) {
    }

}

Here we find:

  • configuration is the component configuration class

  • hazelcastService is the service that we have implemented in the input component tutorial. it will be responsible of creating a hazelcast client instance.

  • jsonb is a built in service provided by tck to handle json object serialization and deserialization. We will use it to convert the incoming record to json format before inseting them into the map.

Defining the PostConstruct method

Nothing to do in the post construct method. but we could for example initialize a hazle cast instance there. but we will do it in a lazy way on the first call in the @ElementListener method

Defining the PreDestroy method

package org.talend.components.hazelcast;

import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;

import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;

@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {

    private final HazelcastOutputConfig configuration;

    private final HazelcastService hazelcastService;

    private final Jsonb jsonb;

    public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
            final HazelcastService hazelcastService, final Jsonb jsonb) {
        this.configuration = configuration;
        this.hazelcastService = hazelcastService;
        this.jsonb = jsonb;
    }

    @PostConstruct
    public void init() {
        //no-op
    }

    @PreDestroy
    public void release() {
        this.hazelcastService.shutdownInstance();
    }

    @ElementListener
    public void onElement(final Record record) {
    }

}

Shut down the Hazelcast client instance and thus free the Hazelcast map reference.

Defining the ElementListener method

package org.talend.components.hazelcast;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;

import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;

@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {

    private final HazelcastOutputConfig configuration;

    private final HazelcastService hazelcastService;

    private final Jsonb jsonb;

    public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
            final HazelcastService hazelcastService, final Jsonb jsonb) {
        this.configuration = configuration;
        this.hazelcastService = hazelcastService;
        this.jsonb = jsonb;
    }

    @PostConstruct
    public void init() {
        //no-op
    }

    @PreDestroy
    public void release() {
        this.hazelcastService.shutdownInstance();
    }

    @ElementListener
    public void onElement(final Record record) {
        final String key = record.getString(configuration.getKey());
        final String value = jsonb.toJson(record);

        final HazelcastInstance hz = hazelcastService.getOrCreateIntance(configuration.getDataset().getConnection());
        final IMap<String, String> map = hz.getMap(configuration.getDataset().getMapName());
        map.put(key, value);
    }

}

We get the key attribute from the incoming record and then convert the hole record to a json string. Then we insert the key/value into the hazelcast map.

Testing the output component

Let’s create a unit test for our output component. The idea will be to create a job that will insert the data using this output implementation.

So, let’s create out test class.

package org.talend.components.hazelcast;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.talend.sdk.component.junit.BaseComponentsHandler;
import org.talend.sdk.component.junit5.Injected;
import org.talend.sdk.component.junit5.WithComponents;

@WithComponents("org.talend.components.hazelcast")
class HazelcastOuputTest {

    private static final String MAP_NAME = "MY-DISTRIBUTED-MAP";

    private static HazelcastInstance hazelcastInstance;

    @Injected
    protected BaseComponentsHandler componentsHandler;

    @BeforeAll
    static void init() {
        hazelcastInstance = Hazelcast.newHazelcastInstance();
        //init the map
        final IMap<String, String> map = hazelcastInstances.getMap(MAP_NAME);

    }

    @AfterAll
    static void shutdown() {
        hazelcastInstance.shutdown();
    }

}

Here we start by creating a hazelcast test instance, and we initialize the map. we also shutdown the instance after all the test are executed.

Now let’s create our output test.

package org.talend.components.hazelcast;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.Service;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.junit.BaseComponentsHandler;
import org.talend.sdk.component.junit5.Injected;
import org.talend.sdk.component.junit5.WithComponents;
import org.talend.sdk.component.runtime.manager.chain.Job;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.talend.sdk.component.junit.SimpleFactory.configurationByExample;

@WithComponents("org.talend.components.hazelcast")
class HazelcastOuputTest {

    private static final String MAP_NAME = "MY-DISTRIBUTED-MAP";

    private static HazelcastInstance hazelcastInstance;

    @Injected
    protected BaseComponentsHandler componentsHandler;

    @Service
    protected RecordBuilderFactory recordBuilderFactory;

    @BeforeAll
    static void init() {
        hazelcastInstance = Hazelcast.newHazelcastInstance();
        //init the map
        final IMap<String, String> map = hazelcastInstance.getMap(MAP_NAME);

    }

    @Test
    void outputTest() {
        final HazelcastDatastore connection = new HazelcastDatastore();
        connection.setClusterIpAddress(
                hazelcastInstance.getCluster().getMembers().iterator().next().getAddress().getHost());
        connection.setGroupName(hazelcastInstance.getConfig().getGroupConfig().getName());
        connection.setPassword(hazelcastInstance.getConfig().getGroupConfig().getPassword());
        final HazelcastDataset dataset = new HazelcastDataset();
        dataset.setConnection(connection);
        dataset.setMapName(MAP_NAME);

        HazelcastOutputConfig config = new HazelcastOutputConfig();
        config.setDataset(dataset);
        config.setKey("id");

        final String configUri = configurationByExample().forInstance(config).configured().toQueryString();

        componentsHandler.setInputData(generateTestData(10));
        Job.components()
                .component("Input", "test://emitter")
                .component("Output", "Hazelcast://Output?" + configUri)
                .connections()
                .from("Input")
                .to("Output")
                .build()
                .run();

        final IMap<String, String> map = hazelcastInstance.getMap(MAP_NAME);
        assertEquals(10, map.size());
    }

    private List<Record> generateTestData(int count) {
        return IntStream.range(0, count)
                .mapToObj(i -> recordBuilderFactory.newRecordBuilder()
                        .withString("id", UUID.randomUUID().toString())
                        .withString("val1", UUID.randomUUID().toString())
                        .withString("val2", UUID.randomUUID().toString())
                        .build())
                .collect(Collectors.toList());
    }

    @AfterAll
    static void shutdown() {
        hazelcastInstance.shutdown();
    }
}

Here we start preparing the emitter test component provided bt TCK that we use in our test job to generate random data for our output. Then, we use the output component to fill the hazelcast map.

By the end we test that the map contains the exact amount of data inserted by the job.

Run the test and check that it’s working.

$ mvn clean test

Congratulation you just finished your output component.

Scroll to top