Skip to content

Commit

Permalink
DuckDB as memory storage (#1074)
Browse files Browse the repository at this point in the history
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->
DuckDB is becoming a popular option for in memory OLAP storage. This pr
contributes a MemoryStorage implementation on DuckDB using both tile and
in memory setup.

### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
DuckDB memory storage implementation
  • Loading branch information
colombod authored May 31, 2023
1 parent d095e5a commit 69d835c
Show file tree
Hide file tree
Showing 8 changed files with 1,239 additions and 1 deletion.
2 changes: 2 additions & 0 deletions dotnet/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
<!-- Skills -->
<PackageVersion Include="DocumentFormat.OpenXml" Version="2.20.0" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="[7.0, )" />
<PackageVersion Include="DuckDB.NET.Data.Full" Version="[0.8, )" />
<PackageVersion Include="DuckDB.NET.Data" Version="[0.8, )" />
<PackageVersion Include="Microsoft.Graph" Version="[4.51.0, 5)" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="[3.32.3, )" />
<PackageVersion Include="Microsoft.Identity.Client.Extensions.Msal" Version="[2.28.0, )" />
Expand Down
9 changes: 9 additions & 0 deletions dotnet/SK-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "InternalUtilities", "Intern
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenApiSkillsExample", "..\samples\dotnet\openapi-skills\OpenApiSkillsExample.csproj", "{4D91A3E0-C404-495B-AD4A-411C4E83CF54}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connectors.Memory.DuckDB", "src\Connectors\Connectors.Memory.DuckDB\Connectors.Memory.DuckDB.csproj", "{50FAE231-6F24-4779-9D02-12ABBC9A49E2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -294,6 +296,12 @@ Global
{4D91A3E0-C404-495B-AD4A-411C4E83CF54}.Publish|Any CPU.Build.0 = Release|Any CPU
{4D91A3E0-C404-495B-AD4A-411C4E83CF54}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4D91A3E0-C404-495B-AD4A-411C4E83CF54}.Release|Any CPU.Build.0 = Release|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Publish|Any CPU.ActiveCfg = Publish|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Publish|Any CPU.Build.0 = Publish|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{50FAE231-6F24-4779-9D02-12ABBC9A49E2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -336,6 +344,7 @@ Global
{136823BE-8665-4D57-87E0-EF41535539E2} = {0247C2C9-86C3-45BA-8873-28B0948EDC0C}
{4D3DAE63-41C6-4E1C-A35A-E77BDFC40675} = {831DDCA2-7D2C-4C31-80DB-6BDB3E1F7AE0}
{4D91A3E0-C404-495B-AD4A-411C4E83CF54} = {FA3720F1-C99A-49B2-9577-A940257098BF}
{50FAE231-6F24-4779-9D02-12ABBC9A49E2} = {0247C2C9-86C3-45BA-8873-28B0948EDC0C}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FBDC56A3-86AD-4323-AA0F-201E59123B83}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<!-- THIS PROPERTY GROUP MUST COME FIRST -->
<AssemblyName>Microsoft.SemanticKernel.Connectors.Memory.DuckDB</AssemblyName>
<RootNamespace>$(AssemblyName)</RootNamespace>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<!-- IMPORT NUGET PACKAGE SHARED PROPERTIES -->
<Import Project="$(RepoRoot)/dotnet/nuget/nuget-package.props" />

<PropertyGroup>
<!-- NuGet Package Settings -->
<Title>Semantic Kernel - DuckDB Connector</Title>
<Description>DuckDB connector for Semantic Kernel skills and semantic memory</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DuckDB.NET.Data.Full" />
<PackageReference Include="DuckDB.NET.Data" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\SemanticKernel\SemanticKernel.csproj" />
</ItemGroup>

</Project>
189 changes: 189 additions & 0 deletions dotnet/src/Connectors/Connectors.Memory.DuckDB/Database.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DuckDB.NET.Data;

namespace Microsoft.SemanticKernel.Connectors.Memory.DuckDB;

internal struct DatabaseEntry
{
public string Key { get; set; }

public string MetadataString { get; set; }

public string EmbeddingString { get; set; }

public string? Timestamp { get; set; }
}

internal sealed class Database
{
private const string TableName = "SKMemoryTable";

public Database() { }

public Task CreateTableAsync(DuckDBConnection conn, CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
CREATE TABLE IF NOT EXISTS {TableName}(
collection TEXT,
key TEXT,
metadata TEXT,
embedding TEXT,
timestamp TEXT,
PRIMARY KEY(collection, key))";
return cmd.ExecuteNonQueryAsync(cancellationToken);
}

public async Task CreateCollectionAsync(DuckDBConnection conn, string collectionName, CancellationToken cancellationToken = default)
{
if (await this.DoesCollectionExistsAsync(conn, collectionName, cancellationToken).ConfigureAwait(false))
{
// Collection already exists
return;
}

using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
INSERT INTO {TableName} VALUES (?1,?2,?3,?4,?5 ); ";
cmd.Parameters.Add(new DuckDBParameter(collectionName));
cmd.Parameters.Add(new DuckDBParameter(string.Empty));
cmd.Parameters.Add(new DuckDBParameter(string.Empty));
cmd.Parameters.Add(new DuckDBParameter(string.Empty));
cmd.Parameters.Add(new DuckDBParameter(string.Empty));

await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}

public async Task UpdateOrInsertAsync(DuckDBConnection conn,
string collection, string key, string? metadata, string? embedding, string? timestamp, CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
INSERT INTO {TableName} VALUES(?1, ?2, ?3, ?4, ?5)
ON CONFLICT (collection, key) DO UPDATE SET metadata=?3, embedding=?4, timestamp=?5; ";
cmd.Parameters.Add(new DuckDBParameter(collection));
cmd.Parameters.Add(new DuckDBParameter(key));
cmd.Parameters.Add(new DuckDBParameter(metadata ?? string.Empty));
cmd.Parameters.Add(new DuckDBParameter(embedding ?? string.Empty));
cmd.Parameters.Add(new DuckDBParameter(timestamp ?? string.Empty));
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}

public async Task<bool> DoesCollectionExistsAsync(DuckDBConnection conn,
string collectionName,
CancellationToken cancellationToken = default)
{
var collections = await this.GetCollectionsAsync(conn, cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false);
return collections.Contains(collectionName);
}

public async IAsyncEnumerable<string> GetCollectionsAsync(DuckDBConnection conn,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
SELECT DISTINCT collection
FROM {TableName};";

using var dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
yield return dataReader.GetString("collection");
}
}

public async IAsyncEnumerable<DatabaseEntry> ReadAllAsync(DuckDBConnection conn,
string collectionName,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
SELECT * FROM {TableName}
WHERE collection=?1;";
cmd.Parameters.Add(new DuckDBParameter(collectionName));

using var dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
while (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
string key = dataReader.GetString("key");
if (string.IsNullOrWhiteSpace(key))
{
continue;
}
string metadata = dataReader.GetString("metadata");
string embedding = dataReader.GetString("embedding");
string timestamp = dataReader.GetString("timestamp");
yield return new DatabaseEntry() { Key = key, MetadataString = metadata, EmbeddingString = embedding, Timestamp = timestamp };
}
}

public async Task<DatabaseEntry?> ReadAsync(DuckDBConnection conn,
string collectionName,
string key,
CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
SELECT * FROM {TableName}
WHERE collection=?1
AND key=?2; ";
cmd.Parameters.Add(new DuckDBParameter(collectionName));
cmd.Parameters.Add(new DuckDBParameter(key));

using var dataReader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (await dataReader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
string metadata = dataReader.GetString(dataReader.GetOrdinal("metadata"));
string embedding = dataReader.GetString(dataReader.GetOrdinal("embedding"));
string timestamp = dataReader.GetString(dataReader.GetOrdinal("timestamp"));
return new DatabaseEntry()
{
Key = key,
MetadataString = metadata,
EmbeddingString = embedding,
Timestamp = timestamp
};
}

return null;
}

public Task DeleteCollectionAsync(DuckDBConnection conn, string collectionName, CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
DELETE FROM {TableName}
WHERE collection=?;";
cmd.Parameters.Add(new DuckDBParameter(collectionName));
return cmd.ExecuteNonQueryAsync(cancellationToken);
}

public Task DeleteAsync(DuckDBConnection conn, string collectionName, string key, CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
DELETE FROM {TableName}
WHERE collection=?1
AND key=?2; ";
cmd.Parameters.Add(new DuckDBParameter(collectionName));
cmd.Parameters.Add(new DuckDBParameter(key));
return cmd.ExecuteNonQueryAsync(cancellationToken);
}

public Task DeleteEmptyAsync(DuckDBConnection conn, string collectionName, CancellationToken cancellationToken = default)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = $@"
DELETE FROM {TableName}
WHERE collection=?1
AND key IS NULL";
cmd.Parameters.Add(new DuckDBParameter(collectionName));
return cmd.ExecuteNonQueryAsync(cancellationToken);
}
}
14 changes: 14 additions & 0 deletions dotnet/src/Connectors/Connectors.Memory.DuckDB/DuckDBExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Data.Common;

namespace Microsoft.SemanticKernel.Connectors.Memory.DuckDB;

internal static class DuckDBExtensions
{
public static string GetString(this DbDataReader reader, string fieldName)
{
int ordinal = reader.GetOrdinal(fieldName);
return reader.GetString(ordinal);
}
}
Loading

0 comments on commit 69d835c

Please sign in to comment.