import { Observable, Subject, filter, map, shareReplay, take } from 'rxjs';

export interface IStream<Actions> {
  action: Actions;
  params: any;
}
/**
 * Abstract Sub class for service
 */
export abstract class Pub<A> {
  constructor(protected stream$: Subject<IStream<A>>) {}
  protected emit(action: IStream<A>['action'], params?: IStream<A>['params']): void {
    this.stream$.next({ action, params });
  }
}
/**
 * Abstract Sub class for service
 */
export abstract class Sub<A> {
  protected stream$: Observable<IStream<A>>;
  constructor(stream$: Subject<IStream<A>>) {
    this.stream$ = stream$.pipe(shareReplay(1));
  }
  protected actionListener(name: A): Observable<IStream<A>> {
    return this.stream$.pipe(
      filter(({ action }) => {
        return action === name;
      }),
      map(({ params }) => {
        return { params, action: undefined } as IStream<A>;
      }),
    );
  }
}

/**
 * Helpers methods for service layer
 */
class GlobHelpers {
  streamToPromise<T>(stream: Observable<T>, pub: () => void): Promise<T> {
    return new Promise((res) => {
      stream.pipe(take(1)).subscribe((v) => res(v));
      pub();
    });
  }
}
/**
 * Abstract Service class for service implementation
 */
export abstract class Service<A> {
  protected stream$ = new Subject<IStream<A>>();
  abstract pub: Pub<A>;
  abstract sub: Sub<A>;
  public globHelpers = new GlobHelpers();
}
