import { FSA } from 'flux-standard-action';
import {
  filter,
  map,
  retry,
  shareReplay,
  switchMap,
  takeUntil,
} from 'rxjs/operators';
import { Subject, Observable } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import settings from '../../settings';
import { LOGOUT, SESSION_STARTED } from '../session/action-types';
import { SSE_SUBSCRIBE, SSE_UNSUBSCRIBE } from './action-types';
import HeartbeatWebsocket from '../../websocket';

const websocketEpic = (action$, state$) => {
  // TODO: this could get big
  const subscribeStream = action$.pipe(
    filter((action: FSA<any, any>) =>
      [SSE_SUBSCRIBE, SSE_UNSUBSCRIBE].includes(action.type)
    ),
    shareReplay()
  );

  // We need one subscriber otherwise actions wont start buffering
  subscribeStream.subscribe();

  return action$.pipe(
    filter((action: FSA<any, any>) => action.type === SESSION_STARTED),
    switchMap((action: FSA<any, any>) => {
      const ob = new Observable((subscriber) => {
        const ws = new HeartbeatWebsocket({
          url: `${settings.websocketUrl}`,
          heartbeatTimeout: 10000,
        });

        ws.onmessage = (event) => {
          if (event.type !== 'heartbeat') {
            subscriber.next(event);
          }
        };

        ws.onopen = () => {
          ws.send({
            type: 'login',
            payload: { token: state$.value.session.token },
          });

          subscribeStream.subscribe((action: FSA<any, any>) => {
            ws.send({
              type: {
                [SSE_SUBSCRIBE]: 'subscribe',
                [SSE_UNSUBSCRIBE]: 'unsubscribe',
              }[action.type],
              ...action,
            });
          });
        };

        return () => {
          ws.close();
        };
      });

      return ob.pipe(
        // retry({ delay: 5000 }),
        takeUntil(
          action$.pipe(
            filter((action: FSA<any, any>) => action.type === LOGOUT)
          )
        )
      );
    }),
    map((data: FSA<any, any>) => data)
  );
};

export default websocketEpic;
