// Copyright 2023-2024 Luminary Cloud, Inc. All Rights Reserved.

import { isProjectAccessDeniedError, isProjectDeletedError } from '../projectUtilsErrors';
import { addRpcError } from '../transientNotification';

/**
 * Max number of requests that can be queued before they are
 * force-flushed.
 */
const MAX_QUEUED_REQUESTS = 50;
/** Amount of time to wait before flushing queued requests. */
const BATCH_DELAY_MS = 50;

/** A queue entry. Contains data and handling function for one request. */
interface RpcQueueEntry<ReqChunkType, ReplyChunkType> {
  /**  Request data chunks for this rpc queue entry */
  reqChunks: ReqChunkType[];
  /**  Functions to be called when the reply for this rpc queue entry is ready. This is a list
   * because the same data can possible be request during the batching window and while the request
   * is in flight.
   */
  onReplies: ((replyChunks: ReplyChunkType[]) => void)[];
  /**
   * The range this request occupies within the batched RPC. Initially [-1, -1],
   * and will be filled just before the batched RPC starts.
   */
  requestStartIndex: number; /** inclusive */
  requestLimitIndex: number; /** exclusive */
}

interface RequestType {
  clone(): RequestType
}

/** Manages a list of SetSessionState calls that can be batched into one RPC. */
export abstract class RpcQueue<ReqChunkType, Request extends RequestType, ReplyChunkType, Reply> {
  /** Debounce timer. */
  private timerId: ReturnType<typeof setTimeout> | null = null;
  /** Queue of requests stored in map keyed by an identifier */
  private queue: { [key: string]: RpcQueueEntry<ReqChunkType, ReplyChunkType> } = {};
  /** Total requests in the queue. */
  private nQueuedRequests: number = 0;

  /** Packs a list of chunks into the request
   * @param reqChunks chunks to insert into the request
   * @param req request where the requests should be added to
  */
  abstract packData(reqChunks: ReqChunkType[], req: Request): void;
  /** Returns the current number of chunks in the request
   * @param req request of this queue
   * @returns the number of chunks in the request
  */
  abstract getDataSize(req: Request): number;
  /** Send the rpc request
   * @param req request of this queue
   * @returns promise for the reply
  */
  abstract sendRequest(req: Request): Promise<Reply>;
  /** Unpack a chunk from the reply
   * @param index global index of the chunk in the request
   * @returns the reply chunk
  */
  abstract unpackData(index: number, reply: Reply): ReplyChunkType;
  /** Clear the data in the request
   * @param req request of this queue
  */
  abstract clearData(req: Request): void;

  /**
  * @param request template request that must contain all data common to requests in this queue
  * @param batchDelayMs amount of time to wait before flushing queued requests.
  */
  constructor(

    private request: Request,
    private batchDelayMs = BATCH_DELAY_MS,
  ) { }

  /**
   * Start a session state request. The request will be queued for a while, with
   * a hope that it can be batched with similar requests that arrive in the future.
   * @param reqChunks chunks for this request
   * @param onReply function called when a reply is received
   */
  public start(
    reqChunks: ReqChunkType[],
    onReply: (replyChunk: ReplyChunkType[]) => void,
    reqChunksKey: string,
  ) {
    const queueEntry = this.queue[reqChunksKey];
    // If the request was already queued, add the onReply function to the list of
    // functions to be called when the reply is ready.
    if (queueEntry) {
      queueEntry.onReplies.push(onReply);
      return;
    }

    // Otherwise add a new entry to the queue and start the timer.
    this.queue[reqChunksKey] = {
      reqChunks,
      onReplies: [onReply],
      requestStartIndex: -1,
      requestLimitIndex: -1,
    };
    this.nQueuedRequests += reqChunks.length;

    // Delay starting a request until either:
    //
    // (1) >= MAX_QUEUED_REQUESTS requests are queued. We bound the queue size,
    // since grpc has a limit on the request & response payload size (16MiB ?).
    //
    // (2) no new request arrives for BATCH_DELAY_MS.
    if (this.timerId) {
      clearTimeout(this.timerId);
      this.timerId = null;
    }
    if (this.nQueuedRequests > MAX_QUEUED_REQUESTS) {
      this.flush();
    } else {
      this.timerId = setTimeout(() => {
        this.timerId = null;
        this.flush();
      }, this.batchDelayMs);
    }
  }

  /**
    * Start a batched RPC.
    *
    * REQUIRES: |this.queue| > 0.
    *
    */
  private flush() {
    this.timerId = null;
    const req = this.request.clone() as Request;

    const queue = Object.values(this.queue);
    queue.forEach((entry) => {
      entry.requestStartIndex = this.getDataSize(req);
      this.packData(entry.reqChunks, req);
      entry.requestLimitIndex = this.getDataSize(req);
    });
    this.queue = {};
    this.nQueuedRequests = 0;

    this.sendRequest(req).then(
      (reply: Reply) => {
        queue.forEach((entry) => {
          const replyDataChunks: ReplyChunkType[] = [];
          for (let i = entry.requestStartIndex; i < entry.requestLimitIndex; i += 1) {
            replyDataChunks.push(this.unpackData(i, reply));
          }
          entry.onReplies.forEach((onReply) => onReply(replyDataChunks));
        });
      },
    ).catch((err: Error) => {
      if (!isProjectAccessDeniedError(err.message) && !isProjectDeletedError(err.message)) {
        addRpcError('Failed to retrieve session state', err);
      }
      console.warn(`Session state error: ${err}`);
    });
    this.clearData(this.request);
  }
}
