Create an input component

In this tutorial we will create a complete working input component for hazelcast.

This will include :

  1. The component family registration.

  2. The component configuration and the UI layout

  3. The partition mapper that let the input split it self to work in a distributed environment.

  4. The source that is responsible for connecting and reading data from the data source.

Getter and Setter methods are omitted for simplicity in this tutorial

The component family registration

We register the component family via a the package-info.java file in the package of the component.

@Icon(value = Icon.IconType.CUSTOM, custom = "hazelcast") (1)
@Components(family = "Hazelcast", categories = "IMDG") (2)
package org.talend.hazelcast;
1 This define the family icon.
2 In this line we define the component family and the component categories. Those information are used in the web and studio applications to group the components.

The component configuration

The component configuration define the configurable part of the component in addition to the configuration type and the UI layout. The configuration is a simple POJO class decorated with annotations from the component framework. Here is the configuration of our component, that we will explain in details.

@GridLayout({ (1)
        @GridLayout.Row({ "hazelcastXml", "mapName" }),
        @GridLayout.Row({ "executorService" }),
})
public class HazelcastConfiguration implements Serializable {

    @Option (2)
    private String hazelcastXml; (3)

    @Option
    private String mapName; (4)

    @Option
    private String executorService = "default"; (5)

    ClientConfig newConfig() throws IOException { (6)
        final ClientConfig newconfig = hazelcastXml == null ? new XmlClientConfigBuilder().build() :
                new XmlClientConfigBuilder(hazelcastXml).build();

        newconfig.setInstanceName(getClass().getSimpleName() + "_" + UUID.randomUUID().toString());
        newconfig.setClassLoader(Thread.currentThread().getContextClassLoader());
        return newconfig;
    }
}
1 In this part we define the UI layout of the configuration. This layout will be used to show and organize the configuration in the web and Talend Studio applications.
2 All the attributes annotated by @Option are known as configuration and will be bind to a default widget according to there types, at least a specific widget is explicitly declared See widgets gallery for more details .
3 The hazelcast xml configuration file path.
4 The name of the map to be read.
5 The name of the executor service with a default name: default.
6 This only a simple utility method that convert our configuration to a hazelcast client configuration object

The Partition Mapper

As our component need to work first in distributed environments. Every input component has to define a partition mapper that will be responsible of calculating the number of sources to be created according to the hole dataset size and the requested bundle size by the targeted runner.

Let’s first start examining the skeleton of our partition mapper. Then we will implement every method one by one.

@Version(1) (1)
@Icon(value = Icon.IconType.CUSTOM, custom = "hazelcastInput") (2)
@PartitionMapper(name = "Input") (3)
public class HazelcastMapper implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;

    public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service) {} (4)

    @PostConstruct
    public void init() throws IOException {}  (5)

    @PreDestroy
    public void close() {} (6)

    @Assessor
    public long estimateSize() {} (7)

    @Split
    public List<HazelcastMapper> split(@PartitionSize final long bundleSize) {} (8)

    @Emitter
    public HazelcastSource createSource() {}  (9)
1 @Version annotation indicate the version of the component. it will be used to migrate the component configuration if needed.
2 @Icon annotation indicate the icon of the component. here we have defined a custom icon that need to be bundled in the component jar under resources/icons.
3 @PartitionMapper annotation indicate that this class is the partition mapper and give it’s name.
4 This constructor of the mapper is responsible of injecting the component configuration and services. Configuration parameter are annotated by @Option. and other parameters are considered as services and will be injected by the component framework. The service may be local services (class annotated with @Service) or some services provided by the component framework.
5 The method annotated with @PostConstruct is executed once on the driver node in a distributed environment and can be used to do some initialization. Here we will get the hazelcast instance according to the provided configuration.
6 The method annotated with @PreDestroy is used to clean resource at the end of the execution of the partition mapper. here we will shutdown the hazelcast instance loaded in the post Construct method.
7 The method annotated with @Assessor is responsible of calculating the dataset size. Here we will get the size of all the hazelcast members.
8 the method annotated with @Split is responsible of split of this mapper according to the requested bundles size by the runner and the hole dataset size.
9 The method annotated with @Emitter is responsible of creating the producer instance that will read the data from the data source (hazelcast in this case).

Now that we know what we need to implement and why. Let’s start coding those methods one by one.

The constructor

private final Collection<String> members; (1)

(2)
public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
        final JsonBuilderFactory jsonFactory,
        final Jsonb jsonb,
        final HazelcastService service) {
    this(configuration, jsonFactory, jsonb, service, emptyList());
}

// internal (3)
protected HazelcastMapper(final HazelcastConfiguration configuration,
        final JsonBuilderFactory jsonFactory,
        final Jsonb jsonb,
        final HazelcastService service,
        final Collection<String> members) {
    this.configuration = configuration;
    this.jsonFactory = jsonFactory;
    this.jsonb = jsonb;
    this.service = service;
    this.members = members;
}
1 We will need the list of hazecast members later. So we add a collection attribute to the mapper
2 The component public constructor, responsible for injecting configuration and services.
3 An internal constructor that get a collection of members in addition to previous parameters. This will be useful later in this tutorial.

The PostConstruct method

private transient HazelcastInstance instance; (1)

@PostConstruct
public void init() throws IOException {
    instance = service.findInstance(configuration.newConfig()); (2)
}
1 We will need Hazelcast instance. we add this as an attribute to the mapper.
2 Here we create an instance of hazelcast according to the provided configuration. You can notice that we use the injected HazelcastService instance to perform that. This service is implemented in the project.

Here is the HazelcastService implementation. Every class annotated with @Service can be injected to the component via it’s constructor.

import org.talend.sdk.component.api.service.Service;

@Service
public class HazelcastService {
    public HazelcastInstance findInstance(final ClientConfig config) {
        return HazelcastClient.newHazelcastClient(config); (1)
    }
}
1 We create a new instance of hazelcast client.

The PreDestroy method

private transient IExecutorService executorService; (1)

@PreDestroy
public void close() { (2)
    instance.getLifecycleService().shutdown();
    executorService = null;
}
1 This execution service will be used in our mapper. So we add it as an attribute.
2 Here we shutdown the instance that we have created in the PostConstruct. and we also free the executorService reference

The Assessor method

@Assessor
public long estimateSize() {
    return getSizeByMembers() (1)
                    .values().stream()
                    .mapToLong(this::getFutureValue) (2)
                    .sum(); (3)
}
1 We get the size of all members by calling the method getSizeByMembers. This method submit a task to the cluster member that will calculate the member size locally and asynchronously.
2 We get the the size of the member from the callable task that we have submitted.
3 We sum the size of all the members

Here is the implementation of the two methods used above

private Map<Member, Future<Long>> getSizeByMembers() {
    final IExecutorService executorService = getExecutorService();
    final SerializableTask<Long> sizeComputation = new SerializableTask<Long>() {

        @Override
        public Long call() throws Exception {

            return localInstance.getMap(configuration.getMapName()).getLocalMapStats().getHeapCost();
        }
    };
    if (members.isEmpty()) { // == if no specific members defined, apply on all the cluster
        return executorService.submitToAllMembers(sizeComputation);
    }
    final Set<Member> members = instance.getCluster().getMembers().stream()
            .filter(m -> this.members.contains(m.getUuid()))
            .collect(toSet());
    return executorService.submitToMembers(sizeComputation, members);
}

private IExecutorService getExecutorService() {
    return executorService == null ?
            executorService = instance.getExecutorService(configuration.getExecutorService()) :
            executorService;
}

The Split method

@Split
public List<HazelcastMapper> split(@PartitionSize final long bundleSize) { (1)
    final List<HazelcastMapper> partitions = new ArrayList<>();
    final Collection<Member> members = new ArrayList<>();
    long current = 0;
    for (final Map.Entry<Member, Future<Long>> entries : getSizeByMembers().entrySet()) {
        final long memberSize = getFutureValue(entries.getValue());
        if (members.isEmpty()) {
            members.add(entries.getKey());
            current += memberSize;
        } else if (current + memberSize > bundleSize) {
            partitions.add(
                    new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
            // reset current iteration
            members.clear();
            current = 0;
        }
    }
    if (!members.isEmpty()) {
        partitions.add(new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
    }

    if (partitions.isEmpty()) { // just execute this if no plan (= no distribution)
        partitions.add(this);
    }
    return partitions;
}
1 This method create a collection of mapper according to the requested bundleSize and the dataset size.

The Emitter method

@Emitter
public HazelcastSource createSource() {
    return new HazelcastSource(configuration, jsonFactory, jsonb, service, members); (1)
}
1 After we have split the mapper. now every mapper will create a producer that will read the records according to the provided configuration.

The full implementation of the Partition Mapper

Here is the full code source for the partition mapper to have a global view of it. Read more about partition mapper…​

@Version(1) (1)
@Icon(Icon.IconType.DB_INPUT) (2)
@PartitionMapper(name = "Input") (3)
public class HazelcastMapper implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;

    private final Collection<String> members;
    private transient HazelcastInstance instance;
    private transient IExecutorService executorService;

    // framework API
    public HazelcastMapper(@Option("configuration") final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service) {
        this(configuration, jsonFactory, jsonb, service, emptyList());
    }

    // internal
    protected HazelcastMapper(final HazelcastConfiguration configuration,
            final JsonBuilderFactory jsonFactory,
            final Jsonb jsonb,
            final HazelcastService service,
            final Collection<String> members) {
        this.configuration = configuration;
        this.jsonFactory = jsonFactory;
        this.jsonb = jsonb;
        this.service = service;
        this.members = members;
    }

    @PostConstruct
    public void init() throws IOException {
        // Here we create an instance of hazelcast according to the provided configuration
        // Here you can notice that we use the injected HazelcastService instance to perform that.
        // This service is implemented in the project. See the implementation in (1)
        instance = service.findInstance(configuration.newConfig());
    }

    @PreDestroy
    public void close() {
        // Here we shutdown the instance that we have created in the PostConstruct. and we free the executorService reference
        instance.getLifecycleService().shutdown();
        executorService = null;
    }

    @Assessor
    public long estimateSize() {
        // Here we calculate the hole size of all memebers
        return getSizeByMembers().values().stream()
                .mapToLong(this::getFutureValue)
                .sum();
    }

    // This method return a map of size by memeber of hazelcast cluster
    private Map<Member, Future<Long>> getSizeByMembers() {
        final IExecutorService executorService = getExecutorService();
        final SerializableTask<Long> sizeComputation = new SerializableTask<Long>() {

            @Override
            public Long call() throws Exception {

                return localInstance.getMap(configuration.getMapName()).getLocalMapStats().getHeapCost();
            }
        };
        if (members.isEmpty()) { // == if no specific memebers defined, apply on all the cluster
            return executorService.submitToAllMembers(sizeComputation);
        }
        final Set<Member> members = instance.getCluster().getMembers().stream()
                .filter(m -> this.members.contains(m.getUuid()))
                .collect(toSet());
        return executorService.submitToMembers(sizeComputation, members);
    }

    // This method create a collection of mapper according to the requested bundleSize and the dataset size
    @Split
    public List<HazelcastMapper> split(@PartitionSize final long bundleSize) {
        final List<HazelcastMapper> partitions = new ArrayList<>();
        final Collection<Member> members = new ArrayList<>();
        long current = 0;
        for (final Map.Entry<Member, Future<Long>> entries : getSizeByMembers().entrySet()) {
            final long memberSize = getFutureValue(entries.getValue());
            if (members.isEmpty()) {
                members.add(entries.getKey());
                current += memberSize;
            } else if (current + memberSize > bundleSize) {
                partitions.add(
                        new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
                // reset current iteration
                members.clear();
                current = 0;
            }
        }
        if (!members.isEmpty()) {
            partitions.add(new HazelcastMapper(configuration, jsonFactory, jsonb, service, toIdentifiers(members)));
        }

        if (partitions.isEmpty()) { // just execute this if no plan (= no distribution)
            partitions.add(this);
        }
        return partitions;
    }

    //After we have splited the mapper. now every mapper will create an emitter that
    // will read the records according to the provided configuration
    @Emitter
    public HazelcastSource createSource() {
        return new HazelcastSource(configuration, jsonFactory, jsonb, service, members);
    }

    private Set<String> toIdentifiers(final Collection<Member> members) {
        return members.stream().map(Member::getUuid).collect(toSet());
    }

    private long getFutureValue(final Future<Long> future) {
        try {
            return future.get(configuration.getTimeout(), SECONDS);
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        } catch (final ExecutionException | TimeoutException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private IExecutorService getExecutorService() {
        return executorService == null ?
                executorService = instance.getExecutorService(configuration.getExecutorService()) :
                executorService;
    }
}

The Producer (Source)

Now that we have setup our component configuration and written our partition mapper that will create our producers. Let implement the source logic that will use the configuration provided by the mapper to read the records from the data source. To implement a source we need to implement the producer method that will produce a record every time it’s invoked.

public class HazelcastSource implements Serializable {
    private final HazelcastConfiguration configuration;
    private final JsonBuilderFactory jsonFactory;
    private final Jsonb jsonb;
    private final HazelcastService service;
    private final Collection<String> members;
    private transient HazelcastInstance instance;
    private transient BufferizedProducerSupport<JsonObject> buffer; (1)

    // The constructor was omited to reduce the code

    @PostConstruct (2)
    public void createInstance() throws IOException {
        instance = service.findInstance(configuration.newConfig());
        final Iterator<Member> memberIterators = instance.getCluster().getMembers().stream()
                .filter(m -> members.isEmpty() || members.contains(m.getUuid()))
                .collect(toSet())
                .iterator();

        buffer = new BufferizedProducerSupport<>(() -> {
            if (!memberIterators.hasNext()) {
                return null;
            }
            final Member member = memberIterators.next();
            // note: this works if this jar is deployed on the hz cluster
            try {
                return instance.getExecutorService(configuration.getExecutorService())
                        .submitToMember(new SerializableTask<Map<String, String>>() {

                            @Override
                            public Map<String, String> call() throws Exception {
                                final IMap<Object, Object> map = localInstance.getMap(configuration.getMapName());
                                final Set<?> keys = map.localKeySet();
                                return keys.stream().collect(toMap(jsonb::toJson, e -> jsonb.toJson(map.get(e))));
                            }
                        }, member).get(configuration.getTimeout(), SECONDS).entrySet().stream()
                        .map(entry -> {
                            final JsonObjectBuilder builder = jsonFactory.createObjectBuilder();
                            if (entry.getKey().startsWith("{")) {
                                builder.add("key", jsonb.fromJson(entry.getKey(), JsonObject.class));
                            } else { // plain string
                                builder.add("key", entry.getKey());
                            }
                            if (entry.getValue().startsWith("{")) {
                                builder.add("value", jsonb.fromJson(entry.getValue(), JsonObject.class));
                            } else { // plain string
                                builder.add("value", entry.getValue());
                            }
                            return builder.build();
                        })
                        .collect(toList())
                        .iterator();
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            } catch (final ExecutionException | TimeoutException e) {
                throw new IllegalArgumentException(e);
            }
        });
    }

    @Producer (3)
    public JsonObject next() {
        return buffer.next();
    }

    @PreDestroy (4)
    public void destroyInstance() {
        //We shutdown the hazelcast instance
        instance.getLifecycleService().shutdown();
    }
}
1 This BufferizedProducerSupport is a utility class that encapsulate the buffering logic so that you need only to provide how to load the data and note the logic to iterate on it. Here in this case the buffer will be created in the PostConstruct method and loaded once, then used to produce records one by one.
2 the method annotated with @PostConstruct is invoked once on the node. so here we can create some connection, do some initialisation of buffering. In our case we are creating a buffer of records in this method using the BufferizedProducerSupport class.
3 The method annotated with @Producer is responsible of producing record. this method return null when there is no more record to read
4 The method annotated with @PreDestroy is called before the Source destruction and it used to clean up all the resources used in the Source. In our case we are shutting down the hazelcast instance that we have created in the post construct method.

We have seen how to create a complete working input in this tutorial. In the next one we will explain how to create some unit tests for it.

Scroll to top