001/** 002 * Copyright (C) 2006-2019 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.http.internal.impl; 017 018import static java.util.Optional.ofNullable; 019import static java.util.concurrent.TimeUnit.SECONDS; 020 021import java.io.IOException; 022import java.net.ServerSocket; 023import java.security.NoSuchAlgorithmException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.TimeUnit; 027import java.util.stream.Stream; 028 029import javax.net.ssl.HostnameVerifier; 030import javax.net.ssl.HttpsURLConnection; 031import javax.net.ssl.SSLContext; 032 033import org.talend.sdk.component.junit.http.api.HttpApiHandler; 034 035import io.netty.bootstrap.ServerBootstrap; 036import io.netty.channel.ChannelFutureListener; 037import io.netty.channel.ChannelOption; 038import io.netty.channel.EventLoopGroup; 039import io.netty.channel.nio.NioEventLoopGroup; 040import io.netty.channel.socket.nio.NioServerSocketChannel; 041import io.netty.util.concurrent.DefaultThreadFactory; 042 043import lombok.AllArgsConstructor; 044import lombok.extern.slf4j.Slf4j; 045 046@Slf4j 047@AllArgsConstructor 048public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable { 049 050 private final HttpApiHandler<T> handler; 051 052 private Thread instance; 053 054 private Runnable shutdown; 055 056 public synchronized HandlerImpl<T> start() { 057 if (instance != null) { 058 throw new IllegalStateException("Instance already started"); 059 } 060 061 if (handler.getPort() <= 0) { 062 handler.setPort(newRandomPort()); 063 } 064 065 final CountDownLatch startingPistol = new CountDownLatch(1); 066 instance = new Thread(() -> { 067 // todo: config 068 final EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("talend-api-boss")); 069 final EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), 070 new DefaultThreadFactory("talend-api-worker")); 071 try { 072 final ServerBootstrap b = new ServerBootstrap(); 073 b 074 .option(ChannelOption.SO_REUSEADDR, true) 075 .group(bossGroup, workerGroup) 076 .channel(NioServerSocketChannel.class) 077 .childHandler(new ProxyInitializer(handler)) 078 .bind("localhost", handler.getPort()) 079 .sync() 080 .addListener((ChannelFutureListener) f -> { 081 if (f.isSuccess()) { 082 shutdown = () -> { 083 bossGroup.shutdownGracefully(); 084 workerGroup.shutdownGracefully(); 085 }; 086 } else { 087 log.error("Can't start API server"); 088 } 089 startingPistol.countDown(); 090 }) 091 .channel() 092 .closeFuture() 093 .sync(); 094 } catch (final InterruptedException e) { 095 close(); 096 Thread.currentThread().interrupt(); 097 } 098 }) { 099 100 { 101 setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_" 102 + HandlerImpl.this.hashCode()); 103 } 104 }; 105 log.info("Starting Talend API server on port {}", handler.getPort()); 106 instance.start(); 107 try { 108 if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) { 109 log 110 .warn("API server took more than the expected timeout to start, you can tune it " 111 + "setting talend.junit.http.starting.timeout system property"); 112 } 113 } catch (final InterruptedException e) { 114 log.warn(e.getMessage()); 115 Thread.currentThread().interrupt(); 116 } 117 118 if (shutdown != null && handler.isGlobalProxyConfiguration()) { 119 final String pt = Integer.toString(handler.getPort()); 120 121 Stream.of("", "s").forEach(s -> { 122 shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown); 123 shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown); 124 shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown); 125 }); 126 127 if (handler.getSslContext() != null) { 128 try { 129 final SSLContext defaultSslContext = SSLContext.getDefault(); 130 final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier(); 131 shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown); 132 shutdown = decorate(() -> { 133 HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory()); 134 HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier); 135 }, shutdown); 136 137 SSLContext.setDefault(handler.getSslContext()); 138 HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory()); 139 HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true); 140 } catch (final NoSuchAlgorithmException e) { 141 throw new IllegalStateException(e); 142 } 143 } 144 log 145 .info("Configured the JVM to use the {} API proxy localhost:{}", 146 handler.getSslContext() != null ? "SSL" : "plain", handler.getPort()); 147 } 148 return this; 149 } 150 151 @Override 152 public synchronized void close() { 153 ofNullable(shutdown).ifPresent(Runnable::run); 154 if (instance != null) { 155 log.info("Stopping Talend API server (port {})", handler.getPort()); 156 try { 157 instance.join(TimeUnit.MINUTES.toMillis(5)); 158 } catch (final InterruptedException e) { 159 log.warn(e.getMessage(), e); 160 Thread.currentThread().interrupt(); 161 } finally { 162 instance = null; 163 shutdown = null; 164 } 165 } 166 Stream 167 .of(handler.getResponseLocator(), handler.getExecutor()) 168 .filter(AutoCloseable.class::isInstance) 169 .map(AutoCloseable.class::cast) 170 .forEach(c -> { 171 try { 172 c.close(); 173 } catch (final Exception e) { 174 log.error(e.getMessage(), e); 175 } 176 }); 177 if (!AutoCloseable.class.isInstance(handler.getExecutor()) 178 && ExecutorService.class.isInstance(handler.getExecutor())) { 179 final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor()); 180 executorService.shutdownNow(); // we don't need to wait here 181 } 182 } 183 184 private Runnable decorate(final Runnable last, final Runnable first) { 185 return () -> { 186 first.run(); 187 last.run(); 188 }; 189 } 190 191 private Runnable setProperty(final String name, final String value) { 192 final String val = System.getProperty(name); 193 System.setProperty(name, value); 194 return () -> { 195 if (val == null) { 196 System.clearProperty(name); 197 } else { 198 System.setProperty(name, val); 199 } 200 }; 201 } 202 203 private int newRandomPort() { 204 try (final ServerSocket socket = new ServerSocket(0)) { 205 return socket.getLocalPort(); 206 } catch (final IOException e) { 207 throw new IllegalStateException(e); 208 } 209 } 210}