import { Injectable } from '@angular/core';
import { Observable, from, Subject, ReplaySubject } from 'rxjs';

type SequencerCallback = () => Observable<any>;

interface SequencerRecord {
  subject: Subject<any>;
  callback: SequencerCallback;
}

type SequencerQueue = SequencerRecord[];

@Injectable({
  providedIn: 'root',
})
export class SequencerService {
  queues = {};

  constructor() {}

  /* (queueKey) is a product ID, or use whatever.
   * (callback) is your operation. It must return an Observable.
   * Operations will wait for all prior operations with the same key.
   * We consider the operation complete when that Observable fails or completes.
   * Note that the Observable we return is not the same one your callback returns.
   */
  performOperation(
    queueKey: string,
    callback: SequencerCallback
  ): Observable<any> {
    const queue = this._getQueue(queueKey, true);

    // Use ReplaySubject instead of plain Subject, or the first emission could be lost sometimes.
    const record: SequencerRecord = {
      subject: new ReplaySubject(1),
      callback: callback,
    };
    queue.push(record);

    if (queue.length === 1) {
      this._triggerFirstInQueue(queue);
    }

    return record.subject;
  }

  private _getQueue(queueKey: string, create: boolean): SequencerQueue {
    let queue = this.queues[queueKey];
    if (!queue) {
      queue = [];
      if (create) {
        this.queues[queueKey] = queue;
      }
    }
    return queue;
  }

  private _triggerFirstInQueue(queue: SequencerQueue): Observable<any> {
    if (queue.length < 1) {
      return from([]);
    }
    const record = queue[0];
    const callback = record.callback;
    record.callback = null; // Ensure a nice loud error if this record gets re-triggered.
    callback().subscribe(
      result => {
        record.subject.next(result);
      },

      error => {
        record.subject.error(error);
        const index = queue.indexOf(record);
        if (index >= 0) {
          queue.splice(index, 1);
        }
        this._triggerFirstInQueue(queue);
      },

      () => {
        record.subject.complete();
        const index = queue.indexOf(record);
        if (index >= 0) {
          queue.splice(index, 1);
        }
        this._triggerFirstInQueue(queue);
      }
    );
    return record.subject;
  }
}
