001/** 002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.lang; 017 018import static org.talend.sdk.component.runtime.base.lang.exception.InvocationExceptionWrapper.toRuntimeException; 019 020import java.lang.reflect.InvocationHandler; 021import java.lang.reflect.InvocationTargetException; 022import java.lang.reflect.Method; 023import java.lang.reflect.Proxy; 024import java.util.concurrent.atomic.AtomicReference; 025import java.util.function.Consumer; 026import java.util.function.Supplier; 027import java.util.stream.BaseStream; 028import java.util.stream.DoubleStream; 029import java.util.stream.IntStream; 030import java.util.stream.LongStream; 031import java.util.stream.Stream; 032 033import lombok.Data; 034 035@Data 036public class StreamDecorator implements InvocationHandler { 037 038 private final BaseStream delegate; 039 040 private final Consumer<Runnable> leafDecorator; 041 042 // if method is iterator() or splitIterator() the behavior is likely not the hoped exact one but ok for us 043 @Override 044 public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { 045 try { 046 final boolean stream = BaseStream.class.isAssignableFrom(method.getReturnType()); 047 final Object result = stream ? method.invoke(delegate, args) : wrap(() -> { 048 try { 049 return method.invoke(delegate, args); 050 } catch (final IllegalAccessException e) { 051 throw new IllegalArgumentException(e); 052 } catch (final InvocationTargetException e) { 053 throw toRuntimeException(e); 054 } 055 }); 056 if (stream) { 057 if (Stream.class.isInstance(result)) { 058 return decorate(Stream.class.cast(result), Stream.class, leafDecorator); 059 } 060 if (IntStream.class.isInstance(result)) { 061 return decorate(IntStream.class.cast(result), IntStream.class, leafDecorator); 062 } 063 if (LongStream.class.isInstance(result)) { 064 return decorate(LongStream.class.cast(result), LongStream.class, leafDecorator); 065 } 066 if (DoubleStream.class.isInstance(result)) { 067 return decorate(DoubleStream.class.cast(result), DoubleStream.class, leafDecorator); 068 } 069 } 070 return result; 071 } catch (final InvocationTargetException ite) { 072 throw ite.getTargetException(); 073 } 074 } 075 076 private <V> V wrap(final Supplier<V> supplier) { 077 final AtomicReference<V> ref = new AtomicReference<>(); 078 leafDecorator.accept(() -> ref.set(supplier.get())); 079 return ref.get(); 080 } 081 082 public static <T> Stream<T> decorate(final Stream<T> delegate, final Consumer<Runnable> wrapper) { 083 return decorate(delegate, Stream.class, wrapper); 084 } 085 086 private static <T, S extends BaseStream<T, ?>> S decorate(final S delegate, final Class<S> type, 087 final Consumer<Runnable> wrapper) { 088 return (S) Proxy 089 .newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { type }, 090 new StreamDecorator(delegate, wrapper)); 091 } 092}