-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support force_append_only for es sink #19919
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this PR? Isn't force_append_only already been handled in the sink executor before reaching the specific sink implementation:
risingwave/src/stream/src/executor/sink.rs
Line 466 in bdee34e
yield Message::Chunk(force_append_only(chunk)) |
@@ -79,13 +81,14 @@ impl StreamChunkConverter { | |||
|
|||
pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> { | |||
match self { | |||
StreamChunkConverter::Es(es) => es.convert_chunk(chunk), | |||
StreamChunkConverter::Es(es) => es.convert_chunk(chunk,es.is_append_only), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: fmt
@@ -117,7 +121,7 @@ impl EsStreamChunkConverter { | |||
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity()); | |||
let mut routing_builder = | |||
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity()); | |||
for build_bulk_para in self.formatter.convert_chunk(chunk)? { | |||
for build_bulk_para in self.formatter.convert_chunk(chunk,is_append_only)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -202,7 +205,7 @@ impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter { | |||
let mut bulks: Vec<ElasticSearchOpenSearchBulk> = Vec::with_capacity(chunk_capacity); | |||
|
|||
let mut bulks_size = 0; | |||
for build_bulk_para in self.formatter.convert_chunk(chunk)? { | |||
for build_bulk_para in self.formatter.convert_chunk(chunk,self.is_append_only)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -113,7 +113,7 @@ impl ElasticSearchOpenSearchFormatter { | |||
}) | |||
} | |||
|
|||
pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<Vec<BuildBulkPara>> { | |||
pub fn convert_chunk(&self, chunk: StreamChunk,is_append_only: bool) -> Result<Vec<BuildBulkPara>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
We didn't support type for es sink, and this pr support it like other sink
Checklist
Documentation
Release note