001/**
002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit.lang;
017
018import static org.talend.sdk.component.runtime.base.lang.exception.InvocationExceptionWrapper.toRuntimeException;
019
020import java.lang.reflect.InvocationHandler;
021import java.lang.reflect.InvocationTargetException;
022import java.lang.reflect.Method;
023import java.lang.reflect.Proxy;
024import java.util.concurrent.atomic.AtomicReference;
025import java.util.function.Consumer;
026import java.util.function.Supplier;
027import java.util.stream.BaseStream;
028import java.util.stream.DoubleStream;
029import java.util.stream.IntStream;
030import java.util.stream.LongStream;
031import java.util.stream.Stream;
032
033import lombok.Data;
034
035@Data
036public class StreamDecorator implements InvocationHandler {
037
038    private final BaseStream delegate;
039
040    private final Consumer<Runnable> leafDecorator;
041
042    // if method is iterator() or splitIterator() the behavior is likely not the hoped exact one but ok for us
043    @Override
044    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
045        try {
046            final boolean stream = BaseStream.class.isAssignableFrom(method.getReturnType());
047            final Object result = stream ? method.invoke(delegate, args) : wrap(() -> {
048                try {
049                    return method.invoke(delegate, args);
050                } catch (final IllegalAccessException e) {
051                    throw new IllegalArgumentException(e);
052                } catch (final InvocationTargetException e) {
053                    throw toRuntimeException(e);
054                }
055            });
056            if (stream) {
057                if (Stream.class.isInstance(result)) {
058                    return decorate(Stream.class.cast(result), Stream.class, leafDecorator);
059                }
060                if (IntStream.class.isInstance(result)) {
061                    return decorate(IntStream.class.cast(result), IntStream.class, leafDecorator);
062                }
063                if (LongStream.class.isInstance(result)) {
064                    return decorate(LongStream.class.cast(result), LongStream.class, leafDecorator);
065                }
066                if (DoubleStream.class.isInstance(result)) {
067                    return decorate(DoubleStream.class.cast(result), DoubleStream.class, leafDecorator);
068                }
069            }
070            return result;
071        } catch (final InvocationTargetException ite) {
072            throw ite.getTargetException();
073        }
074    }
075
076    private <V> V wrap(final Supplier<V> supplier) {
077        final AtomicReference<V> ref = new AtomicReference<>();
078        leafDecorator.accept(() -> ref.set(supplier.get()));
079        return ref.get();
080    }
081
082    public static <T> Stream<T> decorate(final Stream<T> delegate, final Consumer<Runnable> wrapper) {
083        return decorate(delegate, Stream.class, wrapper);
084    }
085
086    private static <T, S extends BaseStream<T, ?>> S decorate(final S delegate, final Class<S> type,
087            final Consumer<Runnable> wrapper) {
088        return (S) Proxy
089                .newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { type },
090                        new StreamDecorator(delegate, wrapper));
091    }
092}