// Copyright 2020-2024 Luminary Cloud, Inc. All Rights Reserved.
import { Buffer } from 'buffer';

import { Code } from '@connectrpc/connect';
import { Attributes, SpanStatusCode, trace } from '@opentelemetry/api';
import * as hashwasm from 'hash-wasm';

import * as basepb from '../../proto/base/base_pb';
import * as filepb from '../../proto/file/file_pb';
import * as uploadpb from '../../proto/upload/upload_pb';
import { UploadProgress } from '../UploadProgress';
import { getLcUserId } from '../jwt';
import { formatNumber } from '../number';
import { Logger } from '../observability/logs';
import * as random from '../random';
import * as rpc from '../rpc';
import * as status from '../status';

import * as upload from './uploadUtils';
import { createZipFileBlob } from './uploadUtils';

const logger = new Logger('uploadFile');

const tracer = trace.getTracer('UploadFile');

export class FileUploader {
  private isUploading = false;

  constructor() { }

  start() {
    this.isUploading = true;
  }

  cancel() {
    this.isUploading = false;
  }

  // Uploads a list of files/paths by converting it into a zip file.
  async uploadFiles(
    projectID: string,
    files: File[],
    paths: string[],
    scaling: number,
    disconnect: boolean,
    onProgress: (progress: UploadProgress) => void,
    /**
     * The mesh type being uploaded.
     * If the type is unknown or it is not a mesh, then this is undefined
     */
    meshType?: uploadpb.MeshType,
    /** True if the mesh converter should use read zones for OpenFOAM mesh */
    readZones: boolean = true,
  ): Promise<upload.UploadResult> {
    this.isUploading = true;

    // Extract dir name if possible.
    let dirName: string;
    try {
      const result = upload.splitPath(paths[0]);
      dirName = result.dirName;
    } catch (err) {
      // This happens when uploading a list of files without a directory.
      dirName = `upload-${random.string(32)}`;
    }

    let dirSize = 0;
    files.forEach((file) => {
      dirSize += file.size;
    });

    // Span for file upload time metrics
    const uploadFileSpan = tracer.startSpan('upload dir time');
    const userId = getLcUserId();
    const uploadMethod = 'resumable';
    const attributes: Attributes = {
      projectID,
      userId,
      fileSize: dirSize,
      fileName: dirName,
      method: uploadMethod,
    };
    uploadFileSpan.setAttributes(attributes);
    logger.info(`Upload start for dir: ${dirName},
      fileSize: ${dirSize}, method: ${uploadMethod}, projectID: ${projectID}`);

    logger.info(`Upload dir start: ${files.length} files`);

    const fileHashProgressStr = 'Preparing directory';
    onProgress({ done: false, progress: 0, message: fileHashProgressStr });
    const checksum = await this.filesChecksum(files, paths, dirSize, (fraction: number) => {
      const progressStr = formatNumber(fraction * 100, { numDecimals: 0 });
      onProgress({ done: false, progress: 0, message: `${fileHashProgressStr} (${progressStr}%)` });
    });
    try {
      if (!checksum) {
        throw Error('canceled');
      }

      const zipFileSize = checksum.fileSize;

      onProgress({ done: false, progress: 0.1 * upload.GCS_CHUNK_SIZE / zipFileSize });

      const meshParams = new uploadpb.MeshParams({
        scaling,
        disconnect,
        meshType,
        doNotReadZonesOpenfoam: !readZones,
      });
      const resourceParams = new uploadpb.ResourceParams({
        params: {
          case: 'meshParams',
          value: meshParams,
        },
      });

      const fileMeta = new filepb.FileMetadata({
        ext: 'zip',
        name: dirName,
        size: BigInt(zipFileSize),
        sha256Checksum: checksum.sha256Digest.algorithm.value as Uint8Array,
        crc32cChecksum: checksum.crc32cDigest,
      });
      const createUploadReq = new uploadpb.CreateUploadRequest({
        fileMeta,
        projectId: projectID,
        resourceParams,
      });

      // Create upload.
      const createUploadReply = await rpc.callRetry(
        'CreateUpload',
        rpc.client.createUpload,
        createUploadReq,
      );
      const uploadId = createUploadReply.upload!.id;

      // Begin uploading zip file in chunks
      const opts: upload.resumableUploaderStreamOpts = {
        uploadId,
        files,
        paths,
        zipFileSize,
        onProgress,
        isUploading: () => this.isUploading,
        logger,
      };

      try {
        await upload.resumableUploaderStream(opts);
      } catch (error) {
        if (status.getCode(error) !== Code.Unimplemented) {
          throw error;
        }

        // Use the simple method by creating a blob on the browser and uploading that blob.
        const blob = await createZipFileBlob(files, paths);
        const zipFile = await new File([blob], 'archive.zip', { type: 'application/zip' });
        const optsSimple: upload.uploaderOpts = {
          uploadId,
          file: zipFile,
          onProgress,
          isUploading: () => this.isUploading,
          logger,
        };
        await upload.simpleUploader(optsSimple);
      }

      // Finish upload.
      const finishUploadReq = new uploadpb.FinishUploadRequest({
        uploadId,
      });
      const finishUploadReply = await rpc.callRetry(
        'FinishUpload',
        rpc.client.finishUpload,
        finishUploadReq,
      );

      logger.info(`Upload complete for dir: ${dirName}, fileSize: ${dirSize},
      method: ${uploadMethod}, projectID: ${projectID}, zipFileSize: ${zipFileSize}`);
      return {
        url: finishUploadReply.url,
        conversion: upload.convertMeshConversion(finishUploadReply.conversion),
      };
    } catch (err) {
      uploadFileSpan.setStatus({ code: SpanStatusCode.ERROR });
      logger.error('Directory upload failed: ', err);
      throw err;
    } finally {
      uploadFileSpan.end();
      onProgress({ done: true, progress: 0 });
    }
  }

  /**
   * uploadFile uploads a file via the RPC path:
   * CreateUpload -> StartUpload -> (UploadData) -> FinishUpload
   * Attempts to use METHOD_GCS_RESUMABLE, but if it is not
   * supported, falls back to METHOD_SIMPLE.
   * It returns the uploaded file URL and conversion status.
  */
  async uploadFile(
    projectID: string,
    file: File,
    scaling: number,
    disconnect: boolean,
    onProgress: (status: UploadProgress) => void,
    /**
     * The mesh type being uploaded.
     * If the type is unknown or it is not a mesh, then this is undefined
     */
    meshType?: uploadpb.MeshType,
    /** True if the mesh converter should use read zones for OpenFOAM mesh */
    readZones: boolean = true,
  ): Promise<upload.UploadResult> {
    this.start();
    // Span for file upload time metrics
    const uploadFileSpan = tracer.startSpan('upload file time');
    const userId = getLcUserId();
    const uploadMethod = 'resumable';
    const attributes: Attributes = {
      projectID,
      userId,
      fileSize: file.size,
      fileName: file.name,
      method: uploadMethod,
    };
    uploadFileSpan.setAttributes(attributes);

    logger.info(`Upload start for file: ${file.name},
    fileSize: ${file.size}, method: ${uploadMethod}, projectID: ${projectID} `);
    const fileSize = file.size;
    const fileHashProgressStr = 'Preparing file';
    // NOTE: sha256Sum can potentially be a long operation depending on the file size. Make sure
    // that we report some progress to the users.
    onProgress({ done: false, progress: 0, message: fileHashProgressStr });
    const checksum = await this.fileChecksum(file, (fraction: number) => {
      const progressStr = formatNumber(fraction * 100, { numDecimals: 0 });
      onProgress({ done: false, progress: 0, message: `${fileHashProgressStr} (${progressStr}%)` });
    });
    try {
      if (!checksum) {
        throw Error('canceled');
      }

      onProgress({
        done: false, progress: 0.1 * upload.GCS_CHUNK_SIZE / fileSize,
      });

      const resourceParams = new uploadpb.ResourceParams({
        params: upload.isMeshFile(file.name) ?
          {
            case: 'meshParams',
            value: { scaling, disconnect, meshType, doNotReadZonesOpenfoam: !readZones },
          } :
          {
            // geometry or other file
            case: 'geometryParams',
            value: {},
          },
      });

      const createUploadReq = new uploadpb.CreateUploadRequest({
        fileMeta: upload.fileToFileMeta(
          file,
          checksum.sha256Digest.algorithm.value as Uint8Array,
          checksum.crc32cDigest,
        ),
        projectId: projectID,
        resourceParams,
      });

      // Create upload.
      const createUploadReply = await rpc.callRetry(
        'CreateUpload',
        rpc.client.createUpload,
        createUploadReq,
      );
      const uploadId = createUploadReply.upload!.id;

      // Initialize resumable upload and upload file chunks
      const opts: upload.uploaderOpts = {
        uploadId,
        file,
        onProgress,
        isUploading: () => this.isUploading,
        logger,
      };

      try {
        await upload.resumableUploader(opts);
      } catch (error) {
        if (status.getCode(error) !== Code.Unimplemented) {
          throw error;
        }
        // If resumable upload is unimplemented, should mean we need to use
        // simple upload.
        await upload.simpleUploader(opts);
      }

      // Finish upload.
      const finishUploadReq = new uploadpb.FinishUploadRequest({
        uploadId,
      });
      const finishUploadReply = await rpc.callRetry(
        'FinishUpload',
        rpc.client.finishUpload,
        finishUploadReq,
      );
      logger.info(`Upload complete for file: ${file.name}, fileSize: ${file.size},
    method: ${uploadMethod}, projectID: ${projectID}`);
      return {
        url: finishUploadReply.url,
        conversion: upload.convertMeshConversion(finishUploadReply.conversion),
      };
    } catch (err) {
      uploadFileSpan.setStatus({ code: SpanStatusCode.ERROR });
      logger.error('Upload failed: ', err);
      throw err;
    } finally {
      uploadFileSpan.end();
      onProgress({ done: true, progress: 0 });
    }
  }

  /**
     Compute the sha256 checksum of a file. Returns a basepb.Checksum object. Arg onProgress is used
    to inform the caller about the progress made in the hash computation. The arg fraction must be
    within 0 and 1.
  */
  async sha256File(
    file: File,
    onProgress?: (fraction: number) => void,
  ): Promise<basepb.Checksum | null> {
    const hasher = await hashwasm.createSHA256();
    const fileSize = file.size;
    for (let off = 0; off < fileSize; off += upload.HASH_CHUNK_SIZE) {
      const endOff = Math.min(off + upload.HASH_CHUNK_SIZE, fileSize);
      const data = await upload.readFileAsync(file, off, endOff);
      if (!this.isUploading) {
        return null;
      }
      hasher.update(new Uint8Array(data));
      if (onProgress) {
        onProgress(endOff / file.size);
      }
    }
    const proto = new basepb.Checksum({
      algorithm: {
        case: 'sha256',
        value: hasher.digest('binary'),
      },
    });
    return proto;
  }

  // Compute the sha256 and crc32c checksum of a file.
  async fileChecksum(
    file: File,
    onProgress?: (fraction: number) => void,
  ): Promise<upload.ChecksumResult | null> {
    const hasher = await hashwasm.createSHA256();
    const crc32c = await hashwasm.createCRC32C();
    const fileSize = file.size;
    for (let off = 0; off < fileSize; off += upload.HASH_CHUNK_SIZE) {
      const endOff = Math.min(off + upload.HASH_CHUNK_SIZE, fileSize);
      const data = await upload.readFileAsync(file, off, endOff);
      const dataArr = new Uint8Array(data);
      if (!this.isUploading) {
        return null;
      }
      hasher.update(dataArr);
      crc32c.update(dataArr);
      if (onProgress) {
        onProgress(endOff / file.size);
      }
    }
    const proto = new basepb.Checksum({
      algorithm: {
        case: 'sha256',
        value: hasher.digest('binary'),
      },
    });
    return {
      sha256Digest: proto,
      crc32cDigest: Buffer.from(crc32c.digest('hex'), 'hex').toString('base64'),
    };
  }

  // Compute the sha256 and crc32c checksum of a zip file created by a list of files.
  // Returns the size of the zip file.
  async filesChecksum(
    files: File[],
    paths: string[],
    totalFilesSize: number,
    onProgress?: (fraction: number) => void,
  ): Promise<upload.ChecksumResultStream | null> {
    const hasher = await hashwasm.createSHA256();
    const crc32c = await hashwasm.createCRC32C();
    const transformStream = new TransformStream<Uint8Array, Uint8Array>(
      undefined,
      new ByteLengthQueuingStrategy({ highWaterMark: upload.GCS_CHUNK_SIZE * 2 }),
      new ByteLengthQueuingStrategy({ highWaterMark: upload.GCS_CHUNK_SIZE * 2 }),
    );

    // Write the zip chunks to transformStream.writable
    // eslint-disable-next-line @typescript-eslint/no-floating-promises
    upload.zipChunks(files, paths, transformStream);

    // Read from transformStream.readable as chunks are available

    // Num combined chunks read from reader.
    let i = 0;
    const reader = transformStream.readable.getReader();
    // Size of output zip file
    let zipFileSize = 0;
    // Buffer to store chunks from reader
    let buffer: Uint8Array[] = [];
    // Size of all chunks in buffer in bytes
    let bufferSize = 0;

    for (; ;) {
      if (!this.isUploading) {
        await reader.cancel();
        return null;
      }

      const { done, value } = await reader.read();
      if (!done) {
        bufferSize += value.byteLength;
        buffer.push(value);
      }

      // Process chunks if chunkBufSize is large enough,
      // or reading is done and buffer is not empty.
      if (done || bufferSize >= upload.GCS_CHUNK_SIZE) {
        if (bufferSize) {
          const chunk = new Uint8Array(bufferSize);

          // Combine buffer into a single chunk
          let size = 0;
          buffer.forEach((arr) => {
            chunk.set(arr, size);
            size += arr.length;
          });

          i += 1;
          logger.info(`read chunk #${i}`);

          hasher.update(chunk);
          crc32c.update(chunk);
          zipFileSize += bufferSize;
          if (onProgress) {
            onProgress(zipFileSize / totalFilesSize);
          }
          // reset buffer
          buffer = [];
          bufferSize = 0;
        }
        if (done) {
          logger.info('Finished reading zip chunks');
          break;
        }
      }
    }

    const proto = new basepb.Checksum({
      algorithm: {
        case: 'sha256',
        value: hasher.digest('binary'),
      },
    });
    return {
      sha256Digest: proto,
      crc32cDigest: Buffer.from(crc32c.digest('hex'), 'hex').toString('base64'),
      fileSize: zipFileSize,
    };
  }
}
