Introduction$
Checkpointing is a feature in the Talend Component Kit (TCK) that enables an input connector to resume processing from the last recorded checkpoint. This ensures fault tolerance and better handling of large data streams by avoiding reprocessing from the beginning in case of failures.
This feature targets mainly upcoming tck integration in Qlik Data Integration (QDI) platforms. (Studio and Talend Cloud are not targeted).
The feature is disabled by default, to use the checkpoint, you need to add a system property talend.checkpoint.enable in the runtime jvm like this:
|
-Dtalend.checkpoint.enable=true
Checkpointing Mechanism$
Checkpointing in TCK is implemented using specific annotations and an interface that defines the required methods. The checkpointing mechanism allows the runtime to save and restore processing states efficiently.
Checkpointing Annotations$
@CheckpointData
: This annotation marks a method that returns a checkpoint object annotated with @Checkpoint
.
@CheckpointAvailable
: This annotation marks a method that indicates when a new checkpoint is available.
@Checkpoint
: This annotation is used to mark a class as a checkpointing configuration and state class. This configuration let user define how checkpoints should be built by the connector, and what are data that are set in checkpoint. Those are instances of that class that are given to the connector to configure it to restart at the expected state.
Connector Implementation$
To support checkpointing, the input component used in the lifecycle implement the following interface (this is an internal implementation, connectors developers don’t have to implement it):
public interface Input extends Lifecycle {
...
default void start(final Consumer<CheckpointState> checkpointCallback) {
throw new IllegalArgumentException("Checkpoint feature is not implemented.");
}
default Object getCheckpoint() {
throw new IllegalArgumentException("Checkpoint feature is not implemented.");
}
default Boolean isCheckpointReady() {
throw new IllegalArgumentException("Checkpoint feature is not implemented.");
}
The start()
method is called with a callback function that retrieves and serializes checkpoint instances. The getCheckpoint()
method returns those checkpoint instances, and isCheckpointReady()
checks if a new checkpoint is available.
Those methods are frontend methods for connectors' methods annotated with @CheckpointAvailable
, and @Checkpoint
.
Example: Checkpointing Producer$
@Emitter(family = "checkpoint", name = "list-input")
public class CheckpointInputDemo implements Serializable {
private final JsonBuilderFactory factory;
private final Jsonb jsonb;
private List<Integer> data;
private transient ListIterator<Integer> iterator;
private Integer bookmark;
private boolean newBookmark;
private final InputConfig configuration;
public CheckpointInputDemo(final JsonBuilderFactory factory, final Jsonb jsonb,
@Option("configuration") final InputConfig config) {
this.factory = factory;
this.jsonb = jsonb;
this.configuration = config;
log.warn("[CheckpointInput] config: {}.", config);
}
@PostConstruct
public void init() {
...
if (configuration.checkpoint == null) {
log.info("[resume] No valid checkpoint configuration found, using start of dataset.");
bookmark = 0;
} else {
bookmark = configuration.checkpoint.sinceId;
}
...
}
@Producer
public JsonObject data() {
...
final Integer produced = iterator.hasNext() ? iterator.next() : null;
if (produced == null) {
configuration.checkpoint.sinceId = bookmark;
return null;
}
bookmark = produced;
configuration.checkpoint.sinceId = produced;
if (isCheckpointCondition(produced)) {
newBookmark = true;
}
return factory.createObjectBuilder().add("data", produced).build();
}
@CheckpointData
public Object getCheckpoint() {
newBookmark = false;
return configuration.checkpoint;
}
@CheckpointAvailable
public Boolean isCheckpointReady() {
return newBookmark;
}
...
Checkpointing Usage Scenarios$
At runtime, checkpointing can be used in two ways:
1. Explicit Runtime Usage$
In this mode, the runtime directly calls the methods of the input connector to manage checkpointing. It is responsible for:
-
Determining when to create a checkpoint.
-
Checking if a checkpoint is available using
isCheckpointReady()
. -
Retrieving and storing the checkpoint using
getCheckpoint()
.
Simple example of explicit checkpointing usage:
...
input.start();
while ((input.next()) != null) {
if (input.isCheckpointReady()) {
serializeCheckpoint(input.getCheckpoint());
}
}
input.stop();
2. Automatic Mode$
In this mode, checkpointing is handled automatically during the lifecycle:
The start()
method is called with a callback function that retrieves and serializes the checkpoint object.
While reading records (next()
method), the connector checks whether a checkpoint can be provided.
If necessary, the checkpoint is generated by calling getCheckpoint()
, which internally calls the method annotated with @CheckpointData
.
final Consumer<CheckpointState> checkpointCallback = bookmark -> {
serializeCheckpoint(bookmark);
};
...
input.start(checkpointCallback);
Record record;
while ((record = input.next()) != null) {
// process records
}
input.stop();
How to resume$
As state previously, checkpoints are configured via the @Checkpoint
annotated configuration class. The @Checkpoint
annotation can be used to specify the method type used for checkpointing and the checkpointing frequency.
The checkpointing frequency can be set to options such as RECORD or TIME. For example, selecting the RECORD frequency saves the checkpoint after processing a certain number of records, while selecting the TIME frequency generates checkpoints at specified time intervals.
When the component is restarted, the runtime calls the start()
method. The connector receives its configuration which is merged with the checkpoint object. It can then resume processing from the last saved state from the configuration.
When implementing checkpointing, it is recommended to use a separate configuration class for checkpointing. This class should be nested within the main configuration class and annotated with @Checkpoint
. This ensures that the checkpointing configuration is separate from the main configuration and can be easily managed by the runtime.
Sample configuration class with checkpointing annotations:
...
@Checkpoint
@Version(value = 2, migrationHandler = CheckpointMigrationHandler.class)
public static class CheckPointInputConfig implements Serializable {
public enum Strategy {
BY_ID,
BY_DATE
}
@Option
@DefaultValue("BY_ID")
private Strategy strategy = Strategy.BY_ID;
@Option
private String startDate;
@Option
private int sinceId;
}
@Data
@GridLayout(value = { @GridLayout.Row("user"), @GridLayout.Row("pass")})
@GridLayout(names = GridLayout.FormType.CHECKPOINT, value = { @GridLayout.Row("checkPointInputConfig") })
public static class InputConfig {
@Option
private String user;
@Option
private String pass;
@Option
private CheckPointInputConfig checkpoint = new CheckPointInputConfig();
}
Calling the getCheckpoint()
returns CheckpointState
class, which is a simple POJO class that holds the checkpoint data and version. This class has to be serializable to allow the runtime to save and restore the checkpoint object. For that, it provides a helper method toJson()
to serialize the checkpoint state object.
Here’s a simple example of a CheckpointState
class serialized to JSON:
{
"$checkpoint": { (1)
"lastId": 95,
"lastUpdate": "2023-04-04",
"strategy": "BY_ID",
"__version": 2 (2)
}
}
Notice the following which are important (automatically done using toJson()
):
1 | Configuration and state of the checkpoint should be a member of a $checkpoint object. |
2 | Version of the checkpoint configuration is stored for eventual migration. |
The checkpointing configuration class is defined as a nested class within the component configuration class. The runtime has to provide the checkpointing configuration to the component when it is started.
configuration.put("configuration.datastore.user", "usr");
configuration.put("configuration.datastore.pass", "pwd");
...
configuration.put("$checkpoint.strategy", "BY_ID");
configuration.put("$checkpoint.sinceId", "5");
//
final Mapper mapper = mgr.findMapper("checkpoint", "list-input", 1, configuration).get();
...
Important points to remember when providing the configuration to the component, you don’t need to respect the internal configuration path of checkpoint configuration class in your connector. The runtime will automatically map the configuration to the checkpoint configuration class when you prefix your checkpoint state with $checkpoint
.
It’s quite easy to translate configuration and checkpoints in json format to a Map<String, String>
object (see helper method jsonToMap()
in ComponentManager
). This object can be passed to the component manager to create a mapper instance.
Conclusion$
The checkpointing feature in TCK improves resilience by allowing input connectors to resume from the last saved state. By leveraging annotations and the input interface, developers can integrate checkpointing seamlessly into their components, ensuring efficient and fault-tolerant data processing.