import { Injectable } from '@angular/core';
import { Logger } from '@myia/ngx-core';
import { JWTService } from '@myia/ngx-http';
import { HubConnectionStatus, SignalRCoreService } from '@myia/ngx-signal-r';

import { Subject, Observable, Observer, Subscription, BehaviorSubject, of } from 'rxjs';


export interface IServerNotificationsConnection {
    status: BehaviorSubject<HubConnectionStatus>;
    notificationReceived: Observable<any>;

    subscribeTopic(topic: string): Observable<boolean>;

    unsubscribeTopic(topic: string): Observable<boolean>;

    close(): void;
}

/**
 * Service for using of server notifications
 */
@Injectable({providedIn: 'root'})
export class ServerNotificationsService {

    constructor(private _signalRCoreService: SignalRCoreService, private _logger: Logger, private _jwtService: JWTService) {
    }

    /**
     * Redirect to login view.
     */
    connect(): Observable<IServerNotificationsConnection> {

        let notifySignalRMethodSubscription: Subscription | null;
        const hubConnection = this._signalRCoreService.connect(null, () => `bearer ${this._jwtService.getAccessToken()}`);
        const notificationReceivedSubject = new Subject<any>();
        notifySignalRMethodSubscription = hubConnection.on('notify').subscribe((args: Array<any>) => {
            const [msg] = args; // message is first argument of notify method called on server
            notificationReceivedSubject.next(msg);
        });
        return of({
            notificationReceived: notificationReceivedSubject,
            status: hubConnection.status,

            subscribeTopic(topic: string): Observable<boolean> {
                return new Observable((observer: Observer<boolean>) => {
                    // subscribe to topic
                    hubConnection.invoke('SubscribeTopic', [topic]).subscribe(
                        () => {
                            observer.next(true);
                            observer.complete();
                        },
                        (err) => {
                            if (hubConnection.status.value !== HubConnectionStatus.disconnected) {
                                console.error(err);
                                observer.next(false);
                                observer.complete();
                            }
                        }
                    );
                });
            },
            unsubscribeTopic(topic: string): Observable<boolean> {
                return new Observable((observer: Observer<boolean>) => {
                    // subscribe to topic
                    hubConnection.invoke('UnsubscribeTopic', [topic]).subscribe(
                        () => {
                            observer.next(true);
                            observer.complete();
                        },
                        () => {
                            observer.next(false);
                            observer.complete();
                        },
                    );
                });
            },
            close: () => {
                console.log('Notification push service closed.');
                if (notifySignalRMethodSubscription) {
                    notifySignalRMethodSubscription.unsubscribe();
                    notifySignalRMethodSubscription = null;
                }
                hubConnection.close();
            }
        });
    }
}

