Components are designed to manipulate data (access, read, create). Talend Component Kit can handle several types of data, described in this document.
By design, the framework must run in DI (plain standalone Java program) and in Beam pipelines.
It is out of scope of the framework to handle the way the runtime serializes - if needed - the data.
For that reason, it is critical not to import serialization constraints to the stack. As an example, this is one of the reasons why Record
or JsonObject
were preferred to Avro IndexedRecord
.
Any serialization concern should either be hidden in the framework runtime (outside of the component developer scope) or in the runtime integration with the framework (for example, Beam integration).
Record
Record
is the default format. It offers many possibilities and can evolve depending on the Talend platform needs. Its structure is data-driven and exposes a schema that allows to browse it.
Projects generated from the Talend Component Kit Starter are by default designed to handle this format of data.
Record is a Java interface but never implement it yourself to ensure compatibility with the different Talend products. Follow the guidelines below.
|
Creating a record
You can build records using the newRecordBuilder
method of the RecordBuilderFactory
(see here).
For example:
public Record createRecord() {
return factory.newRecordBuilder()
.withString("name", "Gary")
.withDateTime("date", ZonedDateTime.of(LocalDateTime.of(2011, 2, 6, 8, 0), ZoneId.of("UTC")))
.build();
}
In the example above, the schema is dynamically computed from the data. You can also do it using a pre-built schema, as follows:
public Record createRecord() {
return factory.newRecordBuilder(myAlreadyBuiltSchemaWithSchemaBuilder)
.withString("name", "Gary")
.withDateTime("date", ZonedDateTime.of(LocalDateTime.of(2011, 2, 6, 8, 0), ZoneId.of("UTC")))
.build();
}
The example above uses a schema that was pre-built using factory.newSchemaBuilder(Schema.Type.RECORD)
.
When using a pre-built schema, the entries passed to the record builder are validated. It means that if you pass a null value null or an entry type that does not match the provided schema, the record creation fails. It also fails if you try to add an entry which does not exist or if you did not set a not nullable entry.
Using a dynamic schema can be useful on the backend but can lead users to more issues when creating a pipeline to process the data. Using a pre-built schema is more reliable for end-users. |
Accessing and reading a record
You can access and read data by relying on the getSchema
method, which provides you with the available entries (columns) of a record. The Entry
exposes the type of its value, which lets you access the value through the corresponding method. For example, the Schema.Type.STRING
type implies using the getString
method of the record.
For example:
public void print(final Record record) {
final Schema schema = record.getSchema();
// log in the natural type
schema.getEntries()
.forEach(entry -> System.out.println(record.get(Object.class, entry.getName())));
// log only strings
schema.getEntries().stream()
.filter(e -> e.getType() == Schema.Type.STRING)
.forEach(entry -> System.out.println(record.getString(entry.getName())));
}
Supported data types
The Record
format supports the following data types:
-
String
-
Boolean
-
Int
-
Long
-
Float
-
Double
-
DateTime
-
Array
-
Bytes
-
Record
A map can always be modelized as a list (array of records with key and value entries). |
For example:
public Record create() {
final Record address = factory.newRecordBuilder()
.withString("street", "Prairie aux Ducs")
.withString("city", "Nantes")
.withString("country", "FRANCE")
.build();
return factory.newRecordBuilder()
.withBoolean("active", true)
.withInt("age", 33)
.withLong("duration", 123459)
.withFloat("tolerance", 1.1f)
.withDouble("balance", 12.58)
.withString("name", "John Doe")
.withDateTime("birth", ZonedDateTime.now())
.withRecord(
factory.newEntryBuilder()
.withName("address")
.withType(Schema.Type.RECORD)
.withComment("The user address")
.withElementSchema(address.getSchema())
.build(),
address)
.withArray(
factory.newEntryBuilder()
.withName("permissions")
.withType(Schema.Type.ARRAY)
.withElementSchema(factory.newSchemaBuilder(Schema.Type.STRING).build())
.build(),
asList("admin", "dev"))
.build();
}
Example: discovering a schema
For example, you can use the API to provide the schema. The following method needs to be implemented in a service.
Manually constructing the schema without any data:
@DiscoverSchema
getSchema(@Option MyDataset dataset) {
return factory.newSchemaBuilder(Schema.Type.RECORD)
.withEntry(factory.newEntryBuilder().withName("id").withType(Schema.Type.LONG).build())
.withEntry(factory.newEntryBuilder().withName("name").withType(Schema.Type.STRING).build())
.build();
}
Returning the schema from an already built record:
@DiscoverSchema
public Schema guessSchema(@Option MyDataset dataset, final MyDataLoaderService myCustomService) {
return myCustomService.loadFirstData().getRecord().getSchema();
}
MyDataset is the class that defines the dataset. Learn more about datasets and datastores in this document.
|
Authorized characters in entry names
Entry names for Record
and JsonObject
types must comply with the following rules:
-
The name must start with a letter or with
_
. If not, the invalid characters are ignored until the first valid character. -
Following characters of the name must be a letter, a number, or
. If not, the invalid character is replaced with
.
For example:
-
1foo
becomesfoo
. -
f@o
becomesf_o
. -
1234f5@o
becomes___f5_o
. -
foo123
staysfoo123
.
JsonObject
The runtime also supports JsonObject
as input and output component type. You can rely on the JSON services (Jsonb
, JsonBuilderFactory
) to create new instances.
This format is close to the Record
format, except that it does not natively support the Datetime type and has a unique Number type to represent Int, Long, Float and Double types. It also does not provide entry metadata like nullable
or comment
, for example.
It also inherits the Record
format limitations.