Wrapping a Beam I/O

Limitations

This part is limited to particular kinds of Beam PTransform:

  • the PTransform<PBegin, PCollection<?>> for the inputs

  • the PTransform<PCollection<?>, PDone> for the outputs. The outputs also must use a single (composite or not) DoFn in their apply method.

Wrap an input

Assume you want to wrap an input like this one (based on existing Beam ones):

@AutoValue
public abstract [static] class Read extends PTransform<PBegin, PCollection<String>> {

  // config

  @Override
  public PCollection<String> expand(final PBegin input) {
    return input.apply(
        org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null)));
  }

  // ... other transform methods
}

To wrap the Read in a framework component you create a transform delegating to this one with a @PartitionMapper annotation at least (you likely want to follow the best practices as well adding @Icon and @Version) and using @Option constructor injections to configure the component:

@PartitionMapper(family = "myfamily", name = "myname")
public class WrapRead extends PTransform<PBegin, PCollection<String>> {
  private PTransform<PBegin, PCollection<String>> delegate;

  public WrapRead(@Option("dataset") final WrapReadDataSet dataset) {
    delegate = TheIO.read().withConfiguration(this.createConfigurationFrom(dataset));
  }

  @Override
  public PCollection<String> expand(final PBegin input) {
    return delegate.expand(input);
  }

  // ... other methods like the mapping with the native configuration (createConfigurationFrom)
}

Wrap an output

Assume you want to wrap an output like this one (based on existing Beam ones):

@AutoValue
public abstract [static] class Write extends PTransform<PCollection<String>, PDone> {


    // configuration withXXX(...)

    @Override
    public PDone expand(final PCollection<String> input) {
      input.apply(ParDo.of(new WriteFn(this)));
      return PDone.in(input.getPipeline());
    }

    // other methods of the transform
}

You can wrap this output exactly the same way than for the inputs but using @Processor this time:

@PartitionMapper(family = "myfamily", name = "myname")
public class WrapRead extends PTransform<PCollection<String>, PDone> {
  private PTransform<PCollection<String>, PDone> delegate;

  public WrapRead(@Option("dataset") final WrapReadDataSet dataset) {
    delegate = TheIO.write().withConfiguration(this.createConfigurationFrom(dataset));
  }

  @Override
  public PDone expand(final PCollection<String> input) {
    return delegate.expand(input);
  }

  // ... other methods like the mapping with the native configuration (createConfigurationFrom)
}

Tip

Note that the class org.talend.sdk.component.runtime.beam.transform.DelegatingTransform fully delegates to another transform the "expansion". Therefore you can extend it and just implement the configuration mapping:

@Processor(family = "beam", name = "file")
public class BeamFileOutput extends DelegatingTransform<PCollection<String>, PDone> {

    public BeamFileOutput(@Option("output") final String output) {
        super(TextIO.write()
            .withSuffix("test")
            .to(FileBasedSink.convertToFileResourceIfPossible(output)));
    }
}

Advanced

In terms of classloading, when you write an IO all the Beam SDK Java core stack is assumed in Talend Component Kit runtime as provided so never include it in compile scope - it would be ignored anyway.

Coder

If you need a JSonCoder you can use org.talend.sdk.component.runtime.beam.factory.service.PluginCoderFactory service which gives you access the JSON-P and JSON-B coders.

Sample

Here is a sample input based on beam Kafka:

@Version
@Icon(Icon.IconType.KAFKA)
@Emitter(name = "Input")
@AllArgsConstructor
@Documentation("Kafka Input")
public class KafkaInput extends PTransform<PBegin, PCollection<JsonObject>> { (1)

    private final InputConfiguration configuration;

    private final JsonBuilderFactory builder;

    private final PluginCoderFactory coderFactory;

    private KafkaIO.Read<byte[], byte[]> delegate() {
        final KafkaIO.Read<byte[], byte[]> read = KafkaIO.<byte[], byte[]> read()
                .withBootstrapServers(configuration.getBootstrapServers())
                .withTopics(configuration.getTopics().stream().map(InputConfiguration.Topic::getName).collect(toList()))
                .withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
        if (configuration.getMaxResults() > 0) {
            return read.withMaxNumRecords(configuration.getMaxResults());
        }
        return read;
    }

    @Override (2)
    public PCollection<JsonObject> expand(final PBegin pBegin) {
        final PCollection<KafkaRecord<byte[], byte[]>> kafkaEntries = pBegin.getPipeline().apply(delegate());
        return kafkaEntries.apply(ParDo.of(new RecordToJson(builder))).setCoder(coderFactory.jsonp()); (3)
    }

    @AllArgsConstructor
    private static class RecordToJson extends DoFn<KafkaRecord<byte[], byte[]>, JsonObject> {

        private final JsonBuilderFactory builder;

        @ProcessElement
        public void onElement(final ProcessContext context) {
            context.output(toJson(context.element()));
        }

        // todo: we shouldnt be typed string/string so make it evolving
        private JsonObject toJson(final KafkaRecord<byte[], byte[]> element) {
            return builder.createObjectBuilder().add("key", new String(element.getKV().getKey()))
                    .add("value", new String(element.getKV().getValue())).build();
        }
    }
}
1 the PTransform generics define it is an input (PBegin marker)
2 the expand method chains the native IO with a custom mapper (RecordToJson)
3 the mapper uses the JSON-P coder automatically created from the contextual component

Since the Beam wrapper doesn’t respect the standard Kit programming Model (no @Emitter for instance) you need to set <talend.validation.component>false</talend.validation.component> property in your pom.xml (or equivalent for Gradle) to skip the Kit component programming model validations.

Scroll to top