import {
    wsconnect,
    NatsConnection,
    Subscription,
    QueuedIterator,
    RequestOptions,
    MsgHdrsImpl,
    PublishOptions,
    Msg,
} from '@nats-io/nats-core'
import {jetstream, jetstreamManager, DeliverPolicy, AckPolicy, ConsumerMessages} from '@nats-io/jetstream'
import {Kvm, KV, KvEntry, KvWatchInclude} from '@nats-io/kv'
import {BucketNames, WatchKeys, TBucketUserInfo, TConsume} from '@component/nats/type'
import {NatsJetstreamPublishError, NatsJetstreamResCode} from '@component/nats/error'

import {getCookie} from '@util/cookie'
import {TChartIndicators} from '@api/chart/chart'

import {useNatsConnectionStore} from './connection'

const JETSTREAM_SERVERS = [
    'wss://stream1.leagueoftraders.io:8888',
    'wss://stream2.leagueoftraders.io:8888',
    'wss://stream3.leagueoftraders.io:8888',
]

type NatsState = 'connecting' | 'connected' | 'disconnected'
const BATCH_SIZE = 200

class NatsClient {
    private static instance: NatsClient
    private conn: NatsConnection | null = null
    private subscriptions: Map<string, Subscription> = new Map()
    private watchers: Map<string, QueuedIterator<KvEntry>> = new Map()
    private consumers: Map<string, ConsumerMessages> = new Map()
    private natsState: NatsState = 'disconnected'

    private constructor() {}

    public static getInstance(): NatsClient {
        if (!NatsClient.instance) {
            NatsClient.instance = new NatsClient()
        }
        return NatsClient.instance
    }

    // ------- NATS -------

    // Connect to NATS
    public async connect() {
        if (this.conn || this.natsState !== 'disconnected') {
            return
        }
        this.natsState = 'connecting'
        try {
            this.conn = await wsconnect({
                reconnect: true,
                pingInterval: 10_000, // 10 seconds
                reconnectTimeWait: 3000, // every 3 seconds
                maxReconnectAttempts: 100, // try 100 times
                servers: JETSTREAM_SERVERS,
                token: getCookie('userToken') || '',
                // debug: process.env.NODE_ENV !== 'production',
            })

            this.natsState = 'connected'
            useNatsConnectionStore.getState().setIsConnected(true)
        } catch (e) {
            console.log(e)
            await this.disconnect()
            useNatsConnectionStore.getState().setIsConnected(false)
        }
    }

    // Subscribe to NATS
    public async subscribe(id: string, subject: string, callback: (msg: Msg) => void) {
        if (!this.conn) return
        try {
            const sub = this.conn.subscribe(subject)
            this.subscriptions.set(id, sub)
            for await (const msg of sub) {
                callback(msg)
            }
        } catch (e) {
            console.log('nats subscribe error: ', e)
        }
    }

    // Unsubscribe to NATS
    public async unsubscribe(id: string) {
        const sub = this.subscriptions.get(id)
        if (!sub) {
            return
        }
        sub.unsubscribe()
        this.subscriptions.delete(id)
    }

    // Publish to NATS
    public async publish(subject: string, data: Uint8Array | string, options?: PublishOptions) {
        if (!this.conn) return
        try {
            this.conn.publish(subject, data, options)
        } catch (e) {
            console.log('nats publish error: ', e)
        }
    }

    // ------- JetStream -------

    // Connect to JetStream
    public async consume(param: TConsume) {
        if (!this.conn) return

        const {start_seq, batch_size = BATCH_SIZE} = param.options || {}

        const jsm = await jetstreamManager(this.conn)
        const stream = await jsm.streams.find(param.subject)
        const streamInfo = await jsm.streams.info(stream)

        const consumerInfo = await jsm.consumers.add(stream, {
            ack_policy: AckPolicy.Explicit,
            deliver_policy: DeliverPolicy.StartSequence,
            opt_start_seq: start_seq || streamInfo.state.last_seq - batch_size,
            filter_subject: param.subject,
        })

        const js = jetstream(this.conn, {timeout: 30_000})
        const fetcher = await js.consumers.get(stream, consumerInfo.name)

        const consumer = await fetcher.consume({
            idle_heartbeat: 10_000,
        })
        this.consumers.set(param.id, consumer)

        const jetstreamStatusMonitor = new Promise<void>(async (_, reject) => {
            for await (const s of consumer.status()) {
                switch (s.type) {
                    case 'heartbeats_missed':
                    case 'consumer_not_found':
                        console.log(`Consumer status error: ${s.type}`)
                        await this.close(param.id)
                        reject(new Error(`Consumer status error: ${s.type}`))
                        break
                }
            }
        })

        const messageConsumer = new Promise<any[]>(async resolve => {
            for await (const msg of consumer) {
                param.callback(msg)
                await msg.ack()
            }
            return resolve([])
        })

        return Promise.race([jetstreamStatusMonitor, messageConsumer])
    }

    public async close(id: string) {
        const consumer = this.consumers.get(id)
        if (!consumer) {
            return
        }
        if (typeof consumer === 'boolean') return
        void consumer.close()
        this.consumers.delete(id)
    }

    // Publish to JetStream
    public async publishMsgRequest(
        subject: string,
        data: Uint8Array | string,
        options?: Partial<RequestOptions>,
        onError?: (error: NatsJetstreamPublishError) => void,
    ) {
        if (!this.conn) return
        try {
            if (!options) {
                options = {}
                options.headers = new MsgHdrsImpl()
                options.headers.append('lan', getCookie('language') || 'en')
            }
            const res = await this.conn.request(subject, data, options as RequestOptions)
            if (res.data.length <= 5) {
                return //success it's just +ack returned
            }

            const msgReply = JSON.parse(Buffer.from(res.data).toString('utf-8'))
            if (msgReply.code && msgReply.code !== NatsJetstreamResCode.SUCCESS) {
                throw new NatsJetstreamPublishError(msgReply.code, msgReply.message)
            }
        } catch (error) {
            if (onError && error instanceof NatsJetstreamPublishError) {
                onError(error)
            } else {
                console.error('Jetstream publish error: ', error.error)
            }
        }
    }

    // ------- KV -------

    // Get JetStream bucket
    public async getBucket(name: string): Promise<KV> {
        if (!this.conn) return
        const kvm = new Kvm(this.conn)
        return await kvm.create(name)
    }

    // Watch JetStream bucket
    public async watch(id: string, bucketName: string, key: string, callback: (value: Uint8Array) => void) {
        try {
            const bucket = await this.getBucket(bucketName)
            if (!bucket) return
            const watcher = await bucket.watch({key, include: KvWatchInclude.UpdatesOnly})
            this.watchers.set(id, watcher)
            for await (const msg of watcher) {
                callback(msg.value)
            }
        } catch (e) {
            console.log('JetStream watch error: ', e)
        }
    }

    // Stop watching JetStream bucket
    public async stop(id: string) {
        const watcher = this.watchers.get(id)
        if (!watcher) return
        watcher.stop()
        this.watchers.delete(id)
    }

    // Get User Info from JetStream bucket
    public async getUserInfo(userID: string): Promise<TBucketUserInfo> {
        const bucket = await this.getBucket(BucketNames.UserInfo)
        const info = await bucket?.get(userID)
        return info?.json()
    }

    // Get Chart Indicator Info from JetStream bucket
    public async getChartIndicatorInfo(): Promise<TChartIndicators> {
        const bucket = await this.getBucket(BucketNames.LiveChart)
        const info = await bucket?.get(WatchKeys.Indicator)
        return info?.json()
    }

    public async disconnect() {
        this.natsState = 'disconnected'
        void this.conn?.drain()
        void this.conn?.close()
        this.conn = null
        this.subscriptions.clear()
        this.watchers.clear()
        this.consumers.clear()
        useNatsConnectionStore.getState().setIsConnected(false)
    }
}

export const natsClient = NatsClient.getInstance()
