Skip to content

DataBufferUtils.join and Netty Leak LEAK: ByteBuf.release() #34113

Closed as not planned
@carl-HelloWorld

Description

@carl-HelloWorld

Source code:

package com.yl.platform.gateway.filter;

import com.yl.platform.gateway.constant.Constants;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR;

/**
 * 请求体日志组件
 */
@Component
public class RequestBodyLogGlobalFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(RequestBodyLogGlobalFilter.class);

    private final List<HttpMessageReader<?>> messageReaders;

    @Value("${yl.platform.gateway.logging.request-body-log-max-byte-length:512}")
    private int requestBodyLogMaxByteLength;

    /**
     * 响应体中日志
     */
    @Value("${gateway.logging.request-body-log-enabled:true}")
    private boolean requestBodyLogEnabled;

    /**
     * 请求体中最大的的可读字节数,如果大于配置则不打印响应体日志
     */
    @Value("${gateway.logging.request-body-max-readable-byte-count:1024}")
    private int requestBodyMaxReadableByteLength;

    public RequestBodyLogGlobalFilter(final ServerCodecConfigurer codecConfigurer) {
        this.messageReaders = codecConfigurer.getReaders();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        if (!shouldRequestBody(exchange)) {
            return chain.filter(exchange);
        }

        if (exchange.getAttribute(Constants.REQUEST_BODY_LOG_ATTR) != null) {
            return chain.filter(exchange);
        }

        ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
        if (cachedRequest == null) {
            return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest -> {
                if (exchange.getRequest() == serverHttpRequest) {
                    return chain.filter(exchange);
                }
                return requestBodyStoreToExchange(exchange, chain, serverHttpRequest);
            }));
        }else {
            return requestBodyStoreToExchange(exchange, chain, cachedRequest);
        }
    }

    private Mono<Void> requestBodyStoreToExchange(ServerWebExchange exchange, GatewayFilterChain chain, ServerHttpRequest cachedRequest) {
        final ServerRequest serverRequest = ServerRequest
                .create(exchange.mutate().request(cachedRequest).build(), messageReaders);
        // todo serverRequest.bodyToMono(byte[].class) maybe cause a memory leak when throw DataBufferLimitException
        // -Dspring.codec.max-in-memory-size = 1
        return serverRequest.bodyToMono(byte[].class).doOnNext(requestBody -> {
            String requestBodyLog = getRequestBodyLog(requestBody);
            if (log.isDebugEnabled()) {
                log.debug(exchange.getLogPrefix() + "requestBodyLog is:" + requestBodyLog);
            }
            if (StringUtils.isNotEmpty(requestBodyLog)) {
                exchange.getAttributes().put(Constants.REQUEST_BODY_LOG_ATTR, requestBodyLog);
            }
        }).then(Mono.defer(() -> {
            return chain.filter(exchange.mutate().request(cachedRequest).build());
        }));
    }

    @Override
    public int getOrder() {
        // AuthorizationGlobalFilter之前
        return Ordered.HIGHEST_PRECEDENCE;
    }

    private boolean shouldRequestBody(ServerWebExchange exchange) {
        final MediaType contentType = exchange.getRequest().getHeaders().getContentType();
        if (!requestBodyLogEnabled) {
            return false;
        }

        if (contentType == null) {
            return false;
        }

        if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
            return true;
        }

        long contentLength = exchange.getRequest().getHeaders().getContentLength();
        if (contentLength < requestBodyMaxReadableByteLength) {
            return true;
        }

        return false;
    }

    public int getRequestBodyLogMaxByteLength() {
        return requestBodyLogMaxByteLength;
    }

    private String getRequestBodyLog(byte[] buf) {
        if (buf != null && buf.length > 0) {
            int length = Math.min(buf.length,getRequestBodyLogMaxByteLength());
            try {
                String requestBodyToUse = new String(buf, 0, length, StandardCharsets.UTF_8.name());
                if (buf.length > getRequestBodyLogMaxByteLength()) {
                    requestBodyToUse = requestBodyToUse + "......";
                }
                return requestBodyToUse;
            } catch (UnsupportedEncodingException ex) {
                return "[unknown]";
            } catch (Throwable ex) {
                log.error("ex", ex);
                return "[unknown]";
            }
        }
        return null;
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions