/*
 * Decompiled with CFR 0.152.
 */
package io.aether.net.fastMeta.nio;

import io.aether.logger.LNode;
import io.aether.logger.Log;
import io.aether.net.fastMeta.DeserializerPackNumber;
import io.aether.net.fastMeta.FastApiContext;
import io.aether.net.fastMeta.FastFutureContext;
import io.aether.net.fastMeta.FastMetaApi;
import io.aether.net.fastMeta.FastMetaNet;
import io.aether.net.fastMeta.RemoteApi;
import io.aether.net.fastMeta.SerializerPackNumber;
import io.aether.net.fastMeta.nio.NioEventHandler;
import io.aether.net.fastMeta.nio.NioReactor;
import io.aether.utils.dataio.DataIn;
import io.aether.utils.dataio.DataInOut;
import io.aether.utils.dataio.DataInOutStatic;
import io.aether.utils.dataio.DataOut;
import io.aether.utils.futures.AFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

class NioTcpConnection<LT, RT extends RemoteApi>
implements FastMetaNet.Connection<LT, RT>,
NioEventHandler {
    private final NioReactor reactor;
    private final SocketChannel channel;
    private final FastMetaApi<LT, ?> localApiMeta;
    private final Consumer<NioTcpConnection<LT, RT>> closeCallback;
    private final LNode logContext;
    private final AtomicBoolean isWritableFlag = new AtomicBoolean(true);
    private final AtomicBoolean closing = new AtomicBoolean(false);
    private final FastApiContext context;
    private final RT remoteApi;
    private LT localApi;
    private SelectionKey selectionKey;
    private final ByteBuffer readBuffer = ByteBuffer.allocate(65536);
    private final DataInOut sizeBuffer = new DataInOut(16);
    private ReadState readState = ReadState.READ_SIZE;
    private int payloadSize = -1;
    private final Queue<WriteRequest> writeQueue = new ConcurrentLinkedQueue<WriteRequest>();

    NioTcpConnection(NioReactor reactor, SocketChannel channel, FastMetaApi<LT, ?> localApiMeta, FastMetaApi<?, RT> remoteApiMeta, Consumer<NioTcpConnection<LT, RT>> closeCallback) {
        this.reactor = reactor;
        this.channel = channel;
        this.localApiMeta = localApiMeta;
        this.closeCallback = closeCallback;
        String remoteAddr = "unknown";
        try {
            remoteAddr = channel.getRemoteAddress().toString();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        this.logContext = Log.of((Object[])new Object[]{"remote", remoteAddr, "socket", "server nio"});
        this.context = new FastApiContext(){

            public void flush(AFuture sendFuture) {
                NioTcpConnection.this.handleFlush(sendFuture);
            }
        };
        this.remoteApi = remoteApiMeta.makeRemote((FastFutureContext)this.context);
    }

    void setLocalApi(LT localApi) {
        this.localApi = localApi;
    }

    public LNode getLogContext() {
        return this.logContext;
    }

    @Override
    public void handleEvent(SelectionKey key) {
        this.selectionKey = key;
        try (LNode.AutoCloseable _l = this.logContext.context();){
            if (!key.isValid()) {
                this.closeConnection(new IOException("Invalid selection key during event handling"));
                return;
            }
            if (key.isWritable()) {
                this.handleWrite();
            }
            if (key.isValid() && key.isReadable()) {
                this.handleRead();
            }
        }
        catch (Exception e) {
            Log.warn((String)"Error during event handling, closing connection", (Throwable)e, (Object[])new Object[0]);
            this.closeConnection(e);
        }
    }

    private void handleRead() {
        int bytesRead;
        try {
            bytesRead = this.channel.read(this.readBuffer);
        }
        catch (IOException e) {
            Log.warn((String)"Read error", (Throwable)e, (Object[])new Object[0]);
            this.closeConnection(e);
            return;
        }
        if (bytesRead == -1) {
            Log.info((String)"Remote closed connection (End of Stream)", (LNode[])new LNode[0]);
            this.closeConnection(new IOException("End of Stream"));
            return;
        }
        if (bytesRead > 0) {
            this.readBuffer.flip();
            this.processReadBuffer();
            this.readBuffer.compact();
        }
    }

    private void processReadBuffer() {
        while (this.readBuffer.hasRemaining() && (this.readState != ReadState.READ_SIZE || this.tryReadPacketSize())) {
            if (this.readState != ReadState.READ_PAYLOAD) continue;
            if (this.readBuffer.remaining() < this.payloadSize) break;
            byte[] payload = new byte[this.payloadSize];
            this.readBuffer.get(payload);
            this.dispatchPacket(payload);
            this.readState = ReadState.READ_SIZE;
            this.payloadSize = -1;
            this.sizeBuffer.clear();
        }
    }

    private boolean tryReadPacketSize() {
        int initialReadPos = this.readBuffer.position();
        while (this.readBuffer.hasRemaining()) {
            byte b = this.readBuffer.get();
            this.sizeBuffer.writeByte((int)b);
            DataInOutStatic sizeView = new DataInOutStatic(this.sizeBuffer.getData(), 0, this.sizeBuffer.getWritePos());
            try {
                Number sizeNum = DeserializerPackNumber.INSTANCE.put((DataIn)sizeView);
                this.payloadSize = sizeNum.intValue();
                this.readState = ReadState.READ_PAYLOAD;
                this.sizeBuffer.clear();
                return true;
            }
            catch (IndexOutOfBoundsException sizeNum) {
            }
            catch (Exception e) {
                Log.error((String)"Fatal error parsing packet size", (Throwable)e, (Object[])new Object[0]);
                this.closeConnection(e);
                return false;
            }
        }
        this.readBuffer.position(initialReadPos);
        return false;
    }

    private void dispatchPacket(byte[] payload) {
        try {
            DataInOutStatic in = new DataInOutStatic(payload);
            this.localApiMeta.makeLocal((FastFutureContext)this.context, (DataIn)in, this.localApi);
        }
        catch (Exception e) {
            Log.error((String)"Error during makeLocal (packet processing). Closing connection.", (Throwable)e, (Object[])new Object[0]);
            this.closeConnection(e);
        }
    }

    private void handleWrite() {
        WriteRequest writeRequest = this.writeQueue.peek();
        while (writeRequest != null) {
            try {
                this.channel.write(writeRequest.buffer);
            }
            catch (IOException e) {
                Log.warn((String)"Write error, closing connection", (Throwable)e, (Object[])new Object[0]);
                writeRequest.future.tryCancel();
                this.closeConnection(e);
                return;
            }
            if (writeRequest.buffer.hasRemaining()) {
                this.isWritableFlag.set(false);
                return;
            }
            this.writeQueue.poll();
            writeRequest.future.tryDone();
            writeRequest = this.writeQueue.peek();
        }
        this.isWritableFlag.set(true);
        if (this.selectionKey.isValid() && this.writeQueue.isEmpty()) {
            this.selectionKey.interestOps(this.selectionKey.interestOps() & 0xFFFFFFFB);
        }
    }

    private void handleFlush(AFuture sendFuture) {
        try (LNode.AutoCloseable _l = this.logContext.context();){
            if (this.closing.get()) {
                Log.trace((String)"Flush called on closing connection, cancelling future.", (LNode[])new LNode[0]);
                sendFuture.tryCancel();
                return;
            }
            DataInOut combinedData = new DataInOut();
            this.context.remoteDataToArray((DataOut)combinedData);
            if (combinedData.getSizeForRead() == 0) {
                if (!this.writeQueue.isEmpty()) {
                    Log.trace((String)"Flush called with empty context data but non-empty write queue. Linking future.", (LNode[])new LNode[0]);
                    WriteRequest lastRequest = null;
                    Iterator iterator = this.writeQueue.iterator();
                    while (iterator.hasNext()) {
                        WriteRequest req;
                        lastRequest = req = (WriteRequest)iterator.next();
                    }
                    if (lastRequest != null) {
                        lastRequest.future.to(sendFuture);
                    } else {
                        sendFuture.tryDone();
                    }
                } else {
                    sendFuture.tryDone();
                }
                return;
            }
            byte[] payload = combinedData.toArray();
            DataInOut packet = new DataInOut(payload.length + 10);
            SerializerPackNumber.INSTANCE.put((DataOut)packet, payload.length);
            packet.write(payload);
            ByteBuffer bufferToSend = ByteBuffer.wrap(packet.toArray());
            WriteRequest writeRequest = new WriteRequest(bufferToSend, sendFuture);
            this.writeQueue.add(writeRequest);
            if (this.selectionKey != null && this.selectionKey.isValid()) {
                this.reactor.addTask(() -> {
                    try (LNode.AutoCloseable _l_task = this.logContext.context();){
                        int currentOps;
                        if (this.selectionKey.isValid() && ((currentOps = this.selectionKey.interestOps()) & 4) == 0) {
                            this.selectionKey.interestOps(currentOps | 4);
                        }
                    }
                });
            } else {
                Log.warn((String)"SelectionKey invalid during flush scheduling, cancelling future.", (LNode[])new LNode[0]);
                sendFuture.tryCancel();
                this.clearWriteQueueAndCancelFutures();
            }
        }
    }

    @Override
    public void closeConnection(Exception e) {
        if (!this.closing.compareAndSet(false, true)) {
            return;
        }
        LNode reasonNode = LNode.of((Object[])new Object[]{"reason", e != null ? e.getMessage() : "shutdown"});
        Log.info((String)"Closing connection", (LNode[])new LNode[]{this.logContext, reasonNode});
        this.clearWriteQueueAndCancelFutures();
        try {
            this.closeCallback.accept(this);
        }
        catch (Exception cbEx) {
            Log.error((String)"Exception in close callback", (Throwable)cbEx, (Object[])new Object[]{this.logContext});
        }
        this.reactor.addTask(() -> {
            if (this.selectionKey != null) {
                this.selectionKey.cancel();
            }
            try {
                this.channel.close();
            }
            catch (IOException ex) {
                Log.warn((String)"Error closing socket channel", (Throwable)ex, (Object[])new Object[]{this.logContext});
            }
        });
    }

    private void clearWriteQueueAndCancelFutures() {
        WriteRequest req;
        Log.trace((String)"Clearing write queue and cancelling pending futures.", (LNode)this.logContext);
        while ((req = this.writeQueue.poll()) != null) {
            req.future.tryCancel();
        }
        this.isWritableFlag.set(false);
    }

    public void read() {
    }

    public void stopRead() {
    }

    public AFuture write(byte[] data) {
        try (LNode.AutoCloseable _l = this.logContext.context();){
            if (this.closing.get()) {
                Log.warn((String)"Attempting write on a closing connection.", (LNode)this.logContext);
                AFuture aFuture = AFuture.canceled();
                return aFuture;
            }
            Log.warn((String)"Direct write() called on connection, framing data...", (LNode)this.logContext);
            DataInOut packet = new DataInOut(data.length + 10);
            SerializerPackNumber.INSTANCE.put((DataOut)packet, data.length);
            packet.write(data);
            AFuture writeFuture = AFuture.make();
            ByteBuffer bufferToSend = ByteBuffer.wrap(packet.toArray());
            WriteRequest writeRequest = new WriteRequest(bufferToSend, writeFuture);
            this.writeQueue.add(writeRequest);
            if (this.selectionKey != null && this.selectionKey.isValid()) {
                this.reactor.addTask(() -> {
                    try (LNode.AutoCloseable _l_task = this.logContext.context();){
                        int currentOps;
                        if (this.selectionKey.isValid() && ((currentOps = this.selectionKey.interestOps()) & 4) == 0) {
                            this.selectionKey.interestOps(currentOps | 4);
                        }
                    }
                });
            } else {
                Log.warn((String)"SelectionKey invalid during direct write scheduling, cancelling future.", (LNode[])new LNode[0]);
                writeFuture.tryCancel();
            }
            AFuture aFuture = writeFuture;
            return aFuture;
        }
    }

    public LT getLocalApi() {
        return this.localApi;
    }

    public RT getRemoteApi() {
        return this.remoteApi;
    }

    public boolean isWritable() {
        return this.isWritableFlag.get() && !this.closing.get();
    }

    public FastFutureContext getMetaContext() {
        return this.context;
    }

    public AFuture destroy(boolean force) {
        this.closeConnection(new IOException("Destroy called" + (force ? " (forced)" : "")));
        return AFuture.completed();
    }

    private static enum ReadState {
        READ_SIZE,
        READ_PAYLOAD;

    }

    private static class WriteRequest {
        final ByteBuffer buffer;
        final AFuture future;

        WriteRequest(ByteBuffer buffer, AFuture future) {
            this.buffer = buffer;
            this.future = future;
        }
    }
}

