001/**
002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit;
017
018import static java.lang.Math.abs;
019import static java.util.Collections.emptyIterator;
020import static java.util.Collections.emptyMap;
021import static java.util.concurrent.TimeUnit.MINUTES;
022import static java.util.concurrent.TimeUnit.SECONDS;
023import static java.util.stream.Collectors.joining;
024import static java.util.stream.Collectors.toList;
025import static org.apache.ziplock.JarLocation.jarLocation;
026import static org.junit.Assert.fail;
027import static org.talend.sdk.component.junit.SimpleFactory.configurationByExample;
028
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Objects;
036import java.util.Optional;
037import java.util.Queue;
038import java.util.Set;
039import java.util.Spliterator;
040import java.util.Spliterators;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.Semaphore;
048import java.util.concurrent.TimeoutException;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.stream.Stream;
051import java.util.stream.StreamSupport;
052
053import javax.json.JsonObject;
054import javax.json.bind.Jsonb;
055import javax.json.bind.JsonbConfig;
056
057import org.apache.xbean.finder.filter.Filter;
058import org.talend.sdk.component.api.service.injector.Injector;
059import org.talend.sdk.component.junit.lang.StreamDecorator;
060import org.talend.sdk.component.runtime.base.Lifecycle;
061import org.talend.sdk.component.runtime.input.Input;
062import org.talend.sdk.component.runtime.input.Mapper;
063import org.talend.sdk.component.runtime.manager.ComponentFamilyMeta;
064import org.talend.sdk.component.runtime.manager.ComponentManager;
065import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry;
066import org.talend.sdk.component.runtime.manager.chain.Job;
067import org.talend.sdk.component.runtime.manager.json.PreComputedJsonpProvider;
068import org.talend.sdk.component.runtime.output.OutputFactory;
069import org.talend.sdk.component.runtime.output.Processor;
070
071import lombok.AllArgsConstructor;
072import lombok.extern.slf4j.Slf4j;
073
074@Slf4j
075public class BaseComponentsHandler implements ComponentsHandler {
076
077    protected static final ThreadLocal<State> STATE = new ThreadLocal<>();
078
079    private final ThreadLocal<PreState> initState = ThreadLocal.withInitial(PreState::new);
080
081    protected String packageName;
082
083    protected Collection<String> isolatedPackages;
084
085    public <T> T injectServices(final T instance) {
086        if (instance == null) {
087            return null;
088        }
089        final String plugin = getSinglePlugin();
090        final Map<Class<?>, Object> services = asManager()
091                .findPlugin(plugin)
092                .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))
093                .get(ComponentManager.AllServices.class)
094                .getServices();
095        Injector.class.cast(services.get(Injector.class)).inject(instance);
096        return instance;
097    }
098
099    public BaseComponentsHandler withIsolatedPackage(final String packageName, final String... packages) {
100        isolatedPackages =
101                Stream.concat(Stream.of(packageName), Stream.of(packages)).filter(Objects::nonNull).collect(toList());
102        if (isolatedPackages.isEmpty()) {
103            isolatedPackages = null;
104        }
105        return this;
106    }
107
108    public EmbeddedComponentManager start() {
109        final EmbeddedComponentManager embeddedComponentManager = new EmbeddedComponentManager(packageName) {
110
111            @Override
112            protected boolean isContainerClass(final Filter filter, final String name) {
113                if (name == null) {
114                    return super.isContainerClass(filter, null);
115                }
116                return (isolatedPackages == null || isolatedPackages.stream().noneMatch(name::startsWith))
117                        && super.isContainerClass(filter, name);
118            }
119
120            @Override
121            public void close() {
122                try {
123                    final State state = STATE.get();
124                    if (state.jsonb != null) {
125                        try {
126                            state.jsonb.close();
127                        } catch (final Exception e) {
128                            // no-op: not important
129                        }
130                    }
131                    STATE.remove();
132                    initState.remove();
133                } finally {
134                    super.close();
135                }
136            }
137        };
138
139        STATE.set(new State(embeddedComponentManager, new ArrayList<>(), initState.get().emitter, null));
140        return embeddedComponentManager;
141    }
142
143    @Override
144    public Outputs collect(final Processor processor, final ControllableInputFactory inputs) {
145        return collect(processor, inputs, 10);
146    }
147
148    /**
149     * Collects all outputs of a processor.
150     *
151     * @param processor the processor to run while there are inputs.
152     * @param inputs the input factory, when an input will return null it will stop the
153     * processing.
154     * @param bundleSize the bundle size to use.
155     * @return a map where the key is the output name and the value a stream of the
156     * output values.
157     */
158    @Override
159    public Outputs collect(final Processor processor, final ControllableInputFactory inputs, final int bundleSize) {
160        final AutoChunkProcessor autoChunkProcessor = new AutoChunkProcessor(bundleSize, processor);
161        autoChunkProcessor.start();
162        final Outputs outputs = new Outputs();
163        final OutputFactory outputFactory = name -> value -> {
164            final List aggregator = outputs.data.computeIfAbsent(name, n -> new ArrayList<>());
165            aggregator.add(value);
166        };
167        try {
168            while (inputs.hasMoreData()) {
169                autoChunkProcessor.onElement(inputs, outputFactory);
170            }
171            autoChunkProcessor.flush(outputFactory);
172        } finally {
173            autoChunkProcessor.stop();
174        }
175        return outputs;
176    }
177
178    @Override
179    public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords) {
180        return collect(recordType, mapper, maxRecords, Runtime.getRuntime().availableProcessors());
181    }
182
183    /**
184     * Collects data emitted from this mapper. If the split creates more than one
185     * mapper, it will create as much threads as mappers otherwise it will use the
186     * caller thread.
187     *
188     * IMPORTANT: don't forget to consume all the stream to ensure the underlying
189     * { @see org.talend.sdk.component.runtime.input.Input} is closed.
190     *
191     * @param recordType the record type to use to type the returned type.
192     * @param mapper the mapper to go through.
193     * @param maxRecords maximum number of records, allows to stop the source when
194     * infinite.
195     * @param concurrency requested (1 can be used instead if &lt;= 0) concurrency for the reader execution.
196     * @param <T> the returned type of the records of the mapper.
197     * @return all the records emitted by the mapper.
198     */
199    @Override
200    public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords,
201            final int concurrency) {
202        mapper.start();
203
204        final State state = STATE.get();
205        final long assess = mapper.assess();
206        final int proc = Math.max(1, concurrency);
207        final List<Mapper> mappers = mapper.split(Math.max(assess / proc, 1));
208        switch (mappers.size()) {
209        case 0:
210            return Stream.empty();
211        case 1:
212            return StreamDecorator.decorate(
213                    asStream(asIterator(mappers.iterator().next().create(), new AtomicInteger(maxRecords))),
214                    collect -> {
215                        try {
216                            collect.run();
217                        } finally {
218                            mapper.stop();
219                        }
220                    });
221        default: // N producers-1 consumer pattern
222            final AtomicInteger threadCounter = new AtomicInteger(0);
223            final ExecutorService es = Executors.newFixedThreadPool(mappers.size(), r -> new Thread(r) {
224
225                {
226                    setName(BaseComponentsHandler.this.getClass().getSimpleName() + "-pool-" + abs(mapper.hashCode())
227                            + "-" + threadCounter.incrementAndGet());
228                }
229            });
230            final AtomicInteger recordCounter = new AtomicInteger(maxRecords);
231            final Semaphore permissions = new Semaphore(0);
232            final Queue<T> records = new ConcurrentLinkedQueue<>();
233            final CountDownLatch latch = new CountDownLatch(mappers.size());
234            final List<? extends Future<?>> tasks = mappers
235                    .stream()
236                    .map(Mapper::create)
237                    .map(input -> (Iterator<T>) asIterator(input, recordCounter))
238                    .map(it -> es.submit(() -> {
239                        try {
240                            while (it.hasNext()) {
241                                final T next = it.next();
242                                records.add(next);
243                                permissions.release();
244                            }
245                        } finally {
246                            latch.countDown();
247                        }
248                    }))
249                    .collect(toList());
250            es.shutdown();
251
252            final int timeout = Integer.getInteger("talend.component.junit.timeout", 5);
253            new Thread() {
254
255                {
256                    setName(BaseComponentsHandler.class.getSimpleName() + "-monitor_" + abs(mapper.hashCode()));
257                }
258
259                @Override
260                public void run() {
261                    try {
262                        latch.await(timeout, MINUTES);
263                    } catch (final InterruptedException e) {
264                        Thread.interrupted();
265                    } finally {
266                        permissions.release();
267                    }
268                }
269            }.start();
270            return StreamDecorator.decorate(asStream(new Iterator<T>() {
271
272                @Override
273                public boolean hasNext() {
274                    try {
275                        permissions.acquire();
276                    } catch (final InterruptedException e) {
277                        Thread.interrupted();
278                        fail(e.getMessage());
279                    }
280                    return !records.isEmpty();
281                }
282
283                @Override
284                public T next() {
285                    T poll = records.poll();
286                    if (poll != null) {
287                        return mapRecord(state, recordType, poll);
288                    }
289                    return null;
290                }
291            }), task -> {
292                try {
293                    task.run();
294                } finally {
295                    tasks.forEach(f -> {
296                        try {
297                            f.get(5, SECONDS);
298                        } catch (final InterruptedException e) {
299                            Thread.interrupted();
300                        } catch (final ExecutionException | TimeoutException e) {
301                            // no-op
302                        } finally {
303                            if (!f.isDone() && !f.isCancelled()) {
304                                f.cancel(true);
305                            }
306                        }
307                    });
308                }
309            });
310        }
311    }
312
313    private <T> Stream<T> asStream(final Iterator<T> iterator) {
314        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false);
315    }
316
317    private <T> Iterator<T> asIterator(final Input input, final AtomicInteger counter) {
318        input.start();
319        return new Iterator<T>() {
320
321            private boolean closed;
322
323            private Object next;
324
325            @Override
326            public boolean hasNext() {
327                final int remaining = counter.get();
328                if (remaining <= 0) {
329                    return false;
330                }
331
332                final boolean hasNext = (next = input.next()) != null;
333                if (!hasNext && !closed) {
334                    closed = true;
335                    input.stop();
336                }
337                if (hasNext) {
338                    counter.decrementAndGet();
339                }
340                return hasNext;
341            }
342
343            @Override
344            public T next() {
345                return (T) next;
346            }
347        };
348    }
349
350    @Override
351    public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper) {
352        return collectAsList(recordType, mapper, 1000);
353    }
354
355    @Override
356    public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper, final int maxRecords) {
357        return collect(recordType, mapper, maxRecords).collect(toList());
358    }
359
360    @Override
361    public Mapper createMapper(final Class<?> componentType, final Object configuration) {
362        return create(Mapper.class, componentType, configuration);
363    }
364
365    @Override
366    public Processor createProcessor(final Class<?> componentType, final Object configuration) {
367        return create(Processor.class, componentType, configuration);
368    }
369
370    private <C, T, A> A create(final Class<A> api, final Class<T> componentType, final C configuration) {
371        final ComponentFamilyMeta.BaseMeta<? extends Lifecycle> meta = findMeta(componentType);
372        return api.cast(meta
373                .getInstantiator()
374                .apply(configuration == null || meta.getParameterMetas().isEmpty() ? emptyMap()
375                        : configurationByExample(configuration, meta
376                                .getParameterMetas()
377                                .stream()
378                                .filter(p -> p.getName().equals(p.getPath()))
379                                .findFirst()
380                                .map(p -> p.getName() + '.')
381                                .orElseThrow(() -> new IllegalArgumentException("Didn't find any option and therefore "
382                                        + "can't convert the configuration instance to a configuration")))));
383    }
384
385    private <T> ComponentFamilyMeta.BaseMeta<? extends Lifecycle> findMeta(final Class<T> componentType) {
386        return asManager()
387                .find(c -> c.get(ContainerComponentRegistry.class).getComponents().values().stream())
388                .flatMap(f -> Stream.concat(f.getProcessors().values().stream(),
389                        f.getPartitionMappers().values().stream()))
390                .filter(m -> m.getType().getName().equals(componentType.getName()))
391                .findFirst()
392                .orElseThrow(() -> new IllegalArgumentException("No component " + componentType));
393    }
394
395    @Override
396    public <T> List<T> collect(final Class<T> recordType, final String family, final String component,
397            final int version, final Map<String, String> configuration) {
398        Job
399                .components()
400                .component("in",
401                        family + "://" + component + "?__version=" + version
402                                + configuration
403                                        .entrySet()
404                                        .stream()
405                                        .map(entry -> entry.getKey() + "=" + entry.getValue())
406                                        .collect(joining("&", "&", "")))
407                .component("collector", "test://collector")
408                .connections()
409                .from("in")
410                .to("collector")
411                .build()
412                .run();
413
414        return getCollectedData(recordType);
415    }
416
417    @Override
418    public <T> void process(final Iterable<T> inputs, final String family, final String component, final int version,
419            final Map<String, String> configuration) {
420        setInputData(inputs);
421
422        Job
423                .components()
424                .component("emitter", "test://emitter")
425                .component("out",
426                        family + "://" + component + "?__version=" + version
427                                + configuration
428                                        .entrySet()
429                                        .stream()
430                                        .map(entry -> entry.getKey() + "=" + entry.getValue())
431                                        .collect(joining("&", "&", "")))
432                .connections()
433                .from("emitter")
434                .to("out")
435                .build()
436                .run();
437
438    }
439
440    @Override
441    public ComponentManager asManager() {
442        return STATE.get().manager;
443    }
444
445    @Override
446    public <T> T findService(final String plugin, final Class<T> serviceClass) {
447        return serviceClass.cast(asManager()
448                .findPlugin(plugin)
449                .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'"))
450                .get(ComponentManager.AllServices.class)
451                .getServices()
452                .get(serviceClass));
453    }
454
455    @Override
456    public <T> T findService(final Class<T> serviceClass) {
457        return findService(getSinglePlugin(), serviceClass);
458    }
459
460    public Set<String> getTestPlugins() {
461        return EmbeddedComponentManager.class.cast(asManager()).testPlugins;
462    }
463
464    @Override
465    public <T> void setInputData(final Iterable<T> data) {
466        final State state = STATE.get();
467        if (state == null) {
468            initState.get().emitter = data.iterator();
469        } else {
470            state.emitter = data.iterator();
471        }
472    }
473
474    @Override
475    public <T> List<T> getCollectedData(final Class<T> recordType) {
476        final State state = STATE.get();
477        return state.collector
478                .stream()
479                .filter(r -> recordType.isInstance(r) || JsonObject.class.isInstance(r))
480                .map(r -> mapRecord(state, recordType, r))
481                .collect(toList());
482    }
483
484    public void resetState() {
485        final State state = STATE.get();
486        if (state == null) {
487            STATE.remove();
488        } else {
489            state.collector.clear();
490            state.emitter = emptyIterator();
491        }
492    }
493
494    private String getSinglePlugin() {
495        return Optional.of(getTestPlugins()).filter(c -> !c.isEmpty()).map(c -> c.iterator().next()).orElseThrow(
496                () -> new IllegalStateException("No component plugin found"));
497    }
498
499    private <T> T mapRecord(final State state, final Class<T> recordType, final Object r) {
500        if (recordType.isInstance(r)) {
501            return recordType.cast(r);
502        }
503        if (JsonObject.class.isInstance(r)) {
504            final Jsonb jsonb = state.jsonb();
505            return jsonb.fromJson(jsonb.toJson(r), recordType);
506        }
507        throw new IllegalArgumentException("Unsupported record: " + r);
508    }
509
510    static class PreState {
511
512        Iterator<?> emitter;
513    }
514
515    @AllArgsConstructor
516    protected static class State {
517
518        final ComponentManager manager;
519
520        final Collection<Object> collector;
521
522        Iterator<?> emitter;
523
524        volatile Jsonb jsonb;
525
526        synchronized Jsonb jsonb() {
527            if (jsonb == null) {
528                jsonb = manager
529                        .getJsonbProvider()
530                        .create()
531                        .withProvider(new PreComputedJsonpProvider("test", manager.getJsonpProvider(),
532                                manager.getJsonpParserFactory(), manager.getJsonpWriterFactory(),
533                                manager.getJsonpBuilderFactory(), manager.getJsonpGeneratorFactory(),
534                                manager.getJsonpReaderFactory())) // reuses
535                        // the
536                        // same
537                        // memory
538                        // buffering
539                        .withConfig(new JsonbConfig().setProperty("johnzon.cdi.activated", false))
540                        .build();
541            }
542            return jsonb;
543        }
544    }
545
546    public static class EmbeddedComponentManager extends ComponentManager {
547
548        private final ComponentManager oldInstance;
549
550        private final Set<String> testPlugins;
551
552        private EmbeddedComponentManager(final String componentPackage) {
553            super(findM2(), "TALEND-INF/dependencies.txt", "org.talend.sdk.component:type=component,value=%s");
554            testPlugins = addJarContaining(Thread.currentThread().getContextClassLoader(),
555                    componentPackage.replace('.', '/'));
556            container
557                    .builder("component-runtime-junit.jar", jarLocation(SimpleCollector.class).getAbsolutePath())
558                    .create();
559            oldInstance = CONTEXTUAL_INSTANCE.get();
560            CONTEXTUAL_INSTANCE.set(this);
561        }
562
563        @Override
564        public void close() {
565            try {
566                super.close();
567            } finally {
568                CONTEXTUAL_INSTANCE.compareAndSet(this, oldInstance);
569            }
570        }
571
572        @Override
573        protected boolean isContainerClass(final Filter filter, final String name) {
574            // embedded mode (no plugin structure) so just run with all classes in parent classloader
575            return true;
576        }
577    }
578
579    public static class Outputs {
580
581        private final Map<String, List<?>> data = new HashMap<>();
582
583        public int size() {
584            return data.size();
585        }
586
587        public Set<String> keys() {
588            return data.keySet();
589        }
590
591        public <T> List<T> get(final Class<T> type, final String name) {
592            return (List<T>) data.get(name);
593        }
594    }
595}