// Copyright 2024 Luminary Cloud, Inc. All Rights Reserved.
import { Timestamp } from '@bufbuild/protobuf';
import { ConnectError } from '@connectrpc/connect';
import { SetStateAction, atom } from 'jotai';
import { atomFamily } from 'jotai/utils';

import { getValueFromSetStateAction } from '../../lib/jotai';
import { Logger } from '../../lib/observability/logs';
import { setState } from '../../lib/persist';
import { isProjectAccessDeniedError, isProjectDeletedError } from '../../lib/projectUtilsErrors';
import { getSessionStoreId } from '../../lib/recoilSync';
import * as rpc from '../../lib/rpc';
import SessionStateRpcQueue from '../../lib/rpcQueue/SessionStateRpcQueue';
import { isStorybookEnv, isTestingEnv } from '../../lib/testing/utils';
import { addRpcError } from '../../lib/transientNotification';
import * as frontendpb from '../../proto/frontend/frontend_pb';
import { selectedGeometryKey } from '../../recoil/selectedGeometryKey';
import { routeParamsState } from '../../recoil/useRouteParams';

// #region: Constants

const logger = new Logger('sessionState.ts');

const rpcQueueMap: { [key: string]: SessionStateRpcQueue } = {};

const rpcPool = new rpc.StreamingRpcPool<
  frontendpb.SessionStateRequest,
  frontendpb.SessionStateStreamReply
>('SessionStateStream', rpc.client.sessionStateStream);

// #region: Helper Functions

/**
 * @returns the timestamp's time value
 */
function timestampToTime(timestamp: Timestamp) {
  return timestamp.toDate().getTime();
}

// #region: Interfaces

export interface ProjectStateEntry {
  /** The value of this state */
  value?: Uint8Array;
  /** The time when this value was last updated */
  timestamp: Timestamp;
}

/** A mapping from state keys to their value */
export interface SyncedAtomData {
  [key: string]: ProjectStateEntry;
}

// #region: Internal Atoms

/**
 * A base atom that fetches the session state from the backend on mount.
 *
 * If in a testing or storybook environment, this atom will not fetch the session state from the
 * backend and instead use the default value.
 *
 * @param projectId the project ID associated with the session store
 * @param key the key for this atom
 * @param defaultValue the default value for this atom for testing
 * @param deserialize deserialize the value from a Uint8Array
 * @returns
 */
function atomWithGetSessionState<T>(
  projectId: string,
  keys: string[],
  defaultValue: T,
  deserialize: (val: Uint8Array, index: number) => T,
) {
  let initialized = false;
  const baseAtom = atom(defaultValue);
  if (isTestingEnv() || isStorybookEnv()) {
    return baseAtom;
  }
  baseAtom.onMount = (setAtom) => {
    if (initialized) {
      // onMount is called every time the atom is "subscribed for the first time", NOT when the atom
      // is initially created. This means when the atom is unsubscribed by everyone and then
      // resubscribed, the onMount will be called again. Therefore, we must keep track of whether
      // the atom has been initialized in order to not clobber the existing state.
      return;
    }
    initialized = true;
    let rpcQueue = rpcQueueMap[`rpcQueue/${projectId}`];
    if (!rpcQueue) {
      const request = new frontendpb.SessionStateRequest({
        projectId,
      });
      rpcQueue = new SessionStateRpcQueue(request);
      rpcQueueMap[`rpcQueue/${projectId}`] = rpcQueue;
    }
    rpcQueue.start(keys, (values) => {
      for (let i = 0; ; i += 1) {
        const data = values[i];
        if (!(data instanceof Uint8Array)) {
          throw Error(`cannot parse values=${values} i=${i} ${data} as Uint8Array`);
        }
        if (data.length || i >= values.length - 1) {
          setAtom(deserialize(data, i));
          return;
        }
      }
    }, keys.join(','));
  };
  return baseAtom;
}

function atomWithSessionStream(projectId: string, geometryId?: string) {
  const sessionStoreId = getSessionStoreId(projectId);
  const baseSyncAtom = atom<SyncedAtomData>({});
  if (isTestingEnv() || isStorybookEnv()) {
    return baseSyncAtom;
  }
  baseSyncAtom.onMount = (setAtom) => {
    const stopStream = rpcPool.start(
      sessionStoreId,
      () => new frontendpb.SessionStateRequest({ projectId }),
      (reply) => {
        if (geometryId) {
          // We do not want to listen to kvstore updates in the geometry tab since the geometry tab
          // does not use the kvstore.
          return;
        }
        reply.entry.forEach((entry) => {
          setAtom((prev) => {
            const prevEntry = prev[entry.key];
            if (
              !prevEntry || (prevEntry && entry.timestamp && prevEntry.value !== entry.value &&
              timestampToTime(prevEntry.timestamp) < timestampToTime(entry.timestamp))
            ) {
              return { ...prev, [entry.key]: { value: entry.value!, timestamp: entry.timestamp! } };
            }
            logger.info(`debugging LC-15045 ignoring ${entry.key}`);
            return prev;
          });
        });
      },
      (err: ConnectError) => {
        if (!isProjectAccessDeniedError(err.message) && !isProjectDeletedError(err.message)) {
          addRpcError('Failed to get session state', err);
        }
      },
    );
    return stopStream;
  };
  return baseSyncAtom;
}

/** A family of Session State streams */
const syncedProjectAtomFamily = atomFamily(
  (projectId: string) => atomWithSessionStream(projectId),
);
// TODO: Harry - add a way to garbage collect old streams

// #region Session State Atoms

/**
 * An atom that syncs with the session state kv store.
 *
 * When the atom is mounted, it fetches the initial data from the backend. When the atom is updated,
 * an effect writes the data to the backend using the key.
 *
 * If in a testing or storybook environment, this atom will not sync the session state with the
 * backend and instead use the provided default value.
 *
 * @param projectId the project ID associated with the session store
 * @param key the key for this atom
 * @param defaultValue the default value for this atom for testing
 * @param serialize serialize the value to a Uint8Array
 * @param deserialize deserialize the value from a Uint8Array
 * @returns
 */
export function atomWithInitialSessionValue<T>(
  projectId: string,
  key: string,
  defaultValue: T,
  serialize: (val: T) => Uint8Array,
  deserialize: (val: Uint8Array, index: number) => T,
) {
  const baseAtom = atomWithGetSessionState(projectId, [key], defaultValue, deserialize);
  const valueAtom = atom(
    (get) => get(baseAtom),
    (get, set, arg: SetStateAction<T>) => {
      const newValue = getValueFromSetStateAction(get, arg, baseAtom);
      const data = serialize(newValue);
      const geometryId = get(routeParamsState).geometryId;
      // When geometryId is defined we only allow selectedGeometryKey to be set. This is because
      // that's the kvstore pair that allows us to perform the liason between the setup and geometry
      // tabs.
      if (geometryId && key !== selectedGeometryKey) {
        return;
      }
      const oldProjectState = get(syncedProjectAtomFamily(projectId))[key];
      const now = new Date(Date.now());
      if (
        !oldProjectState || (oldProjectState && oldProjectState.value !== data &&
           timestampToTime(oldProjectState.timestamp) < now.getTime())
      ) {
        set(syncedProjectAtomFamily(projectId), (prev) => ({
          ...prev,
          [key]: {
            value: data,
            timestamp: Timestamp.fromDate(now),
          },
        }));
        if (!isTestingEnv() && !isStorybookEnv()) {
          setState(projectId, key, data);
        }
      }
      set(baseAtom, newValue);
    },
  );
  return valueAtom;
}

/**
 * An atom that syncs with the session state kv store and listens for changes.
 *
 * When the atom is mounted, it fetches the initial data from the backend. When the atom is updated,
 * an effect writes the data to the backend using the key. When the Session State stream sends an
 * update, the atom is updated with the new value.
 *
 * If in a testing or storybook environment, this atom will not sync the session state with the
 * backend and instead use the provided default value.
 *
 * @param projectId the project ID associated with the session store
 * @param key the key for this atom
 * @param defaultValue the default value for this atom for testing
 * @param serialize serialize the value to a Uint8Array
 * @param deserialize deserialize the value from a Uint8Array
 * @returns
 */
export function atomWithProjectSync<T>(
  projectId: string,
  key: string,
  defaultValue: T,
  serialize: (val: T) => Uint8Array,
  deserialize: (val: Uint8Array) => T,
) {
  const initialAtom = atomWithInitialSessionValue(
    projectId,
    key,
    defaultValue,
    serialize,
    deserialize,
  );
  const syncedValueAtom = syncedProjectAtomFamily(projectId);
  const valueAtom = atom(
    (get) => {
      const syncedValue = get(syncedValueAtom)[key];
      return syncedValue?.value !== undefined ? deserialize(syncedValue.value) : get(initialAtom);
    },
    (get, set, arg: SetStateAction<T>) => {
      set(initialAtom, getValueFromSetStateAction(get, arg, initialAtom));
    },
  );
  return valueAtom;
}
