import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { AppConfigService } from '@core/services/app-config.service';
import { take } from 'rxjs/operators';
import { StringUtils } from '@shared/utils/string.utils';

@Injectable({
  providedIn: 'root',
})
export class KafkaAdapter {
  constructor(
    private _http: HttpClient,
    private _appConfigService: AppConfigService
  ) {}

  publish(messages: string[], sessionId: string): void {
    const headers = new HttpHeaders()
      .set('Accept', 'application/json')
      .set('Content-Type', 'application/json')
      .set('client_id', this._appConfigService.config.apiKey)
      .set('X-NW-Message-ID', sessionId);

    const url = `${this._appConfigService.config.multiproductApi}/kafka/publish`;

    this._http
      .post(
        url,
        messages
          .filter(msg => StringUtils.isJSON(msg))
          .map(msg => JSON.parse(msg)),
        {
          headers,
        }
      )
      .pipe(take(1))
      .subscribe(
        () => {},
        error => {
          if (this._appConfigService.isTest()) {
            // eslint-disable-next-line no-console
            console.log('Kafka log error', error);
          }
        }
      );
  }
}
