import { Exchange, makeOperation, Operation } from "urql";

import {
  Source,
  fromValue,
  delay,
  mergeMap,
  merge,
  share,
  filter,
  pipe,
  fromArray,
  take,
  onPush,
  takeWhile,
} from "wonka";

function race<T>(streams: Array<Source<T>>) {
  let selected$: Source<T> | null = null;

  const makeRace = (stream$: Source<T>) =>
    pipe(
      stream$,
      takeWhile(() => {
        return selected$ === null || selected$ === stream$;
      }),
      onPush(() => {
        selected$ = stream$;
      })
    );

  return merge(
    streams.map((stream$) => {
      return makeRace(stream$);
    })
  );
}

interface DebounceExchangeOptions {
  debounceDuration?: number; // Custom debounce duration
  filterFn?: (op: Operation) => boolean; // Custom filter function
}

export const debounceExchange = (
  options: DebounceExchangeOptions = {}
): Exchange => {
  const { debounceDuration = 1000, filterFn = (op) => op.kind === "mutation" } =
    options;

  return ({ client, forward }) => {
    const debounceRegistry = new Map<number, Operation[]>();

    return (ops$) => {
      const sharedOps$ = share(ops$);

      const filteredOps$ = pipe(sharedOps$, filter(filterFn), share);

      const rest$ = pipe(
        sharedOps$,
        filter((op) => !filterFn(op))
      );

      const debouncedOps$ = pipe(
        filteredOps$,
        onPush((op) => {
          debounceRegistry.set(op.key, []);
        }),
        mergeMap((op) =>
          pipe(
            race([
              pipe(filteredOps$, take(1)),
              pipe(fromValue(null), delay(debounceDuration)),
            ]),
            mergeMap((debouncedBy: Operation | null) => {
              if (!debouncedBy || debouncedBy.key == null) return fromValue(op);

              // Since this request is debounced, anything that got debounced due
              // to this request must be "reparented" to the new primary mutation
              const debounced = debounceRegistry.get(op.key) ?? [];
              debounceRegistry.get(debouncedBy.key)?.push(...debounced, op);

              // Don't forget to tell the client that noone cares about this query
              // anymore; otherwise urql seems to keep it inside a sort of cache
              // that prevents similar mutations from properly firing.
              client.reexecuteOperation(makeOperation(`teardown`, op));

              return fromArray([]);
            })
          )
        )
      );

      const results$ = pipe(merge([debouncedOps$, rest$]), forward);

      return pipe(
        results$,
        mergeMap((result) => {
          const debounced = debounceRegistry.get(result.operation.key);
          if (typeof debounced === "undefined") return fromValue(result);

          debounceRegistry.delete(result.operation.key);

          // For each query that got debounced, we now return the primary
          // result. This way all queries will be marked as fulfilled.
          return fromArray([
            ...debounced.map((operation) => ({
              ...result,
              operation,
            })),
            result,
          ]);
        })
      );
    };
  };
};
