This tutorial is the continuation of Talend Input component for Hazelcast tutorial. We will not walk through the project creation again, So please start from there before taking this one. |
This tutorial shows how to create a complete working output component for Hazelcast
Defining the configurable part and the layout of the component
As seen before, in Hazelcast there is multiple data source type. You can find queues, topics, cache, maps…
In this tutorials we will stick with the Map dataset and all what we will see here is applicable to the other types.
Let’s assume that our Hazelcast output component will be responsible of inserting data into a distributed Map. For that, we will need to know which attribute from the incoming data is to be used as a key in the map. The value will be the hole record encoded into a json format.
Bu that in mind, we can design our output configuration as: the same Datastore and Dataset from the input component and an additional configuration that will define the key attribute.
Let’s create our Output configuration class.
package org.talend.components.hazelcast;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.configuration.ui.layout.GridLayout;
import org.talend.sdk.component.api.meta.Documentation;import java.io.Serializable;
@GridLayout({
@GridLayout.Row("dataset"),
@GridLayout.Row("key")
})
@Documentation("Hazelcast output configuration")
public class HazelcastOutputConfig implements Serializable {
@Option
@Documentation("the hazelcast dataset")
private HazelcastDataset dataset;
@Option
@Documentation("The key attribute")
private String key;
// Getters & Setters omitted for simplicity
// You need to generate them
}
Let’s add the i18n properties of our configuration into the Messages.properties file
# Output config
HazelcastOutputConfig.dataset._displayName=Hazelcast dataset
HazelcastOutputConfig.key._displayName=Key attribute
Output Implementation
The skeleton of the output component looks as follows:
package org.talend.components.hazelcast;
import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.Serializable;
import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;
@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {
public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration) {
}
@PostConstruct
public void init() {
}
@PreDestroy
public void release() {
}
@ElementListener
public void onElement(final Record record) {
}
}
-
@Version
annotation indicates the version of the component. It is used to migrate the component configuration if needed. -
@Icon
annotation indicates the icon of the component. Here, the icon is a custom icon that needs to be bundled in the component JAR underresources/icons
. -
@Processor
annotation indicates that this class is the processor (output) and defines the name of the component. -
constructor
of the processor is responsible for injecting the component configuration and services. Configuration parameters are annotated with@Option
. The other parameters are considered as services and are injected by the component framework. Services can be local (class annotated with@Service
) or provided by the component framework. -
The method annotated with
@PostConstruct
is executed once by instance and can be used for initialization. -
The method annotated with
@PreDestroy
is used to clean resources at the end of the execution of the output. -
Data is passed to the method annotated with
@ElementListener
. That method is responsible for handling the data output. You can define all the related logic in this method.
If you need to bulk write the updates accordingly to groups, see Processors and batch processing. |
Now, we will need to add the display name of the Output to the i18n resources file Messages.properties
#Output
Hazelcast.Output._displayName=Output
Let’s implement all of those methods
Defining the constructor method
We will create the outpu contructor to inject the component configuration and some additional local and built in services.
Built in services are services provided by TCK. |
package org.talend.components.hazelcast;
import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;
import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;
@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {
private final HazelcastOutputConfig configuration;
private final HazelcastService hazelcastService;
private final Jsonb jsonb;
public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
final HazelcastService hazelcastService, final Jsonb jsonb) {
this.configuration = configuration;
this.hazelcastService = hazelcastService;
this.jsonb = jsonb;
}
@PostConstruct
public void init() {
}
@PreDestroy
public void release() {
}
@ElementListener
public void onElement(final Record record) {
}
}
Here we find:
-
configuration
is the component configuration class -
hazelcastService
is the service that we have implemented in the input component tutorial. it will be responsible of creating a hazelcast client instance. -
jsonb
is a built in service provided by tck to handle json object serialization and deserialization. We will use it to convert the incoming record to json format before inseting them into the map.
Defining the PostConstruct method
Nothing to do in the post construct method. but we could for example initialize a hazle cast instance there. but we will
do it in a lazy way on the first call in the @ElementListener
method
Defining the PreDestroy method
package org.talend.components.hazelcast;
import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;
import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;
@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {
private final HazelcastOutputConfig configuration;
private final HazelcastService hazelcastService;
private final Jsonb jsonb;
public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
final HazelcastService hazelcastService, final Jsonb jsonb) {
this.configuration = configuration;
this.hazelcastService = hazelcastService;
this.jsonb = jsonb;
}
@PostConstruct
public void init() {
//no-op
}
@PreDestroy
public void release() {
this.hazelcastService.shutdownInstance();
}
@ElementListener
public void onElement(final Record record) {
}
}
Shut down the Hazelcast client instance and thus free the Hazelcast map reference.
Defining the ElementListener method
package org.talend.components.hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.talend.sdk.component.api.component.Icon;
import org.talend.sdk.component.api.component.Version;
import org.talend.sdk.component.api.configuration.Option;
import org.talend.sdk.component.api.meta.Documentation;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.json.bind.Jsonb;
import java.io.Serializable;
import static org.talend.sdk.component.api.component.Icon.IconType.CUSTOM;
@Version
@Icon(custom = "Hazelcast", value = CUSTOM)
@Processor(name = "Output")
@Documentation("Hazelcast output component")
public class HazelcastOutput implements Serializable {
private final HazelcastOutputConfig configuration;
private final HazelcastService hazelcastService;
private final Jsonb jsonb;
public HazelcastOutput(@Option("configuration") final HazelcastOutputConfig configuration,
final HazelcastService hazelcastService, final Jsonb jsonb) {
this.configuration = configuration;
this.hazelcastService = hazelcastService;
this.jsonb = jsonb;
}
@PostConstruct
public void init() {
//no-op
}
@PreDestroy
public void release() {
this.hazelcastService.shutdownInstance();
}
@ElementListener
public void onElement(final Record record) {
final String key = record.getString(configuration.getKey());
final String value = jsonb.toJson(record);
final HazelcastInstance hz = hazelcastService.getOrCreateIntance(configuration.getDataset().getConnection());
final IMap<String, String> map = hz.getMap(configuration.getDataset().getMapName());
map.put(key, value);
}
}
We get the key attribute from the incoming record and then convert the hole record to a json string. Then we insert the key/value into the hazelcast map.
Testing the output component
Let’s create a unit test for our output component. The idea will be to create a job that will insert the data using this output implementation.
So, let’s create out test class.
package org.talend.components.hazelcast;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.talend.sdk.component.junit.BaseComponentsHandler;
import org.talend.sdk.component.junit5.Injected;
import org.talend.sdk.component.junit5.WithComponents;
@WithComponents("org.talend.components.hazelcast")
class HazelcastOuputTest {
private static final String MAP_NAME = "MY-DISTRIBUTED-MAP";
private static HazelcastInstance hazelcastInstance;
@Injected
protected BaseComponentsHandler componentsHandler;
@BeforeAll
static void init() {
hazelcastInstance = Hazelcast.newHazelcastInstance();
//init the map
final IMap<String, String> map = hazelcastInstances.getMap(MAP_NAME);
}
@AfterAll
static void shutdown() {
hazelcastInstance.shutdown();
}
}
Here we start by creating a hazelcast test instance, and we initialize the map. we also shutdown the instance after all the test are executed.
Now let’s create our output test.
package org.talend.components.hazelcast;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.Service;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.junit.BaseComponentsHandler;
import org.talend.sdk.component.junit5.Injected;
import org.talend.sdk.component.junit5.WithComponents;
import org.talend.sdk.component.runtime.manager.chain.Job;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.talend.sdk.component.junit.SimpleFactory.configurationByExample;
@WithComponents("org.talend.components.hazelcast")
class HazelcastOuputTest {
private static final String MAP_NAME = "MY-DISTRIBUTED-MAP";
private static HazelcastInstance hazelcastInstance;
@Injected
protected BaseComponentsHandler componentsHandler;
@Service
protected RecordBuilderFactory recordBuilderFactory;
@BeforeAll
static void init() {
hazelcastInstance = Hazelcast.newHazelcastInstance();
//init the map
final IMap<String, String> map = hazelcastInstance.getMap(MAP_NAME);
}
@Test
void outputTest() {
final HazelcastDatastore connection = new HazelcastDatastore();
connection.setClusterIpAddress(
hazelcastInstance.getCluster().getMembers().iterator().next().getAddress().getHost());
connection.setGroupName(hazelcastInstance.getConfig().getGroupConfig().getName());
connection.setPassword(hazelcastInstance.getConfig().getGroupConfig().getPassword());
final HazelcastDataset dataset = new HazelcastDataset();
dataset.setConnection(connection);
dataset.setMapName(MAP_NAME);
HazelcastOutputConfig config = new HazelcastOutputConfig();
config.setDataset(dataset);
config.setKey("id");
final String configUri = configurationByExample().forInstance(config).configured().toQueryString();
componentsHandler.setInputData(generateTestData(10));
Job.components()
.component("Input", "test://emitter")
.component("Output", "Hazelcast://Output?" + configUri)
.connections()
.from("Input")
.to("Output")
.build()
.run();
final IMap<String, String> map = hazelcastInstance.getMap(MAP_NAME);
assertEquals(10, map.size());
}
private List<Record> generateTestData(int count) {
return IntStream.range(0, count)
.mapToObj(i -> recordBuilderFactory.newRecordBuilder()
.withString("id", UUID.randomUUID().toString())
.withString("val1", UUID.randomUUID().toString())
.withString("val2", UUID.randomUUID().toString())
.build())
.collect(Collectors.toList());
}
@AfterAll
static void shutdown() {
hazelcastInstance.shutdown();
}
}
Here we start preparing the emitter
test component provided bt TCK that we use in our test job
to generate random data for our output. Then, we use the output component to fill the hazelcast map.
By the end we test that the map contains the exact amount of data inserted by the job.
Run the test and check that it’s working.
$ mvn clean test
Congratulation you just finished your output component.