Implementing an Output component

This tutorial shows how to create a complete working output component for Hazelcast, including:

  1. Defining the configurable part and the layout of the component.

  2. Defining the output that is responsible for connecting and writing data to the data source.

Defining the configurable part and the layout of the component

The methodology to define the component configuration is described in the Implementing an input component tutorial.

In this case, the input component configuration defined in the tutorial mentioned above can be used as a starting point.

Two additional fields are required for the output component:

@Option
private String keyAttribute;

@Option
private String valueAttribute;

These fields are used to determine the key and value attributes for the Hazelcast map.

Configuring the Output

The output component needs to work in distributed environments and should implement a serializable interface.

The skeleton of the output component looks as follows:

@Version (1)
@Icon(custom = "hazelcastOutput", value = CUSTOM) (2)
@Processor(name = "Output") (3)
public class HazelcastOutput implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;


    public HazelcastOutput(@Option("configuration") final HazelcastConfiguration configuration,
                           final JsonBuilderFactory jsonFactory,
                           final Jsonb jsonb,
                           final HazelcastService service) {} (4)

    @PostConstruct
    public void init() {} (5)

    @PreDestroy
    public void release() {} (6)

    @ElementListener
    public void onElement(final JsonObject defaultInput) {} (7)

}
1 The @Version annotation indicates the version of the component. It is used to migrate the component configuration if needed.
2 The @Icon annotation indicates the icon of the component. Here, the icon is a custom icon that needs to be bundled in the component JAR under resources/icons.
3 The @Processor annotation indicates that this class is the processor (output) and defines the name of the component.
4 This constructor of the processor is responsible for injecting the component configuration and services. Configuration parameters are annotated with @Option.
The other parameters are considered as services and are injected by the component framework. Services can be local (class annotated with @Service) or provided by the component framework.
5 The method annotated with @PostConstruct is executed once by instance and can be used for initialization. Here, the goal is to get the Hazelcast instance according to the provided configuration.
6 The method annotated with @PreDestroy is used to clean resources at the end of the execution of the output.
In the context of this tutorial, the Hazelcast instance loaded in the post Construct method is shut down.
7 Data is passed to the method annotated with @ElementListener. That method is responsible for handling the data output. You can define all the related logic in this method.
If you need to bulk write the updates accordingly to groups, see Processors and batch processing.

Each of the methods mentioned above now needs to be created.

Defining the constructor method

public HazelcastOutput(@Option("configuration") final HazelcastConfiguration configuration,
                       final JsonBuilderFactory jsonFactory,
                       final Jsonb jsonb,
                       final HazelcastService service) {
    this.configuration = configuration;
    this.jsonFactory = jsonFactory;
    this.jsonb = jsonb;
    this.service = service;
}

Defining the PostConstruct method

private transient HazelcastInstance instance;
private transient IMap<Object, Object> map;

@PostConstruct
public void init() {
    instance = service.findInstance(configuration.newConfig()); (1)
    map = instance.getMap(configuration.getMapName()); (2)
}

To make it work, a Hazelcast instance and a Hazelcast map are required. The corresponding attributes need to be added to the output.

1 Create an instance of Hazelcast according to the provided configuration. In this case, the injected HazelcastService instance is used. This service is implemented in the project. See the implementation in the Implementing an input component tutorial.
2 Get the Hazelcast map according to the map name provided in the configuration, using the Hazelcast instance.
In production, avoid creating one instance per thread/worker.

Defining the PreDestroy method

@PreDestroy
public void close() {
    instance.getLifecycleService().shutdown();
    map = null;
}

Shut down the Hazelcast client instance created by the method annotated with @PostConstruct and thus free the Hazelcast map reference.

Defining the ElementListener method

@ElementListener
public void onElement(final JsonObject defaultInput) { (1)
    final Object key = toValue(defaultInput.get(configuration.getKeyAttribute()));
    final Object value = toValue(defaultInput.get(configuration.getValueAttribute()));
    map.put(key, value);
}

private Object toValue(final JsonValue jsonValue) { (2)
    if (jsonValue == null) {
        return null;
    }
    if (jsonValue.getValueType() == STRING) {
        return JsonString.class.cast(jsonValue).getString();
    }
    if (jsonValue.getValueType() == NUMBER) {
        return JsonNumber.class.cast(jsonValue).doubleValue();
    }
    return jsonValue.asJsonObject();
}
1 This method is used to pass the incoming data to the output. Every object passed should be a JsonObject instance. This method can include any logic required to write data to the data source.
In this tutorial, the data is passed to the Hazelcast map.
2 This is the inner method used to transform incoming values into the format required to pass data to the Hazelcast map.

Full implementation of the Output

Once implemented, the Output configuration is as follows. For more information about outputs, refer to this document.

@Version (1)
@Icon(custom = "hazelcastOutput", value = CUSTOM) (2)
@Processor(name = "Output") (3)
public class HazelcastOutput implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;

    private transient HazelcastInstance instance;
    private transient IMap<Object, Object> map;

    public HazelcastOutput(@Option("configuration") final HazelcastConfiguration configuration,
                           final JsonBuilderFactory jsonFactory,
                           final Jsonb jsonb,
                           final HazelcastService service) {
        this.configuration = configuration;
        this.jsonFactory = jsonFactory;
        this.jsonb = jsonb;
        this.service = service;
    }

    @PostConstruct
    public void init() {
        instance = service.findInstance(configuration.newConfig());
        map = instance.getMap(configuration.getMapName());
    }

    @ElementListener
    public void onElement(final JsonObject defaultInput) {
        final Object key = toValue(defaultInput.get(configuration.getKeyAttribute()));
        final Object value = toValue(defaultInput.get(configuration.getValueAttribute()));
        map.put(key, value);
    }

    @PreDestroy
    public void release() {
        instance.getLifecycleService().shutdown();
        map = null;
    }

    private Object toValue(final JsonValue jsonValue) {
        if (jsonValue == null) {
            return null;
        }
        if (jsonValue.getValueType() == STRING) {
            return JsonString.class.cast(jsonValue).getString();
        }
        if (jsonValue.getValueType() == NUMBER) {
            return JsonNumber.class.cast(jsonValue).doubleValue();
        }
        return jsonValue.asJsonObject();
    }
}

You have now created a complete working output component. You can follow this tutorial to learn how to create unit tests for your new component.

Scroll to top