001/**
002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit;
017
018import org.talend.sdk.component.runtime.base.Lifecycle;
019import org.talend.sdk.component.runtime.output.InputFactory;
020import org.talend.sdk.component.runtime.output.OutputFactory;
021import org.talend.sdk.component.runtime.output.Processor;
022
023import lombok.RequiredArgsConstructor;
024
025/**
026 * Processor wrapper allowing to "auto" manage the chunking/grouping.
027 */
028@RequiredArgsConstructor
029public class AutoChunkProcessor implements Lifecycle {
030
031    /**
032     * The size of the chunks.
033     */
034    private final int chunkSize;
035
036    /**
037     * The delegate processor.
038     */
039    private final Processor processor;
040
041    /**
042     * Internal counter to handle the chunking.
043     */
044    private int processedItemCount = 0;
045
046    public void onElement(final InputFactory ins, final OutputFactory outs) {
047        if (processedItemCount == 0) {
048            processor.beforeGroup();
049        }
050        try {
051            processor.onNext(ins, outs);
052            processedItemCount++;
053        } finally {
054            if (processedItemCount == chunkSize) {
055                processor.afterGroup(outs);
056                processedItemCount = 0;
057            }
058        }
059    }
060
061    public void flush(final OutputFactory outs) {
062        if (processedItemCount > 0) {
063            processor.afterGroup(outs);
064            processedItemCount = 0;
065        }
066    }
067
068    @Override
069    public void stop() {
070        processor.stop();
071    }
072
073    @Override
074    public String plugin() {
075        return processor.plugin();
076    }
077
078    @Override
079    public String rootName() {
080        return processor.rootName();
081    }
082
083    @Override
084    public String name() {
085        return processor.name();
086    }
087
088    @Override
089    public void start() {
090        processor.start();
091    }
092}