package oracle.core.ojdl.query;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import oracle.core.ojdl.reader.BaseLogReader;
import oracle.core.ojdl.reader.LogReader;
import oracle.core.ojdl.reader.LogRecord;
import oracle.core.ojdl.reader.LogRecordImpl;

/* loaded from: input_file:oracle/core/ojdl/query/ConcurrentMergeLogReader.class */
class ConcurrentMergeLogReader extends BaseLogReader {
    private static final String TIMESTAMP_FIELD = "TSTZ_ORIGINATING";
    private int m_nreaders;
    private LogReader[] m_readers;
    private LogRecord[] m_records;
    private transient int m_lastIndex = -1;
    private transient long m_lastTime = -1;
    private transient ArrayBlockingQueue<LogRecord>[] m_queues;
    private ExecutorService m_executor;
    private static Logger s_logger = Logger.getLogger("oracle.odl.query");
    private static LogRecord DISABLED = new LogRecordImpl();
    private static LogRecord ENDOFDATA = new LogRecordImpl();

    /* loaded from: input_file:oracle/core/ojdl/query/ConcurrentMergeLogReader$Worker.class */
    private static class Worker implements Runnable {
        LogReader m_reader;
        ArrayBlockingQueue m_queue;
        long count = 0;

        public Worker(LogReader logReader, ArrayBlockingQueue arrayBlockingQueue) {
            this.m_reader = logReader;
            this.m_queue = arrayBlockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    LogRecord read = this.m_reader.read();
                    if (read == null) {
                        this.m_queue.put(ConcurrentMergeLogReader.ENDOFDATA);
                        return;
                    } else {
                        this.count++;
                        this.m_queue.put(read);
                    }
                } catch (Exception e) {
                    if (ConcurrentMergeLogReader.s_logger.isLoggable(Level.FINE)) {
                        ConcurrentMergeLogReader.s_logger.log(Level.FINE, "Unexpected exception in ConcurrentMergeLogReader Worker thread: " + e, (Throwable) e);
                    }
                    try {
                        this.m_queue.put(ConcurrentMergeLogReader.ENDOFDATA);
                        return;
                    } catch (InterruptedException e2) {
                        return;
                    }
                }
            }
        }
    }

    public ConcurrentMergeLogReader(int i, ExecutorService executorService) {
        this.m_nreaders = 0;
        this.m_nreaders = i;
        this.m_readers = new LogReader[i];
        this.m_records = new LogRecord[i];
        this.m_queues = new ArrayBlockingQueue[i];
        this.m_executor = executorService;
    }

    public void setReader(int i, LogReader logReader) {
        this.m_readers[i] = logReader;
    }

    public void start() throws Exception {
        for (int i = 0; i < this.m_nreaders; i++) {
            this.m_queues[i] = new ArrayBlockingQueue<>(1);
            if (this.m_readers[i] != null) {
                this.m_executor.submit(new Worker(this.m_readers[i], this.m_queues[i]));
            } else {
                this.m_queues[i].put(ENDOFDATA);
            }
        }
    }

    @Override // oracle.core.ojdl.reader.BaseLogReader, oracle.core.ojdl.reader.LogReader
    public synchronized LogRecord read() {
        ArrayBlockingQueue<LogRecord> arrayBlockingQueue;
        long j = Long.MAX_VALUE;
        int i = -1;
        int length = this.m_readers.length;
        for (int i2 = 0; i2 < length; i2++) {
            LogRecord logRecord = this.m_records[i2];
            if (logRecord != DISABLED) {
                if (logRecord == null && (arrayBlockingQueue = this.m_queues[i2]) != null) {
                    try {
                        logRecord = arrayBlockingQueue.take();
                    } catch (Exception e) {
                        if (s_logger.isLoggable(Level.FINE)) {
                            s_logger.log(Level.FINE, "Caught exception reading from reader " + i2 + ". No more data will be read from this reader.", (Throwable) e);
                        }
                        logRecord = ENDOFDATA;
                    }
                    if (logRecord != ENDOFDATA) {
                        this.m_records[i2] = logRecord;
                    } else {
                        this.m_queues[i2] = null;
                        logRecord = null;
                    }
                }
                if (logRecord != null) {
                    Long l = (Long) logRecord.getField(TIMESTAMP_FIELD);
                    long longValue = l != null ? l.longValue() : 0L;
                    if (longValue < j) {
                        j = longValue;
                        i = i2;
                    }
                }
            }
        }
        if (i < 0) {
            this.m_lastIndex = -1;
            this.m_lastTime = -1L;
            return null;
        }
        LogRecord logRecord2 = this.m_records[i];
        this.m_records[i] = null;
        this.m_lastIndex = i;
        this.m_lastTime = j;
        return logRecord2;
    }

    @Override // oracle.core.ojdl.reader.BaseLogReader, oracle.core.ojdl.reader.LogReader
    public synchronized void close() {
        if (this.m_readers != null) {
            for (int i = 0; i < this.m_readers.length; i++) {
                try {
                    this.m_readers[i].close();
                } catch (Exception e) {
                }
            }
        }
        this.m_nreaders = 0;
        this.m_readers = null;
        this.m_records = null;
    }
}
