Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support force_append_only for es sink #19919

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ CREATE TABLE test_route (

statement ok
CREATE SINK test_route_sink from test_route WITH (
type = 'upsert',
connector = 'elasticsearch',
index = 'test_route',
url = 'http://elasticsearch:9200',
Expand All @@ -32,6 +33,7 @@ CREATE SINK test_route_sink from test_route WITH (

statement ok
CREATE SINK s7 from t7 WITH (
type = 'upsert',
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch:9200',
Expand All @@ -41,6 +43,7 @@ CREATE SINK s7 from t7 WITH (

statement ok
CREATE SINK s8 from t7 WITH (
type = 'upsert',
connector = 'elasticsearch',
index = 'test1',
primary_key = 'v1,v3',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct ElasticSearchSink {
config: ElasticSearchOpenSearchConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

#[async_trait]
Expand All @@ -41,6 +42,7 @@ impl TryFrom<SinkParam> for ElasticSearchSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}
Expand All @@ -64,6 +66,7 @@ impl Sink for ElasticSearchSink {
self.schema.clone(),
self.pk_indices.clone(),
Self::SINK_NAME,
self.is_append_only,
)?
.into_log_sinker(self.config.concurrent_requests))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl StreamChunkConverter {
schema: Schema,
pk_indices: &Vec<usize>,
properties: &BTreeMap<String, String>,
is_append_only: bool,
) -> Result<Self> {
if is_remote_es_sink(sink_name) {
let index_column = properties
Expand Down Expand Up @@ -71,6 +72,7 @@ impl StreamChunkConverter {
index_column,
index,
routing_column,
is_append_only,
)?))
} else {
Ok(StreamChunkConverter::Other)
Expand All @@ -79,13 +81,14 @@ impl StreamChunkConverter {

pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> {
match self {
StreamChunkConverter::Es(es) => es.convert_chunk(chunk),
StreamChunkConverter::Es(es) => es.convert_chunk(chunk, es.is_append_only),
StreamChunkConverter::Other => Ok(chunk),
}
}
}
pub struct EsStreamChunkConverter {
formatter: ElasticSearchOpenSearchFormatter,
is_append_only: bool,
}
impl EsStreamChunkConverter {
pub fn new(
Expand All @@ -95,6 +98,7 @@ impl EsStreamChunkConverter {
index_column: Option<usize>,
index: Option<String>,
routing_column: Option<usize>,
is_append_only: bool,
) -> Result<Self> {
let formatter = ElasticSearchOpenSearchFormatter::new(
pk_indices,
Expand All @@ -104,10 +108,13 @@ impl EsStreamChunkConverter {
index,
routing_column,
)?;
Ok(Self { formatter })
Ok(Self {
formatter,
is_append_only,
})
}

fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> {
fn convert_chunk(&self, chunk: StreamChunk, is_append_only: bool) -> Result<StreamChunk> {
let mut ops = Vec::with_capacity(chunk.capacity());
let mut id_string_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
Expand All @@ -117,7 +124,7 @@ impl EsStreamChunkConverter {
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
let mut routing_builder =
<Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
for build_bulk_para in self.formatter.convert_chunk(chunk)? {
for build_bulk_para in self.formatter.convert_chunk(chunk, is_append_only)? {
let BuildBulkPara {
key,
value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub struct ElasticSearchOpenSearchSinkWriter {
client: Arc<ElasticSearchOpenSearchClient>,
formatter: ElasticSearchOpenSearchFormatter,
config: ElasticSearchOpenSearchConfig,
is_append_only: bool,
}

impl ElasticSearchOpenSearchSinkWriter {
Expand All @@ -168,6 +169,7 @@ impl ElasticSearchOpenSearchSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
connector: &str,
is_append_only: bool,
) -> Result<Self> {
let client = Arc::new(config.build_client(connector)?);
let formatter = ElasticSearchOpenSearchFormatter::new(
Expand All @@ -182,6 +184,7 @@ impl ElasticSearchOpenSearchSinkWriter {
client,
formatter,
config,
is_append_only,
})
}
}
Expand All @@ -202,7 +205,7 @@ impl AsyncTruncateSinkWriter for ElasticSearchOpenSearchSinkWriter {
let mut bulks: Vec<ElasticSearchOpenSearchBulk> = Vec::with_capacity(chunk_capacity);

let mut bulks_size = 0;
for build_bulk_para in self.formatter.convert_chunk(chunk)? {
for build_bulk_para in self.formatter.convert_chunk(chunk, self.is_append_only)? {
let BuildBulkPara {
key,
value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct ElasticSearchOpenSearchConfig {
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_concurrent_requests")]
pub concurrent_requests: usize,

pub r#type: String,
}

fn default_retry_on_conflict() -> i32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ impl ElasticSearchOpenSearchFormatter {
})
}

pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<Vec<BuildBulkPara>> {
pub fn convert_chunk(
&self,
chunk: StreamChunk,
is_append_only: bool,
) -> Result<Vec<BuildBulkPara>> {
let mut result_vec = Vec::with_capacity(chunk.capacity());
for (op, rows) in chunk.rows() {
let index = if let Some(index_column) = self.index_column {
Expand Down Expand Up @@ -157,6 +161,11 @@ impl ElasticSearchOpenSearchFormatter {
});
}
Op::Delete => {
if is_append_only {
return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
"`Delete` operation is not supported in `append_only` mode"
)));
}
let key = self.key_encoder.encode(rows)?;
let mem_size_b = std::mem::size_of_val(&key);
result_vec.push(BuildBulkPara {
Expand All @@ -167,7 +176,15 @@ impl ElasticSearchOpenSearchFormatter {
routing_column,
});
}
Op::UpdateDelete => continue,
Op::UpdateDelete => {
if is_append_only {
return Err(SinkError::ElasticSearchOpenSearch(anyhow!(
"`UpdateDelete` operation is not supported in `append_only` mode"
)));
} else {
continue;
}
}
}
}
Ok(result_vec)
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/elasticsearch_opensearch/opensearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct OpenSearchSink {
config: ElasticSearchOpenSearchConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

#[async_trait]
Expand All @@ -43,6 +44,7 @@ impl TryFrom<SinkParam> for OpenSearchSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}
Expand All @@ -69,6 +71,7 @@ impl Sink for OpenSearchSink {
self.schema.clone(),
self.pk_indices.clone(),
Self::SINK_NAME,
self.is_append_only,
)?
.into_log_sinker(self.config.concurrent_requests))
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ impl RemoteLogSinker {
sink_param.schema(),
&sink_param.downstream_pk,
&sink_param.properties,
sink_param.sink_type.is_append_only(),
)?,
})
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ ElasticSearchOpenSearchConfig:
- name: concurrent_requests
field_type: usize
required: true
- name: r#type
field_type: String
required: true
FsConfig:
fields:
- name: fs.path
Expand Down
Loading