Dynamic schema
Since the 1.1.25
release, the dynamic column feature is supported in Studio with component-runtime components.
Dynamic column is available with Enterprise versions of Talend Studio only. |
Accessing columns metadata
In Studio, we can define for each component a schema with associated metadata.
To access those informations in your component, you’ve to do a few things:
Using the @Structure
annotation
-
API:
@org.talend.sdk.component.api.configuration.ui.widget.Structure
According the specified field type, you will acess to
-
the column names list with
List<String>
-
a subset or all wanted metadata with
List<MySchemaMeta>
(see below)
@Data
@GridLayout({ @GridLayout.Row({ "dataset" }),
@GridLayout.Row({ "incomingSchema" }) }) (5)
public class OutputConfig implements Serializable {
@Option
@Documentation("My dataset.")
private Dataset dataset;
@Option (1)
@Documentation("Incoming metadata.")
@Structure(type = Structure.Type.IN) (2) (3)
private List<SchemaInfo> incomingSchema; (4)
1 | @Option : mark class’s attributes as being a configuration entry. |
2 | @Structure : mark this configuration entry as a schema container. |
3 | Structure.Type.IN : marks the schema for an incoming flow (Output).
Use Structure.Type.OUT for outgoing flow (Input). |
4 | List<SchemaInfo> : is a custom class for holding metadata. |
5 | @GridLayout : option should be defined in the UI layout. |
Then, we should have a class SchemaInfo
as following:
Defining a specific class for holding metadata
If you don’t want just only column names (using List<String>
), you’ll have to define a custom class.
@Data
@GridLayout({ @GridLayout.Row({ "label", "key", "talendType", "nullable", "pattern" }) })
@Documentation("Schema definition.")
public class SchemaInfo implements Serializable {
@Option
@Documentation("Column name.")
private String label;
@Option
@Documentation("Is it a Key column.")
private boolean key;
@Option
@Documentation("Talend type such as id_String.")
private String talendType;
@Option
@Documentation("Is it a Nullable column.")
private boolean nullable;
@Option
@Documentation("Pattern used for datetime processing.")
private String pattern = "yyyy-MM-dd HH:mm";
}
Available Studio metadata informations
Field name | Type | Name in Studio |
---|---|---|
|
String |
Column |
|
String |
Db Column |
|
Boolean |
Key |
|
String |
DB Type |
|
String |
Type |
|
Boolean |
Nullable |
|
String |
Date Pattern |
|
int |
Length |
|
int |
Precision |
|
String |
Default |
|
String |
Comment |
Notes when designing an output connector
Available since 1.43.x release |
As Talend Component Kit Schema's types aren’t matching all Studio types, we wrap those types in wider
types (like Character
or char
wrapped into String
, Short
to Integer
, and so on…).
Anyway, the original type coming from Studio’s IPersistableRow
is stored in record’s schema properties under the property name talend.studio.type
.
Studio managed types are: id_BigDecimal
, id_Boolean
, id_Byte
, id_byte[]
, id_Character
, id_Date
, id_Double
, id_Document
, id_Dynamic
, id_Float
, id_Integer
, id_List
, id_Long
, id_Object
, id_Short
, id_String
.
When handling an output connector designed for Studio, you should have to check for this property to get an accurate type for output.
For instance, java.math.BigDecimal
is handled in framework as a Type.STRING
, so when an output connector will receive
a record, in studio context, you’ll need to check for the property and cast it correctly.
Here is a simple processor before writing to backend destination:
@ElementListener
public void process(final Record input) {
final String value = input.getString("myBigDecimal");
final Boolean isBigDec = "id_BigDecimal".equals(input.getSchema().getEntry("myBigDecimal").getProp("talend.studio.type"));
queueWriter.write(isBigDec ? new BigDecimal(value) : value);
}
This usage of properties is cumbersome but may fix some potential issues for now. We plan to widen managed types in Record and Schema in a few iterations (No ETA defined yet). |
Discovering schema (Guess schema)
There are two annotations allowing to discover a component’s schema:
-
@DiscoverSchema
(only for input components) -
@DiscoverSchemaExtended
(all components)
Using the @DiscoverSchema
annotation
@Service
public class UiServices {
@DiscoverSchema("MyDataSet")
public Schema guessSchema(@Option final MyDataSet dataset) {
// some code
retrurn factory.newSchemaBuilder(Schema.Type.RECORD)
.withEntry(factory.newEntryBuilder()
.withName("DataSetor")
.withType(Schema.Type.STRING)
.withNullable(true)
.build())
// building some entries
.withEntry(factory.newEntryBuilder()
.withName("effective_date")
.withType(Schema.Type.DATETIME)
.withNullable(true)
.withComment("Effective date of purchase")
.build())
.build();
}
}
Using the @DiscoverSchemaExtended
annotation
Parameters can be an incoming schema and/or an outgoing branch, and an optional parameter: configuration. None of them is "Must have".
Prototype:
/**
*
* @param incomingSchema the schema of the input flow
* @param conf the configuration of the processor (not a @Dataset)
* @param branch the name of the output flow for which the the computed schema is expected (FLOW, MAIN, REJECT, etc.)
* @return
*/
@DiscoverSchemaExtended("full")
public Schema guessMethodName(final Schema incomingSchema, final @Option("configuration") ConfigClass myConfig, final String branch) {...}
@DiscoverSchemaExtended("incoming_schema")
public Schema guessMethodName(final Schema incomingSchema, final @Option("configuration") ConfigClass myConfig) {...}
@DiscoverSchemaExtended("branch")
public Schema guessMethodName(final @Option("configuration") ConfigClass myConfig, final String branch) {...}
@DiscoverSchemaExtended("minimal")
public Schema guessMethodName(final @Option("configuration") ConfigClass myConfig) {...}
As you may pass other parameters to method, ensure to use the above naming : incomingSchema for the schema and branch for the outgoing branch. |
The schema element must comply with the following requirements:
-
It has to be a string not a sub json object
-
The json has to be flat
Example:
@DiscoverSchemaExtended("MyProcessorSchema")
public Schema discoverProcessorSchema(final Schema incomingSchema, @Option final MyProcessorConfiguration configuration, final String branch) {
final Schema.Builder outgoingSchema = factory.newSchemaBuilder(incomingSchema);
outgoingSchema.withEntry(factory.newEntryBuilder()
.withName("RejectorProcessorSchema")
.withType(Type.STRING)
.build());
outgoingSchema.withEntry(factory.newEntryBuilder()
.withName(branch)
.withType(Type.FLOAT)
.withComment(infos)
.withProp(org.talend.sdk.component.api.record.SchemaProperty.SIZE, "10")
.withProp(org.talend.sdk.component.api.record.SchemaProperty.SCALE, "3")
.build());
if ("REJECT".equals(branch.toUpperCase())) {
outgoingSchema.withEntry(factory.newEntryBuilder()
.withName("ERROR_MESSAGE")
.withType(Type.STRING)
.withDefaultValue(code)
.build());
}
return outgoingSchema.build();
}
Guess schema action selection
For inputs
-
Try to find an action in declared Service class
-
search an action of type
@DiscoverSchema
named like the input dataset. -
search an action of type
@DiscoverSchemaExtended
named like the input dataset. -
search an action of type
@DiscoverSchema
.
-
-
Execute a fake job with component to retrieve output schema.
Fixed schema
In some cases, the schema will always use the same scheme.
Having a fixed schema will prevent Studio to display the Guess schema button and the reject UI part.
The annotation’s value should point to a @DiscoverSchema
or @DiscoverSchemaExtended
action.
Example with an emitter:
@FixedSchema("my_fixed_input_schema")
@Emitter(family = "test", name = "myEmitter")
public class MyEmitter implements Serializable {
...
In service class:
@Service
public class UiServices {
@DiscoverSchema("my_fixed_input_schema")
public Schema guessSchema(@Option final MyDataSet dataset) {
retrurn factory.newSchemaBuilder(Schema.Type.RECORD)
.withEntry(factory.newEntryBuilder()
.withName("header")
.withType(Schema.Type.STRING)
.withNullable(false)
.build())
.withEntry(factory.newEntryBuilder()
.withName("message")
.withType(Schema.Type.DATETIME)
.withNullable(true)
.build())
.build();
}
}
Example for a handling a processor with an eventual reject flow:
Example:
@Service
public class UiServices {
@DiscoverSchemaExtended("my_fixed_processor_schema")
public Schema discoverProcessorSchema(final Schema in,
@Option final MyConf conf,
final String branch) {
final Schema.Builder out = factory.newSchemaBuilder(in);
if ("REJECT".equals(branch.toUpperCase())) {
out.withEntry(factory.newEntryBuilder()
.withName("ERROR_MESSAGE")
.withType(Type.STRING)
.build());
}
return out.build();
}
}