package reactor.core;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.event.Event;
import reactor.function.Consumer;
import reactor.function.Predicate;
import reactor.queue.BlockingQueueFactory;
import reactor.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/reactor-core-1.0.1.RELEASE.jar:reactor/core/EventBatcher.class */
public class EventBatcher<T> implements Consumer<Event<T>> {
    private final AtomicLong count = new AtomicLong();
    private final AtomicLong flushCount = new AtomicLong();
    private final Observable observable;
    private final Object key;
    private final Queue<Event<T>> queue;
    private final Predicate<Queue<Event<T>>> queueWhile;
    private final Predicate<Queue<Event<T>>> flushWhen;

    public EventBatcher(@Nonnull Observable observable, @Nonnull Object obj, @Nullable Queue<Event<T>> queue, @Nullable Predicate<Queue<Event<T>>> predicate, @Nullable Predicate<Queue<Event<T>>> predicate2) {
        Assert.notNull(observable, "Reactor cannot be null.");
        Assert.notNull(obj, "Event key cannot be null.");
        this.observable = observable;
        this.key = obj;
        this.queue = null == queue ? BlockingQueueFactory.createQueue() : queue;
        this.queueWhile = predicate;
        this.flushWhen = predicate2;
    }

    public void flush() {
        Event<T> poll;
        this.flushCount.set(this.count.get());
        while (this.flushCount.getAndDecrement() > 0 && null != (poll = this.queue.poll())) {
            this.observable.notify(this.key, poll);
        }
    }

    @Override // reactor.function.Consumer
    public final void accept(Event<T> event) {
        if (null == this.queueWhile || this.queueWhile.test(this.queue)) {
            this.queue.add(event);
            this.count.incrementAndGet();
        }
        if (null == this.flushWhen || !this.flushWhen.test(this.queue)) {
            return;
        }
        flush();
    }
}
