import { isPlatformBrowser } from '@angular/common';
import { Inject, Injectable, NgZone, OnDestroy, PLATFORM_ID } from '@angular/core';
import { asyncScheduler, BehaviorSubject, interval, Observable, Subject, Subscription } from 'rxjs';
import { takeWhile } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import { SOCKET_CONFIG } from './socket.config';
import { WebSocketConfig } from './socket.type';

interface SocketConnectOptions {
  runOutsideAngular?: boolean | undefined;
  runWithDebounce?: boolean | undefined;
  keepConnection?: number | undefined;
}

@Injectable()
export class SocketService implements OnDestroy {
  get connected(): boolean {
    return this.isConnected;
  }

  private statusSubj = new BehaviorSubject(false);
  statusChanged: Observable<boolean> = this.statusSubj.asObservable();

  private reconnectedSubj = new Subject<void>();
  reconnected = this.reconnectedSubj.asObservable();

  private websocketSubj: WebSocketSubject<unknown> | null = null;
  private messageSubj: Subject<unknown> = new Subject<unknown>();
  private keepConnectionSub = new Subscription();

  private options: SocketConnectOptions = {};
  private reconnectionInterval: Observable<number> | null = null;
  private reconnectInterval = 5000;
  private reconnectAttempts = 15;
  private subscriptions = new Map<string, unknown>();
  private isConnected = false;
  private closed = true;
  private config: WebSocketSubjectConfig<unknown> = { url: '/' };

  constructor(
    @Inject(SOCKET_CONFIG) wsConfig: WebSocketConfig[],
    private zone: NgZone,
    @Inject(PLATFORM_ID) private platformId: number
  ) {
    const config = wsConfig.filter(c => !c.destination)[0];
    if (config) {
      this.setup(config);
    }
    // run reconnect if not connection
    this.statusSubj.subscribe(isConnected => {
      this.isConnected = isConnected;
      this.tryReconnect();
    });
    this.reconnectedSubj.subscribe(() => {
      const tmpSubs = this.subscriptions;
      this.subscriptions = new Map<string, unknown>();
      for (const operation of tmpSubs.keys()) {
        this.subscribe(operation, tmpSubs.get(operation));
      }
    });
  }

  ngOnDestroy(): void {
    this.close();
  }

  protected setup(wsConfig: WebSocketConfig): void {
    this.reconnectInterval = wsConfig.reconnectInterval || this.reconnectInterval;
    this.reconnectAttempts = wsConfig.reconnectAttempts || this.reconnectAttempts;
    let disconnected = false;
    this.config = {
      url: wsConfig.url,
      closeObserver: {
        next: (e: CloseEvent) => {
          console.log('[WebSocket] closed!');
          console.log(e);
          if (!this.closed) {
            disconnected = true;
          }
          this.statusSubj.next(false);
        },
      },
      openObserver: {
        next: (_: Event) => {
          console.log('[WebSocket] connected!');
          this.statusSubj.next(true);
          if (disconnected) {
            disconnected = false;
            this.reconnectedSubj.next();
          }
        },
      },
    };
  }

  /*
   * connect to WebSocked
   * TODO: move this options to provider
   * */
  connect(options: SocketConnectOptions = {}): void {
    if (!isPlatformBrowser(this.platformId)) {
      return;
    }
    this.closed = false;
    this.options = options;
    this.websocketSubj?.complete();
    this.websocketSubj = new WebSocketSubject(this.config);
    if (options.runOutsideAngular || options.runWithDebounce) {
      let debounceSub = new Subscription();
      this.zone.runOutsideAngular(() => {
        this.websocketSubj?.subscribe(message => {
          if (options.runWithDebounce) {
            this.messageSubj.next(message);
            debounceSub.unsubscribe();
            debounceSub = asyncScheduler.schedule(() => {
              this.zone.run(() => {});
            }, 150);
          } else {
            this.messageSubj.next(message);
          }
        });
      });
    } else {
      this.websocketSubj.subscribe(message => this.messageSubj.next(message));
    }
    if (options.keepConnection) {
      this.keepConnectionSub.unsubscribe();
      this.keepConnectionSub = interval(options.keepConnection).subscribe(() => {
        if (!this.websocketSubj) {
          return;
        }
        this.websocketSubj.next('ping');
      });
    }
  }

  /**
   * close websocket connection and remove subscriptions
   */
  close(): void {
    console.log('[WebSocket] closing connection');
    this.closed = true;
    this.keepConnectionSub.unsubscribe();
    this.websocketSubj?.complete();
    this.subscriptions = new Map<string, unknown>();
  }

  /*
   * reconnect if not connecting or errors
   * */
  private tryReconnect(): void {
    if (this.reconnectionInterval || this.isConnected || this.closed) {
      return;
    }
    this.reconnectionInterval = interval(this.reconnectInterval).pipe(
      takeWhile((_, index) => index < this.reconnectAttempts && !this.isConnected)
    );
    this.reconnectionInterval.subscribe({
      next: () => this.connect(this.options),
      complete: () => {
        // Subject complete if reconnect attempts ending or reconnect is successful
        this.reconnectionInterval = null;
      },
    });
  }

  /*
   * on message event
   * */
  on<T>(): Observable<T> {
    return this.messageSubj as Observable<T>;
  }

  /*
   * send message to server
   * */
  send<T>(data: T): void {
    if (this.websocketSubj) {
      this.websocketSubj.next(data);
    } else {
      console.error('[WebSocket] Seems that you did not connected to socket.');
    }
  }

  /*
   * send subscribe message to server, save it, and resend on reconnect
   * ONLY one operation can be observer at the same time e.g. subscribe('USER_UPDATE')
   * IF needed to observe same operation but with different params,
   * you can pass some id after operation name with ':' e.g. subscribe('USER_UPDATE:U1')
   * ':U1' will be removed before sending to server but it will cached to correctly handling
   * socket reconnection and unsubscribing from operations. Note that unsubscribe() must be
   * called with same operation name e.g unsubscribe('USER_UPDATE:U1')
   * */
  subscribe<T>(operation: string, params: T): void {
    if (this.websocketSubj) {
      if (!this.subscriptions.has(operation)) {
        this.websocketSubj.next(
          Object.assign(params, {
            operation: operation.split(':')[0],
          })
        );
        this.subscriptions.set(operation, params);
        console.log(`[WebSocket] Subscribing to operation ${operation}.`);
      } else {
        console.warn(`[WebSocket] Operation ${operation} already under observing.`);
      }
    } else {
      console.error('[WebSocket] Seems that you did not connected to socket.');
    }
  }

  unsubscribe(operation: string): void {
    if (this.websocketSubj) {
      let params = this.subscriptions.get(operation);
      if (params) {
        this.subscriptions.delete(operation);
        params = Object.assign(params as Record<string, unknown>, {
          operation: `REMOVE_${operation.split(':')[0]}`,
        });
        this.websocketSubj.next(params);
        console.log(`[WebSocket] Unsubscribing form ${operation}.`);
      } else {
        console.warn(
          `[WebSocket] Operation ${operation} not found, make sure it was subscribe previously.`
        );
      }
    } else {
      console.error('[WebSocket] Seems that you did not connected to socket.');
    }
  }
}
