001/** 002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit; 017 018import static java.lang.Math.abs; 019import static java.util.Collections.emptyIterator; 020import static java.util.Collections.emptyMap; 021import static java.util.concurrent.TimeUnit.MINUTES; 022import static java.util.concurrent.TimeUnit.SECONDS; 023import static java.util.stream.Collectors.joining; 024import static java.util.stream.Collectors.toList; 025import static org.apache.ziplock.JarLocation.jarLocation; 026import static org.junit.Assert.fail; 027import static org.talend.sdk.component.junit.SimpleFactory.configurationByExample; 028 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Objects; 036import java.util.Optional; 037import java.util.Queue; 038import java.util.Set; 039import java.util.Spliterator; 040import java.util.Spliterators; 041import java.util.concurrent.ConcurrentLinkedQueue; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.Future; 047import java.util.concurrent.Semaphore; 048import java.util.concurrent.TimeoutException; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Stream; 051import java.util.stream.StreamSupport; 052 053import javax.json.JsonObject; 054import javax.json.bind.Jsonb; 055import javax.json.bind.JsonbConfig; 056 057import org.apache.xbean.finder.filter.Filter; 058import org.talend.sdk.component.api.service.injector.Injector; 059import org.talend.sdk.component.junit.lang.StreamDecorator; 060import org.talend.sdk.component.runtime.base.Lifecycle; 061import org.talend.sdk.component.runtime.input.Input; 062import org.talend.sdk.component.runtime.input.Mapper; 063import org.talend.sdk.component.runtime.manager.ComponentFamilyMeta; 064import org.talend.sdk.component.runtime.manager.ComponentManager; 065import org.talend.sdk.component.runtime.manager.ContainerComponentRegistry; 066import org.talend.sdk.component.runtime.manager.chain.Job; 067import org.talend.sdk.component.runtime.manager.json.PreComputedJsonpProvider; 068import org.talend.sdk.component.runtime.output.OutputFactory; 069import org.talend.sdk.component.runtime.output.Processor; 070 071import lombok.AllArgsConstructor; 072import lombok.extern.slf4j.Slf4j; 073 074@Slf4j 075public class BaseComponentsHandler implements ComponentsHandler { 076 077 protected static final ThreadLocal<State> STATE = new ThreadLocal<>(); 078 079 private final ThreadLocal<PreState> initState = ThreadLocal.withInitial(PreState::new); 080 081 protected String packageName; 082 083 protected Collection<String> isolatedPackages; 084 085 public <T> T injectServices(final T instance) { 086 if (instance == null) { 087 return null; 088 } 089 final String plugin = getSinglePlugin(); 090 final Map<Class<?>, Object> services = asManager() 091 .findPlugin(plugin) 092 .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'")) 093 .get(ComponentManager.AllServices.class) 094 .getServices(); 095 Injector.class.cast(services.get(Injector.class)).inject(instance); 096 return instance; 097 } 098 099 public BaseComponentsHandler withIsolatedPackage(final String packageName, final String... packages) { 100 isolatedPackages = 101 Stream.concat(Stream.of(packageName), Stream.of(packages)).filter(Objects::nonNull).collect(toList()); 102 if (isolatedPackages.isEmpty()) { 103 isolatedPackages = null; 104 } 105 return this; 106 } 107 108 public EmbeddedComponentManager start() { 109 final EmbeddedComponentManager embeddedComponentManager = new EmbeddedComponentManager(packageName) { 110 111 @Override 112 protected boolean isContainerClass(final Filter filter, final String name) { 113 if (name == null) { 114 return super.isContainerClass(filter, null); 115 } 116 return (isolatedPackages == null || isolatedPackages.stream().noneMatch(name::startsWith)) 117 && super.isContainerClass(filter, name); 118 } 119 120 @Override 121 public void close() { 122 try { 123 final State state = STATE.get(); 124 if (state.jsonb != null) { 125 try { 126 state.jsonb.close(); 127 } catch (final Exception e) { 128 // no-op: not important 129 } 130 } 131 STATE.remove(); 132 initState.remove(); 133 } finally { 134 super.close(); 135 } 136 } 137 }; 138 139 STATE.set(new State(embeddedComponentManager, new ArrayList<>(), initState.get().emitter, null)); 140 return embeddedComponentManager; 141 } 142 143 @Override 144 public Outputs collect(final Processor processor, final ControllableInputFactory inputs) { 145 return collect(processor, inputs, 10); 146 } 147 148 /** 149 * Collects all outputs of a processor. 150 * 151 * @param processor the processor to run while there are inputs. 152 * @param inputs the input factory, when an input will return null it will stop the 153 * processing. 154 * @param bundleSize the bundle size to use. 155 * @return a map where the key is the output name and the value a stream of the 156 * output values. 157 */ 158 @Override 159 public Outputs collect(final Processor processor, final ControllableInputFactory inputs, final int bundleSize) { 160 final AutoChunkProcessor autoChunkProcessor = new AutoChunkProcessor(bundleSize, processor); 161 autoChunkProcessor.start(); 162 final Outputs outputs = new Outputs(); 163 final OutputFactory outputFactory = name -> value -> { 164 final List aggregator = outputs.data.computeIfAbsent(name, n -> new ArrayList<>()); 165 aggregator.add(value); 166 }; 167 try { 168 while (inputs.hasMoreData()) { 169 autoChunkProcessor.onElement(inputs, outputFactory); 170 } 171 autoChunkProcessor.flush(outputFactory); 172 } finally { 173 autoChunkProcessor.stop(); 174 } 175 return outputs; 176 } 177 178 @Override 179 public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords) { 180 return collect(recordType, mapper, maxRecords, Runtime.getRuntime().availableProcessors()); 181 } 182 183 /** 184 * Collects data emitted from this mapper. If the split creates more than one 185 * mapper, it will create as much threads as mappers otherwise it will use the 186 * caller thread. 187 * 188 * IMPORTANT: don't forget to consume all the stream to ensure the underlying 189 * { @see org.talend.sdk.component.runtime.input.Input} is closed. 190 * 191 * @param recordType the record type to use to type the returned type. 192 * @param mapper the mapper to go through. 193 * @param maxRecords maximum number of records, allows to stop the source when 194 * infinite. 195 * @param concurrency requested (1 can be used instead if <= 0) concurrency for the reader execution. 196 * @param <T> the returned type of the records of the mapper. 197 * @return all the records emitted by the mapper. 198 */ 199 @Override 200 public <T> Stream<T> collect(final Class<T> recordType, final Mapper mapper, final int maxRecords, 201 final int concurrency) { 202 mapper.start(); 203 204 final State state = STATE.get(); 205 final long assess = mapper.assess(); 206 final int proc = Math.max(1, concurrency); 207 final List<Mapper> mappers = mapper.split(Math.max(assess / proc, 1)); 208 switch (mappers.size()) { 209 case 0: 210 return Stream.empty(); 211 case 1: 212 return StreamDecorator.decorate( 213 asStream(asIterator(mappers.iterator().next().create(), new AtomicInteger(maxRecords))), 214 collect -> { 215 try { 216 collect.run(); 217 } finally { 218 mapper.stop(); 219 } 220 }); 221 default: // N producers-1 consumer pattern 222 final AtomicInteger threadCounter = new AtomicInteger(0); 223 final ExecutorService es = Executors.newFixedThreadPool(mappers.size(), r -> new Thread(r) { 224 225 { 226 setName(BaseComponentsHandler.this.getClass().getSimpleName() + "-pool-" + abs(mapper.hashCode()) 227 + "-" + threadCounter.incrementAndGet()); 228 } 229 }); 230 final AtomicInteger recordCounter = new AtomicInteger(maxRecords); 231 final Semaphore permissions = new Semaphore(0); 232 final Queue<T> records = new ConcurrentLinkedQueue<>(); 233 final CountDownLatch latch = new CountDownLatch(mappers.size()); 234 final List<? extends Future<?>> tasks = mappers 235 .stream() 236 .map(Mapper::create) 237 .map(input -> (Iterator<T>) asIterator(input, recordCounter)) 238 .map(it -> es.submit(() -> { 239 try { 240 while (it.hasNext()) { 241 final T next = it.next(); 242 records.add(next); 243 permissions.release(); 244 } 245 } finally { 246 latch.countDown(); 247 } 248 })) 249 .collect(toList()); 250 es.shutdown(); 251 252 final int timeout = Integer.getInteger("talend.component.junit.timeout", 5); 253 new Thread() { 254 255 { 256 setName(BaseComponentsHandler.class.getSimpleName() + "-monitor_" + abs(mapper.hashCode())); 257 } 258 259 @Override 260 public void run() { 261 try { 262 latch.await(timeout, MINUTES); 263 } catch (final InterruptedException e) { 264 Thread.interrupted(); 265 } finally { 266 permissions.release(); 267 } 268 } 269 }.start(); 270 return StreamDecorator.decorate(asStream(new Iterator<T>() { 271 272 @Override 273 public boolean hasNext() { 274 try { 275 permissions.acquire(); 276 } catch (final InterruptedException e) { 277 Thread.interrupted(); 278 fail(e.getMessage()); 279 } 280 return !records.isEmpty(); 281 } 282 283 @Override 284 public T next() { 285 T poll = records.poll(); 286 if (poll != null) { 287 return mapRecord(state, recordType, poll); 288 } 289 return null; 290 } 291 }), task -> { 292 try { 293 task.run(); 294 } finally { 295 tasks.forEach(f -> { 296 try { 297 f.get(5, SECONDS); 298 } catch (final InterruptedException e) { 299 Thread.interrupted(); 300 } catch (final ExecutionException | TimeoutException e) { 301 // no-op 302 } finally { 303 if (!f.isDone() && !f.isCancelled()) { 304 f.cancel(true); 305 } 306 } 307 }); 308 } 309 }); 310 } 311 } 312 313 private <T> Stream<T> asStream(final Iterator<T> iterator) { 314 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false); 315 } 316 317 private <T> Iterator<T> asIterator(final Input input, final AtomicInteger counter) { 318 input.start(); 319 return new Iterator<T>() { 320 321 private boolean closed; 322 323 private Object next; 324 325 @Override 326 public boolean hasNext() { 327 final int remaining = counter.get(); 328 if (remaining <= 0) { 329 return false; 330 } 331 332 final boolean hasNext = (next = input.next()) != null; 333 if (!hasNext && !closed) { 334 closed = true; 335 input.stop(); 336 } 337 if (hasNext) { 338 counter.decrementAndGet(); 339 } 340 return hasNext; 341 } 342 343 @Override 344 public T next() { 345 return (T) next; 346 } 347 }; 348 } 349 350 @Override 351 public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper) { 352 return collectAsList(recordType, mapper, 1000); 353 } 354 355 @Override 356 public <T> List<T> collectAsList(final Class<T> recordType, final Mapper mapper, final int maxRecords) { 357 return collect(recordType, mapper, maxRecords).collect(toList()); 358 } 359 360 @Override 361 public Mapper createMapper(final Class<?> componentType, final Object configuration) { 362 return create(Mapper.class, componentType, configuration); 363 } 364 365 @Override 366 public Processor createProcessor(final Class<?> componentType, final Object configuration) { 367 return create(Processor.class, componentType, configuration); 368 } 369 370 private <C, T, A> A create(final Class<A> api, final Class<T> componentType, final C configuration) { 371 final ComponentFamilyMeta.BaseMeta<? extends Lifecycle> meta = findMeta(componentType); 372 return api.cast(meta 373 .getInstantiator() 374 .apply(configuration == null || meta.getParameterMetas().isEmpty() ? emptyMap() 375 : configurationByExample(configuration, meta 376 .getParameterMetas() 377 .stream() 378 .filter(p -> p.getName().equals(p.getPath())) 379 .findFirst() 380 .map(p -> p.getName() + '.') 381 .orElseThrow(() -> new IllegalArgumentException("Didn't find any option and therefore " 382 + "can't convert the configuration instance to a configuration"))))); 383 } 384 385 private <T> ComponentFamilyMeta.BaseMeta<? extends Lifecycle> findMeta(final Class<T> componentType) { 386 return asManager() 387 .find(c -> c.get(ContainerComponentRegistry.class).getComponents().values().stream()) 388 .flatMap(f -> Stream.concat(f.getProcessors().values().stream(), 389 f.getPartitionMappers().values().stream())) 390 .filter(m -> m.getType().getName().equals(componentType.getName())) 391 .findFirst() 392 .orElseThrow(() -> new IllegalArgumentException("No component " + componentType)); 393 } 394 395 @Override 396 public <T> List<T> collect(final Class<T> recordType, final String family, final String component, 397 final int version, final Map<String, String> configuration) { 398 Job 399 .components() 400 .component("in", 401 family + "://" + component + "?__version=" + version 402 + configuration 403 .entrySet() 404 .stream() 405 .map(entry -> entry.getKey() + "=" + entry.getValue()) 406 .collect(joining("&", "&", ""))) 407 .component("collector", "test://collector") 408 .connections() 409 .from("in") 410 .to("collector") 411 .build() 412 .run(); 413 414 return getCollectedData(recordType); 415 } 416 417 @Override 418 public <T> void process(final Iterable<T> inputs, final String family, final String component, final int version, 419 final Map<String, String> configuration) { 420 setInputData(inputs); 421 422 Job 423 .components() 424 .component("emitter", "test://emitter") 425 .component("out", 426 family + "://" + component + "?__version=" + version 427 + configuration 428 .entrySet() 429 .stream() 430 .map(entry -> entry.getKey() + "=" + entry.getValue()) 431 .collect(joining("&", "&", ""))) 432 .connections() 433 .from("emitter") 434 .to("out") 435 .build() 436 .run(); 437 438 } 439 440 @Override 441 public ComponentManager asManager() { 442 return STATE.get().manager; 443 } 444 445 @Override 446 public <T> T findService(final String plugin, final Class<T> serviceClass) { 447 return serviceClass.cast(asManager() 448 .findPlugin(plugin) 449 .orElseThrow(() -> new IllegalArgumentException("cant find plugin '" + plugin + "'")) 450 .get(ComponentManager.AllServices.class) 451 .getServices() 452 .get(serviceClass)); 453 } 454 455 @Override 456 public <T> T findService(final Class<T> serviceClass) { 457 return findService(getSinglePlugin(), serviceClass); 458 } 459 460 public Set<String> getTestPlugins() { 461 return EmbeddedComponentManager.class.cast(asManager()).testPlugins; 462 } 463 464 @Override 465 public <T> void setInputData(final Iterable<T> data) { 466 final State state = STATE.get(); 467 if (state == null) { 468 initState.get().emitter = data.iterator(); 469 } else { 470 state.emitter = data.iterator(); 471 } 472 } 473 474 @Override 475 public <T> List<T> getCollectedData(final Class<T> recordType) { 476 final State state = STATE.get(); 477 return state.collector 478 .stream() 479 .filter(r -> recordType.isInstance(r) || JsonObject.class.isInstance(r)) 480 .map(r -> mapRecord(state, recordType, r)) 481 .collect(toList()); 482 } 483 484 public void resetState() { 485 final State state = STATE.get(); 486 if (state == null) { 487 STATE.remove(); 488 } else { 489 state.collector.clear(); 490 state.emitter = emptyIterator(); 491 } 492 } 493 494 private String getSinglePlugin() { 495 return Optional.of(getTestPlugins()).filter(c -> !c.isEmpty()).map(c -> c.iterator().next()).orElseThrow( 496 () -> new IllegalStateException("No component plugin found")); 497 } 498 499 private <T> T mapRecord(final State state, final Class<T> recordType, final Object r) { 500 if (recordType.isInstance(r)) { 501 return recordType.cast(r); 502 } 503 if (JsonObject.class.isInstance(r)) { 504 final Jsonb jsonb = state.jsonb(); 505 return jsonb.fromJson(jsonb.toJson(r), recordType); 506 } 507 throw new IllegalArgumentException("Unsupported record: " + r); 508 } 509 510 static class PreState { 511 512 Iterator<?> emitter; 513 } 514 515 @AllArgsConstructor 516 protected static class State { 517 518 final ComponentManager manager; 519 520 final Collection<Object> collector; 521 522 Iterator<?> emitter; 523 524 volatile Jsonb jsonb; 525 526 synchronized Jsonb jsonb() { 527 if (jsonb == null) { 528 jsonb = manager 529 .getJsonbProvider() 530 .create() 531 .withProvider(new PreComputedJsonpProvider("test", manager.getJsonpProvider(), 532 manager.getJsonpParserFactory(), manager.getJsonpWriterFactory(), 533 manager.getJsonpBuilderFactory(), manager.getJsonpGeneratorFactory(), 534 manager.getJsonpReaderFactory())) // reuses 535 // the 536 // same 537 // memory 538 // buffering 539 .withConfig(new JsonbConfig().setProperty("johnzon.cdi.activated", false)) 540 .build(); 541 } 542 return jsonb; 543 } 544 } 545 546 public static class EmbeddedComponentManager extends ComponentManager { 547 548 private final ComponentManager oldInstance; 549 550 private final Set<String> testPlugins; 551 552 private EmbeddedComponentManager(final String componentPackage) { 553 super(findM2(), "TALEND-INF/dependencies.txt", "org.talend.sdk.component:type=component,value=%s"); 554 testPlugins = addJarContaining(Thread.currentThread().getContextClassLoader(), 555 componentPackage.replace('.', '/')); 556 container 557 .builder("component-runtime-junit.jar", jarLocation(SimpleCollector.class).getAbsolutePath()) 558 .create(); 559 oldInstance = CONTEXTUAL_INSTANCE.get(); 560 CONTEXTUAL_INSTANCE.set(this); 561 } 562 563 @Override 564 public void close() { 565 try { 566 super.close(); 567 } finally { 568 CONTEXTUAL_INSTANCE.compareAndSet(this, oldInstance); 569 } 570 } 571 572 @Override 573 protected boolean isContainerClass(final Filter filter, final String name) { 574 // embedded mode (no plugin structure) so just run with all classes in parent classloader 575 return true; 576 } 577 } 578 579 public static class Outputs { 580 581 private final Map<String, List<?>> data = new HashMap<>(); 582 583 public int size() { 584 return data.size(); 585 } 586 587 public Set<String> keys() { 588 return data.keySet(); 589 } 590 591 public <T> List<T> get(final Class<T> type, final String name) { 592 return (List<T>) data.get(name); 593 } 594 } 595}