import type { ISessionStreamEvent } from 'Workers/streamingTypes';
import { ConnectableObservable, from } from 'rxjs';
import { filter, share, publish, mergeMap, map, startWith, withLatestFrom } from 'rxjs/operators';
import { datastreamIsRFSTimer, datastreamIsDataTimer } from 'utils/Timer/Timers';
import {
  isDisconnectionNotification,
  isBlotterLiveNotification,
  isRequestForStreamData,
} from './typeGuards';
import { fromWebworker$, toWebworker$ } from './streamingWorker';
import { SubjectFactoryByProperty } from 'utils/RX';
import { hidden } from './visibility';
import { measuredDelay } from 'utils/RX/operators/measuredDelay';
import { type StreamingEvent, isStreamingData, type Streams } from './typings';

const pollingDelayMs = 50;

const hidden$ = hidden(document);

const messageType = new SubjectFactoryByProperty('type', fromWebworker$);
const legacy$ = messageType.create('DATASTREAM');

const heartbeatMissed$ = messageType.create('ESP_HEARTBEAT_MISSED').pipe(
  map(({ streamId }) => streamId),
  share(),
);

const flush$ = messageType.create('FLUSH');

/**
 * @todo type session message ASAP
 */
const session$ = legacy$.pipe(
  map(({ message }) => message),
  publish(),
) as ConnectableObservable<ISessionStreamEvent>;

const dataStream$ = (session$ as unknown as ConnectableObservable<StreamingEvent>).pipe(
  filter(datastreamIsDataTimer(isStreamingData)),
  share(),
);

const disconnectionNotification$ = dataStream$.pipe(filter(isDisconnectionNotification), share());

const blotterNotification$ = dataStream$.pipe(filter(isBlotterLiveNotification), share());

// Return polling loop
flush$
  .pipe(
    startWith(null),
    withLatestFrom(hidden$),
    map(([_, hiddenBefore]) => ({ hiddenBefore })),
    measuredDelay(() => performance.now(), pollingDelayMs),
    map(([item, observedDelay]) => ({ ...item, observedDelay })),
    withLatestFrom(hidden$),
    map(([item, hiddenAfter]) => ({ ...item, hiddenAfter })),
    map(delayInfo => ({ type: 'FLUSH' as 'FLUSH', delayInfo })),
  )
  .subscribe(toWebworker$);

const esp$ = flush$.pipe(
  map(({ esp }) => esp),
  mergeMap(array => from(array)),
  share(),
);

const rfs$ = dataStream$.pipe(filter(datastreamIsRFSTimer(isRequestForStreamData)), share());

export const getStreams = (): Streams => {
  if (session$ === undefined) {
    throw new Error(
      'Streams are missing, ensure correct call to initStreams() before calling getStreams',
    );
  }
  return {
    session$,
    dataStream$,
    disconnectionNotification$,
    esp$,
    rfs$,
    blotterNotification$,
    heartbeatMissed$,
  };
};
