001/** 002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit; 017 018import java.util.Collection; 019import java.util.HashMap; 020import java.util.Iterator; 021import java.util.Map; 022 023import javax.json.bind.Jsonb; 024import javax.json.bind.JsonbBuilder; 025import javax.json.bind.JsonbConfig; 026 027import org.talend.sdk.component.api.record.Record; 028import org.talend.sdk.component.runtime.manager.ComponentManager; 029import org.talend.sdk.component.runtime.record.RecordConverters; 030 031/** 032 * An input factory which joins multiple distinct sources reading them in "parallel". 033 * 034 * IMPORTANT: all entries of the map but have the same "size". 035 */ 036public class JoinInputFactory implements ControllableInputFactory { 037 038 private final Map<String, Iterator<?>> data = new HashMap<>(); 039 040 private volatile Jsonb jsonb; 041 042 private volatile RecordConverters.MappingMetaRegistry registry; 043 044 public JoinInputFactory withInput(final String branch, final Collection<?> branchData) { 045 data.put(branch, branchData.iterator()); 046 return this; 047 } 048 049 @Override 050 public Object read(final String name) { 051 final Iterator<?> iterator = data.get(name); 052 if (iterator != null && iterator.hasNext()) { 053 return map(iterator.next()); 054 } 055 return null; 056 } 057 058 @Override 059 public boolean hasMoreData() { 060 final boolean hasMore = !data.isEmpty() && data.entrySet().stream().allMatch(e -> e.getValue().hasNext()); 061 if (!hasMore && jsonb != null) { 062 synchronized (this) { 063 if (jsonb != null) { 064 try { 065 jsonb.close(); 066 } catch (final Exception e) { 067 // no-op: not important here 068 } 069 } 070 } 071 } 072 return hasMore; 073 } 074 075 @Override 076 public InputFactoryIterable asInputRecords() { 077 return new InputFactoryIterable(this, data); 078 } 079 080 private Object map(final Object next) { 081 if (next == null || Record.class.isInstance(next)) { // directly ok 082 return next; 083 } 084 085 if (String.class.isInstance(next) || next.getClass().isPrimitive()) { // primitives 086 return next; 087 } 088 089 if (jsonb == null) { 090 synchronized (this) { 091 if (jsonb == null) { 092 jsonb = JsonbBuilder.create(new JsonbConfig().setProperty("johnzon.cdi.activated", false)); 093 registry = new RecordConverters.MappingMetaRegistry(); 094 } 095 } 096 } 097 098 return new RecordConverters() 099 .toRecord(registry, next, () -> jsonb, 100 () -> ComponentManager.instance().getRecordBuilderFactoryProvider().apply(null)); 101 } 102}