001/**
002 * Copyright (C) 2006-2019 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit.http.internal.impl;
017
018import static java.util.Optional.ofNullable;
019import static java.util.concurrent.TimeUnit.SECONDS;
020
021import java.io.IOException;
022import java.net.ServerSocket;
023import java.security.NoSuchAlgorithmException;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.TimeUnit;
028import java.util.stream.Stream;
029
030import javax.net.ssl.HostnameVerifier;
031import javax.net.ssl.HttpsURLConnection;
032import javax.net.ssl.SSLContext;
033
034import org.talend.sdk.component.junit.http.api.HttpApiHandler;
035
036import io.netty.bootstrap.ServerBootstrap;
037import io.netty.channel.ChannelFutureListener;
038import io.netty.channel.ChannelOption;
039import io.netty.channel.EventLoopGroup;
040import io.netty.channel.nio.NioEventLoopGroup;
041import io.netty.channel.socket.nio.NioServerSocketChannel;
042import io.netty.util.concurrent.DefaultThreadFactory;
043
044import lombok.AllArgsConstructor;
045import lombok.extern.slf4j.Slf4j;
046
047@Slf4j
048@AllArgsConstructor
049public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable {
050
051    private final HttpApiHandler<T> handler;
052
053    private Thread instance;
054
055    private Runnable shutdown;
056
057    public synchronized HandlerImpl<T> start() {
058        if (instance != null) {
059            throw new IllegalStateException("Instance already started");
060        }
061
062        if (handler.getPort() <= 0) {
063            handler.setPort(newRandomPort());
064        }
065
066        final CountDownLatch startingPistol = new CountDownLatch(1);
067        final int nProcessors = Math.max(1, Runtime.getRuntime().availableProcessors());
068        final ExecutorService boosExecutor =
069                Executors.newFixedThreadPool(1, new DefaultThreadFactory("talend-api-boss"));
070        final ExecutorService workerExecutor =
071                Executors.newFixedThreadPool(nProcessors, new DefaultThreadFactory("talend-api-worker"));
072        instance = new Thread(() -> {
073            // todo: config
074            final EventLoopGroup bossGroup = new NioEventLoopGroup(1, boosExecutor);
075            final EventLoopGroup workerGroup = new NioEventLoopGroup(nProcessors, workerExecutor);
076            try {
077                final ServerBootstrap b = new ServerBootstrap();
078                b
079                        .option(ChannelOption.SO_REUSEADDR, true)
080                        .group(bossGroup, workerGroup)
081                        .channel(NioServerSocketChannel.class)
082                        .childHandler(new ProxyInitializer(handler))
083                        .bind("localhost", handler.getPort())
084                        .sync()
085                        .addListener((ChannelFutureListener) f -> {
086                            if (f.isSuccess()) {
087                                shutdown = () -> {
088                                    bossGroup.shutdownGracefully();
089                                    workerGroup.shutdownGracefully();
090                                };
091                            } else {
092                                log.error("Can't start API server");
093                            }
094                            startingPistol.countDown();
095                        })
096                        .channel()
097                        .closeFuture()
098                        .sync();
099            } catch (final InterruptedException e) {
100                close();
101                Thread.currentThread().interrupt();
102            }
103        }) {
104
105            {
106                setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_"
107                        + HandlerImpl.this.hashCode());
108            }
109        };
110        log.info("Starting Talend API server on port {}", handler.getPort());
111        instance.start();
112        try {
113            if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) {
114                log
115                        .warn("API server took more than the expected timeout to start, you can tune it "
116                                + "setting talend.junit.http.starting.timeout system property");
117            }
118        } catch (final InterruptedException e) {
119            log.warn(e.getMessage());
120            Thread.currentThread().interrupt();
121        }
122
123        if (shutdown != null && handler.isGlobalProxyConfiguration()) {
124            final String pt = Integer.toString(handler.getPort());
125
126            Stream.of("", "s").forEach(s -> {
127                shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown);
128                shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown);
129                shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown);
130            });
131
132            if (handler.getSslContext() != null) {
133                try {
134                    final SSLContext defaultSslContext = SSLContext.getDefault();
135                    final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier();
136                    shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown);
137                    shutdown = decorate(() -> {
138                        HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory());
139                        HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier);
140                    }, shutdown);
141
142                    SSLContext.setDefault(handler.getSslContext());
143                    HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory());
144                    HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true);
145                } catch (final NoSuchAlgorithmException e) {
146                    throw new IllegalStateException(e);
147                }
148            }
149            log
150                    .info("Configured the JVM to use the {} API proxy localhost:{}",
151                            handler.getSslContext() != null ? "SSL" : "plain", handler.getPort());
152        }
153        return this;
154    }
155
156    @Override
157    public synchronized void close() {
158        ofNullable(shutdown).ifPresent(Runnable::run);
159        if (instance != null) {
160            log.info("Stopping Talend API server (port {})", handler.getPort());
161            try {
162                instance.join(TimeUnit.MINUTES.toMillis(5));
163            } catch (final InterruptedException e) {
164                log.warn(e.getMessage(), e);
165                Thread.currentThread().interrupt();
166            } finally {
167                instance = null;
168                shutdown = null;
169            }
170        }
171        Stream
172                .of(handler.getResponseLocator(), handler.getExecutor())
173                .filter(AutoCloseable.class::isInstance)
174                .map(AutoCloseable.class::cast)
175                .forEach(c -> {
176                    try {
177                        c.close();
178                    } catch (final Exception e) {
179                        log.error(e.getMessage(), e);
180                    }
181                });
182        if (!AutoCloseable.class.isInstance(handler.getExecutor())
183                && ExecutorService.class.isInstance(handler.getExecutor())) {
184            final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor());
185            executorService.shutdownNow(); // we don't need to wait here
186        }
187    }
188
189    private Runnable decorate(final Runnable last, final Runnable first) {
190        return () -> {
191            first.run();
192            last.run();
193        };
194    }
195
196    private Runnable setProperty(final String name, final String value) {
197        final String val = System.getProperty(name);
198        System.setProperty(name, value);
199        return () -> {
200            if (val == null) {
201                System.clearProperty(name);
202            } else {
203                System.setProperty(name, val);
204            }
205        };
206    }
207
208    private int newRandomPort() {
209        try (final ServerSocket socket = new ServerSocket(0)) {
210            return socket.getLocalPort();
211        } catch (final IOException e) {
212            throw new IllegalStateException(e);
213        }
214    }
215}