001/**
002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit.environment.builtin.beam;
017
018import static java.util.Optional.ofNullable;
019
020import java.lang.annotation.Annotation;
021import java.lang.reflect.Field;
022import java.lang.reflect.Method;
023import java.util.Map;
024import java.util.stream.Stream;
025
026import org.jboss.shrinkwrap.resolver.api.maven.ScopeType;
027import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependencies;
028import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependency;
029import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependencyExclusion;
030import org.talend.sdk.component.junit.environment.ClassLoaderEnvironment;
031
032import lombok.extern.slf4j.Slf4j;
033
034@Slf4j
035public abstract class BeamEnvironment extends ClassLoaderEnvironment {
036
037    private boolean skipBeamSdk;
038
039    private String beamVersion;
040
041    @Override
042    protected AutoCloseable doStart(final Class<?> clazz, final Annotation[] annotations) {
043        beamVersion = System.getProperty("talend.junit.beam.version", Versions.BEAM_VERSION);
044        try {
045            BeamEnvironment.class.getClassLoader().loadClass("org.talend.sdk.component.runtime.beam.TalendIO");
046            skipBeamSdk = true;
047        } catch (final NoClassDefFoundError | ClassNotFoundException e) {
048            skipBeamSdk = false;
049        }
050        resetBeamCache();
051        final AutoCloseable delegate = super.doStart(clazz, annotations);
052        return () -> {
053            try {
054                delegate.close();
055            } finally {
056                resetBeamCache();
057            }
058        };
059    }
060
061    private void resetBeamCache() {
062        // if beam 2.4.0 includes it: PipelineOptionsfactory.resetCache()
063
064        try { // until resetCache() is part of beam do it the hard way
065            final ClassLoader loader = Thread.currentThread().getContextClassLoader();
066            final Class<?> pof = loader.loadClass("org.apache.beam.sdk.options.PipelineOptionsFactory");
067
068            Stream.of("COMBINED_CACHE", "INTERFACE_CACHE", "SUPPORTED_PIPELINE_RUNNERS").forEach(mapField -> {
069                try {
070                    final Field field = pof.getDeclaredField(mapField);
071                    field.setAccessible(true);
072                    ofNullable(Map.class.cast(field.get(null))).ifPresent(Map::clear);
073                } catch (final Exception e) {
074                    // no-op: this is a best effort clean until beam supports it correctly
075                }
076            });
077
078            // 2. reinit
079            // todo: SUPPORTED_PIPELINE_RUNNERS reinit but it is final so we just expect the user to set the runner for
080            // now
081
082            final Method initializeRegistry = pof.getDeclaredMethod("resetRegistry");
083            initializeRegistry.setAccessible(true);
084            initializeRegistry.invoke(null);
085        } catch (final NoClassDefFoundError | Exception ex) {
086            log.warn(ex.getMessage());
087        }
088    }
089
090    @Override
091    protected MavenDependency[] rootDependencies() {
092        return new MavenDependency[] { MavenDependencies.createDependency(rootDependencyBase() + ":jar:" + beamVersion,
093                ScopeType.RUNTIME, false,
094                skipBeamSdk
095                        ? new MavenDependencyExclusion[] {
096                                MavenDependencies.createExclusion("org.apache.beam", "beam-sdks-java-core") }
097                        : new MavenDependencyExclusion[0]) };
098    }
099
100    @Override
101    public String getName() {
102        return super.getName().replace("Runner", "");
103    }
104
105    protected abstract String rootDependencyBase();
106}