001/** 002 * Copyright (C) 2006-2018 Talend Inc. - www.talend.com 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.talend.sdk.component.junit.http.internal.impl; 017 018import static java.util.Optional.ofNullable; 019import static java.util.concurrent.TimeUnit.SECONDS; 020 021import java.io.IOException; 022import java.net.ServerSocket; 023import java.security.NoSuchAlgorithmException; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.TimeUnit; 027import java.util.stream.Stream; 028 029import javax.net.ssl.HostnameVerifier; 030import javax.net.ssl.HttpsURLConnection; 031import javax.net.ssl.SSLContext; 032 033import org.talend.sdk.component.junit.http.api.HttpApiHandler; 034 035import io.netty.bootstrap.ServerBootstrap; 036import io.netty.channel.ChannelFutureListener; 037import io.netty.channel.ChannelOption; 038import io.netty.channel.EventLoopGroup; 039import io.netty.channel.nio.NioEventLoopGroup; 040import io.netty.channel.socket.nio.NioServerSocketChannel; 041import io.netty.util.concurrent.DefaultThreadFactory; 042 043import lombok.AllArgsConstructor; 044import lombok.extern.slf4j.Slf4j; 045 046@Slf4j 047@AllArgsConstructor 048public class HandlerImpl<T extends HttpApiHandler<?>> implements AutoCloseable { 049 050 private final HttpApiHandler<T> handler; 051 052 private Thread instance; 053 054 private Runnable shutdown; 055 056 public synchronized HandlerImpl<T> start() { 057 if (instance != null) { 058 throw new IllegalStateException("Instance already started"); 059 } 060 061 if (handler.getPort() <= 0) { 062 handler.setPort(newRandomPort()); 063 } 064 065 final CountDownLatch startingPistol = new CountDownLatch(1); 066 instance = new Thread(() -> { 067 // todo: config 068 final EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("talend-api-boss")); 069 final EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), 070 new DefaultThreadFactory("talend-api-worker")); 071 try { 072 final ServerBootstrap b = new ServerBootstrap(); 073 b 074 .option(ChannelOption.SO_REUSEADDR, true) 075 .group(bossGroup, workerGroup) 076 .channel(NioServerSocketChannel.class) 077 .childHandler(new ProxyInitializer(handler)) 078 .bind("localhost", handler.getPort()) 079 .sync() 080 .addListener((ChannelFutureListener) f -> { 081 if (f.isSuccess()) { 082 shutdown = () -> { 083 bossGroup.shutdownGracefully(); 084 workerGroup.shutdownGracefully(); 085 }; 086 } else { 087 log.error("Can't start API server"); 088 } 089 startingPistol.countDown(); 090 }) 091 .channel() 092 .closeFuture() 093 .sync(); 094 } catch (final InterruptedException e) { 095 close(); 096 Thread.currentThread().interrupt(); 097 } 098 }) { 099 100 { 101 setName("Talend-API-monitor_" + HandlerImpl.this.getClass().getSimpleName() + "_" 102 + HandlerImpl.this.hashCode()); 103 } 104 }; 105 log.info("Starting Talend API server on port {}", handler.getPort()); 106 instance.start(); 107 try { 108 if (!startingPistol.await(Integer.getInteger("talend.junit.http.starting.timeout", 60), SECONDS)) { 109 log.warn("API server took more than the expected timeout to start, you can tune it " 110 + "setting talend.junit.http.starting.timeout system property"); 111 } 112 } catch (final InterruptedException e) { 113 log.warn(e.getMessage()); 114 Thread.currentThread().interrupt(); 115 } 116 117 if (shutdown != null && handler.isGlobalProxyConfiguration()) { 118 final String pt = Integer.toString(handler.getPort()); 119 120 Stream.of("", "s").forEach(s -> { 121 shutdown = decorate(setProperty("http" + s + ".proxyHost", "localhost"), shutdown); 122 shutdown = decorate(setProperty("http" + s + ".proxyPort", pt), shutdown); 123 shutdown = decorate(setProperty("http" + s + ".nonProxyHosts", "local|*.local"), shutdown); 124 }); 125 126 if (handler.getSslContext() != null) { 127 try { 128 final SSLContext defaultSslContext = SSLContext.getDefault(); 129 final HostnameVerifier defaultHostnameVerifier = HttpsURLConnection.getDefaultHostnameVerifier(); 130 shutdown = decorate(() -> SSLContext.setDefault(defaultSslContext), shutdown); 131 shutdown = decorate(() -> { 132 HttpsURLConnection.setDefaultSSLSocketFactory(defaultSslContext.getSocketFactory()); 133 HttpsURLConnection.setDefaultHostnameVerifier(defaultHostnameVerifier); 134 }, shutdown); 135 136 SSLContext.setDefault(handler.getSslContext()); 137 HttpsURLConnection.setDefaultSSLSocketFactory(handler.getSslContext().getSocketFactory()); 138 HttpsURLConnection.setDefaultHostnameVerifier((host, sslSession) -> true); 139 } catch (final NoSuchAlgorithmException e) { 140 throw new IllegalStateException(e); 141 } 142 } 143 log.info("Configured the JVM to use the {} API proxy localhost:{}", 144 handler.getSslContext() != null ? "SSL" : "plain", handler.getPort()); 145 } 146 return this; 147 } 148 149 @Override 150 public synchronized void close() { 151 ofNullable(shutdown).ifPresent(Runnable::run); 152 if (instance != null) { 153 log.info("Stopping Talend API server (port {})", handler.getPort()); 154 try { 155 instance.join(TimeUnit.MINUTES.toMillis(5)); 156 } catch (final InterruptedException e) { 157 log.warn(e.getMessage(), e); 158 Thread.currentThread().interrupt(); 159 } finally { 160 instance = null; 161 shutdown = null; 162 } 163 } 164 Stream 165 .of(handler.getResponseLocator(), handler.getExecutor()) 166 .filter(AutoCloseable.class::isInstance) 167 .map(AutoCloseable.class::cast) 168 .forEach(c -> { 169 try { 170 c.close(); 171 } catch (final Exception e) { 172 log.error(e.getMessage(), e); 173 } 174 }); 175 if (!AutoCloseable.class.isInstance(handler.getExecutor()) 176 && ExecutorService.class.isInstance(handler.getExecutor())) { 177 final ExecutorService executorService = ExecutorService.class.cast(handler.getExecutor()); 178 executorService.shutdownNow(); // we don't need to wait here 179 } 180 } 181 182 private Runnable decorate(final Runnable last, final Runnable first) { 183 return () -> { 184 first.run(); 185 last.run(); 186 }; 187 } 188 189 private Runnable setProperty(final String name, final String value) { 190 final String val = System.getProperty(name); 191 System.setProperty(name, value); 192 return () -> { 193 if (val == null) { 194 System.clearProperty(name); 195 } else { 196 System.setProperty(name, val); 197 } 198 }; 199 } 200 201 private int newRandomPort() { 202 try (final ServerSocket socket = new ServerSocket(0)) { 203 return socket.getLocalPort(); 204 } catch (final IOException e) { 205 throw new IllegalStateException(e); 206 } 207 } 208}