001/**
002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit;
017
018import static java.lang.Math.abs;
019import static java.util.Collections.emptyIterator;
020import static java.util.Collections.emptyMap;
021import static java.util.Locale.ROOT;
022import static java.util.concurrent.TimeUnit.MINUTES;
023import static java.util.concurrent.TimeUnit.SECONDS;
024import static java.util.stream.Collectors.joining;
025import static java.util.stream.Collectors.toList;
026import static org.apache.ziplock.JarLocation.jarLocation;
027import static org.junit.Assert.fail;
028import static org.talend.sdk.component.junit.SimpleFactory.configurationByExample;
029
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Objects;
038import java.util.Optional;
039import java.util.Queue;
040import java.util.Set;
041import java.util.Spliterator;
042import java.util.Spliterators;
043import java.util.concurrent.ConcurrentLinkedQueue;
044import java.util.concurrent.CopyOnWriteArrayList;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.Executors;
049import java.util.concurrent.Future;
050import java.util.concurrent.Semaphore;
051import java.util.concurrent.TimeoutException;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Stream;
055import java.util.stream.StreamSupport;
056
057import javax.json.JsonBuilderFactory;
058import javax.json.JsonObject;
059import javax.json.bind.Jsonb;
060import javax.json.bind.JsonbConfig;
061import javax.json.spi.JsonProvider;
062
063import org.apache.xbean.finder.filter.Filter;
064import org.talend.sdk.component.api.record.Record;
065import org.talend.sdk.component.api.service.injector.Injector;
066import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
067import org.talend.sdk.component.junit.lang.StreamDecorator;
068import org.talend.sdk.component.runtime.base.Lifecycle;
069import org.talend.sdk.component.runtime.input.Input;
070import org.talend.sdk.component.runtime.input.Mapper;
071import org.talend.sdk.component.runtime.manager.ComponentFamilyMeta;
072import org.talend.sdk.component.runtime.manager.ComponentManager;
073import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry;
074import org.talend.sdk.component.runtime.manager.chain.AutoChunkProcessor;
075import org.talend.sdk.component.runtime.manager.chain.Job;
076import org.talend.sdk.component.runtime.manager.json.PreComputedJsonpProvider;
077import org.talend.sdk.component.runtime.output.OutputFactory;
078import org.talend.sdk.component.runtime.output.Processor;
079import org.talend.sdk.component.runtime.record.RecordConverters;
080
081import lombok.AllArgsConstructor;
082import lombok.extern.slf4j.Slf4j;
083
084@Slf4j
085public class BaseComponentsHandler implements ComponentsHandler {
086
087    protected static final Local<State> STATE = loadStateHolder();
088
089    private static Local<State> loadStateHolder() {
090        switch (System.getProperty("talend.component.junit.handler.state", "thread").toLowerCase(ROOT)) {
091        case "static":
092            return new Local.StaticImpl<>();
093        default:
094            return new Local.ThreadLocalImpl<>();
095        }
096    }
097
098    private final ThreadLocal<PreState> initState = ThreadLocal.withInitial(PreState::new);
099
100    protected String packageName;
101
102    protected Collection<String> isolatedPackages;
103
104    public <T> T injectServices(final T instance) {
105        if (instance == null) {
106            return null;
107        }
108        final String plugin = getSinglePlugin();
109        final Map<Class<?>, Object> services = asManager()
110                .findPlugin(plugin)
111                .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))
112                .get(ComponentManager.AllServices.class)
113                .getServices();
114        Injector.class.cast(services.get(Injector.class)).inject(instance);
115        return instance;
116    }
117
118    public BaseComponentsHandler withIsolatedPackage(final String packageName, final String... packages) {
119        isolatedPackages =
120                Stream.concat(Stream.of(packageName), Stream.of(packages)).filter(Objects::nonNull).collect(toList());
121        if (isolatedPackages.isEmpty()) {
122            isolatedPackages = null;
123        }
124        return this;
125    }
126
127    public EmbeddedComponentManager start() {
128        final EmbeddedComponentManager embeddedComponentManager = new EmbeddedComponentManager(packageName) {
129
130            @Override
131            protected boolean isContainerClass(final Filter filter, final String name) {
132                if (name == null) {
133                    return super.isContainerClass(filter, null);
134                }
135                return (isolatedPackages == null || isolatedPackages.stream().noneMatch(name::startsWith))
136                        && super.isContainerClass(filter, name);
137            }
138
139            @Override
140            public void close() {
141                try {
142                    final State state = STATE.get();
143                    if (state.jsonb != null) {
144                        try {
145                            state.jsonb.close();
146                        } catch (final Exception e) {
147                            // no-op: not important
148                        }
149                    }
150                    STATE.remove();
151                    initState.remove();
152                } finally {
153                    super.close();
154                }
155            }
156        };
157
158        STATE
159                .set(new State(embeddedComponentManager, new CopyOnWriteArrayList<>(), initState.get().emitter, null,
160                        null, null, null));
161        return embeddedComponentManager;
162    }
163
164    @Override
165    public Outputs collect(final Processor processor, final ControllableInputFactory inputs) {
166        return collect(processor, inputs, 10);
167    }
168
169    /**
170     * Collects all outputs of a processor.
171     *
172     * @param processor the processor to run while there are inputs.
173     * @param inputs the input factory, when an input will return null it will stop the
174     * processing.
175     * @param bundleSize the bundle size to use.
176     * @return a map where the key is the output name and the value a stream of the
177     * output values.
178     */
179    @Override
180    public Outputs collect(final Processor processor, final ControllableInputFactory inputs, final int bundleSize) {
181        final AutoChunkProcessor autoChunkProcessor = new AutoChunkProcessor(bundleSize, processor);
182        autoChunkProcessor.start();
183        final Outputs outputs = new Outputs();
184        final OutputFactory outputFactory = name -> value -> {
185            final List aggregator = outputs.data.computeIfAbsent(name, n -> new ArrayList<>());
186            aggregator.add(value);
187        };
188        try {
189            while (inputs.hasMoreData()) {
190                autoChunkProcessor.onElement(inputs, outputFactory);
191            }
192            autoChunkProcessor.flush(outputFactory);
193        } finally {
194            autoChunkProcessor.stop();
195        }
196        return outputs;
197    }
198
199    @Override
200    public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords) {
201        return collect(recordType, mapper, maxRecords, Runtime.getRuntime().availableProcessors());
202    }
203
204    /**
205     * Collects data emitted from this mapper. If the split creates more than one
206     * mapper, it will create as much threads as mappers otherwise it will use the
207     * caller thread.
208     *
209     * IMPORTANT: don't forget to consume all the stream to ensure the underlying
210     * { @see org.talend.sdk.component.runtime.input.Input} is closed.
211     *
212     * @param recordType the record type to use to type the returned type.
213     * @param mapper the mapper to go through.
214     * @param maxRecords maximum number of records, allows to stop the source when
215     * infinite.
216     * @param concurrency requested (1 can be used instead if &lt;= 0) concurrency for the reader execution.
217     * @param <T> the returned type of the records of the mapper.
218     * @return all the records emitted by the mapper.
219     */
220    @Override
221    public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords,
222            final int concurrency) {
223        mapper.start();
224
225        final State state = STATE.get();
226        final long assess = mapper.assess();
227        final int proc = Math.max(1, concurrency);
228        final List<Mapper> mappers = mapper.split(Math.max(assess / proc, 1));
229        switch (mappers.size()) {
230        case 0:
231            return Stream.empty();
232        case 1:
233            return StreamDecorator
234                    .decorate(asStream(asIterator(mappers.iterator().next().create(), new AtomicInteger(maxRecords))),
235                            collect -> {
236                                try {
237                                    collect.run();
238                                } finally {
239                                    mapper.stop();
240                                }
241                            });
242        default: // N producers-1 consumer pattern
243            final AtomicInteger threadCounter = new AtomicInteger(0);
244            final ExecutorService es = Executors.newFixedThreadPool(mappers.size(), r -> new Thread(r) {
245
246                {
247                    setName(BaseComponentsHandler.this.getClass().getSimpleName() + "-pool-" + abs(mapper.hashCode())
248                            + "-" + threadCounter.incrementAndGet());
249                }
250            });
251            final AtomicInteger recordCounter = new AtomicInteger(maxRecords);
252            final Semaphore permissions = new Semaphore(0);
253            final Queue<T> records = new ConcurrentLinkedQueue<>();
254            final CountDownLatch latch = new CountDownLatch(mappers.size());
255            final List<? extends Future<?>> tasks = mappers
256                    .stream()
257                    .map(Mapper::create)
258                    .map(input -> (Iterator<T>) asIterator(input, recordCounter))
259                    .map(it -> es.submit(() -> {
260                        try {
261                            while (it.hasNext()) {
262                                final T next = it.next();
263                                records.add(next);
264                                permissions.release();
265                            }
266                        } finally {
267                            latch.countDown();
268                        }
269                    }))
270                    .collect(toList());
271            es.shutdown();
272
273            final int timeout = Integer.getInteger("talend.component.junit.timeout", 5);
274            new Thread() {
275
276                {
277                    setName(BaseComponentsHandler.class.getSimpleName() + "-monitor_" + abs(mapper.hashCode()));
278                }
279
280                @Override
281                public void run() {
282                    try {
283                        latch.await(timeout, MINUTES);
284                    } catch (final InterruptedException e) {
285                        Thread.currentThread().interrupt();
286                    } finally {
287                        permissions.release();
288                    }
289                }
290            }.start();
291            return StreamDecorator.decorate(asStream(new Iterator<T>() {
292
293                @Override
294                public boolean hasNext() {
295                    try {
296                        permissions.acquire();
297                    } catch (final InterruptedException e) {
298                        Thread.currentThread().interrupt();
299                        fail(e.getMessage());
300                    }
301                    return !records.isEmpty();
302                }
303
304                @Override
305                public T next() {
306                    final T poll = records.poll();
307                    if (poll != null) {
308                        return mapRecord(state, recordType, poll);
309                    }
310                    return null;
311                }
312            }), task -> {
313                try {
314                    task.run();
315                } finally {
316                    tasks.forEach(f -> {
317                        try {
318                            f.get(5, SECONDS);
319                        } catch (final InterruptedException e) {
320                            Thread.currentThread().interrupt();
321                        } catch (final ExecutionException | TimeoutException e) {
322                            // no-op
323                        } finally {
324                            if (!f.isDone() && !f.isCancelled()) {
325                                f.cancel(true);
326                            }
327                        }
328                    });
329                }
330            });
331        }
332    }
333
334    private <T> Stream<T> asStream(final Iterator<T> iterator) {
335        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false);
336    }
337
338    private <T> Iterator<T> asIterator(final Input input, final AtomicInteger counter) {
339        input.start();
340        return new Iterator<T>() {
341
342            private boolean closed;
343
344            private Object next;
345
346            @Override
347            public boolean hasNext() {
348                final int remaining = counter.get();
349                if (remaining <= 0) {
350                    return false;
351                }
352
353                final boolean hasNext = (next = input.next()) != null;
354                if (!hasNext && !closed) {
355                    closed = true;
356                    input.stop();
357                }
358                if (hasNext) {
359                    counter.decrementAndGet();
360                }
361                return hasNext;
362            }
363
364            @Override
365            public T next() {
366                return (T) next;
367            }
368        };
369    }
370
371    @Override
372    public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper) {
373        return collectAsList(recordType, mapper, 1000);
374    }
375
376    @Override
377    public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper, final int maxRecords) {
378        return collect(recordType, mapper, maxRecords).collect(toList());
379    }
380
381    @Override
382    public Mapper createMapper(final Class<?> componentType, final Object configuration) {
383        return create(Mapper.class, componentType, configuration);
384    }
385
386    @Override
387    public Processor createProcessor(final Class<?> componentType, final Object configuration) {
388        return create(Processor.class, componentType, configuration);
389    }
390
391    private <C, T, A> A create(final Class<A> api, final Class<T> componentType, final C configuration) {
392        final ComponentFamilyMeta.BaseMeta<? extends Lifecycle> meta = findMeta(componentType);
393        return api
394                .cast(meta
395                        .getInstantiator()
396                        .apply(configuration == null || meta.getParameterMetas().get().isEmpty() ? emptyMap()
397                                : configurationByExample(configuration, meta
398                                        .getParameterMetas()
399                                        .get()
400                                        .stream()
401                                        .filter(p -> p.getName().equals(p.getPath()))
402                                        .findFirst()
403                                        .map(p -> p.getName() + '.')
404                                        .orElseThrow(() -> new IllegalArgumentException(
405                                                "Didn't find any option and therefore "
406                                                        + "can't convert the configuration instance to a configuration")))));
407    }
408
409    private <T> ComponentFamilyMeta.BaseMeta<? extends Lifecycle> findMeta(final Class<T> componentType) {
410        return asManager()
411                .find(c -> c.get(ContainerComponentRegistry.class).getComponents().values().stream())
412                .flatMap(f -> Stream
413                        .of(f.getProcessors().values().stream(), f.getPartitionMappers().values().stream(),
414                                f.getDriverRunners().values().stream())
415                        .flatMap(t -> t))
416                .filter(m -> m.getType().getName().equals(componentType.getName()))
417                .findFirst()
418                .orElseThrow(() -> new IllegalArgumentException("No component " + componentType));
419    }
420
421    @Override
422    public <T> List<T> collect(final Class<T> recordType, final String family, final String component,
423            final int version, final Map<String, String> configuration) {
424        Job
425                .components()
426                .component("in",
427                        family + "://" + component + "?__version=" + version
428                                + configuration
429                                        .entrySet()
430                                        .stream()
431                                        .map(entry -> entry.getKey() + "=" + entry.getValue())
432                                        .collect(joining("&", "&", "")))
433                .component("collector", "test://collector")
434                .connections()
435                .from("in")
436                .to("collector")
437                .build()
438                .run();
439
440        return getCollectedData(recordType);
441    }
442
443    @Override
444    public <T> void process(final Iterable<T> inputs, final String family, final String component, final int version,
445            final Map<String, String> configuration) {
446        setInputData(inputs);
447
448        Job
449                .components()
450                .component("emitter", "test://emitter")
451                .component("out",
452                        family + "://" + component + "?__version=" + version
453                                + configuration
454                                        .entrySet()
455                                        .stream()
456                                        .map(entry -> entry.getKey() + "=" + entry.getValue())
457                                        .collect(joining("&", "&", "")))
458                .connections()
459                .from("emitter")
460                .to("out")
461                .build()
462                .run();
463
464    }
465
466    @Override
467    public ComponentManager asManager() {
468        return STATE.get().manager;
469    }
470
471    @Override
472    public <T> T findService(final String plugin, final Class<T> serviceClass) {
473        return serviceClass
474                .cast(asManager()
475                        .findPlugin(plugin)
476                        .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))
477                        .get(ComponentManager.AllServices.class)
478                        .getServices()
479                        .get(serviceClass));
480    }
481
482    @Override
483    public <T> T findService(final Class<T> serviceClass) {
484        return findService(getSinglePlugin(), serviceClass);
485    }
486
487    public Set<String> getTestPlugins() {
488        return new HashSet<>(EmbeddedComponentManager.class.cast(asManager()).testPlugins);
489    }
490
491    @Override
492    public <T> void setInputData(final Iterable<T> data) {
493        final State state = STATE.get();
494        if (state == null) {
495            initState.get().emitter = data.iterator();
496        } else {
497            state.emitter = data.iterator();
498        }
499    }
500
501    @Override
502    public <T> List<T> getCollectedData(final Class<T> recordType) {
503        final State state = STATE.get();
504        return state.collector
505                .stream()
506                .filter(r -> recordType.isInstance(r) || JsonObject.class.isInstance(r) || Record.class.isInstance(r))
507                .map(r -> mapRecord(state, recordType, r))
508                .collect(toList());
509    }
510
511    public void resetState() {
512        final State state = STATE.get();
513        if (state == null) {
514            STATE.remove();
515        } else {
516            state.collector.clear();
517            state.emitter = emptyIterator();
518        }
519    }
520
521    private String getSinglePlugin() {
522        return Optional
523                .of(EmbeddedComponentManager.class.cast(asManager()).testPlugins/* sorted */)
524                .filter(c -> !c.isEmpty())
525                .map(c -> c.iterator().next())
526                .orElseThrow(() -> new IllegalStateException("No component plugin found"));
527    }
528
529    private <T> T mapRecord(final State state, final Class<T> recordType, final Object r) {
530        if (recordType.isInstance(r)) {
531            return recordType.cast(r);
532        }
533        if (Record.class == recordType) {
534            return recordType
535                    .cast(new RecordConverters()
536                            .toRecord(state.registry, r, state::jsonb, state::recordBuilderFactory));
537        }
538        return recordType
539                .cast(new RecordConverters()
540                        .toType(state.registry, r, recordType, state::jsonBuilderFactory, state::jsonProvider,
541                                state::jsonb, state::recordBuilderFactory));
542    }
543
544    static class PreState {
545
546        Iterator<?> emitter;
547    }
548
549    @AllArgsConstructor
550    protected static class State {
551
552        final ComponentManager manager;
553
554        final Collection<Object> collector;
555
556        final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry();
557
558        Iterator<?> emitter;
559
560        volatile Jsonb jsonb;
561
562        volatile JsonProvider jsonProvider;
563
564        volatile JsonBuilderFactory jsonBuilderFactory;
565
566        volatile RecordBuilderFactory recordBuilderFactory;
567
568        synchronized Jsonb jsonb() {
569            if (jsonb == null) {
570                jsonb = manager
571                        .getJsonbProvider()
572                        .create()
573                        .withProvider(new PreComputedJsonpProvider("test", manager.getJsonpProvider(),
574                                manager.getJsonpParserFactory(), manager.getJsonpWriterFactory(),
575                                manager.getJsonpBuilderFactory(), manager.getJsonpGeneratorFactory(),
576                                manager.getJsonpReaderFactory())) // reuses the same memory buffers
577                        .withConfig(new JsonbConfig().setProperty("johnzon.cdi.activated", false))
578                        .build();
579            }
580            return jsonb;
581        }
582
583        synchronized JsonProvider jsonProvider() {
584            if (jsonProvider == null) {
585                jsonProvider = manager.getJsonpProvider();
586            }
587            return jsonProvider;
588        }
589
590        synchronized JsonBuilderFactory jsonBuilderFactory() {
591            if (jsonBuilderFactory == null) {
592                jsonBuilderFactory = manager.getJsonpBuilderFactory();
593            }
594            return jsonBuilderFactory;
595        }
596
597        synchronized RecordBuilderFactory recordBuilderFactory() {
598            if (recordBuilderFactory == null) {
599                recordBuilderFactory = manager.getRecordBuilderFactoryProvider().apply("test");
600            }
601            return recordBuilderFactory;
602        }
603    }
604
605    public static class EmbeddedComponentManager extends ComponentManager {
606
607        private final ComponentManager oldInstance;
608
609        private final List<String> testPlugins;
610
611        private EmbeddedComponentManager(final String componentPackage) {
612            super(findM2(), "TALEND-INF/dependencies.txt", "org.talend.sdk.component:type=component,value=%s");
613            testPlugins = addJarContaining(Thread.currentThread().getContextClassLoader(),
614                    componentPackage.replace('.', '/'));
615            container
616                    .builder("component-runtime-junit.jar", jarLocation(SimpleCollector.class).getAbsolutePath())
617                    .create();
618            oldInstance = ComponentManager.contextualInstance().get();
619            ComponentManager.contextualInstance().set(this);
620        }
621
622        @Override
623        public void close() {
624            try {
625                super.close();
626            } finally {
627                ComponentManager.contextualInstance().compareAndSet(this, oldInstance);
628            }
629        }
630
631        @Override
632        protected boolean isContainerClass(final Filter filter, final String name) {
633            // embedded mode (no plugin structure) so just run with all classes in parent classloader
634            return true;
635        }
636    }
637
638    public static class Outputs {
639
640        private final Map<String, List<?>> data = new HashMap<>();
641
642        public int size() {
643            return data.size();
644        }
645
646        public Set<String> keys() {
647            return data.keySet();
648        }
649
650        public <T> List<T> get(final Class<T> type, final String name) {
651            return (List<T>) data.get(name);
652        }
653    }
654
655    interface Local<T> {
656
657        void set(T value);
658
659        T get();
660
661        void remove();
662
663        class StaticImpl<T> implements Local<T> {
664
665            private final AtomicReference<T> state = new AtomicReference<>();
666
667            @Override
668            public void set(final T value) {
669                state.set(value);
670            }
671
672            @Override
673            public T get() {
674                return state.get();
675            }
676
677            @Override
678            public void remove() {
679                state.set(null);
680            }
681        }
682
683        class ThreadLocalImpl<T> implements Local<T> {
684
685            private final ThreadLocal<T> threadLocal = new ThreadLocal<>();
686
687            @Override
688            public void set(final T value) {
689                threadLocal.set(value);
690            }
691
692            @Override
693            public T get() {
694                return threadLocal.get();
695            }
696
697            @Override
698            public void remove() {
699                threadLocal.remove();
700            }
701        }
702    }
703}