Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compressable:Zstd comp support #4657

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Athishpranav2003
Copy link
Contributor

Which issue(s) this PR fixes:
Fixes #4162

What this PR does / why we need it:
Adds new compression method support to handle messages
Docs Changes:
TODO
Release Note:
N/A

@Athishpranav2003
Copy link
Contributor Author

@daipom @ashie need some comments on this since its a very new compression method. I tried to make sure the existing support is not broken and added this feature additionally. Need to perform some more work but wanted to get some comments from you guys

@Athishpranav2003 Athishpranav2003 marked this pull request as draft October 3, 2024 18:16
@Athishpranav2003 Athishpranav2003 force-pushed the zstd_comp_support branch 2 times, most recently from 6bf74ae to 0b79317 Compare October 3, 2024 19:04
@daipom daipom self-requested a review October 4, 2024 01:29
@daipom
Copy link
Contributor

daipom commented Oct 4, 2024

Thanks! I will see this this weekend!

@Athishpranav2003
Copy link
Contributor Author

@daipom did u get chance to look at this?

@daipom
Copy link
Contributor

daipom commented Oct 17, 2024

Sorry, I haven't made time for this. 😢
I'll make time for it this week.

@Athishpranav2003
Copy link
Contributor Author

Its fine i guess,
Even i was very busy in last 2 weeks 👍

@daipom
Copy link
Contributor

daipom commented Oct 21, 2024

@Athishpranav2003
I'm sorry for my late response. 😢

I have confirmed the entire direction!
It's great! Thanks for starting this enhancement!

Sorry I haven't made time to see the detailed implementation, such as Compressable module,
but the overall design of Fluentd would be essential for us now.
So, now, I comment on the overall direction of further modification.

It looks basically good as in_forward support!
All that is left is to support Output/Buffer/Chunk logic for output plugins. (Currently, these logic assumes gzip only)
To do so, it would be a good idea to support out_forward first.

The core logic would be the following.

  • EventStream#to_compressed_msgpack_stream
    • def to_compressed_msgpack_stream(time_int: false, packer: nil)
      packed = to_msgpack_stream(time_int: time_int, packer: packer)
      compress(packed)
      end
  • Output#generate_format_proc
    • FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
      def generate_format_proc
      if @buffer && @buffer.compress == :gzip
      @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM
      else
      @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM
      end
      end
  • Chunk::Decompressable
    • module Decompressable
      include Fluent::Plugin::Compressable
      def append(data, **kwargs)
      if kwargs[:compress] == :gzip
      io = StringIO.new
      Zlib::GzipWriter.wrap(io) do |gz|
      data.each do |d|
      gz.write d
      end
      end
      concat(io.string, data.size)
      else
      super
      end
      end
      def open(**kwargs, &block)
      if kwargs[:compressed] == :gzip
      super
      else
      super(**kwargs) do |chunk_io|
      output_io = if chunk_io.is_a?(StringIO)
      StringIO.new
      else
      Tempfile.new('decompressed-data')
      end
      output_io.binmode if output_io.is_a?(Tempfile)
      decompress(input_io: chunk_io, output_io: output_io)
      output_io.seek(0, IO::SEEK_SET)
      yield output_io
      end
      end
      end
      def read(**kwargs)
      if kwargs[:compressed] == :gzip
      super
      else
      decompress(super)
      end
      end
      def write_to(io, **kwargs)
      open(compressed: :gzip) do |chunk_io|
      if kwargs[:compressed] == :gzip
      IO.copy_stream(chunk_io, io)
      else
      decompress(input_io: chunk_io, output_io: io)
      end
      end
      end

It is complicated, but the compression of Buffer and Chunk and the flush process of output plugins are closely related.
The following existing implementations may be helpful for how output plugins behaves:

  • unless @as_secondary
    if @compress == :gzip && @buffer.compress == :text
    @buffer.compress = :gzip
    elsif @compress == :text && @buffer.compress == :gzip
    log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
    end
    end
  • chunk.open(compressed: @compress) do |chunk_io|
    entries = [0xdb, chunk_io.size].pack('CN')
    sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
    IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
    end
  • writer = case
    when @compress_method.nil?
    method(:write_without_compression)
    when @compress_method == :gzip
    if @buffer.compress != :gzip || @recompress
    method(:write_gzip_with_compression)
    else
    method(:write_gzip_from_gzipped_chunk)
    end
    else
    raise "BUG: unknown compression method #{@compress_method}"
    end
  • def write_without_compression(path, chunk)
    File.open(path, "ab", @file_perm) do |f|
    if @need_ruby_on_macos_workaround
    content = chunk.read()
    f.puts content
    else
    chunk.write_to(f)
    end
    end
    end
    def write_gzip_with_compression(path, chunk)
    File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    chunk.write_to(gz, compressed: :text)
    gz.close
    end
    end
    def write_gzip_from_gzipped_chunk(path, chunk)
    File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
    end
    end

@Athishpranav2003
Copy link
Contributor Author

@daipom thanks for the review.

Yah, this will kick a lot more changes in the overall events pipeline.
For now i have just added the compression support as a module in Compressible. And used this in in-forward. I will try to look at Buffer as well in meantime(might need more time since i got busy with a project). I guess maybe splitting it into multiple PRs will be easy since i can finish small chunks one by one(seperate functionality) and easy to review

@Athishpranav2003
Copy link
Contributor Author

@daipom can you please check the change in the Compression module as well. Once if we merge this we can proceed with the other development around this

@daipom
Copy link
Contributor

daipom commented Oct 25, 2024

@Athishpranav2003

can you please check the change in the Compression module as well

Sure! I will review the Compressable module!

Once if we merge this we can proceed with the other development around this

To judge if we can merge this only with the support of the module and in_forward, we need to reach an agreement on updating Forward Protocol Specification.

It would be necessary to update CompressedPackedForward Mode.
By allowing zstd as the value for the key compressed, it would be possible to add Zstd support while keeping compatibility, but I'm not sure yet.

@Athishpranav2003
Copy link
Contributor Author

Sure @daipom

If it's needed then I can split the PR into 2 where this will only focus on zstd and other one for in-forward

@Athishpranav2003
Copy link
Contributor Author

@daipom how to proceed in this PR?

@daipom
Copy link
Contributor

daipom commented Oct 30, 2024

Sorry, please give me a few more days 😢

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Athishpranav2003
Sorry for my late response 😢
I have commented on some points, but basically it looks good! Thanks!

About our future policies, I think it is better not to separate the PRs, as these changes need to be considered in combination with updating Forward-Protocol.

How about supporting the out_forward by implementing Output/Buffer/Chunk logic in this PR?

My concern is whether we can support Buffer/Chunk zstd compression.
To support it, the compressed data must be able to be concatenated.
gzip allows it.
It appears that we can concat zstd files without any problems, but I wonder if this is an officially supported specification of zstd and zstd-ruby gem.

$ echo "Hello world" > 1
$ echo "Hello Fluentd" > 2
$ zstd -f 1
$ zstd -f 2
$ cat 1.zst 2.zst > 3.zst
$ zstd -d 3.zst
$ cat 3
Hello world
Hello Fluentd

@@ -203,7 +203,7 @@ class MessagePackEventStream < EventStream
# https://github.com/msgpack/msgpack-ruby/issues/119

# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: nil)
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)

Comment on lines 271 to 272
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)

output_io = kwargs[:output_io]
writer = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line would be unnecessary.

Suggested change
writer = nil

end
end

private

def string_decompress(compressed_data)
def string_decompress(compressed_data, type = :gzip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about splitting the logic for simplicity?

      def string_decompress(compressed_data, type = :gzip)
        if type == :gzip
          string_decompress_gzip(compressed_data)
        elsif type == :zstd
          string_decompress_zstd(compressed_data)
        else
          raise ArgumentError, "Unknown compression type: #{type}"
        end
      end

end
break if io.eof?
end

out
end

def io_decompress(input, output)
def io_decompress(input, output, type = :gzip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -309,8 +309,7 @@ def on_message(msg, chunk_size, conn)
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = (option && option['compressed']!=nil && option['compressed']!="text") ? Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) : Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this for simplicity?

        option = msg[2] || {}

        size = option['size'] || 0
        if option['compressed'] == 'gzip' || option['compressed'] == 'zstd'
          es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
        else
          es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
        end

@Athishpranav2003
Copy link
Contributor Author

bundle exec rake test TEST=test/plugin/test_compressable.rb                      2856ms  Sun 03 Nov 2024 11:29:54 AM IST
/usr/bin/ruby -w -I"lib:test" -Eascii-8bit:ascii-8bit /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb "test/plugin/test_compressable.rb" 
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/rubygems_ext.rb:250: warning: method redefined; discarding old encode_with
/usr/local/share/ruby/site_ruby/rubygems/dependency.rb:341: warning: previous definition of encode_with was here
/usr/share/ruby/bundled_gems.rb:75:in `require': libruby.so.3.2: cannot open shared object file: No such file or directory - /usr/lib64/gems/ruby/yajl-ruby-1.4.3/yajl/yajl.so (LoadError)
	from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
	from /usr/share/gems/gems/yajl-ruby-1.4.3/lib/yajl.rb:1:in `<top (required)>'
	from /usr/share/ruby/bundled_gems.rb:75:in `require'
	from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
	from /home/aggressive_racer1/projects/fluentd/lib/fluent/config/literal_parser.rb:20:in `<top (required)>'
	from /usr/share/ruby/bundled_gems.rb:75:in `require'
	from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
	from /home/aggressive_racer1/projects/fluentd/lib/fluent/config/element.rb:18:in `<top (required)>'
	from /usr/share/ruby/bundled_gems.rb:75:in `require'
	from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
	from /home/aggressive_racer1/projects/fluentd/test/helper.rb:42:in `<top (required)>'
	from /home/aggressive_racer1/projects/fluentd/test/plugin/test_compressable.rb:1:in `require_relative'
	from /home/aggressive_racer1/projects/fluentd/test/plugin/test_compressable.rb:1:in `<top (required)>'
	from /usr/share/ruby/bundled_gems.rb:75:in `require'
	from /usr/share/ruby/bundled_gems.rb:75:in `block (2 levels) in replace_require'
	from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:21:in `block in <main>'
	from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:6:in `select'
	from /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb:6:in `<main>'
rake aborted!
Command failed with status (1): [ruby -w -I"lib:test" -Eascii-8bit:ascii-8bit /usr/share/gems/gems/rake-13.2.1/lib/rake/rake_test_loader.rb "test/plugin/test_compressable.rb" ]
/usr/share/gems/gems/rake-13.2.1/exe/rake:27:in `<top (required)>'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:58:in `load'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:58:in `kernel_load'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli/exec.rb:23:in `run'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:455:in `exec'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/command.rb:28:in `run'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/invocation.rb:127:in `invoke_command'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor.rb:527:in `dispatch'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:35:in `dispatch'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/vendor/thor/lib/thor/base.rb:584:in `start'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/cli.rb:29:in `start'
/usr/share/gems/gems/bundler-2.5.16/exe/bundle:28:in `block in <top (required)>'
/usr/share/gems/gems/bundler-2.5.16/lib/bundler/friendly_errors.rb:117:in `with_friendly_errors'
/usr/share/gems/gems/bundler-2.5.16/exe/bundle:20:in `<top (required)>'
/usr/bin/bundle:25:in `load'
/usr/bin/bundle:25:in `<main>'
Tasks: TOP => test => base_test
(See full trace by running task with --trace)

@daipom i addressed your comments but couldnt test it locally due to some issue.
yajl-ruby is already installed in my system but some other issue

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not this code work?

#4657 (comment)

lib/fluent/plugin/in_forward.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/in_forward.rb Outdated Show resolved Hide resolved
@Athishpranav2003
Copy link
Contributor Author

I guess there is a small careless
Problem is local testing is not working at all. Could help me with that later

I will check after 18th
Got packed with exams now :(

@daipom
Copy link
Contributor

daipom commented Nov 5, 2024

Hmm, these changes would be necessary, but looks like the error in your environment is caused by another reason.

#4657 (comment)

/usr/share/ruby/bundled_gems.rb:75:in `require': libruby.so.3.2: cannot open shared object file: No such file or directory - /usr/lib64/gems/ruby/yajl-ruby-1.4.3/yajl/yajl.so (LoadError)

@daipom
Copy link
Contributor

daipom commented Nov 5, 2024

I will check after 18th
Got packed with exams now :(

OK! I'm sorry my response was so slow, and it took so long.
Good luck with the exams!

@Athishpranav2003
Copy link
Contributor Author

@daipom the gem issue got resolved after update,prolly some mismatched versions,

All tests are passing now

Signed-off-by: Athish Pranav D <[email protected]>
Signed-off-by: Athish Pranav D <[email protected]>
@Athishpranav2003
Copy link
Contributor Author

Athishpranav2003 commented Nov 18, 2024

@daipom some doubts here

in chuck open method for decompress we dont have any type identification.
how to identify the chunk type while open/read method is called

def open(**kwargs, &block)
if kwargs[:compressed] == :gzip
super
else
super(**kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
output_io.binmode if output_io.is_a?(Tempfile)
decompress(input_io: chunk_io, output_io: output_io)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end
def read(**kwargs)
if kwargs[:compressed] == :gzip
super
else
decompress(super)
end
end

@daipom
Copy link
Contributor

daipom commented Nov 20, 2024

@daipom some doubts here

in chuck open method for decompress we dont have any type identification. how to identify the chunk type while open/read method is called

def open(**kwargs, &block)
if kwargs[:compressed] == :gzip
super
else
super(**kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
output_io.binmode if output_io.is_a?(Tempfile)
decompress(input_io: chunk_io, output_io: output_io)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end
def read(**kwargs)
if kwargs[:compressed] == :gzip
super
else
decompress(super)
end
end

I see!
The current implementation is based on the following assumption.

  • It is a gzip format chunk = It extends Decompressable module

def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
# state: unstaged/staged/queued/closed
@state = :unstaged
@size = 0
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now
extend Decompressable if compress == :gzip
end

If we add a compression format, we need to change something here.
I discussed the direction with @Watson1978 and @kenhys.
We believe the following direction seems to be a good one.

  • Create Decompressable module for each compression format
    • GzipDecompressable (Rename the current Decompressable)
    • ZstdDecompressable
  • Switch modules to extend according to the compression format of the chunk

The current implementation of Decompressable looks a little weird to me.
I don't see the point of specifying the format in argument compressed
I don't want to change the signature for compatibility, but I believe a parameter like raw would be more correct...

Anyway, we only need to consider the possibility that gzip is specified for compressed for GzipDecompressable, and zstd for ZstdDecompressable.
(If a different compression format is specified to compressed, the chunk should ignore it and decompress itself.)

@Athishpranav2003
Copy link
Contributor Author

Oh ok
Let me again go through and check it

But what is that parameter compress referring to?

Signed-off-by: Athish Pranav D <[email protected]>
@Athishpranav2003
Copy link
Contributor Author

@daipom seems like buffer is working :)
Will go ahead and do other stuff

Signed-off-by: Athish Pranav D <[email protected]>
@Athishpranav2003 Athishpranav2003 force-pushed the zstd_comp_support branch 2 times, most recently from cbe3b58 to f5f8361 Compare November 21, 2024 10:32
@Athishpranav2003
Copy link
Contributor Author

@daipom Can you check this out_file once. Also how to go about UTs for it. Which all UTs should we write for zstd.

@Athishpranav2003
Copy link
Contributor Author

@daipom can you check the PR now?

Signed-off-by: Athish Pranav D <[email protected]>
@Athishpranav2003
Copy link
Contributor Author

@daipom added UTs as well.
PR seems to be complete now
let me know if i missed anything

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ZSTD compression support to forward plugin
2 participants