Wrapping a Beam I/O

Limitations

This part is limited to specific kinds of Beam PTransform:

  • PTransform<PBegin, PCollection<?>> for inputs.

  • PTransform<PCollection<?>, PDone> for outputs. Outputs must use a single (composite or not) DoFn in their apply method.

Wrapping an input

To illustrate the input wrapping, this procedure uses the following input as a starting point (based on existing Beam inputs):

@AutoValue
public abstract [static] class Read extends PTransform<PBegin, PCollection<String>> {

  // config

  @Override
  public PCollection<String> expand(final PBegin input) {
    return input.apply(
        org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null)));
  }

  // ... other transform methods
}

To wrap the Read in a framework component, create a transform delegating to that Read with at least a @PartitionMapper annotation and using @Option constructor injections to configure the component. Also make sure to follow the best practices and to specify @Icon and @Version.

@PartitionMapper(family = "myfamily", name = "myname")
public class WrapRead extends PTransform<PBegin, PCollection<String>> {
  private PTransform<PBegin, PCollection<String>> delegate;

  public WrapRead(@Option("dataset") final WrapReadDataSet dataset) {
    delegate = TheIO.read().withConfiguration(this.createConfigurationFrom(dataset));
  }

  @Override
  public PCollection<String> expand(final PBegin input) {
    return delegate.expand(input);
  }

  // ... other methods like the mapping with the native configuration (createConfigurationFrom)
}

Wrapping an output

To illustrate the output wrapping, this procedure uses the following output as a starting point (based on existing Beam outputs):

@AutoValue
public abstract [static] class Write extends PTransform<PCollection<String>, PDone> {


    // configuration withXXX(...)

    @Override
    public PDone expand(final PCollection<String> input) {
      input.apply(ParDo.of(new WriteFn(this)));
      return PDone.in(input.getPipeline());
    }

    // other methods of the transform
}

You can wrap this output exactly the same way you wrap an input, but using @Processor instead of:

@Processor(family = "myfamily", name = "myname")
public class WrapWrite extends PTransform<PCollection<String>, PDone> {
  private PTransform<PCollection<String>, PDone> delegate;

  public WrapWrite(@Option("dataset") final WrapWriteDataSet dataset) {
    delegate = TheIO.write().withConfiguration(this.createConfigurationFrom(dataset));
  }

  @Override
  public PDone expand(final PCollection<String> input) {
    return delegate.expand(input);
  }

  // ... other methods like the mapping with the native configuration (createConfigurationFrom)
}

Tip

Note that the org.talend.sdk.component.runtime.beam.transform.DelegatingTransform class fully delegates the "expansion" to another transform. Therefore, you can extend it and implement the configuration mapping:

@Processor(family = "beam", name = "file")
public class BeamFileOutput extends DelegatingTransform<PCollection<String>, PDone> {

    public BeamFileOutput(@Option("output") final String output) {
        super(TextIO.write()
            .withSuffix("test")
            .to(FileBasedSink.convertToFileResourceIfPossible(output)));
    }
}

Advanced

In terms of classloading, when you write an I/O, the Beam SDK Java core stack is assumed as provided in Talend Component Kit runtime. This way, you don’t need to include it in the compile scope, it would be ignored anyway.

Coder

If you need a JSonCoder, you can use the org.talend.sdk.component.runtime.beam.factory.service.PluginCoderFactory service, which gives you access to the JSON-P and JSON-B coders.

There is also an Avro coder, which uses the FileContainer. It ensures it is self-contained for IndexedRecord and it does not require—as the default Apache Beam AvroCoder—to set the schema when creating a pipeline.
It consumes more space and therefore is slightly slower, but it is fine for DoFn, since it does not rely on serialization in most cases. See org.talend.sdk.component.runtime.beam.transform.avro.IndexedRecordCoder.

JsonObject to IndexedRecord

If your PCollection is made of JsonObject records, and you want to convert them to IndexedRecord, you can use the following PTransforms:

IndexedRecordToJson

converts an IndexedRecord to a JsonObject.

JsonToIndexedRecord

converts a JsonObject to an IndexedRecord.

SchemalessJsonToIndexedRecord

converts a JsonObject to an IndexedRecord with AVRO schema inference.

Record coder

There are two main provided coder for Record:

FullSerializationRecordCoder

it will unwrap the record as an Avro IndexedRecord and serialize it with its schema. This can indeed have a performance impact but, due to the structure of component, it will not impact the runtime performance in general - except with direct runner - because the runners will optimize the pipeline accurately.

SchemaRegistryCoder

it will serialize the Avro IndexedRecord as well but it will ensure the schema is in the SchemaRegistry to be able to deserialize it when needed. This implementation is faster but the default implementation of the registry is "in memory" so will only work with a single worker node. You can extend it using Java SPI mecanism to use a custom distributed implementation.

Sample

Sample input based on Beam Kafka:

@Version
@Icon(Icon.IconType.KAFKA)
@Emitter(name = "Input")
@AllArgsConstructor
@Documentation("Kafka Input")
public class KafkaInput extends PTransform<PBegin, PCollection<Record>> { (1)

    private final InputConfiguration configuration;

    private final RecordBuilderFactory builder;

    private final PluginCoderFactory coderFactory;

    private KafkaIO.Read<byte[], byte[]> delegate() {
        final KafkaIO.Read<byte[], byte[]> read = KafkaIO.<byte[], byte[]> read()
                .withBootstrapServers(configuration.getBootstrapServers())
                .withTopics(configuration.getTopics().stream().map(InputConfiguration.Topic::getName).collect(toList()))
                .withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
        if (configuration.getMaxResults() > 0) {
            return read.withMaxNumRecords(configuration.getMaxResults());
        }
        return read;
    }

    @Override (2)
    public PCollection<Record> expand(final PBegin pBegin) {
        final PCollection<KafkaRecord<byte[], byte[]>> kafkaEntries = pBegin.getPipeline().apply(delegate());
        return kafkaEntries.apply(ParDo.of(new BytesToRecord(builder))).setCoder(SchemaRegistryCoder.of()); (3)
    }

    @AllArgsConstructor
    private static class BytesToRecord extends DoFn<KafkaRecord<byte[], byte[]>, Record> {

        private final RecordBuilderFactory builder;

        @ProcessElement
        public void onElement(final ProcessContext context) {
            context.output(toRecord(context.element()));
        }

        private Record toRecord(final KafkaRecord<byte[], byte[]> element) {
            return builder.newRecordBuilder().add("key", element.getKV().getKey())
                    .add("value", element.getKV().getValue()).build();
        }
    }
}
1 The PTransform generics define that the component is an input (PBegin marker).
2 The expand method chains the native I/O with a custom mapper (BytesToRecord).
3 The mapper uses the SchemaRegistry coder automatically created from the contextual component.

Because the Beam wrapper does not respect the standard Talend Component Kit programming model ( for example, there is no @Emitter), you need to set the <talend.validation.component>false</talend.validation.component> property in your pom.xml file (or equivalent for Gradle) to skip the component programming model validations of the framework.

Scroll to top