TTudor BrindusUse OxCaml
11c9f605创建于 2025年12月12日历史提交
open! Core

let direct_file_destination ?(buffer_size = 4096 * 16) ~filename () =
  let buf = Iobuf.create ~len:buffer_size in
  let file = Core_unix.openfile ~mode:[ O_CREAT; O_TRUNC; O_RDWR ] filename in
  let written = ref 0 in
  let flush () =
    Iobuf.rewind buf;
    Iobuf.advance buf !written;
    Iobuf.flip_lo buf;
    Iobuf_unix.write buf file;
    written := 0;
    Iobuf.reset buf
  in
  let module Dest = struct
    let next_buf ~ensure_capacity =
      flush ();
      if ensure_capacity > Iobuf.length buf
      then failwith "Not enough buffer space in [direct_file_destination]";
      buf
    ;;

    let wrote_bytes count = written := !written + count

    let close () =
      flush ();
      Core_unix.close file
    ;;
  end
  in
  (module Dest : Writer_intf.Destination)
;;

(* While Zstandard has the best compression, perfetto does not yet understand the format. *)
let zstd_file_destination ?(buffer_size = 64 * 1024) ~filename () =
  let buf = Iobuf.create ~len:buffer_size in
  let compression_level = 5 in
  (* Ensure the compression buffer is large enough for the worst case of an input of
     [buffer_size]. *)
  let compressed_buf =
    let len =
      buffer_size
      |> Int64.of_int
      |> Zstandard.compression_output_size_bound
      |> Int64.to_int_exn
    in
    Iobuf.create ~len
  in
  let file = Core_unix.openfile ~mode:[ O_CREAT; O_TRUNC; O_CLOEXEC; O_RDWR ] filename in
  let written = ref 0 in
  let compression_context = Zstandard.Compression_context.create () in
  let flush () =
    Iobuf.rewind buf;
    Iobuf.advance buf !written;
    Iobuf.flip_lo buf;
    let input =
      Zstandard.Input.from_bigstring
        ~pos:(Iobuf.Expert.lo buf)
        ~len:(Iobuf.length buf)
        (Iobuf.Expert.buf buf)
    in
    let output =
      Zstandard.Output.in_buffer
        ~pos:(Iobuf.Expert.lo compressed_buf)
        ~len:(Iobuf.length compressed_buf)
        (Iobuf.Expert.buf compressed_buf)
    in
    let compressed_length =
      Zstandard.With_explicit_context.compress
        compression_context
        ~compression_level
        ~input
        ~output
    in
    Iobuf.advance compressed_buf compressed_length;
    Iobuf.flip_lo compressed_buf;
    Iobuf_unix.write compressed_buf file;
    written := 0;
    Iobuf.reset buf;
    Iobuf.reset compressed_buf
  in
  let module Dest = struct
    let next_buf ~ensure_capacity =
      flush ();
      if ensure_capacity > Iobuf.length buf
      then failwith "Not enough buffer space in [zstd_file_destination]";
      buf
    ;;

    let wrote_bytes count = written := !written + count

    let close () =
      flush ();
      Zstandard.Compression_context.free compression_context;
      Core_unix.close file
    ;;
  end
  in
  (module Dest : Writer_intf.Destination)
;;

let gzip_file_destination ?(buffer_size = 64 * 1024) ~filename () =
  let buf = Iobuf.create ~len:buffer_size in
  let bytes = Bytes.create buffer_size in
  let file = Core_unix.openfile ~mode:[ O_CREAT; O_TRUNC; O_CLOEXEC; O_RDWR ] filename in
  let out_channel =
    let oc = Core_unix.out_channel_of_descr file in
    (* Consider making the compression level an environment variable for
       experimentation. *)
    Gzip.open_out_chan ~level:6 oc
  in
  let written = ref 0 in
  let flush () =
    Iobuf.rewind buf;
    Iobuf.advance buf !written;
    Iobuf.flip_lo buf;
    Iobuf.Peek.To_bytes.blit
      ~src:(Iobuf.read_only buf) ~src_pos:0 ~dst:bytes ~dst_pos:0 ~len:!written;
    Gzip.output out_channel bytes 0 !written;
    written := 0;
    Iobuf.reset buf;
  in
  let module Dest = struct
    let next_buf ~ensure_capacity =
      flush ();
      if ensure_capacity > Iobuf.length buf
      then failwith "Not enough buffer space in [gzip_file_destination]";
      buf
    ;;

    let wrote_bytes count = written := !written + count

    let close () =
      flush ();
      (* [close_out] also closes the underlying file descr. *)
      Gzip.close_out out_channel
    ;;
  end
  in
  (module Dest : Writer_intf.Destination)
;;

let file_destination ?(file_format = Writer_intf.File_format.Uncompressed) ~filename () =
  match file_format with
  | Uncompressed -> direct_file_destination ~filename ()
  | Gzip -> gzip_file_destination ~filename ()
  | Zstandard -> zstd_file_destination ~filename ()
;;

let iobuf_destination buf =
  (* We give out an [Iobuf] with a shared underlying [Bigstring] but different pointers
     so that when this is closed the provided buffer keeps its window, and we can test
     the [Buffer_until_initialized] feature to ignore writes after close.

     This also ensures our logic works when the window of [buf] is narrower than the
     limits because [sub_shared] leads to a buffer with equal window and limits. *)
  let provided_buf = Iobuf.sub_shared buf in
  let total_written = ref 0 in
  (* Ensure we update the length of the buffer based on [wrote_bytes]
     and thus test that things are calling it correctly. *)
  let set_cur_length () =
    Iobuf.rewind provided_buf;
    Iobuf.advance provided_buf !total_written
  in
  let module Dest = struct
    (* [next_buf] can be called multiple times even without running out of room, for
       example via [Writer.Expert.force_switch_buffers]. But we can just keep giving back
       the same buffer as long as it has room *)
    let next_buf ~ensure_capacity =
      if ensure_capacity > Iobuf.length provided_buf
      then failwith "No more room in [iobuf_destination]";
      provided_buf
    ;;

    let wrote_bytes count =
      total_written := !total_written + count;
      set_cur_length ()
    ;;

    let close () =
      set_cur_length ();
      (* Now that it's closed we set the bounds on the buffer we were originally given to
         match the total length of everything written. *)
      Iobuf.resize buf ~len:!total_written
    ;;
  end
  in
  (module Dest : Writer_intf.Destination)
;;

let black_hole_destination ~len ~touch_memory =
  let buf = Iobuf.create ~len in
  if touch_memory then Iobuf.zero buf;
  let module Dest = struct
    let next_buf ~ensure_capacity =
      Iobuf.reset buf;
      if ensure_capacity > Iobuf.length buf
      then failwith "Record too large for [black_hole_destination]";
      buf
    ;;

    let wrote_bytes _count = ()
    let close () = ()
  end
  in
  (module Dest : Writer_intf.Destination)
;;

(* A [Destination] which keeps buffers it gives out in a list and is able to write the
   contents of those buffers to another [Destination]. *)
module Temp_buffer : sig
  type t =
    { copy_to : (module Writer_intf.Destination) -> unit
    ; dest : (module Writer_intf.Destination)
    }

  val create : unit -> t
end = struct
  type t =
    { copy_to : (module Writer_intf.Destination) -> unit
    ; dest : (module Writer_intf.Destination)
    }

  type internal =
    { mutable buffers : (read_write, Iobuf.seek, Iobuf.global) Iobuf.t list
    ; mutable written_in_cur_buf : int
    }

  let create () =
    let t = { buffers = []; written_in_cur_buf = 0 } in
    let module Dest = struct
      let next_buf ~ensure_capacity =
        let capacity = Int.max ensure_capacity 1_000 in
        let buf = Iobuf.create ~len:capacity in
        t.buffers <- buf :: t.buffers;
        t.written_in_cur_buf <- 0;
        buf
      ;;

      let wrote_bytes count =
        t.written_in_cur_buf <- t.written_in_cur_buf + count;
        let cur_buf = List.hd_exn t.buffers in
        (* Make sure the lo matches the total bytes written, the Writer probably already
           did this but we don't count on that. *)
        Iobuf.rewind cur_buf;
        Iobuf.advance cur_buf t.written_in_cur_buf
      ;;

      (* We have nowhere to flush to *)
      let close () = ()
    end
    in
    let dest = (module Dest : Writer_intf.Destination) in
    let copy_to (module D : Writer_intf.Destination) =
      let in_order_buffers = List.rev t.buffers in
      let out_buf = ref (D.next_buf ~ensure_capacity:0) in
      List.iter in_order_buffers ~f:(fun in_buf ->
        Iobuf.flip_lo in_buf;
        (* Core.print_s [%sexp (in_buf : (_, _) Iobuf.Window.Hexdump.Pretty.t)]; *)
        let in_buf_len = Iobuf.length in_buf in
        if Iobuf.length !out_buf < in_buf_len
        then out_buf := D.next_buf ~ensure_capacity:in_buf_len;
        Iobuf.Blit_fill.blito ~src:in_buf ~dst:!out_buf ();
        D.wrote_bytes in_buf_len);
      t.buffers <- [];
      t.written_in_cur_buf <- 0
    in
    { copy_to; dest }
  ;;
end

module Buffer_until_initialized = struct
  type state =
    | Buffering_to of Temp_buffer.t
    | Needs_transfer of
        { src : Temp_buffer.t
        ; dst : (module Writer_intf.Destination)
        }
    | Set of (module Writer_intf.Destination)

  type t = { mutable state : state }

  let create () =
    let temp_buffer = Temp_buffer.create () in
    { state = Buffering_to temp_buffer }
  ;;

  let set_destination t destination =
    match t.state with
    | Buffering_to temp_buffer ->
      (* We can't immediately do the transfer because the writer is still using the last
         buffer we gave it, so we need to wait for it to ask for a new buffer. *)
      t.state <- Needs_transfer { src = temp_buffer; dst = destination }
    | Needs_transfer _ | Set _ ->
      failwith "Tried to set Buffer_until_initialized which already had destination"
  ;;

  let to_destination t =
    let module Dest = struct
      let next_buf ~ensure_capacity =
        let (module D) =
          match t.state with
          | Needs_transfer { src; dst } ->
            src.copy_to dst;
            t.state <- Set dst;
            dst
          | Buffering_to temp_buffer -> temp_buffer.dest
          | Set d -> d
        in
        D.next_buf ~ensure_capacity
      ;;

      let wrote_bytes count =
        let (module D) =
          match t.state with
          | Buffering_to temp_buffer -> temp_buffer.dest
          | Needs_transfer { src; dst = _ } -> src.dest
          | Set d -> d
        in
        D.wrote_bytes count
      ;;

      let close () =
        let (module D) =
          match t.state with
          | Needs_transfer { src; dst } ->
            src.copy_to dst;
            dst
          | Buffering_to temp_buffer -> temp_buffer.dest
          | Set d -> d
        in
        D.close ();
        (* Make it so writes after closing will be gracefully ignored. *)
        t.state <- Set (black_hole_destination ~len:1024 ~touch_memory:false)
      ;;
    end
    in
    (module Dest : Writer_intf.Destination)
  ;;
end