import { defer, first, isObservable, Observable, of } from 'rxjs';
import { catchError, finalize, mergeMap, publishReplay, refCount, shareReplay } from 'rxjs/operators';

export class SharedCallServiceBase {

    private _sharedCalls: { [key: string]: { [key: string]: any } } = {};

    protected _eventSharedCall<T>(eventId: string, callId: string, method: Observable<T>): Observable<T> {
        if (!this._sharedCalls[eventId]) {
            this._sharedCalls = {[eventId]: {}};
        }
        if (!this._sharedCalls[eventId][callId]) {
            this._sharedCalls[eventId][callId] = this.renewAfterTimer(callId, method, 5000);
        }
        return this._sharedCalls[eventId][callId];
    }

    private createReturnObs(obs: Observable<any>, time: number, bufferReplays: number) {
        return obs.pipe(
            shareReplay(bufferReplays, time),
        );
    }

    private renewAfterTimer(callId: string, obs: Observable<any>, time: number, bufferReplays: number = 1) {
        return this.createReturnObs(obs, time, bufferReplays).pipe(
            first(
                null,
                defer(() => {
                    console.log(`shared call ${callId} expired.`);
                    return this.createReturnObs(obs, time, bufferReplays);
                }),
            ),
            catchError(err => {
                console.log(`shared call error: ${err}.`);
                throw err;
            }),
            mergeMap((d) => (isObservable(d) ? d : of(d))),
        );
    }

}
