001/**
002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit.http.internal.impl;
017
018import static java.util.Optional.ofNullable;
019import static java.util.concurrent.TimeUnit.SECONDS;
020
021import java.io.IOException;
022import java.net.ServerSocket;
023import java.security.NoSuchAlgorithmException;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.TimeUnit;
028import java.util.stream.Stream;
029
030import javax.net.ssl.HostnameVerifier;
031import javax.net.ssl.HttpsURLConnection;
032import javax.net.ssl.SSLContext;
033
034import org.talend.sdk.component.junit.http.api.HttpApiHandler;
035
036import io.netty.bootstrap.ServerBootstrap;
037import io.netty.channel.ChannelFutureListener;
038import io.netty.channel.ChannelOption;
039import io.netty.channel.EventLoopGroup;
040import io.netty.channel.nio.NioEventLoopGroup;
041import io.netty.channel.socket.nio.NioServerSocketChannel;
042import io.netty.util.concurrent.DefaultThreadFactory;
043
044import lombok.AllArgsConstructor;
045import lombok.extern.slf4j.Slf4j;
046
047@Slf4j
048@AllArgsConstructor
049public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable {
050
051    private final HttpApiHandler<T> handler;
052
053    private Thread instance;
054
055    private Runnable shutdown;
056
057    public synchronized HandlerImpl<T> start() {
058        if (instance != null) {
059            throw new IllegalStateException("Instance already started");
060        }
061
062        if (handler.getPort() <= 0) {
063            handler.setPort(newRandomPort());
064        }
065
066        final CountDownLatch startingPistol = new CountDownLatch(1);
067        final int nProcessors = Math.max(1, Runtime.getRuntime().availableProcessors());
068        final ExecutorService boosExecutor =
069                Executors.newFixedThreadPool(1, new DefaultThreadFactory("talend-api-boss"));
070        final ExecutorService workerExecutor =
071                Executors.newFixedThreadPool(nProcessors, new DefaultThreadFactory("talend-api-worker"));
072        instance = new Thread(() -> {
073            // todo: config
074            final EventLoopGroup bossGroup = new NioEventLoopGroup(1, boosExecutor);
075            final EventLoopGroup workerGroup = new NioEventLoopGroup(nProcessors, workerExecutor);
076            try {
077                final ServerBootstrap b = new ServerBootstrap();
078                b
079                        .option(ChannelOption.SO_REUSEADDR, true)
080                        .group(bossGroup, workerGroup)
081                        .channel(NioServerSocketChannel.class)
082                        .childHandler(new ProxyInitializer(handler))
083                        .bind("localhost", handler.getPort())
084                        .sync()
085                        .addListener((ChannelFutureListener) f -> {
086                            if (f.isSuccess()) {
087                                shutdown = () -> {
088                                    bossGroup.shutdownGracefully();
089                                    workerGroup.shutdownGracefully();
090                                };
091                            } else {
092                                log.error("Can't start API server");
093                            }
094                            startingPistol.countDown();
095                        })
096                        .channel()
097                        .closeFuture()
098                        .sync();
099            } catch (final InterruptedException e) {
100                close();
101                Thread.currentThread().interrupt();
102            }
103        }) {
104
105            {
106                setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_"
107                        + HandlerImpl.this.hashCode());
108            }
109        };
110        log.info("Starting Talend API server on port {}", handler.getPort());
111        instance.start();
112        try {
113            if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) {
114                log
115                        .warn("API server took more than the expected timeout to start, you can tune it "
116                                + "setting talend.junit.http.starting.timeout system property");
117            }
118        } catch (final InterruptedException e) {
119            log.warn(e.getMessage());
120            Thread.currentThread().interrupt();
121        }
122
123        if (shutdown != null && handler.isGlobalProxyConfiguration()) {
124            final String pt = Integer.toString(handler.getPort());
125
126            Stream.of("", "s").forEach(s -> {
127                shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown);
128                shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown);
129                shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown);
130            });
131
132            if (handler.getSslContext() != null) {
133                try {
134                    final SSLContext defaultSslContext = SSLContext.getDefault();
135                    final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier();
136                    shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown);
137                    shutdown = decorate(() -> {
138                        HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory());
139                        HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier);
140                    }, shutdown);
141                    shutdown = decorate(
142                            () -> setProperty("jdk.internal.httpclient.disableHostnameVerification", "true"), shutdown);
143
144                    SSLContext.setDefault(handler.getSslContext());
145                    HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory());
146                    HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true);
147                } catch (final NoSuchAlgorithmException e) {
148                    throw new IllegalStateException(e);
149                }
150            }
151            log
152                    .info("Configured the JVM to use the {} API proxy localhost:{}",
153                            handler.getSslContext() != null ? "SSL" : "plain", handler.getPort());
154        }
155        return this;
156    }
157
158    @Override
159    public synchronized void close() {
160        ofNullable(shutdown).ifPresent(Runnable::run);
161        if (instance != null) {
162            log.info("Stopping Talend API server (port {})", handler.getPort());
163            try {
164                instance.join(TimeUnit.MINUTES.toMillis(5));
165            } catch (final InterruptedException e) {
166                log.warn(e.getMessage(), e);
167                Thread.currentThread().interrupt();
168            } finally {
169                instance = null;
170                shutdown = null;
171            }
172        }
173        Stream
174                .of(handler.getResponseLocator(), handler.getExecutor())
175                .filter(AutoCloseable.class::isInstance)
176                .map(AutoCloseable.class::cast)
177                .forEach(c -> {
178                    try {
179                        c.close();
180                    } catch (final Exception e) {
181                        log.error(e.getMessage(), e);
182                    }
183                });
184        if (!AutoCloseable.class.isInstance(handler.getExecutor())
185                && ExecutorService.class.isInstance(handler.getExecutor())) {
186            final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor());
187            executorService.shutdownNow(); // we don't need to wait here
188        }
189    }
190
191    private Runnable decorate(final Runnable last, final Runnable first) {
192        return () -> {
193            first.run();
194            last.run();
195        };
196    }
197
198    private Runnable setProperty(final String name, final String value) {
199        final String val = System.getProperty(name);
200        System.setProperty(name, value);
201        return () -> {
202            if (val == null) {
203                System.clearProperty(name);
204            } else {
205                System.setProperty(name, val);
206            }
207        };
208    }
209
210    private int newRandomPort() {
211        try (final ServerSocket socket = new ServerSocket(0)) {
212            return socket.getLocalPort();
213        } catch (final IOException e) {
214            throw new IllegalStateException(e);
215        }
216    }
217}