Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacement of Value and Bins with Writable/Readable derive macros #138

Open
wants to merge 13 commits into
base: async
Choose a base branch
from
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
pull_request:
branches:
- master
workflow_dispatch:

env:
CARGO_TERM_COLOR: always
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ members = ["tools/benchmark", "aerospike-core", "aerospike-rt", "aerospike-sync"

[dev-dependencies]
log = "0.4"
env_logger = "0.9.3"
env_logger = "0.10.0"
hex = "0.4"
bencher = "0.1"
criterion = { version = "0.5.1", features = ["async_tokio", "async_futures", "async"]}
serde_json = "1.0"
rand = "0.7"
rand = "0.8.5"
lazy_static = "1.4"
aerospike-macro = {path = "./aerospike-macro"}
aerospike-rt = {path = "./aerospike-rt"}
Expand Down
2 changes: 2 additions & 0 deletions aerospike-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ error-chain = "0.12"
pwhash = "0.3"
serde = { version = "1.0", features = ["derive"], optional = true }
aerospike-rt = {path = "../aerospike-rt"}
aerospike-macro = {path = "../aerospike-macro"}
futures = {version = "0.3.16" }
async-trait = "0.1.51"
num = "0.4.0"

[features]
serialization = ["serde"]
Expand Down
23 changes: 12 additions & 11 deletions aerospike-core/src/batch/batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::batch::BatchRead;
use crate::cluster::partition::Partition;
use crate::cluster::{Cluster, Node};
use crate::commands::BatchReadCommand;
use crate::derive::readable::ReadableBins;
use crate::errors::{Error, Result};
use crate::policy::{BatchPolicy, Concurrency};
use crate::Key;
Expand All @@ -35,29 +36,29 @@ impl BatchExecutor {
BatchExecutor { cluster }
}

pub async fn execute_batch_read(
pub async fn execute_batch_read<T: ReadableBins + 'static>(
&self,
policy: &BatchPolicy,
batch_reads: Vec<BatchRead>,
) -> Result<Vec<BatchRead>> {
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchRead<T>>> {
let mut batch_nodes = self.get_batch_nodes(&batch_reads).await?;
let jobs = batch_nodes
.drain()
.map(|(node, reads)| BatchReadCommand::new(policy, node, reads))
.collect();
let reads = self.execute_batch_jobs(jobs, &policy.concurrency).await?;
let mut res: Vec<BatchRead> = vec![];
let mut res: Vec<BatchRead<T>> = vec![];
for mut read in reads {
res.append(&mut read.batch_reads);
}
Ok(res)
}

async fn execute_batch_jobs(
async fn execute_batch_jobs<T: ReadableBins + 'static>(
&self,
jobs: Vec<BatchReadCommand>,
jobs: Vec<BatchReadCommand<T>>,
concurrency: &Concurrency,
) -> Result<Vec<BatchReadCommand>> {
) -> Result<Vec<BatchReadCommand<T>>> {
let threads = match *concurrency {
Concurrency::Sequential => 1,
Concurrency::Parallel => jobs.len(),
Expand Down Expand Up @@ -97,12 +98,12 @@ impl BatchExecutor {
}
}

async fn get_batch_nodes(
async fn get_batch_nodes<T: ReadableBins>(
&self,
batch_reads: &[BatchRead],
) -> Result<HashMap<Arc<Node>, Vec<BatchRead>>> {
batch_reads: &[BatchRead<T>],
) -> Result<HashMap<Arc<Node>, Vec<BatchRead<T>>>> {
let mut map = HashMap::new();
for (_, batch_read) in batch_reads.iter().enumerate() {
for batch_read in batch_reads {
let node = self.node_for_key(&batch_read.key).await?;
map.entry(node)
.or_insert_with(Vec::new)
Expand Down
9 changes: 5 additions & 4 deletions aerospike-core/src/batch/batch_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// License for the specific language governing permissions and limitations under
// the License.

use crate::derive::readable::ReadableBins;
use crate::Bins;
use crate::Key;
use crate::Record;
Expand All @@ -22,18 +23,18 @@ use serde::Serialize;
/// Key and bin names used in batch read commands where variable bins are needed for each key.
#[cfg_attr(feature = "serialization", derive(Serialize))]
#[derive(Debug, Clone)]
pub struct BatchRead {
pub struct BatchRead<T: ReadableBins> {
/// Key.
pub key: Key,

/// Bins to retrieve for this key.
pub bins: Bins,

/// Will contain the record after the batch read operation.
pub record: Option<Record>,
pub record: Option<Record<T>>,
}

impl BatchRead {
impl<T: ReadableBins> BatchRead<T> {
/// Create a new `BatchRead` instance for the given key and bin selector.
pub const fn new(key: Key, bins: Bins) -> Self {
BatchRead {
Expand All @@ -44,7 +45,7 @@ impl BatchRead {
}

#[doc(hidden)]
pub fn match_header(&self, other: &BatchRead, match_set: bool) -> bool {
pub fn match_header(&self, other: &BatchRead<T>, match_set: bool) -> bool {
let key = &self.key;
let other_key = &other.key;
(key.namespace == other_key.namespace)
Expand Down
78 changes: 53 additions & 25 deletions aerospike-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// License for the specific language governing permissions and limitations under
// the License.

use std::collections::HashMap;
use std::path::Path;
use std::str;
use std::sync::Arc;
Expand All @@ -24,14 +25,16 @@ use crate::commands::{
DeleteCommand, ExecuteUDFCommand, ExistsCommand, OperateCommand, QueryCommand, ReadCommand,
ScanCommand, TouchCommand, WriteCommand,
};
use crate::derive::readable::ReadableBins;
use crate::derive::writable::WritableBins;
use crate::errors::{ErrorKind, Result, ResultExt};
use crate::net::ToHosts;
use crate::operations::{Operation, OperationType};
use crate::policy::{BatchPolicy, ClientPolicy, QueryPolicy, ReadPolicy, ScanPolicy, WritePolicy};
use crate::task::{IndexTask, RegisterTask};
use crate::{
BatchRead, Bin, Bins, CollectionIndexType, IndexType, Key, Record, Recordset, ResultCode,
Statement, UDFLang, Value,
BatchRead, Bins, CollectionIndexType, IndexType, Key, Record, Recordset, ResultCode, Statement,
UDFLang, Value,
};
use aerospike_rt::fs::File;
#[cfg(all(any(feature = "rt-tokio"), not(feature = "rt-async-std")))]
Expand Down Expand Up @@ -179,7 +182,12 @@ impl Client {
///
/// # Panics
/// Panics if the return is invalid
pub async fn get<T>(&self, policy: &ReadPolicy, key: &Key, bins: T) -> Result<Record>
pub async fn get<S: ReadableBins, T>(
&self,
policy: &ReadPolicy,
key: &Key,
bins: T,
) -> Result<Record<S>>
where
T: Into<Bins> + Send + Sync + 'static,
{
Expand Down Expand Up @@ -223,11 +231,11 @@ impl Client {
/// => println!("Error executing batch request: {}", err),
/// }
/// ```
pub async fn batch_get(
pub async fn batch_get<T: ReadableBins + 'static>(
&self,
policy: &BatchPolicy,
batch_reads: Vec<BatchRead>,
) -> Result<Vec<BatchRead>> {
batch_reads: Vec<BatchRead<T>>,
) -> Result<Vec<BatchRead<T>>> {
let executor = BatchExecutor::new(self.cluster.clone());
executor.execute_batch_read(policy, batch_reads).await
}
Expand Down Expand Up @@ -268,7 +276,12 @@ impl Client {
/// Err(err) => println!("Error writing record: {}", err),
/// }
/// ```
pub async fn put(&self, policy: &WritePolicy, key: &Key, bins: &[Bin]) -> Result<()> {
pub async fn put<T: WritableBins>(
&self,
policy: &WritePolicy,
key: &Key,
bins: &T,
) -> Result<()> {
let mut command = WriteCommand::new(
policy,
self.cluster.clone(),
Expand Down Expand Up @@ -301,7 +314,12 @@ impl Client {
/// Err(err) => println!("Error writing record: {}", err),
/// }
/// ```
pub async fn add(&self, policy: &WritePolicy, key: &Key, bins: &[Bin]) -> Result<()> {
pub async fn add<T: WritableBins>(
&self,
policy: &WritePolicy,
key: &Key,
bins: &T,
) -> Result<()> {
let mut command =
WriteCommand::new(policy, self.cluster.clone(), key, bins, OperationType::Incr);
command.execute().await
Expand All @@ -310,7 +328,12 @@ impl Client {
/// Append bin string values to existing record bin values. The policy specifies the
/// transaction timeout, record expiration and how the transaction is handled when the record
/// already exists. This call only works for string values.
pub async fn append(&self, policy: &WritePolicy, key: &Key, bins: &[Bin]) -> Result<()> {
pub async fn append<T: WritableBins>(
&self,
policy: &WritePolicy,
key: &Key,
bins: &T,
) -> Result<()> {
let mut command = WriteCommand::new(
policy,
self.cluster.clone(),
Expand All @@ -324,7 +347,12 @@ impl Client {
/// Prepend bin string values to existing record bin values. The policy specifies the
/// transaction timeout, record expiration and how the transaction is handled when the record
/// already exists. This call only works for string values.
pub async fn prepend(&self, policy: &WritePolicy, key: &Key, bins: &[Bin]) -> Result<()> {
pub async fn prepend<T: WritableBins>(
&self,
policy: &WritePolicy,
key: &Key,
bins: &T,
) -> Result<()> {
let mut command = WriteCommand::new(
policy,
self.cluster.clone(),
Expand Down Expand Up @@ -421,12 +449,12 @@ impl Client {
/// ```
/// # Panics
/// Panics if the return is invalid
pub async fn operate(
pub async fn operate<T: ReadableBins>(
&self,
policy: &WritePolicy,
key: &Key,
ops: &[Operation<'_>],
) -> Result<Record> {
) -> Result<Record<T>> {
let mut command = OperateCommand::new(policy, self.cluster.clone(), key, ops);
command.execute().await?;
Ok(command.read_command.record.unwrap())
Expand Down Expand Up @@ -556,7 +584,7 @@ impl Client {
function_name: &str,
args: Option<&[Value]>,
) -> Result<Option<Value>> {
let mut command = ExecuteUDFCommand::new(
let mut command: ExecuteUDFCommand<HashMap<String, Value>> = ExecuteUDFCommand::new(
policy,
self.cluster.clone(),
key,
Expand Down Expand Up @@ -616,13 +644,13 @@ impl Client {
///
/// # Panics
/// Panics if the async block fails
pub async fn scan<T>(
pub async fn scan<S: ReadableBins + 'static, T>(
&self,
policy: &ScanPolicy,
namespace: &str,
set_name: &str,
bins: T,
) -> Result<Arc<Recordset>>
) -> Result<Arc<Recordset<S>>>
where
T: Into<Bins> + Send + Sync + 'static,
{
Expand All @@ -638,7 +666,7 @@ impl Client {
let set_name = set_name.to_owned();
let bins = bins.clone();

aerospike_rt::spawn(async move {
let _ = aerospike_rt::spawn(async move {
let mut command = ScanCommand::new(
&policy, node, &namespace, &set_name, bins, recordset, partitions,
);
Expand All @@ -657,14 +685,14 @@ impl Client {
///
/// # Panics
/// panics if the async block fails
pub async fn scan_node<T>(
pub async fn scan_node<S: ReadableBins + 'static, T>(
&self,
policy: &ScanPolicy,
node: Arc<Node>,
namespace: &str,
set_name: &str,
bins: T,
) -> Result<Arc<Recordset>>
) -> Result<Arc<Recordset<S>>>
where
T: Into<Bins> + Send + Sync + 'static,
{
Expand All @@ -676,7 +704,7 @@ impl Client {
let namespace = namespace.to_owned();
let set_name = set_name.to_owned();

aerospike_rt::spawn(async move {
let _ = aerospike_rt::spawn(async move {
let mut command = ScanCommand::new(
&policy,
node,
Expand Down Expand Up @@ -718,11 +746,11 @@ impl Client {
///
/// # Panics
/// Panics if the async block fails
pub async fn query(
pub async fn query<T: ReadableBins + 'static>(
&self,
policy: &QueryPolicy,
statement: Statement,
) -> Result<Arc<Recordset>> {
) -> Result<Arc<Recordset<T>>> {
statement.validate()?;
let statement = Arc::new(statement);

Expand All @@ -737,7 +765,7 @@ impl Client {
let t_recordset = recordset.clone();
let policy = policy.clone();
let statement = statement.clone();
aerospike_rt::spawn(async move {
let _ = aerospike_rt::spawn(async move {
let mut command =
QueryCommand::new(&policy, node, statement, t_recordset, partitions);
command.execute().await.unwrap();
Expand All @@ -753,12 +781,12 @@ impl Client {
///
/// # Panics
/// Panics when the async block fails
pub async fn query_node(
pub async fn query_node<T: ReadableBins + 'static>(
&self,
policy: &QueryPolicy,
node: Arc<Node>,
statement: Statement,
) -> Result<Arc<Recordset>> {
) -> Result<Arc<Recordset<T>>> {
statement.validate()?;

let recordset = Arc::new(Recordset::new(policy.record_queue_size, 1));
Expand All @@ -770,7 +798,7 @@ impl Client {
.node_partitions(node.as_ref(), &statement.namespace)
.await;

aerospike_rt::spawn(async move {
let _ = aerospike_rt::spawn(async move {
let mut command = QueryCommand::new(&policy, node, statement, t_recordset, partitions);
command.execute().await.unwrap();
})
Expand Down
Loading