import { DestroyRef, inject, Injectable } from '@angular/core';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { catchError, EMPTY, filter, map, Observable, Subject, Subscription, takeWhile, tap, timer } from 'rxjs';
import { ApiService } from '../api/api.service';
const RETRY_SECONDS = 5;
const MAX_RETRIES = 30;
const DEBUG_MODE = true;
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { NotificationEventType, NotificationMessage } from 'src/app/store/models/notification.model';
import _get from 'lodash/get';

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {
  destroyRef = inject(DestroyRef);

  private messages = new Subject<NotificationMessage>();
	
  private readonly connected = new Subject<void>();

	readonly connected$ = this.connected.asObservable();

	private wsSubjectConfig: WebSocketSubjectConfig<NotificationMessage>;

	private isConnected: boolean = false;
  private connectError: string;
  private reconnectionTries: number = 0;
  private reconnectSubscription: Subscription;
  /**
	 * A stream of messages received
	 */
	private messages$ = this.messages.asObservable();

	private socket:  WebSocketSubject<NotificationMessage>;
  private socketSubscription: Subscription;

  constructor(private api: ApiService) {}

  init() {
    this.disconnect();

    this.api.get(`notifications/url`)
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe({
        next: payload => {
            const url = _get(payload, 'payload');
            this.setUpWebSocketSubjectConfig(url);
            this.connect(this.wsSubjectConfig);
        }
      });
  }

  /**
	 * Constructs the WebSocketSubjectConfig object, with open and close observers
	 * to handle connection status, and trying to re-connect when disconnected.
	 */
	private readonly setUpWebSocketSubjectConfig = (url: string) => {
    const config: WebSocketSubjectConfig<NotificationMessage> = {
      url,
      closeObserver: {
        next: (event) => {
          DEBUG_MODE && console.log('closeObserver', event);
          this.isConnected = false;
          this.tryReconnect();
        },
      },
      openObserver: {
        next: (event) => {
          DEBUG_MODE && console.log('openObserver', event);
          this.connectError = undefined;
          this.isConnected = true;
          this.reconnectionTries = 0;
          this.connected.next();
        },
      },
    };

    this.wsSubjectConfig = config;
  };

  /**
	 * Attempts to connect to the websocket.
	 */
	private readonly connect = (config: WebSocketSubjectConfig<NotificationMessage>) => {
    // Create a new socket and listen for messages, pushing them into the messages Subject.
    this.socket = new WebSocketSubject(config);

    if (this.socketSubscription && !this.socketSubscription.closed) {
      this.socketSubscription.unsubscribe();
      this.socket.complete();
    }

    this.socketSubscription = this.socket.pipe(
      takeUntilDestroyed(this.destroyRef),
      tap((msg) => {
        this.messages.next(msg);
      }),
      catchError((err) => {
        this.connectError = err;

        DEBUG_MODE && console.log('error in connect', err);

        this.tryReconnect();
        
        return EMPTY;
      }),
    ).subscribe();
  };

  /**
	 * Disconnects the socket. For simulation purposes. The service will automatically try to reconnect.
	 */
	readonly disconnect = () => {
    if (this.isConnected && this.socket) {
      this.socket.complete();
    }
  };

  /**
	 * Handles attempting to reconnect to the websocket until connected or
	 * the max retries have been reached.
	 */
	private readonly tryReconnect = () => {
    if (this.reconnectSubscription && !this.reconnectSubscription.closed) {
      this.reconnectSubscription.unsubscribe();
    }

    this.reconnectSubscription = timer(RETRY_SECONDS * 1000)
      .pipe(
        takeUntilDestroyed(this.destroyRef),
        takeWhile(() => {
          if (!this.isConnected) {
            this.reconnectionTries++;
          }

          return !this.isConnected && this.reconnectionTries < MAX_RETRIES;
        }),
        tap(() => {
          this.init();
        }),
      )
      .subscribe();
  };

  /**
	 * Begins listening to a type of events or events.
	 *
	 * Sets up the subscription with the server, sending a subscribe message, and returning a stream
	 * of filtered messages.
	 *
	 * When the client closes the stream, sends an unsubscribe message to the server.
	 *
	 * @param eventType
	 * @returns A stream of messages of the specified type.
	 */
	listen<T extends NotificationEventType = NotificationEventType>(eventType: NotificationEventType): Observable<NotificationMessage<T>> {
		return this.messages$
      .pipe(
        filter((msg) =>  msg.notificationTypeId === eventType),
        map((msg) =>  msg as NotificationMessage<T>),
      );
	}
}
