001/** 002 * Copyright (C) 2006-2021 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.http.internal.impl; 017 018import static java.util.Optional.ofNullable; 019import static java.util.concurrent.TimeUnit.SECONDS; 020 021import java.io.IOException; 022import java.net.ServerSocket; 023import java.security.NoSuchAlgorithmException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.TimeUnit; 028import java.util.stream.Stream; 029 030import javax.net.ssl.HostnameVerifier; 031import javax.net.ssl.HttpsURLConnection; 032import javax.net.ssl.SSLContext; 033 034import org.talend.sdk.component.junit.http.api.HttpApiHandler; 035 036import io.netty.bootstrap.ServerBootstrap; 037import io.netty.channel.ChannelFutureListener; 038import io.netty.channel.ChannelOption; 039import io.netty.channel.EventLoopGroup; 040import io.netty.channel.nio.NioEventLoopGroup; 041import io.netty.channel.socket.nio.NioServerSocketChannel; 042import io.netty.util.concurrent.DefaultThreadFactory; 043 044import lombok.AllArgsConstructor; 045import lombok.extern.slf4j.Slf4j; 046 047@Slf4j 048@AllArgsConstructor 049public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable { 050 051 private final HttpApiHandler<T> handler; 052 053 private Thread instance; 054 055 private Runnable shutdown; 056 057 public synchronized HandlerImpl<T> start() { 058 if (instance != null) { 059 throw new IllegalStateException("Instance already started"); 060 } 061 062 if (handler.getPort() <= 0) { 063 handler.setPort(newRandomPort()); 064 } 065 066 final CountDownLatch startingPistol = new CountDownLatch(1); 067 final int nProcessors = Math.max(1, Runtime.getRuntime().availableProcessors()); 068 final ExecutorService boosExecutor = 069 Executors.newFixedThreadPool(1, new DefaultThreadFactory("talend-api-boss")); 070 final ExecutorService workerExecutor = 071 Executors.newFixedThreadPool(nProcessors, new DefaultThreadFactory("talend-api-worker")); 072 instance = new Thread(() -> { 073 // todo: config 074 final EventLoopGroup bossGroup = new NioEventLoopGroup(1, boosExecutor); 075 final EventLoopGroup workerGroup = new NioEventLoopGroup(nProcessors, workerExecutor); 076 try { 077 final ServerBootstrap b = new ServerBootstrap(); 078 b 079 .option(ChannelOption.SO_REUSEADDR, true) 080 .group(bossGroup, workerGroup) 081 .channel(NioServerSocketChannel.class) 082 .childHandler(new ProxyInitializer(handler)) 083 .bind("localhost", handler.getPort()) 084 .sync() 085 .addListener((ChannelFutureListener) f -> { 086 if (f.isSuccess()) { 087 shutdown = () -> { 088 bossGroup.shutdownGracefully(); 089 workerGroup.shutdownGracefully(); 090 }; 091 } else { 092 log.error("Can't start API server"); 093 } 094 startingPistol.countDown(); 095 }) 096 .channel() 097 .closeFuture() 098 .sync(); 099 } catch (final InterruptedException e) { 100 close(); 101 Thread.currentThread().interrupt(); 102 } 103 }) { 104 105 { 106 setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_" 107 + HandlerImpl.this.hashCode()); 108 } 109 }; 110 log.info("Starting Talend API server on port {}", handler.getPort()); 111 instance.start(); 112 try { 113 if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) { 114 log 115 .warn("API server took more than the expected timeout to start, you can tune it " 116 + "setting talend.junit.http.starting.timeout system property"); 117 } 118 } catch (final InterruptedException e) { 119 log.warn(e.getMessage()); 120 Thread.currentThread().interrupt(); 121 } 122 123 if (shutdown != null && handler.isGlobalProxyConfiguration()) { 124 final String pt = Integer.toString(handler.getPort()); 125 126 Stream.of("", "s").forEach(s -> { 127 shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown); 128 shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown); 129 shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown); 130 }); 131 132 if (handler.getSslContext() != null) { 133 try { 134 final SSLContext defaultSslContext = SSLContext.getDefault(); 135 final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier(); 136 shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown); 137 shutdown = decorate(() -> { 138 HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory()); 139 HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier); 140 }, shutdown); 141 shutdown = decorate( 142 () -> setProperty("jdk.internal.httpclient.disableHostnameVerification", "true"), shutdown); 143 144 SSLContext.setDefault(handler.getSslContext()); 145 HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory()); 146 HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true); 147 } catch (final NoSuchAlgorithmException e) { 148 throw new IllegalStateException(e); 149 } 150 } 151 log 152 .info("Configured the JVM to use the {} API proxy localhost:{}", 153 handler.getSslContext() != null ? "SSL" : "plain", handler.getPort()); 154 } 155 return this; 156 } 157 158 @Override 159 public synchronized void close() { 160 ofNullable(shutdown).ifPresent(Runnable::run); 161 if (instance != null) { 162 log.info("Stopping Talend API server (port {})", handler.getPort()); 163 try { 164 instance.join(TimeUnit.MINUTES.toMillis(5)); 165 } catch (final InterruptedException e) { 166 log.warn(e.getMessage(), e); 167 Thread.currentThread().interrupt(); 168 } finally { 169 instance = null; 170 shutdown = null; 171 } 172 } 173 Stream 174 .of(handler.getResponseLocator(), handler.getExecutor()) 175 .filter(AutoCloseable.class::isInstance) 176 .map(AutoCloseable.class::cast) 177 .forEach(c -> { 178 try { 179 c.close(); 180 } catch (final Exception e) { 181 log.error(e.getMessage(), e); 182 } 183 }); 184 if (!AutoCloseable.class.isInstance(handler.getExecutor()) 185 && ExecutorService.class.isInstance(handler.getExecutor())) { 186 final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor()); 187 executorService.shutdownNow(); // we don't need to wait here 188 } 189 } 190 191 private Runnable decorate(final Runnable last, final Runnable first) { 192 return () -> { 193 first.run(); 194 last.run(); 195 }; 196 } 197 198 private Runnable setProperty(final String name, final String value) { 199 final String val = System.getProperty(name); 200 System.setProperty(name, value); 201 return () -> { 202 if (val == null) { 203 System.clearProperty(name); 204 } else { 205 System.setProperty(name, val); 206 } 207 }; 208 } 209 210 private int newRandomPort() { 211 try (final ServerSocket socket = new ServerSocket(0)) { 212 return socket.getLocalPort(); 213 } catch (final IOException e) { 214 throw new IllegalStateException(e); 215 } 216 } 217}