import {Injectable} from '@angular/core';
import {RxStompService} from '@tsm/framework/websocket';
import {Observable} from 'rxjs';
import {map} from 'rxjs/operators';
import {IFrame, Message} from '@stomp/stompjs';
import {ActionsSubject} from '@ngrx/store';
import {LoginFinishedSuccess, Logout} from '@tsm/core';
import {ofType} from '@ngrx/effects';
import {RxStompState} from '@stomp/rx-stomp';
import {webSocket, WebSocketSubject} from 'rxjs/webSocket';

@Injectable({
  providedIn: 'root',
})
export class DmsWebsocketService {
  public state$: Observable<string>;
  private socket$: WebSocketSubject<any>;

  constructor(
    private rxStompService: RxStompService,
    private actionsSubj: ActionsSubject,
  ) {
    actionsSubj.pipe(ofType(Logout)).subscribe((_) => {
      this.rxStompService.deactivate();
    });

    actionsSubj.pipe(ofType(LoginFinishedSuccess)).subscribe(({response}) => {
      if (!this.isActive()) {
        this.rxStompService.stompClient.connectHeaders = {
          Authorization: response.access_token,
        };
        this.rxStompService.activate();
      }
    });

    this.state$ = this.rxStompService.connectionState$.pipe(
      map((state: number) => {
        return RxStompState[state];
      }),
    );
  }

  onSend(record: {topic: string; key: string}, event: string, body?: string) {
    if (body) {
      this.rxStompService.publish({
        destination: `/topic/${record.topic}/${record.key}/${event}`,
        body: body,
      });
    } else {
      this.rxStompService.publish({
        destination: `/topic/${record.topic}/${record.key}/${event}`,
      });
    }
  }

  onSend2(record: {topic: string}, body?: string) {
    this.rxStompService.publish({
      destination: `/topic/${record.topic}`,
      body: body,
    });
  }

  onWatch(
    record: {topic: string; key: string},
    event: string,
  ): Observable<any> {
    return this.rxStompService
      .watch(`/topic/${record.topic}/${record.key}/${event}`)
      .pipe(map((x) => JSON.parse(x.body)));
  }

  onWatchRaw(
    record: {topic: string; key: string},
    event: string,
  ): Observable<Message> {
    return this.rxStompService.watch(
      `/topic/${record.topic}/${record.key}/${event}`,
    );
  }

  onStompErrors(): Observable<IFrame> {
    return this.rxStompService.stompErrors$.asObservable();
  }

  isActive(): boolean {
    return this.rxStompService.active;
  }

  initSpelMessage(url: string) {
    this.socket$ = webSocket(url);
  }

  sendMessage(msg: any) {
    if (this.socket$) {
      this.socket$.next(msg);
    }
  }

  getMessages() {
    return this.socket$.asObservable();
  }

  closeConnection() {
    if (this.socket$) {
      this.socket$.complete();
    }
  }
}
