import config from '../config'
import {
  DefaultHttpClient,
  HubConnection,
  HubConnectionBuilder,
  ISubscription,
  LogLevel
} from '@microsoft/signalr'
import { Store } from 'redux'
import {
  BehaviorSubject,
  EMPTY,
  from,
  Observable,
  of,
  OperatorFunction,
  ReplaySubject,
  Subject,
  throwError
} from 'rxjs'
import * as Sentry from '@sentry/react'
import {
  catchError,
  filter,
  finalize,
  map,
  mergeMap,
  take,
  takeUntil
} from 'rxjs/operators'
import { getAuthToken } from '../store/auth/selectors'
import { addLogItem } from '../store/log/actions'
import {
  connectFailure,
  connectSuccess,
  forceReload
} from '../store/ws/actions'
import { hasError } from '../store/ws/selectors'

enum ResponseType {
  SUCCESS = 0,
  ERROR = 1,
  KEEPALIVE_TYPE = 2
}

const KEEPALIVE_FREQUENCY = 6000
const LATENCY_MAX_DELAY = 6000
const INVOKE_MAX_DELAY = 10_000

interface StreamMetadata {
  messageId: number
  timeoutOccurred: number
  asyncResponse: Subject<InSequenceResult>
}

const streamMetadata: Record<number, StreamMetadata> = {}

interface InSequenceResult {
  inSequence: boolean
  error: string | null
}

const isInSequence = (
  store: Store,
  connectionId: number,
  type: number,
  messageId: number,
  methodName: string
): Observable<InSequenceResult> => {
  const asyncResponse = new ReplaySubject<InSequenceResult>()
  let syncResponse: InSequenceResult = {
    inSequence: true,
    error: null
  }
  const safeMethods = ['GetNewOrderAlerts']
  if (streamMetadata[connectionId]) {
    const previousMetadata = streamMetadata[connectionId]
    clearTimeout(previousMetadata.timeoutOccurred)
    previousMetadata.asyncResponse.complete()

    const expectedId = previousMetadata.messageId + 1

    if (messageId > expectedId && !safeMethods.includes(methodName)) {
      syncResponse = {
        inSequence: false,
        error: `${methodName} responded with an invalid message ID. Type = ${type}, expected less than ${expectedId}, got ${messageId}.`
      }
    }
  }

  let timeoutOccurred: number | undefined
  if (syncResponse.inSequence) {
    timeoutOccurred = window.setTimeout(() => {
      handleTimeout(store, connectionId, methodName, timeoutOccurred!)
    }, KEEPALIVE_FREQUENCY + LATENCY_MAX_DELAY)

    streamMetadata[connectionId] = {
      messageId,
      timeoutOccurred,
      asyncResponse
    }
  }

  asyncResponse.next(syncResponse)
  return asyncResponse.pipe(
    finalize(() => {
      if (timeoutOccurred !== undefined) {
        clearTimeout(timeoutOccurred)
      }
    })
  )
}

const streamSequenceAndTime = (
  store: Store,
  methodName: string,
  connectionId: number,
  type: number,
  messageId: number,
  result: any,
  error: any
) => {
  return isInSequence(store, connectionId, type, messageId, methodName).pipe(
    map((inSequence) => {
      // Something went wrong with the messageId or the timing
      if (inSequence.error) {
        return {
          type: ResponseType.ERROR,
          result: null,
          error: inSequence.error
        }
      }

      // Keep alive messages.
      if (type === ResponseType.KEEPALIVE_TYPE) {
        return { type, result: null, error: null }
      }

      // If we have either, pass it on and let the error check in the mergeMap handle it
      return { type, result, error }
    })
  )
}

const fromHubStream =
  <T>(
    getConnection: () => HubConnection,
    methodName: string,
    ...params: any[]
  ): OperatorFunction<any, Response<T>> =>
  (source: Observable<any>): Observable<Response<T>> => {
    let hubSubscription: ISubscription<any> | undefined
    const streamDisposed = new Subject<void>()

    return source.pipe(
      mergeMap(() => {
        const subject$ = new Subject<Response<T>>()
        hubSubscription = getConnection()
          .stream<Response<T>>(methodName, ...params)
          .subscribe({
            next: (response) => subject$.next(response),
            error: (err) => subject$.error(err),
            complete: () => {
              // When the connection is closed by the server, complete the observable.
              subject$.complete()
              streamDisposed.next()
              streamDisposed.complete()

              hubSubscription = undefined
            }
          })
        return subject$.asObservable()
      }),
      takeUntil(streamDisposed),
      finalize(() => {
        if (hubSubscription) {
          // When the observable is completed, close the stream.
          hubSubscription.dispose()
        }
      })
    )
  }

interface InvokeMetadata {
  messageId: number
  timeoutOccurred: number
  asyncResponse: Subject<Response<any>>
}

let nextMessageId: number = 0
const invokeMetadata: Record<number, InvokeMetadata> = {}

interface SuccessResponse<T> {
  type: ResponseType.SUCCESS
  messageId: number
  result: T
  error: null
}
interface ErrorResponse {
  type: ResponseType.ERROR
  messageId: number
  result: null
  error: ErrorMessageContents
}
interface ErrorMessageContents {
  errorCode: string
  fatal: boolean
  message: string
}
interface KeepaliveResponse {
  type: ResponseType.KEEPALIVE_TYPE
  messageId: number
  result: null
  error: null
}

type Response<T> = SuccessResponse<T> | ErrorResponse | KeepaliveResponse

const handleError = (store: Store, returnResult: ErrorResponse) => {
  // @ts-ignore
  if (returnResult.error.fatal) {
    try {
      store.dispatch(
        addLogItem(
          'Skipping forced reload after fatal error: ' +
            JSON.stringify(returnResult)
        )
      )
    } catch (e: any) {
      // intentionally left blank
    }
    // store.dispatch(forceReload())
  }
  return throwError(returnResult.error)
}

const handleKeepalive = (store: Store, returnResult: KeepaliveResponse) => {
  if (hasError(store.getState())) {
    store.dispatch(connectSuccess())
  }
  return EMPTY
}

const handleSuccess = <T>(store: Store, returnResult: SuccessResponse<T>) => {
  if (hasError(store.getState())) {
    store.dispatch(connectSuccess())
  }
  return of(returnResult.result)
}

const handleTimeout = (
  store: Store,
  connectionId: number,
  methodName: string,
  timeoutOccurred: number
) => {
  const error = `Timeout occurred: connection ${connectionId}, method ${methodName}, timer ${timeoutOccurred}`
  // tslint:disable-next-line: no-console
  console.warn(`[${new Date().toLocaleTimeString()}] ${error}`)
  const currentState = store.getState()
  // make sure we flush the buffer to Sentry as soon as we start getting timeouts
  const replay = Sentry.getReplay()
  replay?.flush()
  if (!hasError(currentState)) {
    store.dispatch(connectFailure(error))
  }
}

const handleResult = <T>(store: Store) =>
  mergeMap((returnResult: Response<T>) => {
    switch (returnResult.type) {
      case ResponseType.SUCCESS:
        return handleSuccess(store, returnResult)
      case ResponseType.ERROR:
        return handleError(store, returnResult)
      case ResponseType.KEEPALIVE_TYPE:
        return handleKeepalive(store, returnResult)
    }
  })

const createHub = (store: Store) => {
  let connection: HubConnection
  let nextConnectionId: number = 0

  const connectedSubject$ = new BehaviorSubject<boolean>(false)
  const onConnected$ = connectedSubject$.pipe(
    filter((connected) => connected),
    take(1)
  )
  return {
    start: (url: string, accessToken: string) => {
      /*
        This class is used in doing the initial negotiation with the back end.
        The back end is set to read the request body for the access token, then
        move that to a cookie.
        It's not clear if this is needed under current SignalR version, but
        since we'd have to make both back end and front end line up and
        this isn't well-documented, leaving in place for now.
       */
      class CustomHttpClient extends DefaultHttpClient {
        constructor() {
          super(console) // the base class wants an signalR.ILogger, I'm not sure if you're supposed to put *the console* into it, but I did and it seemed to work
        }
        public async send(
          request: signalR.HttpRequest
        ): Promise<signalR.HttpResponse> {
          request.content = getAuthToken(store.getState()) ?? accessToken
          // Now we have manipulated the request how we want we can just call the base class method
          return super.send(request)
        }
      }
      connection = new HubConnectionBuilder()
        .configureLogging(LogLevel.Information)
        .withUrl(url, {
          httpClient: new CustomHttpClient(),
          accessTokenFactory: (): string | Promise<string> =>
            getAuthToken(store.getState()) ?? accessToken
        })
        .build()
      if (config.api.serverTimeoutInMilliseconds) {
        connection.serverTimeoutInMilliseconds =
          config.api.serverTimeoutInMilliseconds
      }
      const subject$ = new Subject<void>()
      connection.onclose((err) => {
        connectedSubject$.next(false)
        subject$.error(err)
        store.dispatch(
          addLogItem(
            'Forced reload because ' + err?.toString() ||
              '(no error message available)'
          )
        )
        store.dispatch(forceReload())
      })

      connection
        .start()
        .then(() => {
          subject$.next()
          connectedSubject$.next(true)
        })
        .catch((err) => {
          subject$.error(err)
          connectedSubject$.next(false)
        })

      return subject$.asObservable()
    },

    stream: <T>(methodName: string, ...params: any[]) => {
      const connectionId = nextConnectionId++
      const finalizeSubject = new Subject<void>()
      return onConnected$.pipe(
        // @ts-ignore
        fromHubStream<T>(() => connection, methodName, ...params),
        mergeMap(({ type, messageId, result, error }: Response<T>) =>
          streamSequenceAndTime(
            store,
            methodName,
            connectionId,
            type,
            messageId,
            result,
            error
          ).pipe(takeUntil(finalizeSubject))
        ),
        handleResult(store),
        catchError((err) => {
          if (
            err.message?.includes('Server timeout elapsed') ||
            err.message?.includes('WebSocket closed') ||
            err.message?.includes('Cannot send data')
          ) {
            store.dispatch(addLogItem('Forced reload because ' + err.message))
            store.dispatch(forceReload())
          } else {
            store.dispatch(
              addLogItem(
                'Error occurred but did not force reload (will cause timeout message)' +
                  err.message
              )
            )
          }
          return throwError(err)
        }),
        takeUntil(finalizeSubject),
        finalize(() => {
          finalizeSubject.next()
          finalizeSubject.complete()
        })
      )
    },

    invoke: <T>(methodName: string, ...params: any[]) => {
      const connectionId = nextConnectionId++
      const messageId = nextMessageId++
      const streamDisposed = new Subject<void>()
      const unsafeMethods = ['CreateOrder', 'HitOrLiftOrder']

      const asyncResponse = new ReplaySubject<Response<T>>()
      if (unsafeMethods.includes(methodName)) {
        const timeoutOccurred = window.setTimeout(() => {
          handleTimeout(store, connectionId, methodName, timeoutOccurred)
        }, INVOKE_MAX_DELAY)

        invokeMetadata[connectionId] = {
          messageId,
          timeoutOccurred,
          asyncResponse
        }
      }
      return onConnected$.pipe(
        mergeMap(() => {
          return from(
            connection.invoke<Response<T>>(methodName, ...params)
          ).pipe(
            catchError((err) => {
              if (unsafeMethods.includes(methodName)) {
                const metadata = invokeMetadata[connectionId]
                clearTimeout(metadata.timeoutOccurred)
                delete invokeMetadata[connectionId]
              }
              // TODO: fix TS so we can dispatch logError here
              console.error(`ERROR IN UNSAFE METHOD ${methodName}: ${err}`)
              return throwError(err)
            }),
            finalize(() => {
              streamDisposed.next()
              streamDisposed.complete()
            })
          )
        }),
        mergeMap((response) => {
          if (unsafeMethods.includes(methodName)) {
            const metadata = invokeMetadata[connectionId]
            clearTimeout(metadata.timeoutOccurred)
            delete invokeMetadata[connectionId]

            if (messageId !== metadata.messageId) {
              asyncResponse.next({
                type: ResponseType.ERROR,
                messageId,
                result: null,
                // error: `${methodName} responded with an invalid message ID. Expected ${messageId}, got ${metadata.messageId}.`
                error: {
                  errorCode: methodName,
                  fatal: true,
                  message: `${methodName} responded with an invalid message ID. Expected ${messageId}, got ${metadata.messageId}.`
                }
              })
              return asyncResponse
            }
          }
          asyncResponse.next(response)
          return asyncResponse
        }),
        handleResult(store),
        takeUntil(streamDisposed)
      )
    }
  }
}

let defaultHub: ReturnType<typeof createHub>

export const initHub = (store: Store) => {
  defaultHub = createHub(store)
  if (import.meta.env.MODE === 'development') {
    ;(window as any).__HUB__ = defaultHub
  }
}

export const getHub = () => {
  if (!defaultHub) {
    throw new Error('Hub was not initialized.')
  }
  return defaultHub
}
