import { inject, Injectable } from '@angular/core';
import { AngularFireAuth } from '@angular/fire/compat/auth';
import {
  PolarCollectionPointer,
  PolarCollectionQuery,
  PolarCollectionQueryOptions,
  PolarQueryConstraint,
} from './interfaces';
import { Observable, asyncScheduler, Subject } from 'rxjs';
import { AutoId } from './utils';
import { PolarDocumentData } from './PolarDocumentData';
import { ENVIRONMENT } from '../../applicationEnvironment';

interface PolarFireBridgeSendingPacket {
  action: string;
  token?: string;
  collectionName?: string;
  constraints?: PolarQueryConstraint[];
  subscriptionId?: string;
}

interface PolarFireBridgeReceivingPacket {
  action: string;
  subscriptionId?: string;
  docs?: any[];
}

interface PolarFireBridgeCollectionListenerFunction {
  id: string;
  fn: (message: string) => void;
}

interface PolarFireBridgeCollectionListener {
  subscriptionId: string;
  query: PolarCollectionPointer | PolarCollectionQuery;
  fns: PolarFireBridgeCollectionListenerFunction[];
}

@Injectable({
  providedIn: 'root',
})
export class PolarFirebridgeSocketService {
  host: string = '';
  applicationEnvironment = inject(ENVIRONMENT);

  ws: WebSocket | undefined = undefined;
  authorized: boolean = false;
  int: any;
  authResultAwaiterFn?: (result: boolean) => void = undefined;
  messageListeners: { [index: string]: PolarFireBridgeCollectionListener } = {};
  lastFetchedDataBySubscriptionId: { [index: string]: any } = {};

  constructor(private auth: AngularFireAuth) {
    this.host = `${this.applicationEnvironment.environment.fireBridgeWebSocketUrl}`;
  }

  async connect(): Promise<boolean> {
    if (this.ws != undefined) {
      return false;
    }

    this.authorized = false;

    this.ws = new WebSocket(this.host);

    this.ws.addEventListener('open', () => {
      this.onOpen();
    });

    this.ws.addEventListener('message', (e) => {
      this.onMessage(e.data);
    });

    this.ws.addEventListener('close', () => {
      this.onClose();
    });

    this.ws.addEventListener('error', (e) => {
      this.onError(e);
    });

    if (this.int != undefined) {
      clearInterval(this.int);
      this.int = undefined;
    }

    this.int = setInterval(() => {
      if (this.authorized) {
        this.send({
          action: 'ping',
        });
      }
    }, 5000);

    return new Promise<boolean>((resolve) => {
      this.authResultAwaiterFn = resolve;
    });
  }

  async send(packet: PolarFireBridgeSendingPacket): Promise<boolean> {
    if (this.ws == undefined) {
      return false;
    }

    try {
      this.ws.send(JSON.stringify(packet));
      return true;
    } catch {
      return false;
    }
  }

  async onOpen() {
    let user = await this.auth.currentUser;
    let token = await user?.getIdToken()!;

    this.send({
      action: 'auth',
      token: token,
    });
  }

  collectionData<T = any>(
    query: PolarCollectionPointer | PolarCollectionQuery,
    options?: PolarCollectionQueryOptions,
  ): Observable<T[]> {
    if (!this.authorized) {
      throw new Error('Not authorized');
    }

    const createResultSet = (_message: string) => {
      const message: PolarFireBridgeReceivingPacket = JSON.parse(_message);
      const mapped = message.docs!.map<PolarDocumentData>(
        (f) => new PolarDocumentData(f['raw'], f['id'], f['exists'], options?.idField),
      );
      return mapped.map<T>((f) => f.data());
    };

    // create observable creator
    const createObservable = (subscriptionId: string, useExisting: boolean) => {
      const currentObservableId = AutoId.newId();

      const source$ = new Observable<T[]>((subscriber) => {
        const listener = (_message: string) => {
          const message: PolarFireBridgeReceivingPacket = JSON.parse(_message);
          if (message.action == 'collectionDataUpdated') {
            if (message.subscriptionId == subscriptionId) {
              this.lastFetchedDataBySubscriptionId[subscriptionId] = _message;
              subscriber.next(createResultSet(_message));
            }
          }
        };

        if (useExisting) {
          // append fn to fns
          const listenerObject = this.messageListeners[subscriptionId];
          listenerObject.fns.push({
            fn: listener,
            id: currentObservableId,
          });

          // need to send data for the first time
          const lastFetchedData = this.lastFetchedDataBySubscriptionId[subscriptionId];
          if (lastFetchedData != undefined) {
            // send data
            // schedule for next tick to avoid error
            asyncScheduler.schedule(() => {
              subscriber.next(createResultSet(lastFetchedData));
            });
          }
        } else {
          this.messageListeners[subscriptionId] = {
            subscriptionId: subscriptionId,
            query: query,
            fns: [
              {
                fn: listener,
                id: currentObservableId,
              },
            ],
          };
        }
      });

      const subject$ = new Subject<T[]>();
      const subscription = source$.subscribe(subject$);

      const unsubscribe = () => {
        try {
          // delete from messageListeners
          console.log('unsubscribing...: ' + subscriptionId);
          this.messageListeners[subscriptionId].fns = this.messageListeners[
            subscriptionId
          ].fns.filter((f) => f.id != currentObservableId);

          // if no more listener, delete this subscriptionId
          let stillSubscribedInOtherListener = true;
          if (this.messageListeners[subscriptionId].fns.length == 0) {
            stillSubscribedInOtherListener = false;
            delete this.messageListeners[subscriptionId];
          }

          if (stillSubscribedInOtherListener) {
            // still subscribed in another listener. do nothing
            console.log("still subscribed in another listener. don't unsubscribe");
          } else {
            // no one is listening to this subscriptionId. unsubscribe
            console.log('no one is listening to this subscriptionId. unsubscribe');
            // erase last fetched data
            delete this.lastFetchedDataBySubscriptionId[subscriptionId];
            this.send({
              action: 'unsubscribeWithId',
              subscriptionId: subscriptionId,
            });
          }
        } catch {}
      };

      return subject$.asObservable();
    };

    // find existing subscription that matches query
    const existing = Object.keys(this.messageListeners).find((key) => {
      const listenerObject = this.messageListeners[key];
      if (JSON.stringify(listenerObject.query) == JSON.stringify(query)) {
        return true;
      }
      return false;
    });

    // use it for efficiency
    if (existing != undefined) {
      console.log('use existing subscription: ' + existing + 'for ' + query.collectionName);
      return createObservable(this.messageListeners[existing].subscriptionId, true);
    }

    const subscriptionId = AutoId.newId();
    console.log('subscribing...: ' + subscriptionId + 'for ' + query.collectionName);

    if (
      !this.send({
        action: 'collectionData',
        collectionName: query.collectionName,
        constraints: query.constraints,
        subscriptionId: subscriptionId,
      })
    ) {
      throw new Error('Failed to send');
    }

    // now collectionData will be sent to us with subscriptionId
    // so we need to catch it and return it by following Observable
    return createObservable(subscriptionId, false);
  }

  async onMessage(_message: string) {
    const message: PolarFireBridgeReceivingPacket = JSON.parse(_message);

    if (message.action == 'authorized') {
      console.log('authorized');
      this.authResultAwaiterFn?.(true);
      this.authResultAwaiterFn = undefined;
      this.authorized = true;

      // succeeded to connect. try to resubscribe if any listener exists
      Object.keys(this.messageListeners).forEach((key) => {
        console.log('attempting to resubscribe');
        const listenerObject = this.messageListeners[key];
        this.send({
          action: 'collectionData',
          collectionName: listenerObject.query.collectionName,
          constraints: listenerObject.query.constraints,
          subscriptionId: listenerObject.subscriptionId,
        });
      });
    }

    Object.keys(this.messageListeners).forEach((key) => {
      const listenerObject = this.messageListeners[key];
      listenerObject.fns.forEach((fn) => {
        fn.fn(_message);
      });
    });
  }

  attemptingToReconnect: boolean = false;

  async onClose() {
    console.log('disconnected');
    this.authorized = false;
    this.ws = undefined;
    this.authResultAwaiterFn?.(false);
    this.authResultAwaiterFn = undefined;

    if (this.attemptingToReconnect) {
      return;
    }

    this.attemptingToReconnect = true;
    // attempt to reconnect
    const retryer = async () => {
      console.log('reconnecting...');
      const result = await this.connect();
      if (!result) {
        setTimeout(retryer, 5000);
      } else {
        this.attemptingToReconnect = false;
      }
    };
    setTimeout(retryer, 3000);
  }

  async onError(e: Event) {
    this.authResultAwaiterFn?.(false);
    this.authResultAwaiterFn = undefined;
  }
}
