001/** 002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.environment.builtin.beam; 017 018import static java.util.Optional.ofNullable; 019 020import java.lang.annotation.Annotation; 021import java.lang.reflect.Field; 022import java.lang.reflect.Method; 023import java.util.Map; 024import java.util.stream.Stream; 025 026import org.jboss.shrinkwrap.resolver.api.maven.ScopeType; 027import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependencies; 028import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependency; 029import org.jboss.shrinkwrap.resolver.api.maven.coordinate.MavenDependencyExclusion; 030import org.talend.sdk.component.junit.environment.ClassLoaderEnvironment; 031 032import lombok.extern.slf4j.Slf4j; 033 034@Slf4j 035public abstract class BeamEnvironment extends ClassLoaderEnvironment { 036 037 private boolean skipBeamSdk; 038 039 private String beamVersion; 040 041 @Override 042 protected AutoCloseable doStart(final Class<?> clazz, final Annotation[] annotations) { 043 beamVersion = System.getProperty("talend.junit.beam.version", Versions.BEAM_VERSION); 044 try { 045 BeamEnvironment.class.getClassLoader().loadClass("org.talend.sdk.component.runtime.beam.TalendIO"); 046 skipBeamSdk = true; 047 } catch (final NoClassDefFoundError | ClassNotFoundException e) { 048 skipBeamSdk = false; 049 } 050 resetBeamCache(); 051 final AutoCloseable delegate = super.doStart(clazz, annotations); 052 return () -> { 053 try { 054 delegate.close(); 055 } finally { 056 resetBeamCache(); 057 } 058 }; 059 } 060 061 private void resetBeamCache() { 062 // if beam 2.4.0 includes it: PipelineOptionsfactory.resetCache() 063 064 try { // until resetCache() is part of beam do it the hard way 065 final ClassLoader loader = Thread.currentThread().getContextClassLoader(); 066 final Class<?> pof = loader.loadClass("org.apache.beam.sdk.options.PipelineOptionsFactory"); 067 068 Stream.of("COMBINED_CACHE", "INTERFACE_CACHE", "SUPPORTED_PIPELINE_RUNNERS").forEach(mapField -> { 069 try { 070 final Field field = pof.getDeclaredField(mapField); 071 field.setAccessible(true); 072 ofNullable(Map.class.cast(field.get(null))).ifPresent(Map::clear); 073 } catch (final Exception e) { 074 // no-op: this is a best effort clean until beam supports it correctly 075 } 076 }); 077 078 // 2. reinit 079 // todo: SUPPORTED_PIPELINE_RUNNERS reinit but it is final so we just expect the user to set the runner for 080 // now 081 082 final Method initializeRegistry = pof.getDeclaredMethod("resetRegistry"); 083 initializeRegistry.setAccessible(true); 084 initializeRegistry.invoke(null); 085 } catch (final NoClassDefFoundError | Exception ex) { 086 log.warn(ex.getMessage()); 087 } 088 } 089 090 @Override 091 protected MavenDependency[] rootDependencies() { 092 return new MavenDependency[] { MavenDependencies.createDependency(rootDependencyBase() + ":jar:" + beamVersion, 093 ScopeType.RUNTIME, false, 094 skipBeamSdk 095 ? new MavenDependencyExclusion[] { 096 MavenDependencies.createExclusion("org.apache.beam", "beam-sdks-java-core") } 097 : new MavenDependencyExclusion[0]) }; 098 } 099 100 @Override 101 public String getName() { 102 return super.getName().replace("Runner", ""); 103 } 104 105 protected abstract String rootDependencyBase(); 106}