package org.apache.openjpa.event;

import com.bea.core.jatmi.common.ntrace;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.openjpa.lib.conf.Configurable;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.J2DoPrivHelper;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.GeneralException;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.Serialization;
import serp.util.Strings;

/* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider.class */
public class TCPRemoteCommitProvider extends AbstractRemoteCommitProvider implements Configurable {
    private static final int DEFAULT_PORT = 5636;
    private static final Localizer s_loc = Localizer.forPackage(TCPRemoteCommitProvider.class);
    private static long s_idSequence = System.currentTimeMillis();
    private static final Map s_portListenerMap = new HashMap();
    private long _id;
    private byte[] _localhost;
    private TCPPortListener _listener;
    private ReentrantLock _addressesLock;
    private static final long PROTOCOL_VERSION = 338210047;
    private int _port = DEFAULT_PORT;
    private int _maxActive = 2;
    private int _maxIdle = 2;
    private int _recoveryTimeMillis = ntrace.TBRIDGE_EX;
    private BroadcastQueue _broadcastQueue = new BroadcastQueue();
    private final List _broadcastThreads = Collections.synchronizedList(new LinkedList());
    private ArrayList _addresses = new ArrayList();

    /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$BroadcastQueue.class */
    private static class BroadcastQueue {
        private LinkedList _packetQueue;
        private boolean _closed;

        private BroadcastQueue() {
            this._packetQueue = new LinkedList();
            this._closed = false;
        }

        public synchronized void close() {
            this._closed = true;
            notifyAll();
        }

        public synchronized boolean isClosed() {
            return this._closed;
        }

        public synchronized void addPacket(byte[] bArr) {
            this._packetQueue.addLast(bArr);
            notify();
        }

        public synchronized byte[] removePacket() throws InterruptedException {
            while (!this._closed && this._packetQueue.isEmpty()) {
                wait();
            }
            if (this._packetQueue.isEmpty()) {
                return null;
            }
            return (byte[]) this._packetQueue.removeFirst();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$BroadcastWorkerThread.class */
    public class BroadcastWorkerThread extends Thread {
        private boolean _keepRunning;

        private BroadcastWorkerThread() {
            this._keepRunning = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this._keepRunning) {
                try {
                    byte[] removePacket = TCPRemoteCommitProvider.this._broadcastQueue.removePacket();
                    if (removePacket != null) {
                        TCPRemoteCommitProvider.this.sendUpdatePacket(removePacket);
                    } else if (TCPRemoteCommitProvider.this._broadcastQueue.isClosed()) {
                        this._keepRunning = false;
                    }
                } catch (InterruptedException e) {
                }
            }
            remove();
        }

        public void setRunning(boolean z) {
            this._keepRunning = z;
        }

        private void remove() {
            TCPRemoteCommitProvider.this._broadcastThreads.remove(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$HostAddress.class */
    public class HostAddress {
        private InetAddress _address;
        private int _port;
        private long _timeLastError;
        private boolean _isAvailable;
        private int _infosIssued;
        private GenericObjectPool _socketPool;

        /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$HostAddress$SocketPoolableObjectFactory.class */
        private class SocketPoolableObjectFactory implements PoolableObjectFactory {
            private SocketPoolableObjectFactory() {
            }

            @Override // org.apache.commons.pool.PoolableObjectFactory
            public Object makeObject() throws IOException {
                try {
                    Socket socket = (Socket) AccessController.doPrivileged(J2DoPrivHelper.newSocketAction(HostAddress.this._address, HostAddress.this._port));
                    if (TCPRemoteCommitProvider.this.log.isTraceEnabled()) {
                        TCPRemoteCommitProvider.this.log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-open-connection", HostAddress.this._address + ":" + HostAddress.this._port, "" + socket.getLocalPort()));
                    }
                    return socket;
                } catch (PrivilegedActionException e) {
                    throw ((IOException) e.getException());
                }
            }

            @Override // org.apache.commons.pool.PoolableObjectFactory
            public void destroyObject(Object obj) {
                try {
                    Socket socket = (Socket) obj;
                    if (TCPRemoteCommitProvider.this.log.isTraceEnabled()) {
                        TCPRemoteCommitProvider.this.log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-close-sending-socket", HostAddress.this._address + ":" + HostAddress.this._port, "" + socket.getLocalPort()));
                    }
                    socket.close();
                } catch (Exception e) {
                    TCPRemoteCommitProvider.this.log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-socket-error", HostAddress.this._address.getHostAddress() + ":" + HostAddress.this._port), e);
                }
            }

            @Override // org.apache.commons.pool.PoolableObjectFactory
            public boolean validateObject(Object obj) {
                return true;
            }

            @Override // org.apache.commons.pool.PoolableObjectFactory
            public void activateObject(Object obj) {
            }

            @Override // org.apache.commons.pool.PoolableObjectFactory
            public void passivateObject(Object obj) {
            }
        }

        private HostAddress(String str) throws UnknownHostException {
            this._infosIssued = 0;
            int indexOf = str.indexOf(58);
            try {
                if (indexOf != -1) {
                    this._address = (InetAddress) AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(str.substring(0, indexOf)));
                    this._port = Integer.parseInt(str.substring(indexOf + 1));
                } else {
                    this._address = (InetAddress) AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(str));
                    this._port = TCPRemoteCommitProvider.DEFAULT_PORT;
                }
                this._socketPool = new GenericObjectPool(new SocketPoolableObjectFactory(), TCPRemoteCommitProvider.this._maxActive, (byte) 1, -1L);
                this._isAvailable = true;
            } catch (PrivilegedActionException e) {
                throw ((UnknownHostException) e.getException());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setMaxActive(int i) {
            this._socketPool.setMaxActive(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setMaxIdle(int i) {
            this._socketPool.setMaxIdle(i);
        }

        public void close() {
            try {
                this._socketPool.close();
            } catch (Exception e) {
                if (TCPRemoteCommitProvider.this.log.isWarnEnabled()) {
                    TCPRemoteCommitProvider.this.log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-pool-error"), e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendUpdatePacket(byte[] bArr) {
            if (this._isAvailable || System.currentTimeMillis() - this._timeLastError >= TCPRemoteCommitProvider.this._recoveryTimeMillis) {
                Socket socket = null;
                try {
                    socket = getSocket();
                    OutputStream outputStream = socket.getOutputStream();
                    outputStream.write(bArr);
                    outputStream.flush();
                    if (TCPRemoteCommitProvider.this.log.isTraceEnabled()) {
                        TCPRemoteCommitProvider.this.log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-sent-update", this._address.getHostAddress() + ":" + this._port, String.valueOf(socket.getLocalPort())));
                    }
                    this._isAvailable = true;
                    this._infosIssued = 0;
                    returnSocket(socket);
                } catch (Exception e) {
                    if (socket != null) {
                        closeSocket(socket);
                    }
                    clearAllSockets();
                    if (this._isAvailable) {
                        if (TCPRemoteCommitProvider.this.log.isWarnEnabled()) {
                            TCPRemoteCommitProvider.this.log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-send-error", this._address.getHostAddress() + ":" + this._port), e);
                        }
                        this._isAvailable = false;
                        this._timeLastError = System.currentTimeMillis();
                        return;
                    }
                    if (System.currentTimeMillis() - this._timeLastError <= TCPRemoteCommitProvider.this._recoveryTimeMillis || this._infosIssued >= 5) {
                        return;
                    }
                    this._timeLastError = System.currentTimeMillis();
                    if (TCPRemoteCommitProvider.this.log.isInfoEnabled()) {
                        TCPRemoteCommitProvider.this.log.info(TCPRemoteCommitProvider.s_loc.get("tcp-send-still-error", this._address.getHostAddress() + ":" + this._port), e);
                    }
                    this._infosIssued++;
                }
            }
        }

        private Socket getSocket() throws Exception {
            return (Socket) this._socketPool.borrowObject();
        }

        private void returnSocket(Socket socket) throws Exception {
            this._socketPool.returnObject(socket);
        }

        private void clearAllSockets() {
            this._socketPool.clear();
        }

        private void closeSocket(Socket socket) {
            try {
                this._socketPool.invalidateObject(socket);
            } catch (Exception e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$TCPPortListener.class */
    public static class TCPPortListener implements Runnable {
        private final Log _log;
        private ServerSocket _receiveSocket;
        private Thread _acceptThread;
        private Set _receiverThreads;
        private final Set _providers;
        private byte[] _localhost;
        private int _port;
        private boolean _isRunning;

        /* loaded from: input_file:org/apache/openjpa/event/TCPRemoteCommitProvider$TCPPortListener$ReceiveSocketHandler.class */
        private class ReceiveSocketHandler implements Runnable {
            private InputStream _in;
            private Socket _s;

            private ReceiveSocketHandler(Socket socket) {
                this._s = socket;
                try {
                    this._s.setTcpNoDelay(true);
                    this._in = new BufferedInputStream(socket.getInputStream());
                } catch (IOException e) {
                    if (TCPPortListener.this._log.isInfoEnabled()) {
                        TCPPortListener.this._log.info(TCPRemoteCommitProvider.s_loc.get("tcp-socket-option-error"), e);
                    }
                    this._s = null;
                } catch (Exception e2) {
                    if (TCPPortListener.this._log.isWarnEnabled()) {
                        TCPPortListener.this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-receive-error"), e2);
                    }
                    this._s = null;
                }
            }

            @Override // java.lang.Runnable
            public void run() {
                if (this._s == null) {
                    return;
                }
                while (TCPPortListener.this._isRunning && this._s != null) {
                    try {
                        handle(this._in);
                    } catch (EOFException e) {
                        if (TCPPortListener.this._log.isTraceEnabled()) {
                            TCPPortListener.this._log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-close-socket", this._s.getInetAddress().getHostAddress() + ":" + this._s.getPort()));
                        }
                    } catch (Throwable th) {
                        if (TCPPortListener.this._log.isWarnEnabled()) {
                            TCPPortListener.this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-receive-error"), th);
                        }
                    }
                }
                try {
                    this._in.close();
                    if (this._s != null) {
                        this._s.close();
                    }
                } catch (IOException e2) {
                    TCPPortListener.this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-socket-error", this._s.getInetAddress().getHostAddress() + ":" + this._s.getPort()), e2);
                }
            }

            private void handle(InputStream inputStream) throws IOException, ClassNotFoundException {
                Serialization.ClassResolvingObjectInputStream classResolvingObjectInputStream = new Serialization.ClassResolvingObjectInputStream(inputStream);
                if (classResolvingObjectInputStream.readLong() != TCPRemoteCommitProvider.PROTOCOL_VERSION && TCPPortListener.this._log.isWarnEnabled()) {
                    TCPPortListener.this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-wrong-version-error", this._s.getInetAddress().getHostAddress() + ":" + this._s.getPort()));
                    return;
                }
                long readLong = classResolvingObjectInputStream.readLong();
                int readInt = classResolvingObjectInputStream.readInt();
                byte[] bArr = (byte[]) classResolvingObjectInputStream.readObject();
                RemoteCommitEvent remoteCommitEvent = (RemoteCommitEvent) classResolvingObjectInputStream.readObject();
                if (TCPPortListener.this._log.isTraceEnabled()) {
                    TCPPortListener.this._log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-received-event", this._s.getInetAddress().getHostAddress() + ":" + this._s.getPort()));
                }
                boolean z = readInt == TCPPortListener.this._port && Arrays.equals(bArr, TCPPortListener.this._localhost);
                synchronized (TCPPortListener.this._providers) {
                    for (TCPRemoteCommitProvider tCPRemoteCommitProvider : TCPPortListener.this._providers) {
                        if (readLong != tCPRemoteCommitProvider._id || !z) {
                            tCPRemoteCommitProvider.eventManager.fireEvent(remoteCommitEvent);
                        }
                    }
                }
            }
        }

        private TCPPortListener(int i, Log log) throws IOException {
            this._receiverThreads = new HashSet();
            this._providers = new HashSet();
            this._isRunning = false;
            this._port = i;
            this._log = log;
            try {
                this._receiveSocket = (ServerSocket) AccessController.doPrivileged(J2DoPrivHelper.newServerSocketAction(this._port));
                this._localhost = InetAddress.getLocalHost().getAddress();
                if (this._log.isTraceEnabled()) {
                    this._log.info(TCPRemoteCommitProvider.s_loc.get("tcp-start-listener", String.valueOf(this._port)));
                }
            } catch (PrivilegedActionException e) {
                throw ((IOException) e.getException());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void listen() {
            this._acceptThread = new Thread(this);
            this._acceptThread.setDaemon(true);
            this._acceptThread.start();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addProvider(TCPRemoteCommitProvider tCPRemoteCommitProvider) {
            synchronized (this._providers) {
                this._providers.add(tCPRemoteCommitProvider);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void removeProvider(TCPRemoteCommitProvider tCPRemoteCommitProvider) {
            synchronized (this._providers) {
                this._providers.remove(tCPRemoteCommitProvider);
                if (this._providers.size() == 0) {
                    this._isRunning = false;
                    try {
                        this._receiveSocket.close();
                    } catch (IOException e) {
                        if (this._log.isWarnEnabled()) {
                            this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-error"), e);
                        }
                    }
                    this._acceptThread.interrupt();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isRunning() {
            boolean z;
            synchronized (this._providers) {
                z = this._isRunning;
            }
            return z;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this._providers) {
                this._isRunning = true;
            }
            Socket socket = null;
            while (this._isRunning) {
                try {
                    socket = (Socket) AccessController.doPrivileged(J2DoPrivHelper.acceptAction(this._receiveSocket));
                    if (this._log.isTraceEnabled()) {
                        this._log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-received-connection", socket.getInetAddress().getHostAddress() + ":" + socket.getPort()));
                    }
                    Thread thread = new Thread(new ReceiveSocketHandler(socket));
                    thread.setDaemon(true);
                    thread.start();
                    this._receiverThreads.add(thread);
                } catch (Exception e) {
                    e = e;
                    if (e instanceof PrivilegedActionException) {
                        e = ((PrivilegedActionException) e).getException();
                    }
                    if ((!(e instanceof SocketException) || this._isRunning) && this._log.isWarnEnabled()) {
                        this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-accept-error"), e);
                    }
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e2) {
                            if (this._log.isWarnEnabled()) {
                                this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-error"), e);
                            }
                        }
                    }
                }
            }
            Iterator it = this._receiverThreads.iterator();
            while (it.hasNext()) {
                ((Thread) it.next()).interrupt();
            }
            synchronized (this._providers) {
                try {
                    if (this._isRunning) {
                        this._receiveSocket.close();
                    }
                } catch (Exception e3) {
                    if (this._log.isWarnEnabled()) {
                        this._log.warn(TCPRemoteCommitProvider.s_loc.get("tcp-close-error"), e3);
                    }
                }
                this._isRunning = false;
                if (this._log.isTraceEnabled()) {
                    this._log.trace(TCPRemoteCommitProvider.s_loc.get("tcp-close-listener", this._port + ""));
                }
            }
        }
    }

    public TCPRemoteCommitProvider() throws UnknownHostException {
        synchronized (TCPRemoteCommitProvider.class) {
            long j = s_idSequence;
            s_idSequence = j + 1;
            this._id = j;
        }
        this._localhost = InetAddress.getLocalHost().getAddress();
        this._addressesLock = new ReentrantLock();
        setNumBroadcastThreads(2);
    }

    public int getPort() {
        return this._port;
    }

    public void setPort(int i) {
        this._port = i;
    }

    public void setRecoveryTimeMillis(int i) {
        this._recoveryTimeMillis = i;
    }

    public int getRecoveryTimeMillis() {
        return this._recoveryTimeMillis;
    }

    public void setMaxActive(int i) {
        this._maxActive = i;
    }

    public int getMaxActive() {
        return this._maxActive;
    }

    public void setMaxIdle(int i) {
        this._maxIdle = i;
    }

    public int getMaxIdle() {
        return this._maxIdle;
    }

    public void setNumBroadcastThreads(int i) {
        synchronized (this._broadcastThreads) {
            int size = this._broadcastThreads.size();
            if (size > i) {
                for (int i2 = i; i2 < size; i2++) {
                    ((BroadcastWorkerThread) this._broadcastThreads.remove(0)).setRunning(false);
                }
            } else if (size < i) {
                for (int i3 = size; i3 < i; i3++) {
                    BroadcastWorkerThread broadcastWorkerThread = new BroadcastWorkerThread();
                    broadcastWorkerThread.setDaemon(true);
                    broadcastWorkerThread.start();
                    this._broadcastThreads.add(broadcastWorkerThread);
                }
            }
        }
    }

    public int getNumBroadcastThreads() {
        return this._broadcastThreads.size();
    }

    public void setAddresses(String str) throws UnknownHostException {
        String str2;
        int i;
        this._addressesLock.lock();
        try {
            try {
                Iterator it = this._addresses.iterator();
                while (it.hasNext()) {
                    ((HostAddress) it.next()).close();
                }
                String[] split = Strings.split(str, ";", 0);
                this._addresses = new ArrayList(split.length);
                String hostName = InetAddress.getLocalHost().getHostName();
                for (String str3 : split) {
                    int indexOf = str3.indexOf(58);
                    if (indexOf != -1) {
                        str2 = str3.substring(0, indexOf);
                        i = Integer.parseInt(str3.substring(indexOf + 1));
                    } else {
                        str2 = str3;
                        i = DEFAULT_PORT;
                    }
                    InetAddress inetAddress = (InetAddress) AccessController.doPrivileged(J2DoPrivHelper.getByNameAction(str2));
                    if (!hostName.equals(str2)) {
                        HostAddress hostAddress = new HostAddress(str3);
                        this._addresses.add(hostAddress);
                        if (this.log.isTraceEnabled()) {
                            this.log.trace(s_loc.get("tcp-address-set", hostAddress._address.getHostName() + ":" + hostAddress._port));
                        }
                    } else if (this.log.isTraceEnabled()) {
                        this.log.trace(s_loc.get("tcp-address-asself", inetAddress.getHostName() + ":" + i));
                    }
                }
            } catch (PrivilegedActionException e) {
                throw ((UnknownHostException) e.getException());
            }
        } finally {
            this._addressesLock.unlock();
        }
    }

    @Override // org.apache.openjpa.event.AbstractRemoteCommitProvider, org.apache.openjpa.lib.conf.Configurable
    public void endConfiguration() {
        super.endConfiguration();
        synchronized (s_portListenerMap) {
            this._listener = (TCPPortListener) s_portListenerMap.get(String.valueOf(this._port));
            if (this._listener == null || (!this._listener.isRunning() && this._listener._port == this._port)) {
                try {
                    this._listener = new TCPPortListener(this._port, this.log);
                    this._listener.listen();
                    s_portListenerMap.put(String.valueOf(this._port), this._listener);
                } catch (Exception e) {
                    throw new GeneralException(s_loc.get("tcp-init-exception", String.valueOf(this._port)), e).setFatal(true);
                }
            } else {
                if (!this._listener.isRunning()) {
                    throw new InternalException(s_loc.get("tcp-listener-broken"));
                }
                if (this._listener._port != this._port) {
                    throw new GeneralException(s_loc.get("tcp-not-equal", String.valueOf(this._port))).setFatal(true);
                }
            }
            this._listener.addProvider(this);
        }
        this._addressesLock.lock();
        try {
            Iterator it = this._addresses.iterator();
            while (it.hasNext()) {
                HostAddress hostAddress = (HostAddress) it.next();
                hostAddress.setMaxActive(this._maxActive);
                hostAddress.setMaxIdle(this._maxIdle);
            }
        } finally {
            this._addressesLock.unlock();
        }
    }

    @Override // org.apache.openjpa.event.RemoteCommitProvider
    public void broadcast(RemoteCommitEvent remoteCommitEvent) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeLong(PROTOCOL_VERSION);
            objectOutputStream.writeLong(this._id);
            objectOutputStream.writeInt(this._port);
            objectOutputStream.writeObject(this._localhost);
            objectOutputStream.writeObject(remoteCommitEvent);
            objectOutputStream.flush();
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.close();
            if (this._broadcastThreads.isEmpty()) {
                sendUpdatePacket(byteArray);
            } else {
                this._broadcastQueue.addPacket(byteArray);
            }
        } catch (IOException e) {
            if (this.log.isWarnEnabled()) {
                this.log.warn(s_loc.get("tcp-payload-create-error"), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendUpdatePacket(byte[] bArr) {
        this._addressesLock.lock();
        try {
            Iterator it = this._addresses.iterator();
            while (it.hasNext()) {
                ((HostAddress) it.next()).sendUpdatePacket(bArr);
            }
        } finally {
            this._addressesLock.unlock();
        }
    }

    @Override // org.apache.openjpa.event.RemoteCommitProvider, org.apache.openjpa.lib.util.Closeable
    public void close() {
        if (this._listener != null) {
            this._listener.removeProvider(this);
        }
        this._broadcastQueue.close();
        while (!this._broadcastThreads.isEmpty()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
        }
        this._addressesLock.lock();
        try {
            Iterator it = this._addresses.iterator();
            while (it.hasNext()) {
                ((HostAddress) it.next()).close();
            }
        } finally {
            this._addressesLock.unlock();
        }
    }
}
