001/** 002 * Copyright (C) 2006-2020 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.http.internal.impl; 017 018import static java.util.Optional.ofNullable; 019import static java.util.concurrent.TimeUnit.SECONDS; 020 021import java.io.IOException; 022import java.net.ServerSocket; 023import java.security.NoSuchAlgorithmException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.TimeUnit; 028import java.util.stream.Stream; 029 030import javax.net.ssl.HostnameVerifier; 031import javax.net.ssl.HttpsURLConnection; 032import javax.net.ssl.SSLContext; 033 034import org.talend.sdk.component.junit.http.api.HttpApiHandler; 035 036import io.netty.bootstrap.ServerBootstrap; 037import io.netty.channel.ChannelFutureListener; 038import io.netty.channel.ChannelOption; 039import io.netty.channel.EventLoopGroup; 040import io.netty.channel.nio.NioEventLoopGroup; 041import io.netty.channel.socket.nio.NioServerSocketChannel; 042import io.netty.util.concurrent.DefaultThreadFactory; 043 044import lombok.AllArgsConstructor; 045import lombok.extern.slf4j.Slf4j; 046 047@Slf4j 048@AllArgsConstructor 049public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable { 050 051 private final HttpApiHandler<T> handler; 052 053 private Thread instance; 054 055 private Runnable shutdown; 056 057 public synchronized HandlerImpl<T> start() { 058 if (instance != null) { 059 throw new IllegalStateException("Instance already started"); 060 } 061 062 if (handler.getPort() <= 0) { 063 handler.setPort(newRandomPort()); 064 } 065 066 final CountDownLatch startingPistol = new CountDownLatch(1); 067 final int nProcessors = Math.max(1, Runtime.getRuntime().availableProcessors()); 068 final ExecutorService boosExecutor = 069 Executors.newFixedThreadPool(1, new DefaultThreadFactory("talend-api-boss")); 070 final ExecutorService workerExecutor = 071 Executors.newFixedThreadPool(nProcessors, new DefaultThreadFactory("talend-api-worker")); 072 instance = new Thread(() -> { 073 // todo: config 074 final EventLoopGroup bossGroup = new NioEventLoopGroup(1, boosExecutor); 075 final EventLoopGroup workerGroup = new NioEventLoopGroup(nProcessors, workerExecutor); 076 try { 077 final ServerBootstrap b = new ServerBootstrap(); 078 b 079 .option(ChannelOption.SO_REUSEADDR, true) 080 .group(bossGroup, workerGroup) 081 .channel(NioServerSocketChannel.class) 082 .childHandler(new ProxyInitializer(handler)) 083 .bind("localhost", handler.getPort()) 084 .sync() 085 .addListener((ChannelFutureListener) f -> { 086 if (f.isSuccess()) { 087 shutdown = () -> { 088 bossGroup.shutdownGracefully(); 089 workerGroup.shutdownGracefully(); 090 }; 091 } else { 092 log.error("Can't start API server"); 093 } 094 startingPistol.countDown(); 095 }) 096 .channel() 097 .closeFuture() 098 .sync(); 099 } catch (final InterruptedException e) { 100 close(); 101 Thread.currentThread().interrupt(); 102 } 103 }) { 104 105 { 106 setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_" 107 + HandlerImpl.this.hashCode()); 108 } 109 }; 110 log.info("Starting Talend API server on port {}", handler.getPort()); 111 instance.start(); 112 try { 113 if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) { 114 log 115 .warn("API server took more than the expected timeout to start, you can tune it " 116 + "setting talend.junit.http.starting.timeout system property"); 117 } 118 } catch (final InterruptedException e) { 119 log.warn(e.getMessage()); 120 Thread.currentThread().interrupt(); 121 } 122 123 if (shutdown != null && handler.isGlobalProxyConfiguration()) { 124 final String pt = Integer.toString(handler.getPort()); 125 126 Stream.of("", "s").forEach(s -> { 127 shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown); 128 shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown); 129 shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown); 130 }); 131 132 if (handler.getSslContext() != null) { 133 try { 134 final SSLContext defaultSslContext = SSLContext.getDefault(); 135 final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier(); 136 shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown); 137 shutdown = decorate(() -> { 138 HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory()); 139 HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier); 140 }, shutdown); 141 142 SSLContext.setDefault(handler.getSslContext()); 143 HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory()); 144 HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true); 145 } catch (final NoSuchAlgorithmException e) { 146 throw new IllegalStateException(e); 147 } 148 } 149 log 150 .info("Configured the JVM to use the {} API proxy localhost:{}", 151 handler.getSslContext() != null ? "SSL" : "plain", handler.getPort()); 152 } 153 return this; 154 } 155 156 @Override 157 public synchronized void close() { 158 ofNullable(shutdown).ifPresent(Runnable::run); 159 if (instance != null) { 160 log.info("Stopping Talend API server (port {})", handler.getPort()); 161 try { 162 instance.join(TimeUnit.MINUTES.toMillis(5)); 163 } catch (final InterruptedException e) { 164 log.warn(e.getMessage(), e); 165 Thread.currentThread().interrupt(); 166 } finally { 167 instance = null; 168 shutdown = null; 169 } 170 } 171 Stream 172 .of(handler.getResponseLocator(), handler.getExecutor()) 173 .filter(AutoCloseable.class::isInstance) 174 .map(AutoCloseable.class::cast) 175 .forEach(c -> { 176 try { 177 c.close(); 178 } catch (final Exception e) { 179 log.error(e.getMessage(), e); 180 } 181 }); 182 if (!AutoCloseable.class.isInstance(handler.getExecutor()) 183 && ExecutorService.class.isInstance(handler.getExecutor())) { 184 final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor()); 185 executorService.shutdownNow(); // we don't need to wait here 186 } 187 } 188 189 private Runnable decorate(final Runnable last, final Runnable first) { 190 return () -> { 191 first.run(); 192 last.run(); 193 }; 194 } 195 196 private Runnable setProperty(final String name, final String value) { 197 final String val = System.getProperty(name); 198 System.setProperty(name, value); 199 return () -> { 200 if (val == null) { 201 System.clearProperty(name); 202 } else { 203 System.setProperty(name, val); 204 } 205 }; 206 } 207 208 private int newRandomPort() { 209 try (final ServerSocket socket = new ServerSocket(0)) { 210 return socket.getLocalPort(); 211 } catch (final IOException e) { 212 throw new IllegalStateException(e); 213 } 214 } 215}