Iterating on component development with Talend Studio How to install and configure components developed with Talend Component Kit in Talend Open Studio component server deploy install studio studio-integration car car-bundler version component-server debug Integrate components you developed using Talend Component Kit to Talend Studio in a few steps. Also learn how to enable the developer and debugging modes to iterate on your component development.
The version of Talend Component Kit you need to use to develop new components depends on the version of Talend Studio in which components will be integrated.
Refer to this document to learn about compatibility between Talend Component Kit and the different versions of Talend applications.
Learn how to build and deploy components to Talend Studio using Maven or Gradle Talend Component Kit plugins.
This can be done using the deploy-in-studio goal from your development environment.
If you are unfamiliar with component development, you can also follow this example to go through the entire process, from creating a project to using your new component in Talend Studio.
The Studio integration relies on the Component Server, that the Studio uses to gather data about components created using Talend Component Kit.
You can change the default configuration of component server by modifying the $STUDIO_HOME/configuration/config.ini file.
The following parameters are available:
Name
Description
Default
component.environment
Enables the developer mode when set to dev
-
component.debounce.timeout
Specifies the timeout (in milliseconds) before calling listeners in components Text fields
750
component.kit.skip
If set to true, the plugin is not enabled. It is useful if you don’t have any component developed with the framework.
false
component.java.arguments
Component server additional options
-
component.java.m2
Maven repository that the server uses to resolve components
Defaults to the global Studio configuration
component.java.coordinates
A list of comma-separated GAV (groupId:artifactId:version) of components to register
-
component.java.registry
A properties file with values matching component GAV (groupId:artifactId:version) registered at startup. Only use slashes (even on windows) in the path.
-
component.java.port
Sets the port to use for the server
random
components.server.beam.active
Active, if set to true, Beam support (Experimental). It requires Beam SDK Java core dependencies to be available.
false
component.server.jul.forceConsole
Adds a console handler to JUL to see logs in the console. This can be helpful in development because the formatting is clearer than the OSGi one in workspace/.metadata/.log.
It uses the java.util.logging.SimpleFormatter.format property to define its format. By default, it is %1$tb %1$td, %1$tY %1$tl:%1$tM:%1$tS %1$Tp %2$s%n%4$s: %5$s%6$s%n, but for development purposes [%4$s] %5$s%6$s%n is simpler and more readable.
false
Here is an example of a common developer configuration/config.ini file:
The developer mode is especially useful to iterate on your component development and to avoid closing and restarting Talend Studio every time you make a change to a component. It adds a Talend Component Kit button in the main toolbar:
When clicking this button, all components developed with the Talend Component Kit framework are reloaded. The cache is invalidated and the components refreshed.
You still need to add and remove the components to see the changes.
To enable it, simply set the component.environment parameter to dev in the config.ini configuration file of the component server.
Several methods allow you to debug custom components created with Talend Component Kit in Talend Studio.
From your development tool, create a new Remote configuration, and copy the Command line arguments for running remote JVM field. For example, -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005, where:
the suspend parameter of the -agentlib argument specifies whether you want to suspend the debugged JVM until the debugger attaches to it. Possible values are n (no, default value) or y (yes).
the address parameter of the -agentlib argument is the port used for the remote configuration. Make sure this port is available.
Open Talend Studio.
Create a new Job that uses the component you want to debug or open an existing one that already uses it.
Go to the Run tab of the Job and select Use specific JVM arguments.
Click New to add an argument.
In the popup window, paste the arguments copied from the IDE.
Enter the corresponding debug mode:
To debug the runtime, run the Job and access the remote host configured in the IDE.
To debug the Guess schema option, click the Guess schema action button of the component and access the remote host configured in the IDE.
From your development tool, create a new Remote configuration, and copy the Command line arguments for running remote JVM field. For example, -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005, where:
suspend defines whether you need to access the defined configuration to run the remote JVM. Possible values are n (no, default value) or y (yes).
address is the port used for the remote configuration. Make sure this port is available.
Access the installation directory of your Talend Sutdio.
Open the .ini file corresponding to your Operating System. For example, TOS_DI-win-x86_64.ini.
Paste the arguments copied from the IDE in a new line of the file.
Go to Talend Studio to use the component, and access the host host configured in the IDE.
If you run multiple Studio instances automatically in parallel, you can run into some issues with the random port computation. For example on a CI platform. For that purpose, you can create the $HOME/.talend/locks/org.talend.sdk.component.studio-integration.lock file.
Then, when a server starts, it acquires a lock on that file and prevents another server to get a port until it is started. It ensures that you can’t have two concurrent processes getting the same port allocated.
However, it is highly unlikely to happen on a desktop. In that case, forcing a different value through component.java.port in your config.ini file is a better solution for local installations.
Component server and HTTP API Learn about Talend Component Kit HTTP API and the component server REST API component-server The HTTP API intends to expose most Talend Component Kit features over HTTP. It is a standalone Java HTTP server.
The WebSocket protocol is activated for the endpoints. Endpoints then use /websocket/v1 as base instead of /api/v1. See WebSocket for more details.
Browse the API description using interface.
To make sure that the migration can be enabled, you need to set the version the component was created with in the execution configuration that you send to the server (component version is in component the detail endpoint). To do that, use tcomp::component::version key.
Endpoints that are intended to disappear will be deprecated. A X-Talend-Warning header will be returned with a message as value.
You can connect yo any endpoint by:
Replacing /api with /websocket
Appending / to the URL
Formatting the request as:
For example:
The response is formatted as follows:
All endpoints are logged at startup. You can then find them in the logs if you have a doubt about which one to use.
If you don’t want to create a pool of connections per endpoint/verb, you can use the bus endpoint: /websocket/v1/bus. This endpoint requires that you add the destinationMethod header to each request with the verb value (GET by default):
the configuration is read from system properties, environment variables, ….
Default value: 1000. Maximum items a cache can store, used for index endpoints.
A comma separated list of gav to locate the components
Default value: ${home}/documentations. A component translation repository. This is where you put your documentation translations. Their name must follow the pattern documentation_${container-id}_language.adoc where ${container-id} is the component jar name (without the extension and version, generally the artifactId).
Default value: true. Should the component extensions add required dependencies.
If you deploy some extension, where they can create their dependencies if needed.
Default value: 180000. Timeout for extension initialization at startup, since it ensures the startup wait extensions are ready and loaded it allows to control the latency it implies.
A property file (or multiple comma separated) where the value is a gav of a component to register(complementary with coordinates). Note that the path can end up with or .properties to take into account all properties in a folder.
Default value: true. Should the /documentation endpoint be activated. Note that when called on localhost the doc is always available.
Default value: true. Should the /api/v1/environment endpoint be activated. It shows some internal versions and git commit which are not always desirable over the wire.
Default value: false. Should the components using a @GridLayout support tab translation. Studio does not suppot that feature yet so this is not enabled by default.
Default value: icons/%s.svg,icons/svg/%s.svg,icons/%s_icon32.png,icons/png/%s_icon32.png. These patterns are used to find the icons in the classpath(s).
Default value: false. If set it will replace any message for exceptions. Set to false to use the actual exception message.
Default value: false. Should the lastUpdated timestamp value of /environment endpoint be updated with server start time.
Default value: en*=en fr*=fr zh*=zh_CN ja*=ja de*=de. For caching reasons the goal is to reduce the locales to the minimum required numbers. For instance we avoid fr and fr_FR which would lead to the same entries but x2 in terms of memory. This mapping enables that by whitelisting allowed locales, default being en. If the key ends with it means all string starting with the prefix will match. For instance fr will match fr_FR but also fr_CA.
The local maven repository used to locate components and their dependencies
Default value: false. Should the plugins be un-deployed and re-deployed.
Default value: 600. Interval in seconds between each check if plugins re-loading is enabled.
Specify a file to check its timestamp on the filesystem. This file will take precedence of the default ones provided by the talend.component.server.component.registry property (used for timestamp method).
Default value: timestamp. Re-deploy method on a timestamp or connectors version change. By default, the timestamp is checked on the file pointed by talend.component.server.component.registry or talend.component.server.plugins.reloading.marker variable, otherwise we inspect the content of the CONNECTORS_VERSION file. Accepted values: timestamp, anything else defaults to connectors.
Default value: false. Should the all requests/responses be logged (debug purposes - only work when running with CXF).
Default value: securityNoopHandler. How to validate a command/request. Accepted values: securityNoopHandler.
Default value: securityNoopHandler. How to validate a connection. Accepted values: securityNoopHandler.
A folder available for the server - don’t forget to mount it in docker if you are using the image - which accepts subfolders named as component plugin id (generally the artifactId or jar name without the version, ex: jdbc). Each family folder can contain:
a user-configuration.properties file which will be merged with component configuration system (see services). This properties file enables the function userJar(xxxx) to replace the jar named xxxx by its virtual gav (groupId:artifactId:version),
a list of jars which will be merged with component family classpath
Default value: auto. Should the implicit artifacts be provisionned to a m2. If set to auto it tries to detect if there is a m2 to provision - recommended, if set to skip it is ignored, else it uses the value as a m2 path.
The configuration uses Microprofile Config for most entries. It means it can be passed through system properties and environment variables (by replacing dots with underscores and making the keys uppercase).
To configure a Docker image rather than a standalone instance, Docker Config and secrets integration allows you to read the configuration from files. You can customize the configuration of these integrations through system properties.
Docker integration provides a secure: support to encrypt values and system properties, when required.
It is fully implemented using the Apache Geronimo Microprofile Config extensions.
Using the server ZIP (or Docker image), you can configure HTTPS by adding properties to _JAVA_OPTIONS. Assuming that you have a certificate in /opt/certificates/component.p12 (don’t forget to add/mount it in the Docker image if you use it), you can activate it as follows:
You can define simple queries on the configuration types and components endpoints. These two endpoints support different parameters.
Queries on the configurationtype/index endpoint supports the following parameters:
type
id
name
metadata of the first configuration property as parameters.
Queries on the component/index endpoint supports the following parameters:
plugin
name
id
familyId
metadata of the first configuration property as parameters.
In both cases, you can combine several conditions using OR and AND operators. If you combine more than two conditions, note that they are evaluated in the order they are written.
Each supported parameter in a condition can be "equal to" (=) or "not equal to" (!=) a defined value (case-sensitive).
For example:
In this example, the query gets components that have a dataset and belong to the jdbc-component plugin, or components that are named input.
The component-form library provides a way to build a component REST API facade that is compatible with React form library.
for example:
the Client can be created using ClientFactory.createDefault(System.getProperty("app.components.base", "http://localhost:8080/api/v1")) and the service can be a simple new UiSpecService<>(). The factory uses JAX-RS if the API is available (assuming a JSON-B provider is registered). Otherwise, it tries to use Spring.
The conversion from the component model (REST API) to the uiSpec model is done through UiSpecService. It is based on the object model which is mapped to a UI model. Having a flat model in the component REST API allows to customize layers easily.
You can completely control the available components, tune the rendering by switching the uiSchema, and add or remove parts of the form. You can also add custom actions and buttons for specific needs of the application.
The /migrate endpoint was not shown in the previous snippet but if you need it, add it as well.
This Maven dependency provides the UISpec model classes. You can use the Ui API (with or without the builders) to create UiSpec representations.
For Example:
The model uses the JSON-B API to define the binding. Make sure to have an implementation in your classpath. To do that, add the following dependencies:
The following module enables you to define through annotations a uispec on your own models:
this can’t be used in components and is only intended for web applications.
org.talend.sdk.component.form.uispec.mapper.api.service.UiSpecMapper enables to create a Ui instance from a custom type annotated with org.talend.sdk.component.form.uispec.mapper.api.model.View and org.talend.sdk.component.form.uispec.mapper.api.model.View.Schema.
UiSpecMapper returns a Supplier and not directly an Ui because the ui-schema is re-evaluated when `get()̀ is called. This enables to update the title maps for example.
Here is an example:
This API maps directly the UiSpec model (json schema and ui schema of Talend UIForm).
The default implementation of the mapper is available at org.talend.sdk.component.form.uispec.mapper.impl.UiSpecMapperImpl.
Here is an example:
The getTitleMapProviders() method will generally lookup a set of TitleMapProvider instances in your IoC context. This API is used to fill the titleMap of the form when a reference identifier is set on the @Schema annotation.
component-kit.js is no more available (previous versions stay on NPM) and is replaced by @talend/react-containers. The previous import can be replaced by import kit from '@talend/react-containers/lib/ComponentForm/kit';.
Default JavaScript integration goes through the Talend UI Forms library and its Containers wrapper.
Documentation is now available on the previous link.
The logging uses Log4j2. You can specify a custom configuration by using the -Dlog4j.configurationFile system property or by adding a log4j2.xml file to the classpath.
Here are some common configurations:
Console logging:
Output messages look like:
JSON logging:
Output messages look like:
Rolling file appender:
More details are available in the RollingFileAppender documentation.
You can compose previous layout (message format) and appenders (where logs are written).
The server image is deployed on Docker. Its version is suffixed with a timestamp to ensure images are not overridden and can break your usage. You can check the available version on Docker hub.
You can run the docker image by executing this command :
You can set the env variable _JAVA_OPTIONS to customize the server, by default it is installed in /opt/talend/component-kit.
The maven repository is the default one of the machine, you can change it setting the system property talend.component.server.maven.repository=/path/to/your/m2.
If you want to deploy some components you can configure which ones in _JAVA_OPTIONS (see server doc online) and redirect your local m2:
The component server docker image comes with two log4j2 profiles: TEXT (default) and JSON. The logging profile can be changed by setting the environment variable LOGGING_LAYOUT to JSON.
Note that Component Server adds to these default Talend profiles the KAFKA profile. With this profile, all logs are sent to Kafka.
You can check the exact configuration in the component-runtime/images/component-server-image/src/main/resources folder.
The console logging is on at INFO level by default. You can customize it by setting the CONSOLE_LOG_LEVEL environment variable to debug, INFO, WARN or to any other log level supported by log4j2.
Run docker image with console logging:
The JSON profile does the following:
Logs on the console using the CONSOLE_LOG_LEVEL configuration as the default profile. It uses the formatting shown below.
If the TRACING_KAFKA_URL environment variable is set, it logs the opentracing data on the defined Kafka using the topic TRACING_KAFKA_TOPIC. This level can be customized by setting the KAFKA_LOG_LEVEL environment variable (INFO by default).
Events are logged in the following format:
This profile is very close to the JSON profile and also adds the LOG_KAFKA_TOPIC and LOG_KAFKA_URL configuration. The difference is that it logs the default logs on Kafka in addition to the tracing logs.
The component server uses Geronimo OpenTracing to monitor request.
The tracing can be activated by setting the TRACING_ON environment variable to true.
The tracing rate is configurable by setting the TRACING_SAMPLING_RATE environment variable. It accepts 0 (none) and 1 (all, default) as values to ensure the consistency of the reporting.
You can find all the details on the configuration in org.talend.sdk.component.server.configuration.OpenTracingConfigSource.
Run docker image with tracing on:
By default, Geronimo OpenTracing will log the spans in a Zipking format so you can use the Kafka profile as explained before to wire it over any OpenTracing backend.
You can register component server images in Docker using these instructions in the corresponding image directory:
Docker Compose allows you to deploy the server with components, by mounting the component volume into the server image.
docker-compose.yml example:
If you want to mount it from another image, you can use this compose configuration:
To run one of the previous compose examples, you can use docker-compose -f docker-compose.yml up.
Only use the configuration related to port 5005 (in ports and the -agentlib option in _JAVA_OPTIONS) to debug the server on port 5005. Don’t set it in production.
You can mount a volume in /opt/talend/component-kit/custom/ and the jars in that folder which will be deployed with the server. Since the server relies on CDI (Apache OpenWebBeans) you can use that technology to enrich it, including JAX-RS endpoints, interceptors etc…or just libraries needing to be in the JVM.
Talend Input component for Hazelcast Example of input component implementation with Talend Component Kit tutorial example partition mapper producer source hazelcast distributed This tutorial walks you through the creation, from scratch, of a complete Talend input component for Hazelcast using the Talend Component Kit (TCK) framework.
Hazelcast is an in-memory distributed system that can store data, which makes it a good example of input component for distributed systems. This is enough for you to get started with this tutorial, but you can find more information about it here: hazelcast.org/.
A TCK project is a simple Java project with specific configurations and dependencies. You can choose your preferred build tool from Maven or Gradle as TCK supports both. In this tutorial, Maven is used.
The first step consists in generating the project structure using Talend Starter Toolkit .
Go to starter-toolkit.talend.io/ and fill in the project information as shown in the screenshots below, then click Finish and Download as ZIP.
image::tutorial_hazelcast_generateproject_1.png[] image::tutorial_hazelcast_generateproject_2.png[]
Extract the ZIP file into your workspace and import it to your preferred IDE. This tutorial uses Intellij IDE, but you can use Eclipse or any other IDE that you are comfortable with.
You can use the Starter Toolkit to define the full configuration of the component, but in this tutorial some parts are configured manually to explain key concepts of TCK.
The generated pom.xml file of the project looks as follows:
Change the name tag to a more relevant value, for example: Component Hazelcast.
The component-api dependency provides the necessary API to develop the components.
talend-component-maven-plugin provides build and validation tools for the component development.
The Java compiler also needs a Talend specific configuration for the components to work correctly. The most important is the -parameters option that preserves the parameter names needed for introspection features that TCK relies on.
Download the mvn dependencies declared in the pom.xml file:
You should get a BUILD SUCCESS at this point:
Create the project structure:
Create the component Java packages.
Packages are mandatory in the component model and you cannot use the default one (no package). It is recommended to create a unique package per component to be able to reuse it as dependency in other components, for example to guarantee isolation while writing unit tests.
The project is now correctly set up. The next steps consist in registering the component family and setting up some properties.
Registering every component family allows the component server to properly load the components and to ensure they are available in Talend Studio.
The family registration happens via a package-info.java file that you have to create.
Move to the src/main/java/org/talend/components/hazelcast package and create a package-info.java file:
@Components: Declares the family name and the categories to which the component belongs.
@Icon: Defines the component family icon. This icon is visible in the Studio metadata tree.
Talend Component Kit supports internationalization (i18n) via Java properties files. Using these files, you can customize and translate the display name of properties such as the name of a component family or, as shown later in this tutorial, labels displayed in the component configuration.
Go to src/main/resources/org/talend/components/hazelcast and create an i18n Messages.properties file as below:
You can define the component family icon in the package-info.java file. The icon image must exist in the resources/icons folder.
TCK supports both SVG and PNG formats for the icons.
Create the icons folder and add an icon image for the Hazelcast family.
This tutorial uses the Hazelcast icon from the official GitHub repository that you can get from: avatars3.githubusercontent.com/u/1453152?s=200&v=4
Download the image and rename it to Hazelcast_icon32.png. The name syntax is important and should match _icon.32.png.
The component registration is now complete. The next step consists in defining the component configuration.
All Input and Output (I/O) components follow a predefined model of configuration. The configuration requires two parts:
Datastore: Defines all properties that let the component connect to the targeted system.
Dataset: Defines the data to be read or written from/to the targeted system.
Connecting to the Hazelcast cluster requires the IP address, group name and password of the targeted cluster.
In the component, the datastore is represented by a simple POJO.
Create a HazelcastDatastore.java class file in the src/main/java/org/talend/components/hazelcast folder.
Define the i18n properties of the datastore. In the Messages.properties file let add the following lines:
The Hazelcast datastore is now defined.
Hazelcast includes different types of datastores. You can manipulate maps, lists, sets, caches, locks, queues, topics and so on.
This tutorial focuses on maps but still applies to the other data structures.
Reading/writing from a map requires the map name.
Create the dataset class by creating a HazelcastDataset.java file in src/main/java/org/talend/components/hazelcast.
The @Dataset annotation marks the class as a dataset. Note that it also references a datastore, as required by the components model.
Just how it was done for the datastore, define the i18n properties of the dataset. To do that, add the following lines to the Messages.properties file.
The component configuration is now ready. The next step consists in creating the Source that will read the data from the Hazelcast map.
The Source is the class responsible for reading the data from the configured dataset.
A source gets the configuration instance injected by TCK at runtime and uses it to connect to the targeted system and read the data.
Create a new class as follows.
The source also needs i18n properties to provide a readable display name. Add the following line to the Messages.properties file.
At this point, it is already possible to see the result in the Talend Component Web Tester to check how the configuration looks like and validate the layout visually. To do that, execute the following command in the project folder.
This command starts the Component Web Tester and deploys the component there.
Access localhost:8080/.
The source is set up. It is now time to start creating some Hazelcast specific code to connect to a cluster and read values for a map.
Add the hazelcast-client Maven dependency to the pom.xml of the project, in the dependencies node.
Add a Hazelcast instance to the @PostConstruct method.
Declare a HazelcastInstance attribute in the source class.
Any non-serializable attribute needs to be marked as transient to avoid serialization issues.
Implement the post construct method.
The component configuration is mapped to the Hazelcast client configuration to create a Hazelcast instance. This instance will be used later to get the map from its name and read the map data. Only the required configuration in the component is exposed to keep the code as simple as possible.
Implement the code responsible for reading the data from the Hazelcast map through the Producer method.
The Producer implements the following logic:
Check if the map iterator is already initialized. If not, get the map from its name and initialize the map iterator. This is done in the @Producer method to ensure the map is initialized only if the next() method is called (lazy initialization). It also avoids the map initialization in the PostConstruct method as the Hazelcast map is not serializable.
All the objects initialized in the PostConstruct method need to be serializable as the source can be serialized and sent to another worker in a distributed cluster.
From the map, create an iterator on the map keys that will read from the map.
Transform every key/value pair into a Talend Record with a "key, value" object on every call to next().
The RecordBuilderFactory class used above is a built-in service in TCK injected via the Source constructor. This service is a factory to create Talend Records.
Now, the next() method will produce a Record every time it is called. The method will return "null" if there is no more data in the map.
Implement the @PreDestroy annotated method, responsible for releasing all resources used by the Source. The method needs to shut the Hazelcast client instance down to release any connection between the component and the Hazelcast cluster.
The Hazelcast Source is completed. The next section shows how to write a simple unit test to check that it works properly.
TCK provides a set of APIs and tools that makes the testing straightforward.
The test of the Hazelcast Source consists in creating an embedded Hazelcast instance with only one member and initializing it with some data, and then in creating a test Job to read the data from it using the implemented Source.
Add the required Maven test dependencies to the project.
Initialize a Hazelcast test instance and create a map with some test data. To do that, create the HazelcastSourceTest.java test class in the src/test/java folder. Create the folder if it does not exist.
The above example creates a Hazelcast instance for the test and creates the MY-DISTRIBUTED-MAP map. The getMap creates the map if it does not already exist. Some keys and values uses in the test are added. Then, a simple test checks that the data is correctly initialized. Finally, the Hazelcast test instance is shut down.
Run the test and check in the logs that a Hazelcast cluster of one member has been created and that the test has passed.
To be able to test components, TCK provides the @WithComponents annotation which enables component testing. Add this annotation to the test. The annotation takes the component Java package as a value parameter.
Create the test Job that configures the Hazelcast instance and link it to an output that collects the data produced by the Source.
Execute the unit test and check that it passes, meaning that the Source is reading the data correctly from Hazelcast.
The Source is now completed and tested. The next section shows how to implement the Partition Mapper for the Source. In this case, the Partition Mapper will split the work (data reading) between the available cluster members to distribute the workload.
The Partition Mapper calculates the number of Sources that can be created and executed in parallel on the available workers of a distributed system. For Hazelcast, it corresponds to the cluster member count.
To fully illustrate this concept, this section also shows how to enhance the test environment to add more Hazelcast cluster members and initialize it with more data.
Instantiate more Hazelcast instances, as every Hazelcast instance corresponds to one member in a cluster. In the test, it is reflected as follows:
The above code sample creates two Hazelcast instances, leading to the creation of two Hazelcast members. Having a cluster of two members (nodes) will allow to distribute the data. The above code also adds more data to the test map and updates the shutdown method and the test.
Run the test on the multi-nodes cluster.
The Source is a simple implementation that does not distribute the workload and reads the data in a classic way, without distributing the read action to different cluster members.
Start implementing the Partition Mapper class by creating a HazelcastPartitionMapper.java class file.
When coupling a Partition Mapper with a Source, the Partition Mapper becomes responsible for injecting parameters and creating source instances. This way, all the attribute initialization part moves from the Source to the Partition Mapper class.
The configuration also sets an instance name to make it easy to find the client instance in the logs or while debugging.
The Partition Mapper class is composed of the following:
constructor: Handles configuration and service injections
Assessor: This annotation indicates that the method is responsible for assessing the dataset size. The underlying runner uses the estimated dataset size to compute the optimal bundle size to distribute the workload efficiently.
Split: This annotation indicates that the method is responsible for creating Partition Mapper instances based on the bundle size requested by the underlying runner and the size of the dataset. It creates as much partitions as possible to parallelize and distribute the workload efficiently on the available workers (known as members in the Hazelcast case).
Emitter: This annotation indicates that the method is responsible for creating the Source instance with an adapted configuration allowing to handle the amount of records it will produce and the required services. I adapts the configuration to let the Source read only the requested bundle of data.
The Assessor method computes the memory size of every member of the cluster. Implementing it requires submitting a calculation task to the members through a serializable task that is aware of the Hazelcast instance.
Create the serializable task.
The purpose of this class is to submit any task to the Hazelcast cluster.
Use the created task to estimate the dataset size in the Assessor method.
The Assessor method calculates the memory size that the map occupies for all members. In Hazelcast, distributing a task to all members can be achieved using an execution service initialized in the getExecutorService() method. The size of the map is requested on every available member. By summing up the results, the total size of the map in the distributed cluster is computed.
The Split method calculates the heap size of the map on every member of the cluster. Then, it calculates how many members a source can handle.
If a member contains less data than the requested bundle size, the method tries to combine it with another member. That combination can only happen if the combined data size is still less or equal to the requested bundle size.
The following code illustrates the logic described above.
The next step consists in adapting the source to take the Split into account.
The following sample shows how to adapt the Source to the Split carried out previously.
The next method reads the data from the members received from the Partition Mapper.
A Big Data runner like Spark will get multiple Source instances. Every source instance will be responsible for reading data from a specific set of members already calculated by the Partition Mapper.
The data is fetched only when the next method is called. This logic allows to stream the data from members without loading it all into the memory.
Implement the method annotated with @Emitter in the HazelcastPartitionMapper class.
The createSource() method creates the source instance and passes the required services and the selected Hazelcast members to the source instance.
Run the test and check that it works as intended.
The component implementation is now done. It is able to read data and to distribute the workload to available members in a Big Data execution environment.
Refactor the component by introducing a service to make some pieces of code reusable and avoid code duplication.
Refactor the Hazelcast instance creation into a service.
Inject this service to the Partition Mapper to reuse it.
Adapt the Source class to reuse the service.
Run the test one last time to ensure everything still works as expected.
Thank you for following this tutorial. Use the logic and approach presented here to create any input component for any system.