/*
 * Decompiled with CFR 0.152.
 */
package hep.io.root.daemon.xrootd;

import hep.io.root.daemon.xrootd.Destination;
import hep.io.root.daemon.xrootd.Message;
import hep.io.root.daemon.xrootd.MultiplexorMBean;
import hep.io.root.daemon.xrootd.Response;
import hep.io.root.daemon.xrootd.ResponseListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.BitSet;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

class Multiplexor
implements MultiplexorMBean {
    private static final int MAX_IDLE = Integer.getInteger("hep.io.root.daemon.xrootd.ConnectionTimeout", 60000);
    private static final int SEND_BUFFER_SIZE = Integer.getInteger("hep.io.root.daemon.xrootd.SendBufferSize", 65536);
    private static final int RECEIVE_BUFFER_SIZE = Integer.getInteger("hep.io.root.daemon.xrootd.ReceivedBufferSize", 65536);
    private static Logger logger = Logger.getLogger(Multiplexor.class.getName());
    private Destination descriptor;
    private SocketChannel channel;
    private Response response;
    private BitSet handles = new BitSet();
    private Thread thread;
    private Map<Short, ResponseListener> responseMap = new HashMap<Short, ResponseListener>();
    private boolean socketClosed = false;
    private long bytesSent;
    private long bytesReceived;
    private Date createDate = new Date();
    private Date lastActive = new Date();
    private int pval;
    private int flag;

    Multiplexor(Destination desc) throws IOException {
        logger.fine(desc + " Creating multiplexor");
        this.descriptor = desc;
        this.channel = SocketChannel.open();
        this.channel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
        this.channel.socket().setSendBufferSize(SEND_BUFFER_SIZE);
        this.thread = new Thread((Runnable)new SocketReader(), "XrootdReader-" + this);
        this.thread.setDaemon(true);
        this.response = new Response(this, this.channel);
    }

    void connect(ResponseListener listener) {
        this.addListener(listener);
        this.thread.start();
    }

    void handleInitialHandshakeResponse(Response response) throws IOException {
        if (response.getLength() != 8) {
            throw new IOException("Unexpected initial handshake length");
        }
        this.pval = response.readInt();
        this.flag = response.readInt();
    }

    private void sendInitialHandshake() throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(20);
        buffer.putInt(12, 4);
        buffer.putInt(16, 2012);
        this.bytesSent += (long)this.channel.write(buffer);
    }

    boolean isSocketClosed() {
        return this.socketClosed;
    }

    public long getBytesReceived() {
        return this.bytesReceived;
    }

    public long getBytesSent() {
        return this.bytesSent;
    }

    public Date getCreateDate() {
        return this.createDate;
    }

    public String getUserName() {
        return this.descriptor.getUserName();
    }

    public String getHostAndPort() {
        return this.descriptor.getAddressAndPort();
    }

    public Date getLastActive() {
        return this.lastActive;
    }

    public int getOutstandingResponseCount() {
        return this.handles.cardinality();
    }

    public long getIdleTime() {
        return System.currentTimeMillis() - this.lastActive.getTime();
    }

    public int getProtocolVersion() {
        return this.pval;
    }

    public int getServerFlag() {
        return this.flag;
    }

    boolean isIdle() {
        return this.getOutstandingResponseCount() == 0 && this.getIdleTime() > (long)MAX_IDLE;
    }

    Destination getDestination() {
        return this.descriptor;
    }

    void sendMessage(Message message, ResponseListener listener) throws IOException {
        short id = this.addListener(listener);
        try {
            this.sendMessage(id, message);
        }
        catch (IOException x) {
            this.removeListener(id);
            throw x;
        }
    }

    void close() {
        this.socketClosed = true;
        try {
            if (this.channel.isConnected()) {
                this.channel.close();
            }
        }
        catch (IOException x) {
            logger.log(Level.WARNING, "Error during socket close", x);
        }
    }

    public String toString() {
        return this.descriptor.toString() + ";" + this.channel.socket().getLocalPort();
    }

    private synchronized short addListener(ResponseListener listener) {
        short handle = (short)this.handles.nextClearBit(0);
        this.handles.set(handle);
        this.responseMap.put(handle, listener);
        return handle;
    }

    private synchronized void removeListener(short id) {
        this.responseMap.remove(id);
        this.handles.clear(id);
    }

    private void sendMessage(short id, Message message) throws IOException {
        this.bytesSent += (long)message.send(id, this.channel);
        this.lastActive.setTime(System.currentTimeMillis());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleResponse() throws IOException {
        ResponseListener handler;
        int status = this.response.getStatus();
        Short handle = this.response.getHandle();
        this.lastActive.setTime(System.currentTimeMillis());
        Multiplexor multiplexor = this;
        synchronized (multiplexor) {
            handler = this.responseMap.get(handle);
        }
        if (handler == null && status != 4001) {
            if (status == 4003) {
                int rc = this.response.readInt();
                String message = this.response.getDataAsString();
                logger.log(Level.SEVERE, this + " Out-of-band error " + rc + ": " + message);
                return;
            }
            throw new IOException(this + " No handler found for handle " + handle + " (status=" + status + ")");
        }
        switch (status) {
            case 4003: {
                int rc = this.response.readInt();
                String message = this.response.getDataAsString();
                handler.handleError(new IOException("Xrootd error " + rc + ": " + message));
                this.removeListener(handle);
                break;
            }
            case 4005: {
                int seconds = this.response.readInt();
                String message = this.response.getDataAsString();
                logger.info(this + " wait: " + message + " seconds=" + seconds);
                handler.reschedule(seconds, TimeUnit.SECONDS);
                this.removeListener(handle);
                break;
            }
            case 4006: {
                int seconds = this.response.readInt();
                String message = this.response.getDataAsString();
                logger.fine(this + " waitresp: " + message + " seconds=" + seconds);
                break;
            }
            case 4004: {
                int port = this.response.readInt();
                String host = this.response.getDataAsString();
                logger.fine(this + " redirect: " + host + " " + port);
                handler.handleRedirect(host, port);
                this.removeListener(handle);
                break;
            }
            case 4001: {
                int code = this.response.readInt();
                if (code == 5008) {
                    this.response.readInt();
                    this.response.regurgitate();
                    this.handleResponse();
                    return;
                }
                throw new IOException("Xrootd: Unimplemented asycn message received: " + code);
            }
            case 0: 
            case 4000: {
                handler.handleResponse(this.response);
                if (!this.response.isComplete()) break;
                this.removeListener(handle);
                break;
            }
            default: {
                throw new IOException("Xrootd: Unimplemented status received: " + status);
            }
        }
    }

    private void handleSocketException(IOException x) {
        if (!this.socketClosed) {
            logger.log(Level.WARNING, this + " Unexpected IO exception on socket", x);
            this.close();
            for (ResponseListener listener : this.responseMap.values()) {
                logger.fine(this + " sending handleSocketError to " + listener);
                listener.handleSocketError(x);
            }
            this.responseMap.clear();
        }
    }

    private class SocketReader
    implements Runnable {
        private SocketReader() {
        }

        public void run() {
            try {
                Multiplexor.this.channel.connect(Multiplexor.this.descriptor.getSocketAddress());
                Multiplexor.this.sendInitialHandshake();
                while (!Multiplexor.this.thread.isInterrupted()) {
                    Multiplexor.this.bytesReceived = Multiplexor.this.bytesReceived + (long)Multiplexor.this.response.read();
                    Multiplexor.this.handleResponse();
                }
                logger.log(Level.FINE, this + " multiplexor thread exiting due to interrupt!");
            }
            catch (IOException x) {
                Multiplexor.this.handleSocketException(x);
            }
            catch (Throwable x) {
                logger.log(Level.SEVERE, this + " multiplexor thread dead!", x);
            }
        }
    }
}

