import { ApolloLink, FetchResult, Observable } from '@apollo/client';
import { DebouncedFunc, throttle } from 'lodash';

import { handleError } from '@anchorage/sentry';

// DEFER_INCREMENTAL_RESPONSE_MAX_FREQUENCY is the maximum frequency in milliseconds at which the server-sent events will be sent to the client.
// This is used to throttle the responses to prevent the client from having to rerender too frequently
const DEFER_INCREMENTAL_RESPONSE_MAX_FREQUENCY = 100;

// EventStreamLink is a custom terminal ApolloLink that allows for server-sent events to be streamed to the client.
// This is used when a query contains the @defer or @stream directive.
// If you are experiencing issues with SSE queries, try disabling appolo cache before modifying this code
export const EventStreamLink = new ApolloLink((operation) => {
  operation.setContext({
    headers: {
      Accept: 'text/event-stream',
    },
    method: 'POST',
  });

  const observable: Observable<
    FetchResult<Record<string, any>, Record<string, any>, Record<string, any>>
  > = new Observable((observer) => {
    const { headers, uri, method } = operation.getContext();
    const body = JSON.stringify({
      query: operation.query.loc?.source.body,
      variables: operation.variables,
      operationName: operation.operationName,
    });

    const controller = new AbortController();
    const signal = controller.signal;

    fetch(uri, {
      method: method || 'POST',
      headers: {
        ...headers,
        'Content-Type': 'application/json',
      },
      body: body,
      signal,
    })
      .then((response) => handleFetchResponse(response, observer))
      .catch((error) => {
        observer.error(error);
      });

    return () => {
      unsubscribeHandler(observer, controller);
    };
  });

  return observable;
});

function handleFetchResponse(response: Response, observer: any) {
  if (!response.ok || response.status !== 200) {
    const networkError = new Error(
      `Failed to fetch: ${response.statusText || response.status}`,
    );
    return observer.error(networkError);
  }
  const body = response.body;
  if (!body) {
    handleError(new Error('Response body is empty'));
    return;
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let streamBuffer = '';

  let cumulativeResponse: any = null;

  const throttledNext = throttle(
    observer.next.bind(observer),
    DEFER_INCREMENTAL_RESPONSE_MAX_FREQUENCY,
    {
      leading: true,
      trailing: true,
    },
  );

  readStream(
    reader,
    decoder,
    streamBuffer,
    cumulativeResponse,
    observer,
    throttledNext,
  );

  throttledNext.flush();
}

function unsubscribeHandler(observer: any, controller: AbortController) {
  // Abort the fetch request
  controller.abort();

  // Notify the observer that the stream is closed
  observer.complete();
}

// readStream is a recursive function that reads the stream of data from the server until the stream is closed.
// Each call to readStream reads the next chunk of data from the stream and processes it
// If not the first chunk, the chunk is merged into the cumulative response, which is then sent to subscribers
function readStream(
  reader: ReadableStreamDefaultReader,
  decoder: TextDecoder,
  streamBuffer: string,
  cumulativeResponse: any,
  observer: any,
  throttledNext: DebouncedFunc<any>,
) {
  reader
    .read()
    .then(({ done, value }) => {
      if (done) {
        if (cumulativeResponse) {
          observer.next(cumulativeResponse);
        }
        observer.complete();
        return;
      }

      streamBuffer += decoder.decode(value, { stream: true });
      let boundary = streamBuffer.indexOf('\n');
      while (boundary !== -1) {
        const chunk = streamBuffer.slice(0, boundary).trim();
        streamBuffer = streamBuffer.slice(boundary + 1);

        if (chunk.startsWith('data:')) {
          let data = JSON.parse(chunk.substring(5).trim());
          if (!cumulativeResponse) {
            cumulativeResponse = data;
            throttledNext(data);
          } else {
            try {
              const mergedResponse = mergeDeferredResponses(
                cumulativeResponse,
                data,
                data.path,
              );
              (cumulativeResponse = mergedResponse),
                throttledNext(JSON.parse(JSON.stringify(mergedResponse)));
            } catch (error) {
              observer.error('Failed to merge deferred response in');
            }
          }
        }

        boundary = streamBuffer.indexOf('\n');
      }

      readStream(
        reader,
        decoder,
        streamBuffer,
        cumulativeResponse,
        observer,
        throttledNext,
      );
    })
    .catch((error) => {
      observer.error(error);
    });
}

// mergeDeferredResponses is a helper function that merges deferred data into the existing response according to the path provided
function mergeDeferredResponses(existing: any, incoming: any, path: any) {
  const mergedData = { ...existing.data };

  let cursor = mergedData;
  for (let i = 0; i < path.length - 1; i++) {
    cursor = cursor[path[i]];
  }

  const lastPath = path[path.length - 1];
  cursor[lastPath] = {
    ...cursor[lastPath],
    ...incoming.data,
  };

  return {
    ...existing,
    data: mergedData,
  };
}
