Skip to content

Commit

Permalink
remove IStreamDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
kreeben committed Dec 4, 2023
1 parent 964e557 commit 2dd9ccc
Show file tree
Hide file tree
Showing 81 changed files with 460 additions and 647 deletions.
7 changes: 4 additions & 3 deletions src/Sir.Cmd/AnalyzeDocumentCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.Logging;
using Sir.Documents;
using Sir.IO;
using Sir.KeyValue;
using Sir.Strings;

namespace Sir.Cmd
Expand All @@ -19,9 +20,9 @@ public void Run(IDictionary<string, string> args, ILogger logger)
var model = new BagOfCharsModel();
var embedding = new SortedList<int, float>();

using (var sessionFactory = new SessionFactory(logger))
using (var documents = new DocumentStreamSession(dataDirectory, sessionFactory))
using (var documentReader = new DocumentReader(dataDirectory, collectionId, sessionFactory))
using (var kvwriter = new KeyValueWriter(dataDirectory, collectionId))
using (var documents = new DocumentStreamSession(dataDirectory, kvwriter))
using (var documentReader = new DocumentReader(dataDirectory, collectionId))
{
var doc = documents.ReadDocument((collectionId, documentId), select, documentReader);

Expand Down
30 changes: 0 additions & 30 deletions src/Sir.Cmd/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ static void Main(string[] args)
{
TruncateIndex(flags["directory"], flags["collection"], logger);
}
else if (command == "optimize")
{
Optimize(flags, logger);
}
else if (command == "rename")
{
Rename(flags["directory"], flags["collection"], flags["newCollection"], logger);
Expand Down Expand Up @@ -122,32 +118,6 @@ private static IDictionary<string, string> ParseArgs(string[] args)
return dic;
}

private static void Optimize(IDictionary<string, string> args, ILogger logger)
{
var dataDirectory = args["directory"];
var collection = args["collection"];
var skip = int.Parse(args["skip"]);
var take = int.Parse(args["take"]);
var reportFrequency = int.Parse(args["reportFrequency"]);
var pageSize = int.Parse(args["pageSize"]);
var fields = new HashSet<string>(args["fields"].Split(','));
var model = new BagOfCharsModel();

using (var sessionFactory = new SessionFactory(logger))
{
sessionFactory.Optimize(
dataDirectory,
collection,
fields,
model,
new LogStructuredIndexingStrategy(model),
skip,
take,
reportFrequency,
pageSize);
}
}

private static void Slice(IDictionary<string, string> args)
{
var file = args["sourceFileName"];
Expand Down
1 change: 1 addition & 0 deletions src/Sir.Cmd/Sir.Cmd.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<ProjectReference Include="..\Sir.CommonCrawl\Sir.CommonCrawl.csproj" />
<ProjectReference Include="..\Sir.InformationRetreival\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.Mnist\Sir.Mnist.csproj" />
<ProjectReference Include="..\Sir.Wikipedia\Sir.Wikipedia.csproj" />
</ItemGroup>
Expand Down
7 changes: 4 additions & 3 deletions src/Sir.Cmd/ValidateCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ public void Run(IDictionary<string, string> args, ILogger logger)
var count = 0;
var embedding = new SortedList<int, float>();

using(var kvwriter = new KeyValue.KeyValueWriter(dir, collectionId))
using (var sessionFactory = new SessionFactory(logger))
using (var validateSession = new ValidateSession<string>(
collectionId,
new SearchSession(dir, sessionFactory, model, new LogStructuredIndexingStrategy(model), logger),
new QueryParser<string>(dir, sessionFactory, model, embedding: embedding, logger: logger)))
new SearchSession(dir, model, new LogStructuredIndexingStrategy(model), kvwriter, logger),
new QueryParser<string>(dir, kvwriter, model, embedding: embedding, logger: logger)))
{
using (var documents = new DocumentStreamSession(dir, sessionFactory))
using (var documents = new DocumentStreamSession(dir, kvwriter))
{
foreach (var doc in documents.ReadDocuments(collectionId, selectFields, skip, take))
{
Expand Down
4 changes: 2 additions & 2 deletions src/Sir.CommonCrawl/CCHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public static void WriteWatSegment(
};

using (var sessionFactory = new SessionFactory(logger))
using (var writeSession = new WriteSession(new DocumentWriter(sessionFactory, dataDirectory, collectionId)))
using (var indexSession = new IndexSession<string>(model, new LogStructuredIndexingStrategy(model), sessionFactory, dataDirectory, collectionId, logger))
using (var writeSession = new WriteSession(new DocumentWriter(dataDirectory, collectionId)))
using (var indexSession = new IndexSession<string>(model, new LogStructuredIndexingStrategy(model), dataDirectory, collectionId, logger))
{
using (var queue = new ProducerConsumerQueue<Document>(document =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/Sir.CommonCrawl/Sir.CommonCrawl.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Sir.Search\Sir.Session.csproj" />
<ProjectReference Include="..\Sir.InformationRetreival\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.Strings\Sir.Strings.csproj" />
</ItemGroup>

Expand Down
15 changes: 0 additions & 15 deletions src/Sir.Core/IStreamDispatcher.cs

This file was deleted.

19 changes: 12 additions & 7 deletions src/Sir.Document/DocumentReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ public class DocumentReader : IDisposable

public ulong CollectionId { get; }

public DocumentReader(string directory, ulong collectionId, IStreamDispatcher database)
public DocumentReader(string directory, ulong collectionId)
{
var valueStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.val", collectionId)));
var keyStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.key", collectionId)));
var docStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.docs", collectionId)));
var valueIndexStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.vix", collectionId)));
var keyIndexStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.kix", collectionId)));
var docIndexStream = database.CreateReadStream(Path.Combine(directory, string.Format("{0}.dix", collectionId)));
var valueStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.val", collectionId)));
var keyStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.key", collectionId)));
var docStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.docs", collectionId)));
var valueIndexStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.vix", collectionId)));
var keyIndexStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.kix", collectionId)));
var docIndexStream = CreateReadStream(Path.Combine(directory, string.Format("{0}.dix", collectionId)));

_vals = new ValueReader(valueStream);
_keys = new ValueReader(keyStream);
Expand All @@ -38,6 +38,11 @@ public DocumentReader(string directory, ulong collectionId, IStreamDispatcher da
CollectionId = collectionId;
}

public static Stream CreateReadStream(string fileName)
{
return new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, FileOptions.SequentialScan);
}

public (long offset, int length) GetDocumentAddress(long docId)
{
return _docIx.Get(docId);
Expand Down
6 changes: 3 additions & 3 deletions src/Sir.Document/DocumentWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public class DocumentWriter : KeyValueWriter, IDisposable
private readonly DocMapWriter _docs;
private readonly DocIndexWriter _docIx;

public DocumentWriter(IStreamDispatcher streamDispatcher, string directory, ulong collectionId) : base(directory, collectionId, streamDispatcher)
public DocumentWriter(string directory, ulong collectionId) : base(directory, collectionId)
{
var docStream = streamDispatcher.CreateAppendStream(directory, collectionId, "docs");
var docIndexStream = streamDispatcher.CreateAppendStream(directory, collectionId, "dix");
var docStream = CreateAppendStream(directory, collectionId, "docs");
var docIndexStream = CreateAppendStream(directory, collectionId, "dix");

_docs = new DocMapWriter(docStream);
_docIx = new DocIndexWriter(docIndexStream);
Expand Down
3 changes: 2 additions & 1 deletion src/Sir.HttpServer/HttpReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sir.KeyValue;

namespace Sir.HttpServer
{
Expand Down Expand Up @@ -58,7 +59,7 @@ public async Task<SearchResult> Read(HttpRequest request, IModel<string> model)
_logger.LogDebug($"parsed query: {queryLog}");
#endif

using (var readSession = new SearchSession(_config.Get("data_dir"), _sessionFactory, model, new LogStructuredIndexingStrategy(model), _logger))
using (var readSession = new SearchSession(_config.Get("data_dir"), model, new LogStructuredIndexingStrategy(model), new KeyValueWriter(_config.Get("data_dir"), _config.Get("default_collection").ToHash()), _logger))
{
return readSession.Search(query, skip, take);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Sir.HttpServer/ServiceConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ public static IServiceProvider Configure(IServiceCollection services)
var model = new BagOfCharsModel();
var sessionFactory = new SessionFactory(logger);
var directory = config.Get("data_dir");
var qp = new QueryParser<string>(directory, sessionFactory, model, logger: logger);
var defaultCollection = config.Get("default_collection");
var qp = new QueryParser<string>(directory, new KeyValue.KeyValueWriter(directory, defaultCollection.ToHash()), model, logger: logger);
var httpParser = new HttpQueryParser(qp);

services.AddSingleton(typeof(IModel<string>), model);
services.AddSingleton(typeof(IStreamDispatcher), sessionFactory);
services.AddSingleton(typeof(SessionFactory), sessionFactory);
services.AddSingleton(typeof(QueryParser<string>), qp);
services.AddSingleton(typeof(HttpQueryParser), httpParser);
Expand Down
3 changes: 2 additions & 1 deletion src/Sir.HttpServer/Sir.HttpServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Sir.Search\Sir.Session.csproj" />
<ProjectReference Include="..\Sir.Core\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.InformationRetreival\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.Strings\Sir.Strings.csproj" />
</ItemGroup>

Expand Down
33 changes: 0 additions & 33 deletions src/Sir.HttpServer/StringQueryFormatter.cs

This file was deleted.

1 change: 1 addition & 0 deletions src/Sir.HttpServer/sir.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
data_dir=appdata\database
default_collection=wikipedia
admin_password=SuperWiseInformationRetrieval123!
3 changes: 2 additions & 1 deletion src/Sir.ImageTests/Sir.ImageTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Sir.Core\Sir.Core.csproj" />
<ProjectReference Include="..\Sir.Core\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.Images\Sir.Images.csproj" />
<ProjectReference Include="..\Sir.InformationRetreival\Sir.InformationRetreival.csproj" />
<ProjectReference Include="..\Sir.Mnist\Sir.Mnist.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Sir.Images/LinearClassifierImageModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Sir.Images
{
public class LinearClassifierImageModel : DistanceCalculator, IModel<IImage>
public class LinearClassifierImageModel : Sir.DistanceCalculator, IModel<IImage>
{
public double IdenticalAngle => 0.95d;
public double FoldAngle => 0.75d;
Expand Down
2 changes: 1 addition & 1 deletion src/Sir.Images/Sir.Images.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Sir.Core\Sir.Core.csproj" />
<ProjectReference Include="..\Sir.InformationRetreival\Sir.InformationRetreival.csproj" />
</ItemGroup>

</Project>
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ public interface IIndexReadWriteStrategy
{
void Put<T>(VectorNode column, VectorNode node);
Hit GetMatchOrNull(ISerializableVector vector, IModel model, ColumnReader reader);
void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, IStreamDispatcher streamDispatcher, ILogger logger = null);
void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, ILogger logger = null);
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using Sir.Documents;
using System;
using System.Buffers;
using System.Collections.Generic;
Expand All @@ -17,9 +18,9 @@ public class PostingsReader : IDisposable
private readonly ILogger _logger;
private readonly ulong _collectionId;

public PostingsReader(string directory, ulong collectionId, long keyId, IStreamDispatcher streamDispatcher, ILogger logger = null)
public PostingsReader(string directory, ulong collectionId, long keyId, ILogger logger = null)
{
_stream = streamDispatcher.CreateReadStream(Path.Combine(directory, $"{collectionId}.{keyId}.pos"));
_stream = DocumentReader.CreateReadStream(Path.Combine(directory, $"{collectionId}.{keyId}.pos"));
_logger = logger;
_collectionId = collectionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class PostingsResolver : IDisposable
/// <summary>
/// Read posting list document IDs into memory.
/// </summary>
public void Resolve(IQuery query, IStreamDispatcher sessionFactory, ILogger logger = null)
public void Resolve(IQuery query, ILogger logger = null)
{
foreach (var term in query.AllTerms())
{
Expand All @@ -23,7 +23,7 @@ public void Resolve(IQuery query, IStreamDispatcher sessionFactory, ILogger logg

if (!_readers.TryGetValue(key, out reader))
{
reader = new PostingsReader(term.Directory, term.CollectionId, term.KeyId, sessionFactory, logger);
reader = new PostingsReader(term.Directory, term.CollectionId, term.KeyId, logger);

if (reader != null)
{
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using Sir.IO;
using Sir.KeyValue;
using System.Diagnostics;

namespace Sir
Expand All @@ -23,14 +24,14 @@ public void Put<T>(VectorNode column, VectorNode node)
column.AddOrAppend(node, _model);
}

public void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, IStreamDispatcher streamDispatcher, ILogger logger = null)
public void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, ILogger logger = null)
{
var time = Stopwatch.StartNew();

using (var vectorStream = streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
using (var vectorStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
{
var size = columnWriter.CreatePage(tree, vectorStream, postingsStream, pageIndexWriter);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using Sir.IO;
using Sir.KeyValue;
using System.Diagnostics;

namespace Sir
Expand All @@ -23,14 +24,14 @@ public void Put<T>(VectorNode column, VectorNode node)
column.AddOrAppendSupervised(node, _model);
}

public void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, IStreamDispatcher streamDispatcher, ILogger logger = null)
public void Commit(string directory, ulong collectionId, long keyId, VectorNode tree, ILogger logger = null)
{
var time = Stopwatch.StartNew();

using (var vectorStream = streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(streamDispatcher.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
using (var vectorStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
{
var size = columnWriter.CreatePage(tree, vectorStream, postingsStream, pageIndexWriter);

Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 2dd9ccc

Please sign in to comment.