package org.springframework.messaging.rsocket.annotation.support;

import io.rsocket.Payload;
import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler;
import org.springframework.messaging.rsocket.PayloadUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.12.RELEASE.jar:org/springframework/messaging/rsocket/annotation/support/RSocketPayloadReturnValueHandler.class */
public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodReturnValueHandler {
    public static final String RESPONSE_HEADER = "rsocketResponse";

    public RSocketPayloadReturnValueHandler(List<Encoder<?>> list, ReactiveAdapterRegistry reactiveAdapterRegistry) {
        super(list, reactiveAdapterRegistry);
    }

    @Override // org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler
    protected Mono<Void> handleEncodedContent(Flux<DataBuffer> flux, MethodParameter methodParameter, Message<?> message) {
        MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
        Assert.notNull(replyMono, "Missing 'rsocketResponse'");
        replyMono.onNext(flux.map(PayloadUtils::createPayload));
        replyMono.onComplete();
        return Mono.empty();
    }

    @Override // org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler
    protected Mono<Void> handleNoContent(MethodParameter methodParameter, Message<?> message) {
        MonoProcessor<Flux<Payload>> replyMono = getReplyMono(message);
        if (replyMono != null) {
            replyMono.onComplete();
        }
        return Mono.empty();
    }

    @Nullable
    private MonoProcessor<Flux<Payload>> getReplyMono(Message<?> message) {
        Object obj = message.getHeaders().get(RESPONSE_HEADER);
        Assert.state(obj == null || (obj instanceof MonoProcessor), "Expected MonoProcessor");
        return (MonoProcessor) obj;
    }
}
