001/**
002 * Copyright (C) 2006-2019 Talend Inc. - www.talend.com
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.talend.sdk.component.junit.http.internal.impl;
017
018import static java.util.Optional.of;
019import static java.util.Optional.ofNullable;
020import static java.util.stream.Collectors.toMap;
021import static org.talend.sdk.component.junit.http.internal.impl.Handlers.closeOnFlush;
022import static org.talend.sdk.component.junit.http.internal.impl.Handlers.sendError;
023
024import java.net.HttpURLConnection;
025import java.nio.charset.StandardCharsets;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Spliterator;
030import java.util.Spliterators;
031import java.util.stream.StreamSupport;
032
033import javax.net.ssl.SSLEngine;
034
035import org.talend.sdk.component.junit.http.api.HttpApiHandler;
036import org.talend.sdk.component.junit.http.api.Response;
037
038import io.netty.buffer.ByteBuf;
039import io.netty.buffer.Unpooled;
040import io.netty.channel.ChannelHandler;
041import io.netty.channel.ChannelHandlerContext;
042import io.netty.channel.SimpleChannelInboundHandler;
043import io.netty.handler.codec.http.DefaultFullHttpResponse;
044import io.netty.handler.codec.http.FullHttpRequest;
045import io.netty.handler.codec.http.HttpHeaderNames;
046import io.netty.handler.codec.http.HttpHeaderValues;
047import io.netty.handler.codec.http.HttpMethod;
048import io.netty.handler.codec.http.HttpResponse;
049import io.netty.handler.codec.http.HttpResponseStatus;
050import io.netty.handler.codec.http.HttpUtil;
051import io.netty.handler.codec.http.HttpVersion;
052import io.netty.handler.ssl.SslHandler;
053import io.netty.util.Attribute;
054
055import lombok.AllArgsConstructor;
056import lombok.extern.slf4j.Slf4j;
057
058@Slf4j
059@AllArgsConstructor
060@ChannelHandler.Sharable
061public class ServingProxyHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
062
063    private final HttpApiHandler api;
064
065    @Override
066    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) {
067        if (!request.decoderResult().isSuccess()) {
068            sendError(ctx, HttpResponseStatus.BAD_REQUEST);
069            return;
070        }
071
072        final String payload = request.content().toString(StandardCharsets.UTF_8);
073
074        api.getExecutor().execute(() -> {
075            final Map<String, String> headers = StreamSupport
076                    .stream(Spliterators
077                            .spliteratorUnknownSize(request.headers().iteratorAsString(), Spliterator.IMMUTABLE), false)
078                    .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
079            final Attribute<String> baseAttr = ctx.channel().attr(Handlers.BASE);
080            Optional<Response> matching = api
081                    .getResponseLocator()
082                    .findMatching(new RequestImpl(
083                            (baseAttr == null || baseAttr.get() == null ? "" : baseAttr.get()) + request.uri(),
084                            request.method().name(), payload, headers), api.getHeaderFilter());
085            if (!matching.isPresent()) {
086                if (HttpMethod.CONNECT.name().equalsIgnoreCase(request.method().name())) {
087                    final Map<String, String> responseHeaders = new HashMap<>();
088                    responseHeaders.put(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.KEEP_ALIVE.toString());
089                    responseHeaders.put(HttpHeaderNames.CONTENT_LENGTH.toString(), "0");
090                    matching = of(new ResponseImpl(responseHeaders, HttpResponseStatus.OK.code(),
091                            Unpooled.EMPTY_BUFFER.array()));
092                    if (api.getSslContext() != null) {
093                        final SSLEngine sslEngine = api.getSslContext().createSSLEngine();
094                        sslEngine.setUseClientMode(false);
095                        ctx.channel().pipeline().addFirst("ssl", new SslHandler(sslEngine, true));
096
097                        final String uri = request.uri();
098                        final String[] parts = uri.split(":");
099                        ctx
100                                .channel()
101                                .attr(Handlers.BASE)
102                                .set("https://" + parts[0]
103                                        + (parts.length > 1 && !"443".equals(parts[1]) ? ":" + parts[1] : ""));
104                    }
105                } else {
106                    sendError(ctx, new HttpResponseStatus(HttpURLConnection.HTTP_BAD_REQUEST,
107                            "You are in proxy mode. No response was found for the simulated request. Please ensure to capture it for next executions. "
108                                    + request.method().name() + " " + request.uri()));
109                    return;
110                }
111            }
112
113            final Response resp = matching.get();
114            final ByteBuf bytes = ofNullable(resp.payload()).map(Unpooled::copiedBuffer).orElse(Unpooled.EMPTY_BUFFER);
115            final HttpResponse response =
116                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(resp.status()), bytes);
117            HttpUtil.setContentLength(response, bytes.array().length);
118
119            if (!api.isSkipProxyHeaders()) {
120                response.headers().set("X-Talend-Proxy-JUnit", "true");
121            }
122
123            ofNullable(resp.headers()).ifPresent(h -> h.forEach((k, v) -> response.headers().set(k, v)));
124            ctx.writeAndFlush(response);
125        });
126    }
127
128    @Override
129    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
130        log.error(cause.getMessage(), cause);
131        closeOnFlush(ctx.channel());
132    }
133}