component-runtime-testing-spark
The folowing artifact will allow you to test against a spark cluster:
<dependency>
<groupId>org.talend.sdk.component</groupId>
<artifactId>component-runtime-testing-spark</artifactId>
<version>${talend-component.version}</version>
<scope>test</scope>
</dependency>
JUnit 4
The usage relies on a JUnit TestRule
. It is recommended to use it as a @ClassRule
to ensure
a single instance of a spark cluster is built but you can also use it as a simple @Rule
which means
it will be created per method instead of per test class.
It takes as parameter the spark and scala version to use. It will then fork a master and N slaves.
Finally it will give you submit*
method allowing you to send jobs either from the test classpath
or from a shade if you run it as an integration test.
Here is a sample:
public class SparkClusterRuleTest {
@ClassRule
public static final SparkClusterRule SPARK = new SparkClusterRule("2.10", "1.6.3", 1);
@Test
public void classpathSubmit() throws IOException {
SPARK.submitClasspath(SubmittableMain.class, getMainArgs());
// do wait the test passed
}
}
this is working with @Parameterized so you can submit a bunch of jobs with different args and even combine it with beam TestPipeline if you make it transient !
|
JUnit 5
The integration with JUnit 5 of that spark cluster logic uses @WithSpark
marker for the extension
and let you, optionally, inject through @SparkInject
, the BaseSpark<?>
handler to access te spark cluster
meta information - like its host/port.
Here is a basic test using it:
@WithSpark
class SparkExtensionTest {
@SparkInject
private BaseSpark<?> spark;
@Test
void classpathSubmit() throws IOException {
final File out = new File(jarLocation(SparkClusterRuleTest.class).getParentFile(), "classpathSubmitJunit5.out");
if (out.exists()) {
out.delete();
}
spark.submitClasspath(SparkClusterRuleTest.SubmittableMain.class, spark.getSparkMaster(), out.getAbsolutePath());
await().atMost(5, MINUTES).until(
() -> out.exists() ? Files.readAllLines(out.toPath()).stream().collect(joining("\n")).trim() : null,
equalTo("b -> 1\na -> 1"));
}
}
How to know the job is done
In current state, SparkClusterRule
doesn’t allow to know a job execution is done - even if it exposes the webui url so
you can poll it to check. The best at the moment is to ensure the output of your job exists and contains the right value.
awaitability
or equivalent library can help you to write such logic.
Here are the coordinates of the artifact:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
And here is how to wait a file exists and its content (for instance) is the expected one:
await()
.atMost(5, MINUTES)
.until(
() -> out.exists() ? Files.readAllLines(out.toPath()).stream().collect(joining("\n")).trim() : null,
equalTo("the expected content of the file"));