component-runtime-testing-spark

The folowing artifact will allow you to test against a spark cluster:

<dependency>
  <groupId>org.talend.sdk.component</groupId>
  <artifactId>component-runtime-testing-spark</artifactId>
  <version>${talend-component.version}</version>
  <scope>test</scope>
</dependency>

JUnit 4

The usage relies on a JUnit TestRule. It is recommended to use it as a @ClassRule to ensure a single instance of a spark cluster is built but you can also use it as a simple @Rule which means it will be created per method instead of per test class.

It takes as parameter the spark and scala version to use. It will then fork a master and N slaves. Finally it will give you submit* method allowing you to send jobs either from the test classpath or from a shade if you run it as an integration test.

Here is a sample:

public class SparkClusterRuleTest {

    @ClassRule
    public static final SparkClusterRule SPARK = new SparkClusterRule("2.10", "1.6.3", 1);

    @Test
    public void classpathSubmit() throws IOException {
        SPARK.submitClasspath(SubmittableMain.class, getMainArgs());

        // do wait the test passed
    }
}
this is working with @Parameterized so you can submit a bunch of jobs with different args and even combine it with beam TestPipeline if you make it transient!

JUnit 5

The integration with JUnit 5 of that spark cluster logic uses @WithSpark marker for the extension and let you, optionally, inject through @SparkInject, the BaseSpark<?> handler to access te spark cluster meta information - like its host/port.

Here is a basic test using it:

@WithSpark
class SparkExtensionTest {

    @SparkInject
    private BaseSpark<?> spark;

    @Test
    void classpathSubmit() throws IOException {
        final File out = new File(jarLocation(SparkClusterRuleTest.class).getParentFile(), "classpathSubmitJunit5.out");
        if (out.exists()) {
            out.delete();
        }
        spark.submitClasspath(SparkClusterRuleTest.SubmittableMain.class, spark.getSparkMaster(), out.getAbsolutePath());

        await().atMost(5, MINUTES).until(
                () -> out.exists() ? Files.readAllLines(out.toPath()).stream().collect(joining("\n")).trim() : null,
                equalTo("b -> 1\na -> 1"));
    }
}

How to know the job is done

In current state, SparkClusterRule doesn’t allow to know a job execution is done - even if it exposes the webui url so you can poll it to check. The best at the moment is to ensure the output of your job exists and contains the right value.

awaitability or equivalent library can help you to write such logic.

Here are the coordinates of the artifact:

<dependency>
  <groupId>org.awaitility</groupId>
  <artifactId>awaitility</artifactId>
  <version>3.0.0</version>
  <scope>test</scope>
</dependency>

And here is how to wait a file exists and its content (for instance) is the expected one:

await()
    .atMost(5, MINUTES)
    .until(
        () -> out.exists() ? Files.readAllLines(out.toPath()).stream().collect(joining("\n")).trim() : null,
        equalTo("the expected content of the file"));
Scroll to top