import { NgZone, Type } from "@angular/core";
import { Observable, ObservableInput, OperatorFunction, from, of } from "rxjs";
import { catchError, filter, map, shareReplay, startWith, switchMap } from "rxjs/operators";

export function cacheLatest<T>() {
    return shareReplay<T>({
        refCount: true,
        bufferSize: 1,
    });
}

export function filterForDefinedValue<T>() {
    return filter((e: T | undefined | null): e is T => e !== undefined && e !== null);
}

export function undefinedWhileLoading<T, O>(
    project: (value: T, index: number) => ObservableInput<O>,
) {
    return switchMap<T, ObservableInput<O | undefined>>((v, i) =>
        from(project(v, i)).pipe(startWith(undefined)),
    );
}

export function propagateMap<T, O>(project: (value: T) => O) {
    return map<T | undefined, O | undefined>((value) => {
        return typeof value === "undefined" ? undefined : project(value);
    });
}

export function propagateSwitchMap<T, O>(project: (value: T) => ObservableInput<O | undefined>) {
    return switchMap<T | undefined, ObservableInput<O | undefined>>((value) => {
        return typeof value === "undefined" ? of(undefined) : project(value);
    });
}

export function catchErrorOfType<TError, T, O extends ObservableInput<any>>(
    type: Type<TError>,
    handler: (t: TError) => O,
) {
    return catchError<T, O>((e) => {
        if (e instanceof type) {
            return handler(e);
        } else {
            throw e;
        }
    });
}

export function runInZone<T>(zone: NgZone): OperatorFunction<T, T> {
    return (source) => {
        return new Observable((observer) => {
            return source.subscribe({
                next: (value: T) => zone.run(() => observer.next(value)),
                error: (e: any) => zone.run(() => observer.error(e)),
                complete: () => zone.run(() => observer.complete()),
            });
        });
    };
}
