Implementing an input component

This tutorial shows how to create a complete working input component for Hazelcast, including:

  1. Registering the component family.

  2. Defining the configurable part and the layout of the component.

  3. Configuring the partition mapper, to let the input split itself to work in a distributed environment.

  4. Configuring the source that is responsible for connecting and reading data from the data source.

Getter and Setter methods are omitted for simplicity in this tutorial.

The following procedures assume that you have already generated the component package from the Talend Component Kit starter. See this tutorial to learn how to generate this package.

Registering the component family

Register the component family via a the package-info.java file in the package of the component.

@Icon(value = Icon.IconType.CUSTOM, custom = "hazelcast") (1)
@Components(family = "Hazelcast", categories = "IMDG") (2)
package org.talend.hazelcast;
1 This defines the family icon.
2 In this line we define the component family and the component categories. Those information are used in the Web and Studio applications to group the components.

Defining the configurable part and the layout of the component

The component configuration includes defining the part of the components that can be configured once integrated to Talend solutions, in addition to the configuration type and the UI layout.

The configuration is a POJO class decorated with annotations from the component framework. In this case, the component is configured as follows:

@GridLayout({ (1)
        @GridLayout.Row({ "hazelcastXml", "mapName" }),
        @GridLayout.Row({ "executorService" }),
})
public class HazelcastConfiguration implements Serializable {

    @Option (2)
    private String hazelcastXml; (3)

    @Option
    private String mapName; (4)

    @Option
    private String executorService = "default"; (5)

    ClientConfig newConfig() throws IOException { (6)
        final ClientConfig newconfig = hazelcastXml == null ? new XmlClientConfigBuilder().build() :
                new XmlClientConfigBuilder(hazelcastXml).build();

        newconfig.setInstanceName(getClass().getSimpleName() + "_" + UUID.randomUUID().toString());
        newconfig.setClassLoader(Thread.currentThread().getContextClassLoader());
        return newconfig;
    }
}
1 UI layout of the configurable part of the component. This layout is used to show and organize the configuration Talend applications.
In the example above, hazelcastXml and mapName are displayed on the same row and all rows are displayed in the default basic configuration tab of the component.
2 All the attributes annotated with @Option are recognized as configuration elements and are bound to a default widget according to their type, unless a specific widget is explicitly declared. For more details, see the widget gallery.
3 Path of the Hazelcast XML configuration file.
4 Name of the map to be read.
5 Executor service name, with a default value set to default in this case.
6 Utility method that converts the configuration into a Hazelcast client configuration object.

Read more about component configuration in this document.

Configuring The Partition Mapper

The Hazelcast component created through this tutorial needs to work first in distributed environments.

Every input component has to define a partition mapper class.

At runtime, this class heuristically estimates the size of the data to be handled. Based on this estimation and on the capacity of the execution engine of your component, it divides the work of your component to reduce the overall execution time.

For more information about the partition mapper class, see Partition mapper.

The skeleton of the partition mapper looks as follows:

@Version(1) (1)
@Icon(value = Icon.IconType.CUSTOM, custom = "hazelcastInput") (2)
@PartitionMapper(name = "Input") (3)
public class HazelcastMapper implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;

    public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service) {} (4)

    @PostConstruct
    public void init() throws IOException {}  (5)

    @PreDestroy
    public void close() {} (6)

    @Assessor
    public long estimateSize() {} (7)

    @Split
    public List<HazelcastMapper> split(@PartitionSize final long bundleSize) {} (8)

    @Emitter
    public HazelcastSource createSource() {}  (9)
1 The @Version annotation specifies the version of the component. it is used to migrate the component configuration if needed.
2 The @Icon annotation specifies the icon of the component. In this case, a custom icon is used and needs to be bundled in the component JAR under resources/icons.
3 The @PartitionMapper annotation indicates that this class is the partition mapper and defines its name.
4 This constructor of the mapper is responsible for injecting the component configuration and services.
  • Configuration parameters are annotated with @Option.

  • Other parameters are considered as services and are injected by the component framework. Services can be local (classes annotated with @Service) or provided by the component framework.

5 The method annotated with @PostConstruct is executed once on the driver node in a distributed environment and can be used for initialization. Here, the goal is to get the Hazelcast instance according to the provided configuration.
6 The method annotated with @PreDestroy is used to clean resources at the end of the execution of the partition mapper.
In the context of this tutorial, the Hazelcast instance loaded in the post Construct method is shut down.
7 The method annotated with @Assessor is responsible for calculating the size of the dataset containing the Hazelcast members.
8 The method annotated with @Split is responsible for splitting this mapper according to the bundle size requested by the runner and to the hole dataset size.
9 The method annotated with @Emitter is responsible for creating the producer instance that reads the data from the data source (Hazelcast in this case).

Each of the methods mentioned above now needs to be created.

Defining the constructor method

private final Collection<String> members; (1)

(2)
public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
        final JsonBuilderFactory jsonFactory,
        final Jsonb jsonb,
        final HazelcastService service) {
    this(configuration, jsonFactory, jsonb, service, emptyList());
}

// internal (3)
protected HazelcastMapper(final HazelcastConfiguration configuration,
        final JsonBuilderFactory jsonFactory,
        final Jsonb jsonb,
        final HazelcastService service,
        final Collection<String> members) {
    this.configuration = configuration;
    this.jsonFactory = jsonFactory;
    this.jsonb = jsonb;
    this.service = service;
    this.members = members;
}
1 Add a collection attribute to the mapper. The list of Hazelcast members is needed later in the process.
2 Define the component public constructor, which is responsible for injecting configuration and services.
3 Add an internal constructor to get the collection of the Hazelcast cluster members.

Defining the PostConstruct method

The @PostConstruct method is used to initialize a Hazelcast client instance.

private transient HazelcastInstance instance; (1)

@PostConstruct
The method annotated with @PostConstruct is used to initialize a Hazelcast client instance.

public void init() throws IOException {
    instance = service.findInstance(configuration.newConfig()); (2)
}
1 Declare the Hazelcast instance as an attribute for the mapper.
2 Create an instance of Hazelcast according to the provided configuration. In this case, the injected HazelcastService instance is used. This service is implemented in the project.

The HazelcastService is implemented as follows. Every class annotated with @Service can be injected to the component via its constructor.

import org.talend.sdk.component.api.service.Service;

@Service
public class HazelcastService {
    public HazelcastInstance findInstance(final ClientConfig config) {
        return HazelcastClient.newHazelcastClient(config); (1)
    }
}
1 Create a new instance of the Hazelcast client.

Defining the PreDestroy method

The method annotated with @PreDestroy cleans up the resources at the end of the execution of the partition mapper.

private transient IExecutorService executorService; (1)

@PreDestroy
public void close() { (2)
    instance.getLifecycleService().shutdown();
    executorService = null;
}
1 Declare the execution service as an attribute for the mapper.
2 Shut down the Hazelcast client instance created by the method annotated with @PostConstruct and thus free the executorService reference.

Defining the Assessor method

The method annotated with @Assessor is used to estimate the volume of the dataset to be handled.

@Assessor
public long estimateSize() {
    return getSizeByMembers() (1)
                    .values().stream()
                    .mapToLong(this::getFutureValue) (2)
                    .sum(); (3)
}
1 The getSizeByMembers() method submits a task to each Hazelcast member to calculate the member size locally and asynchronously..
2 The mapToLong() method gets the size calculated by that task.
3 The sum() method calculates the total size of all the members.

Then, you can Implement the methods annotated with @PreDestroy and @Accessor to create a map of the volumes of the Hazelcast cluster members.

private Map<Member, Future<Long>> getSizeByMembers() {
    final IExecutorService executorService = getExecutorService();
    final SerializableTask<Long> sizeComputation = new SerializableTask<Long>() {

        @Override
        public Long call() throws Exception {

            return localInstance.getMap(configuration.getMapName()).getLocalMapStats().getHeapCost();
        }
    };
    if (members.isEmpty()) { // == if no specific members defined, apply on all the cluster
        return executorService.submitToAllMembers(sizeComputation);
    }
    final Set<Member> members = instance.getCluster().getMembers().stream()
            .filter(m -> this.members.contains(m.getUuid()))
            .collect(toSet());
    return executorService.submitToMembers(sizeComputation, members);
}

private IExecutorService getExecutorService() {
    return executorService == null ?
            executorService = instance.getExecutorService(configuration.getExecutorService()) :
            executorService;
}

Defining the Split method

The method annotated with @Split dynamically splits the partition mapper into a collection of sub-mappers according to:

  • the bundle size required by the execution engine to be used.

  • to the volume of the dataset to handle.

@Split
public List<HazelcastMapper> split(@PartitionSize final long bundleSize) {
    final List<HazelcastMapper> partitions = new ArrayList<>();
    final Collection<Member> members = new ArrayList<>();
    long current = 0;
    for (final Map.Entry<Member, Future<Long>> entries : getSizeByMembers().entrySet()) {
        final long memberSize = getFutureValue(entries.getValue());
        if (members.isEmpty()) {
            members.add(entries.getKey());
            current += memberSize;
        } else if (current + memberSize > bundleSize) {
            partitions.add(
                    new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
            // reset current iteration
            members.clear();
            current = 0;
        }
    }
    if (!members.isEmpty()) {
        partitions.add(new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
    }

    if (partitions.isEmpty()) { // just execute this if no plan (= no distribution)
        partitions.add(this);
    }
    return partitions;
}

The Emitter method

The method annotated with @Emitter generates producers using the component configuration. It can be used to load data to the processing flow of a Talend Job. Each sub-mapper generates one producer.

@Emitter
public HazelcastSource createSource() {
    return new HazelcastSource(configuration, jsonFactory, jsonb, service, members); (1)
}
1 After splitting the mapper, every mapper creates a producer that reads the records according to the provided configuration.

Full implementation of the Partition Mapper

Once implemented, the Partition Mapper configuration is as follows. For more information about Partition Mappers, refer to this document.

@Version(1) (1)
@Icon(Icon.IconType.DB_INPUT) (2)
@PartitionMapper(name = "Input") (3)
public class HazelcastMapper implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;

    private final Collection<String> members;
    private transient HazelcastInstance instance;
    private transient IExecutorService executorService;

    // framework API
    public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service) {
        this(configuration, jsonFactory, jsonb, service, emptyList());
    }

    // internal
    protected HazelcastMapper(final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service,
            final Collection<String> members) {
        this.configuration = configuration;
        this.jsonFactory = jsonFactory;
        this.jsonb = jsonb;
        this.service = service;
        this.members = members;
    }

    @PostConstruct
    public void init() throws IOException {
        // Creates an instance of Hazelcast according to the provided configuration.
        // Uses the injected HazelcastService instance to perform that.
        // This service is implemented in the project. See the implementation in <1>.
        instance = service.findInstance(configuration.newConfig());
    }

    @PreDestroy
    public void close() {
        // Shuts down the instance created in the PostConstruct and frees the executorService reference.
        instance.getLifecycleService().shutdown();
        executorService = null;
    }

    @Assessor
    public long estimateSize() {
        // Calculates the whole size of all members.
        return getSizeByMembers().values().stream()
                .mapToLong(this::getFutureValue)
                .sum();
    }

    // This method returns a map of size by member of Hazelcast cluster
    private Map<Member, Future<Long>> getSizeByMembers() {
        final IExecutorService executorService = getExecutorService();
        final SerializableTask<Long> sizeComputation = new SerializableTask<Long>() {

            @Override
            public Long call() throws Exception {

                return localInstance.getMap(configuration.getMapName()).getLocalMapStats().getHeapCost();
            }
        };
        if (members.isEmpty()) { // == if no specific memebers defined, applies to all the cluster
            return executorService.submitToAllMembers(sizeComputation);
        }
        final Set<Member> members = instance.getCluster().getMembers().stream()
                .filter(m -> this.members.contains(m.getUuid()))
                .collect(toSet());
        return executorService.submitToMembers(sizeComputation, members);
    }

    // This method creates a collection of mappers according to the requested bundleSize and the dataset size.
    @Split
    public List<HazelcastMapper> split(@PartitionSize final long bundleSize) {
        final List<HazelcastMapper> partitions = new ArrayList<>();
        final Collection<Member> members = new ArrayList<>();
        long current = 0;
        for (final Map.Entry<Member, Future<Long>> entries : getSizeByMembers().entrySet()) {
            final long memberSize = getFutureValue(entries.getValue());
            if (members.isEmpty()) {
                members.add(entries.getKey());
                current += memberSize;
            } else if (current + memberSize > bundleSize) {
                partitions.add(
                        new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
                // Resets current iteration
                members.clear();
                current = 0;
            }
        }
        if (!members.isEmpty()) {
            partitions.add(new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
        }

        if (partitions.isEmpty()) { // just executes this if no plan (= no distribution)
            partitions.add(this);
        }
        return partitions;
    }

    //After splitting the mapper, every mappers creates an emitter that
    // reads the records according to the provided configuration.
    @Emitter
    public HazelcastSource createSource() {
        return new HazelcastSource(configuration, jsonFactory, jsonb, service, members);
    }

    private Set<String> toIdentifiers(final Collection<Member> members) {
        return members.stream().map(Member::getUuid).collect(toSet());
    }

    private long getFutureValue(final Future<Long> future) {
        try {
            return future.get(configuration.getTimeout(), SECONDS);
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        } catch (final ExecutionException | TimeoutException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private IExecutorService getExecutorService() {
        return executorService == null ?
                executorService = instance.getExecutorService(configuration.getExecutorService()) :
                executorService;
    }
}

Configuring the Producer (Source)

The component configuration and the partition mapper in charge of creating the producers are now defined. At this point, you can implement the source logic that uses the configuration provided by the mapper to read the records from the data source.

To implement a source, you need to define the producer method that creates a record every time it is invoked.

public class HazelcastSource implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;
    private final Collection<String> members;
    private transient HazelcastInstance instance;
    private transient BufferizedProducerSupport<JsonObject> buffer; (1)

    // The constructor is omitted on purpose to reduce the length of this sample code

    @PostConstruct (2)
    public void createInstance() throws IOException {
        instance = service.findInstance(configuration.newConfig());
        final Iterator<Member> memberIterators = instance.getCluster().getMembers().stream()
                .filter(m -> members.isEmpty() || members.contains(m.getUuid()))
                .collect(toSet())
                .iterator();

        buffer = new BufferizedProducerSupport<>(() -> {
            if (!memberIterators.hasNext()) {
                return null;
            }
            final Member member = memberIterators.next();
            // Note: this works if this JAR is deployed on the Hazelcast cluster
            try {
                return instance.getExecutorService(configuration.getExecutorService())
                        .submitToMember(new SerializableTask<Map<String, String>>() {

                            @Override
                            public Map<String, String> call() throws Exception {
                                final IMap<Object, Object> map = localInstance.getMap(configuration.getMapName());
                                final Set<?> keys = map.localKeySet();
                                return keys.stream().collect(toMap(jsonb::toJson, e -> jsonb.toJson(map.get(e))));
                            }
                        }, member).get(configuration.getTimeout(), SECONDS).entrySet().stream()
                        .map(entry -> {
                            final JsonObjectBuilder builder = jsonFactory.createObjectBuilder();
                            if (entry.getKey().startsWith("{")) {
                                builder.add("key", jsonb.fromJson(entry.getKey(), JsonObject.class));
                            } else { // plain string
                                builder.add("key", entry.getKey());
                            }
                            if (entry.getValue().startsWith("{")) {
                                builder.add("value", jsonb.fromJson(entry.getValue(), JsonObject.class));
                            } else { // plain string
                                builder.add("value", entry.getValue());
                            }
                            return builder.build();
                        })
                        .collect(toList())
                        .iterator();
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            } catch (final ExecutionException | TimeoutException e) {
                throw new IllegalArgumentException(e);
            }
        });
    }

    @Producer (3)
    public JsonObject next() {
        return buffer.next();
    }

    @PreDestroy (4)
    public void destroyInstance() {
        // Shutdown of the Hazelcast instance
        instance.getLifecycleService().shutdown();
    }
}
1 BufferizedProducerSupport is a utility class that encapsulates the buffering logic. It allows to only provide instructions on how to load the data rather than the logic to iterate on it. In this case, the buffer is created in the @PostConstruct method and loaded once, and is then used to produce records one by one.
2 The method annotated with @PostConstruct is invoked once on the node. It allows to create some connections and to initialize the buffering. In this case, the BufferizedProducerSupport class is used to create a buffer of records in this method.
3 The method annotated with @Producer is responsible for producing records. This method returns null when there is no more record to read.
4 The method annotated with @PreDestroy is called before the Source destruction and is used to clean up all the resources used in the Source. In this case, the Hazelcast instance created in the post construct method is shut down.

To learn more about producers, refer to this document.

You have now created a complete working input component. You can follow this tutorial to learn how to create unit tests for your new component.

Scroll to top