Skip to content

Commit

Permalink
[Feature] Add native format reader to access StarRocks data bypass BE…
Browse files Browse the repository at this point in the history
… Server.

Signed-off-by: plotor <[email protected]>
  • Loading branch information
plotor committed Jan 9, 2025
1 parent 48b9d6e commit 33c196d
Show file tree
Hide file tree
Showing 43 changed files with 4,787 additions and 31 deletions.
8 changes: 7 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,6 @@ set(STARROCKS_DEPENDENCIES
arrow_flight_sql
gRPC::grpc
gRPC::grpc++
jemalloc # required by arrow
parquet
orc
cctz
Expand Down Expand Up @@ -943,6 +942,13 @@ set(STARROCKS_DEPENDENCIES
${WL_END_GROUP}
)

if (NOT BUILD_FORMAT_LIB)
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
jemalloc # required by arrow
)
endif()

if (${WITH_STARCACHE} STREQUAL "ON")
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exprs/dictmapping_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ Status DictMappingExpr::open(RuntimeState* state, ExprContext* context, Function
return Status::OK();
}

#if defined(BUILD_FORMAT_LIB)
return Status::OK();
#else
return state->mutable_dict_optimize_parser()->rewrite_expr(context, this, _output_id);
#endif
}

StatusOr<ColumnPtr> DictMappingExpr::evaluate_checked(ExprContext* context, Chunk* ptr) {
Expand Down
1 change: 0 additions & 1 deletion be/src/starrocks_format/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


set(CMAKE_VERBOSE_MAKEFILE ON)

set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/starrocks_format")
Expand Down
5 changes: 5 additions & 0 deletions be/src/starrocks_format/starrocks_lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

#include "common/config.h"
#include "fs/fs_s3.h"
#include "runtime/exec_env.h"
#include "runtime/time_types.h"
#include "storage/lake/fixed_location_provider.h"
#include "storage/lake/tablet_manager.h"
#include "storage/olap_define.h"
#include "util/mem_info.h"
#include "util/timezone_utils.h"

namespace starrocks::lake {
Expand Down Expand Up @@ -56,10 +58,13 @@ void starrocks_format_initialize(void) {

Aws::InitAPI(aws_sdk_options);

MemInfo::init();
date::init_date_cache();

TimezoneUtils::init_time_zones();

CHECK(GlobalEnv::GetInstance()->init().ok());

auto lake_location_provider = std::make_shared<FixedLocationProvider>("");
_lake_tablet_manager = new lake::TabletManager(lake_location_provider, config::lake_metadata_cache_limit);
LOG(INFO) << "starrocks format module has been initialized successfully";
Expand Down
11 changes: 9 additions & 2 deletions be/src/storage/lake/lake_delvec_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelV
Status LakeDelvecLoader::load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) {
(*pdelvec).reset(new DelVector());
// 2. find in delvec file
ASSIGN_OR_RETURN(auto metadata,
_tablet_manager->get_tablet_metadata(tsid.tablet_id, version, _fill_cache, 0, _lake_io_opts.fs));
TabletMetadataPtr metadata;
if (_lake_io_opts.location_provider) {
const std::string filepath = _lake_io_opts.location_provider->tablet_metadata_location(tsid.tablet_id, version);
ASSIGN_OR_RETURN(metadata, _tablet_manager->get_tablet_metadata(filepath, false, 0, _lake_io_opts.fs));
} else {
ASSIGN_OR_RETURN(metadata, _tablet_manager->get_tablet_metadata(tsid.tablet_id, version, _fill_cache, 0,
_lake_io_opts.fs));
}

RETURN_IF_ERROR(
lake::get_del_vec(_tablet_manager, *metadata, tsid.segment_id, _fill_cache, _lake_io_opts, pdelvec->get()));
return Status::OK();
Expand Down
232 changes: 231 additions & 1 deletion format-sdk/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,231 @@
# StarRocks Format Library
# StarRocks Format Library

StarRocks Format Library is an SDK for reading and writing StarRocks data files, which provides C++ and Java version of tablet data file reader and writer. With the Format SDK, you can bypass the StarRocks BE node and directly read and write tablet data files that stored in the remote file system (e.g. S3, HDFS) in the shared-data mode. So that a piece of data can be accessed (read and write) by multiple computing engines (e.g. Spark, StarRocks) at the same time.

## Why we do it?

StarRocks is a next-gen, high-performance analytical data warehouse. However, the limitations of the MPP architecture still cause it to face some problems in large-data ETL scenarios. For example:

- __Low resource utilization__: StarRocks clusters adopt the resource reservation mode mostly, but in order to support large-data ETL scenarios, which require a large amount of resource overhead in a relatively short period of time, plan redundant resources in advance may reduce the overall resource utilization of the cluster.
- __Poor resource isolation__: In terms of resource isolation, StarRocks adopts Group rather than Query level isolation. In large-data ETL scenarios, there is a risk that a query with large resource overhead will run out of resources, thereby starving some small queries to death.
- __Lack of failure tolerance__: Due to the lack of task-level failure tolerance mechanism, when an ETL job fails, it will usually still fail even if rerun it manually.

In order to solve the above problems, we have proposed the idea of __bypassing StarRocks BE node to directly read and write tablet data files in the shared-data mode__. Taking the Apache Spark as an example(_In fact, we are not bound to it, and we can support more computing engines by design._), the overall architecture design is as follows:

![starrocks-bypass-load](https://github.com/user-attachments/assets/00d37763-5b6c-42ac-a980-40a604746c59)

For complex query scenarios, users can submit jobs through the `spark-sql` client. Spark interacts with StarRocks to obtain tables's metadata information, manages data write transaction, and do read/write operation on shared storage through the Format SDK directly. Finally, for one piece of StarRocks's data files, multiple computing engines can read and write it.

## How to build?

The building process is divided into two steps. __Firstly__, execute the following command to compile `be` and `format-lib` module to generate the `libstarrocks_format.so` file:

```bash
# build be
./build.sh --clean --be --enable-shared-data

# build format-lib
./build.sh --format-lib --enable-shared-data --without-tenann
```

__Then__ execute the following command to compile the `format-sdk` module:

```bash
cd ./format-sdk
mvn clean package -DskipTests
```

After that, you will get two files: `libstarrocks_format_wrapper.so` and `format-sdk.jar`.

## How to use?

Assume there is a product sales information table, the table schema is as follows:

```sql
CREATE TABLE `tb_sales`
(
`product_id` BIGINT NOT NULL COMMENT "Product ID",
`date` DATE NOT NULL COMMENT "Sale Date",
`quantity` INT NOT NULL COMMENT "Sale Quantity",
`amount` BIGINT NOT NULL COMMENT "Sale Amount"
) ENGINE = OLAP PRIMARY KEY(`product_id`, `date`)
DISTRIBUTED BY HASH(`product_id`)
PROPERTIES (
"enable_persistent_index" = "true"
);
```

Next, we will take a Java program as an example to show how to use the Format SDK to read and write data files of this table.

> Note: you need to introduce the `format-sdk.jar` dependency into your project.
- __Write by StarRocksWriter__

You can write a program to use `StarRocksWriter` to perform write operations according to the following steps:

1. Request the metadata and partition information of the table by `RestClient`;
2. Begin a transaction;
3. Create and open a `StarRocksWriter` which binding to the target Tablet;
4. Create an Arrow `VectorSchemaRoot` object, and fill it with custom data according to the table schema definition;
5. Call the `StarRocksWriter#write` method to write the filled `VectorSchemaRoot` object to StarRocks;
6. Batch as needed, and call the `StarRocksWriter#flush` method to flush the data to the file system;
7. Repeat steps 4 to 6 above until all data is written;
8. Close and release `StarRocksWriter`;
9. Commit or rollback the transaction.

Example code as follows:

```java
final String DEFAULT_CATALOG = "default_catalog";
final String DB_NAME = "bypass";
final String TB_NAME = "tb_sales";

Config config = Config.newBuilder()
.feHttpUrl("http://127.0.0.1:8030")
.feJdbcUrl("jdbc:mysql://127.0.0.1:9030")
.database(DB_NAME)
.username("root")
.password("******")
.s3Endpoint("https://tos-s3-cn-beijing.volces.com")
.s3EndpointRegion("cn-beijing")
.s3ConnectionSslEnabled(false)
.s3PathStyleAccess(false)
.s3AccessKey("******")
.s3SecretKey("******")
.build();

String label = String.format(
"bypass_%s_%s_%s", DB_NAME, TB_NAME, RandomStringUtils.randomAlphabetic(8)
);
try (RestClient restClient = RestClient.newBuilder()
.feHttpEndpoints("http://127.0.0.1:8030")
.username("root")
.password("******")
.build()) {

// Request the metadata and partition information of the table by RestClient
TableSchema tableSchema = restClient.getTableSchema(DEFAULT_CATALOG, DB_NAME, TB_NAME);
List<TablePartition> tablePartitions = restClient.listTablePartitions(DEFAULT_CATALOG, DB_NAME, TB_NAME, false);
Schema arrowSchema = ArrowUtils.toArrowSchema(tableSchema, ZoneId.systemDefault());

// Begin a transaction
TransactionResult beginTxnResult = restClient.beginTransaction(DEFAULT_CATALOG, DB_NAME, TB_NAME, label);
assertTrue(beginTxnResult.isOk());

List<TabletCommitInfo> commitTablets = new ArrayList<>();
TablePartition tablePartition = tablePartitions.get(0);
for (int i = 0; i < tablePartition.getTablets().size(); i++) {
TablePartition.Tablet tablet = tablePartition.getTablets().get(i);
Long tabletId = tablet.getId();

// Create and open a StarRocksWriter which binding to the target Tablet
StarRocksWriter tabletWriter = new StarRocksWriter(
tabletId, tablePartition.getStoragePath(), beginTxnResult.getTxnId(), arrowSchema, config);
try {
tabletWriter.open();

try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, tabletWriter.getAllocator())) {
// Fill the VectorSchemaRoot object with custom data according to the table schema definition
fillSampleData(i, root);

// Write the filled VectorSchemaRoot object to StarRocks
tabletWriter.write(root);

// Flush the data to the file system
tabletWriter.flush();
}

} finally {
// Close and release StarRocksWriter
tabletWriter.finish();
tabletWriter.close();
tabletWriter.release();
}

commitTablets.add(new TabletCommitInfo(tabletId, tablet.getPrimaryComputeNodeId()));
}

// Commit or rollback the transaction
TransactionResult prepareTxnResult = restClient.prepareTransaction(
DEFAULT_CATALOG, DB_NAME, beginTxnResult.getLabel(), commitTablets, null);
if (prepareTxnResult.isOk()) {
TransactionResult commitTxnResult = restClient.commitTransaction(
DEFAULT_CATALOG, DB_NAME, prepareTxnResult.getLabel());
assertTrue(commitTxnResult.isOk());
} else {
TransactionResult rollbackTxnResult = restClient.rollbackTransaction(
DEFAULT_CATALOG, DB_NAME, prepareTxnResult.getLabel(), null);
assertTrue(rollbackTxnResult.isOk());
}
}
```

- __Read by StarRocksReader__

You can write a program to use `StarRocksWriter` to perform read operations according to the following steps:

1. Request the metadata and partition information of the table by `RestClient`;
2. Create and open a `StarRocksReader` which binding to the target Tablet;
3. `StarRocksReader` supports iterator access, so you can iterate and read the data;
4. Close and release `StarRocksReader`.

Example code as follows:

```java
final String DEFAULT_CATALOG = "default_catalog";
final String DB_NAME = "bypass";
final String TB_NAME = "tb_sales";

Config config = Config.newBuilder()
.feHttpUrl("http://127.0.0.1:8030")
.feJdbcUrl("jdbc:mysql://127.0.0.1:9030")
.database(DB_NAME)
.username("root")
.password("******")
.s3Endpoint("https://tos-s3-cn-beijing.volces.com")
.s3EndpointRegion("cn-beijing")
.s3ConnectionSslEnabled(false)
.s3PathStyleAccess(false)
.s3AccessKey("******")
.s3SecretKey("******")
.build();

try (RestClient restClient = RestClient.newBuilder()
.feHttpEndpoints("http://127.0.0.1:8030")
.username("root")
.password("******")
.build()) {

// Request the metadata and partition information of the table by RestClient
TableSchema tableSchema = restClient.getTableSchema(DEFAULT_CATALOG, DB_NAME, TB_NAME);
List<TablePartition> tablePartitions = restClient.listTablePartitions(DEFAULT_CATALOG, DB_NAME, TB_NAME, false);
Schema arrowSchema = ArrowUtils.toArrowSchema(tableSchema, ZoneId.systemDefault());

TablePartition partition = tablePartitions.get(0);
for (TablePartition.Tablet tablet : partition.getTablets()) {
Long tabletId = tablet.getId();
// Create and open a StarRocksReader which binding to the target Tablet
StarRocksReader reader = new StarRocksReader(
tabletId,
partition.getStoragePath(),
partition.getVisibleVersion(),
arrowSchema,
arrowSchema,
config);
try {
reader.open();

// StarRocksReader supports iterator access, so you can iterate and read the data
while (reader.hasNext()) {
try (VectorSchemaRoot root = reader.next()) {
System.out.println(root.contentToTSVString());
}
}
} finally {
// Close and release StarRocksReader
reader.close();
reader.release();
}
}
}
```
48 changes: 48 additions & 0 deletions format-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -70,6 +76,13 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down Expand Up @@ -162,6 +175,41 @@
</execution>
</executions>
</plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.2</version>
<configuration>
<shadeSourcesContent>true</shadeSourcesContent>
<createSourcesJar>true</createSourcesJar>
<minimizeJar>true</minimizeJar>
<artifactSet>
<includes>
<include>com.starrocks.format:format-sdk</include>
<include>org.apache.arrow:*</include>
<include>org.apache.commons:*</include>
<include>commons-codec:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>com.starrocks.format.shaded.arrow</shadedPattern>
<pattern>org.apache.commons</pattern>
<shadedPattern>com.starrocks.format.shaded.commons</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>-->

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Loading

0 comments on commit 33c196d

Please sign in to comment.