001/** 002 * Copyright (C) 2006-2019 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit; 017 018import java.util.Collection; 019import java.util.HashMap; 020import java.util.Iterator; 021import java.util.Map; 022 023import javax.json.bind.Jsonb; 024import javax.json.bind.JsonbBuilder; 025import javax.json.bind.JsonbConfig; 026 027import org.talend.sdk.component.api.record.Record; 028import org.talend.sdk.component.runtime.jsonb.MultipleFormatDateAdapter; 029import org.talend.sdk.component.runtime.manager.ComponentManager; 030import org.talend.sdk.component.runtime.record.RecordConverters; 031 032/** 033 * An input factory which joins multiple distinct sources reading them in "parallel". 034 * 035 * IMPORTANT: all entries of the map but have the same "size". 036 */ 037public class JoinInputFactory implements ControllableInputFactory { 038 039 private final Map<String, Iterator<?>> data = new HashMap<>(); 040 041 private volatile Jsonb jsonb; 042 043 private volatile RecordConverters.MappingMetaRegistry registry; 044 045 public JoinInputFactory withInput(final String branch, final Collection<?> branchData) { 046 data.put(branch, branchData.iterator()); 047 return this; 048 } 049 050 @Override 051 public Object read(final String name) { 052 final Iterator<?> iterator = data.get(name); 053 if (iterator != null && iterator.hasNext()) { 054 return map(iterator.next()); 055 } 056 return null; 057 } 058 059 @Override 060 public boolean hasMoreData() { 061 final boolean hasMore = !data.isEmpty() && data.entrySet().stream().allMatch(e -> e.getValue().hasNext()); 062 if (!hasMore && jsonb != null) { 063 synchronized (this) { 064 if (jsonb != null) { 065 try { 066 jsonb.close(); 067 } catch (final Exception e) { 068 // no-op: not important here 069 } 070 } 071 } 072 } 073 return hasMore; 074 } 075 076 @Override 077 public InputFactoryIterable asInputRecords() { 078 return new InputFactoryIterable(this, data); 079 } 080 081 private Object map(final Object next) { 082 if (next == null || Record.class.isInstance(next)) { // directly ok 083 return next; 084 } 085 086 if (String.class.isInstance(next) || next.getClass().isPrimitive()) { // primitives 087 return next; 088 } 089 090 if (jsonb == null) { 091 synchronized (this) { 092 if (jsonb == null) { 093 jsonb = JsonbBuilder 094 .create(new JsonbConfig() 095 .withAdapters(new MultipleFormatDateAdapter()) 096 .setProperty("johnzon.cdi.activated", false)); 097 registry = new RecordConverters.MappingMetaRegistry(); 098 } 099 } 100 } 101 102 return new RecordConverters() 103 .toRecord(registry, next, () -> jsonb, 104 () -> ComponentManager.instance().getRecordBuilderFactoryProvider().apply(null)); 105 } 106}