import { EMPTY, from, Subject } from 'rxjs';
import { filter, map, mergeMap, share, switchMap, takeUntil, tap } from 'rxjs/operators';
import { WebWorker } from './web-worker';
import { TaskResult, WorkerDefinition, WorkerTask } from './worker-models';
import {
  AsyncWorker,
  AsyncWorkerFunction,
  AsyncWorkerLiteral,
  isAsyncWorkerFunction,
  UnknownWorker,
  WorkerFactory,
  WorkerFunction,
} from './worker-types';

export class WorkerPool {
  readonly name: string;
  private factory: WorkerFactory;

  private taskQueue: WorkerTask[] = [];
  private idleQueue: AsyncWorker[] = [];
  private workerPromises: Promise<AsyncWorker>[] = [];

  private work$ = new Subject<void>();
  private result$ = new Subject();
  private taskResult$ = new Subject<TaskResult>();
  private error$ = new Subject();
  private terminate$ = new Subject();

  private taskResultObs$ = this.taskResult$.asObservable().pipe(share());

  private id = 1;
  private finalizing = false;
  private terminated = false;

  private constructor(
    workerDef: WorkerDefinition,
    private size: number,
  ) {
    this.name = workerDef.name;
    this.factory = workerDef.factory;
    this.orchestrate();
  }

  static async create<T extends UnknownWorker>(
    workerDef: WorkerDefinition,
    size: number,
  ): Promise<AsyncWorker<T>> {
    const pool = new WorkerPool(workerDef, size);
    const worker = await pool.createWorker();

    if (!worker) {
      throw new Error(`Could not create worker pool for ${workerDef.name}`);
    }

    pool.idleQueue.push(worker);

    const asyncPool = isAsyncWorkerFunction(worker)
      ? pool.createAsyncPoolFunction()
      : pool.createAsyncPoolLiteral(worker);

    return asyncPool as AsyncWorker<T>;
  }

  private createAsyncPoolFunction(): AsyncWorkerFunction {
    const workerFunction: AsyncWorkerFunction = {
      name: this.name,
      isTerminated: this.isTerminated.bind(this),
      terminate: this.terminate.bind(this),
      taskResult$: this.taskResultObs$,
      postMessage: this.createWorkerFunction('postMessage'),
      result$: this.taskResult$.pipe(
        filter((x) => !x.func),
        map((x) => x.value),
      ),
    };
    return workerFunction;
  }

  private createAsyncPoolLiteral(worker: AsyncWorker): AsyncWorkerLiteral {
    const workerLiteral: Record<string, unknown> = {
      name: this.name,
      isTerminated: this.isTerminated.bind(this),
      terminate: this.terminate.bind(this),
      taskResult$: this.taskResultObs$,
    };

    const workerKeys = Object.keys(workerLiteral);

    Object.entries(worker)
      .filter(([key]) => workerKeys.indexOf(key) === -1)
      .forEach(([key, value]) => {
        if (typeof value === 'function') {
          workerLiteral[key] = this.createWorkerFunction(key);
          workerLiteral[`${key}$`] = this.taskResultObs$.pipe(
            filter((x) => x.func === key),
            map((x) => x.value),
          );
        }
      });

    return workerLiteral as AsyncWorkerLiteral;
  }

  private createWorkerFunction(func?: string): WorkerFunction {
    const workerFn = (...parameters: unknown[]) => {
      const task = new WorkerTask(this.nextId(), func, ...parameters);
      this.taskQueue.push(task);
      this.work$.next();
      return task.promise as Promise<unknown>;
    };
    return workerFn as WorkerFunction;
  }

  private nextId(): string {
    return `${this.name}:${this.id++}`;
  }

  private orchestrate() {
    this.work$
      .pipe(takeUntil(this.terminate$))
      .pipe(
        mergeMap(() => from(this.getNextWorker())),
        filter((worker) => worker !== undefined && worker !== null),
        mergeMap((worker: AsyncWorker) => {
          const task = this.taskQueue.shift();
          if (task) {
            return from(this.execute(worker, task)).pipe(
              switchMap(() => from(task.promise)),
              map((result) => [worker, result] as [AsyncWorker, unknown]),
            );
          } else {
            this.idleQueue.push(worker);
            return EMPTY;
          }
        }),
        map(([worker, result]) => {
          this.idleQueue.push(worker);
          return result;
        }),
        tap(() => this.work$.next()),
      )
      .subscribe({
        next: (result) => this.result$.next(result),
        error: (error) => this.error$.next(error),
      });
  }

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private async getNextWorker(): Promise<any> {
    return this.idleQueue.shift() ?? (await this.createWorker());
  }

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  private async createWorker(): Promise<any> {
    if (this.workerPromises.length < this.size) {
      const next = this.workerPromises.length + 1;
      const name = `${this.name} (${next} of ${this.size})`;

      const workerDef: WorkerDefinition = {
        name: name,
        factory: () => this.factory(name),
      };

      const promise = WebWorker.create(workerDef);
      this.workerPromises.push(promise);
      const worker = await promise;
      worker.taskResult$.subscribe(this.taskResult$);
      return worker;
    }
  }

  private async execute(worker: AsyncWorker, task: WorkerTask) {
    const invoke =
      typeof worker === 'function' ? worker : (<AsyncWorkerLiteral>worker)[task.params.func];
    invoke(...task.params.args).then(
      (result: unknown) => {
        task.resolve(result);
      },
      (error: unknown) => {
        task.reject(error);
      },
    );
  }

  private isTerminated(): boolean {
    return this.terminated;
  }

  private async terminate(force = false): Promise<void> {
    this.terminate$.complete();

    const workers = await Promise.all(this.workerPromises);
    const promises = workers.map((worker) => {
      return worker.terminate(force);
    });

    await Promise.all(promises).then(() => this.finalize());
  }

  private finalize() {
    if (this.finalizing) return;

    this.finalizing = true;
    this.result$.complete();
    this.terminated = true;
  }
}
