Skip to content

Commit

Permalink
introduce PostingsWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
kreeben committed May 9, 2024
1 parent c9fdb89 commit 8d8d1a0
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 110 deletions.
8 changes: 4 additions & 4 deletions src/Sir.Document/DocumentRegistryWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class DocumentRegistryWriter : IDisposable

public DocumentRegistryWriter(string directory, ulong collectionId)
{
_documentMapWriter = new DocumentMapWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, "docs"));
_documentIndexWriter = new DocumentIndexWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, "dix"));
_documentMapWriter = new DocumentMapWriter(StreamFactory.CreateAppendStream(directory, collectionId, "docs"));
_documentIndexWriter = new DocumentIndexWriter(StreamFactory.CreateAppendStream(directory, collectionId, "dix"));
_kvWriter = new KeyValueWriter(directory, collectionId);
_directory = directory;
_collectionId = collectionId;
Expand Down Expand Up @@ -52,8 +52,8 @@ public void Commit()
_documentIndexWriter.Dispose();
_kvWriter.Dispose();

_documentMapWriter = new DocumentMapWriter(KeyValueWriter.CreateAppendStream(_directory, _collectionId, "docs"));
_documentIndexWriter = new DocumentIndexWriter(KeyValueWriter.CreateAppendStream(_directory, _collectionId, "dix"));
_documentMapWriter = new DocumentMapWriter(StreamFactory.CreateAppendStream(_directory, _collectionId, "docs"));
_documentIndexWriter = new DocumentIndexWriter(StreamFactory.CreateAppendStream(_directory, _collectionId, "dix"));
_kvWriter = new KeyValueWriter(_directory, _collectionId);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Sir.InformationRetreival/IO/ColumnReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Hit ClosestMatchOrNullStoppingAtFirstIdenticalPage(ISerializableVector ve
}
}

if (hit.Score >= model.IdenticalAngle)
if (hit.Score.Approximates(model.IdenticalAngle))
break;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Sir.InformationRetreival/IO/ColumnWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public ColumnWriter(Stream indexStream, bool keepStreamOpen = false)
_keepIndexStreamOpen = keepStreamOpen;
}

public (int depth, int width) CreatePage(VectorNode column, Stream vectorStream, Stream postingsStream, PageIndexWriter pageIndexWriter)
public (int depth, int width) CreatePage(VectorNode column, Stream vectorStream, PostingsWriter postingsWriter, PageIndexWriter pageIndexWriter)
{
var page = column.SerializeTree(_ixStream, vectorStream, postingsStream);
var page = column.SerializeTree(_ixStream, vectorStream, postingsWriter);

pageIndexWriter.Put(page.offset, page.length);

Expand Down
24 changes: 3 additions & 21 deletions src/Sir.InformationRetreival/IO/GraphBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static void SerializeNode(this VectorNode node, Stream stream)
/// <param name="vectorStream">stream to persist vectors into</param>
/// <param name="postingsStream">stream to persist postings into</param>
/// <returns></returns>
public static (long offset, long length) SerializeTree(this VectorNode node, Stream indexStream = null, Stream vectorStream = null, Stream postingsStream = null)
public static (long offset, long length) SerializeTree(this VectorNode node, Stream indexStream = null, Stream vectorStream = null, PostingsWriter postingsWriter = null)
{
var stack = new Stack<VectorNode>();
var offset = indexStream.Position;
Expand All @@ -295,9 +295,9 @@ public static (long offset, long length) SerializeTree(this VectorNode node, Str

while (node != null)
{
if (node.PostingsOffset == -1 && postingsStream != null)
if (node.PostingsOffset == -1 && postingsWriter != null)
{
SerializePostings(node, postingsStream);
postingsWriter.SerializePostings(node);
}

if (vectorStream != null)
Expand Down Expand Up @@ -327,23 +327,5 @@ public static (long offset, long length) SerializeTree(this VectorNode node, Str

return (offset, length);
}

public static void SerializePostings(VectorNode node, Stream postingsStream)
{
if (node.DocIds.Count == 0) throw new ArgumentException("can't be empty", nameof(node.DocIds));

node.PostingsOffset = postingsStream.Position;

// serialize item count
postingsStream.Write(BitConverter.GetBytes((long)node.DocIds.Count));

// serialize address of next page (unknown at this time)
postingsStream.Write(BitConverter.GetBytes((long)0));

foreach (var docId in node.DocIds)
{
postingsStream.Write(BitConverter.GetBytes(docId));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

namespace Sir.IO
{
public class PostingsResolver : IDisposable
/// <summary>
/// Read postings lists from storage and map them to query terms
/// </summary>
public class PostingsReadOrchestrator : IDisposable
{
private readonly Dictionary<(string, ulong, long), PostingsReader> _readers = new Dictionary<(string, ulong, long), PostingsReader>();
private readonly ILogger _logger;

/// <summary>
/// Read posting list document IDs into memory.
/// </summary>
public void Resolve(IQuery query, ILogger logger = null)
public PostingsReadOrchestrator(ILogger logger = null)
{
_logger = logger;
}

public void ReadAndMapPostings(IQuery query)
{
foreach (var term in query.AllTerms())
{
Expand All @@ -23,7 +29,7 @@ public void Resolve(IQuery query, ILogger logger = null)

if (!_readers.TryGetValue(key, out reader))
{
reader = new PostingsReader(term.Directory, term.CollectionId, term.KeyId, logger);
reader = new PostingsReader(term.Directory, term.CollectionId, term.KeyId, _logger);

if (reader != null)
{
Expand Down
41 changes: 41 additions & 0 deletions src/Sir.InformationRetreival/IO/PostingsWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.IO;

namespace Sir.IO
{
public class PostingsWriter : IDisposable
{
private readonly Stream _postingsStream;

public PostingsWriter(Stream postingsStream)
{
_postingsStream = postingsStream;
}

public void SerializePostings(VectorNode node)
{
if (node.DocIds.Count == 0) throw new ArgumentException("can't be empty", nameof(node.DocIds));

node.PostingsOffset = _postingsStream.Position;

// serialize item count
_postingsStream.Write(BitConverter.GetBytes((long)node.DocIds.Count));

// serialize address of next page (unknown at this time)
_postingsStream.Write(BitConverter.GetBytes((long)0));

foreach (var docId in node.DocIds)
{
_postingsStream.Write(BitConverter.GetBytes(docId));
}
}

public void Dispose()
{
if (_postingsStream != null )
{
_postingsStream.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public void Commit(string directory, ulong collectionId, long keyId, VectorNode
{
var time = Stopwatch.StartNew();

using (var vectorStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
using (var vectorStream = StreamFactory.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsWriter = new PostingsWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "pos")))
using (var columnWriter = new ColumnWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
{
var size = columnWriter.CreatePage(tree, vectorStream, postingsStream, pageIndexWriter);
var size = columnWriter.CreatePage(tree, vectorStream, postingsWriter, pageIndexWriter);

if (logger != null)
logger.LogInformation($"serialized column {keyId}, weight {tree.Weight} {size} in {time.Elapsed}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public void Commit(string directory, ulong collectionId, long keyId, VectorNode
{
var time = Stopwatch.StartNew();

using (var vectorStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsStream = KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "pos"))
using (var columnWriter = new ColumnWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(KeyValueWriter.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
using (var vectorStream = StreamFactory.CreateAppendStream(directory, collectionId, keyId, "vec"))
using (var postingsWriter = new PostingsWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "pos")))
using (var columnWriter = new ColumnWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "ix")))
using (var pageIndexWriter = new PageIndexWriter(StreamFactory.CreateAppendStream(directory, collectionId, keyId, "ixtp")))
{
var size = columnWriter.CreatePage(tree, vectorStream, postingsStream, pageIndexWriter);
var size = columnWriter.CreatePage(tree, vectorStream, postingsWriter, pageIndexWriter);

if (logger != null)
logger.LogInformation($"serialized column {keyId}, weight {tree.Weight} {size} in {time.Elapsed}");
Expand Down
10 changes: 0 additions & 10 deletions src/Sir.InformationRetreival/Parsers/IQueryFormatter.cs

This file was deleted.

34 changes: 19 additions & 15 deletions src/Sir.InformationRetreival/Session/SearchSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class SearchSession<T> : DocumentStreamSession, IDisposable, ISearchSessi
{
private readonly IModel<T> _model;
private readonly IIndexReadWriteStrategy _indexStrategy;
private readonly PostingsResolver _postingsResolver;
private readonly PostingsReadOrchestrator _postingsReadOrchestrator;
private readonly Scorer _scorer;
private readonly ILogger _logger;
private readonly Dictionary<(string, ulong, long), ColumnReader> _readers;
Expand All @@ -25,12 +25,12 @@ public SearchSession(
IModel<T> model,
IIndexReadWriteStrategy indexStrategy,
ILogger logger = null,
PostingsResolver postingsResolver = null,
PostingsReadOrchestrator postingsResolver = null,
Scorer scorer = null) : base(directory)
{
_model = model;
_indexStrategy = indexStrategy;
_postingsResolver = postingsResolver ?? new PostingsResolver();
_postingsReadOrchestrator = postingsResolver ?? new PostingsReadOrchestrator(logger);
_scorer = scorer ?? new Scorer();
_logger = logger;
_readers = new Dictionary<(string, ulong, long), ColumnReader>();
Expand All @@ -50,7 +50,7 @@ public override void ClearCachedReaders()

public SearchResult Search(IQuery query, int skip, int take)
{
var result = Execute(query, skip, take, false);
var result = OrchestrateSearch(query, skip, take, false);

if (result != null)
{
Expand All @@ -66,7 +66,7 @@ public SearchResult Search(IQuery query, int skip, int take)

public Document SearchScalar(IQuery query)
{
var result = Execute(query, 0, 1, true);
var result = OrchestrateSearch(query, 0, 1, true);

if (result != null)
{
Expand All @@ -82,7 +82,7 @@ public Document SearchScalar(IQuery query)

public SearchResult SearchIdentical(IQuery query, int take)
{
var result = Execute(query, 0, take, true);
var result = OrchestrateSearch(query, 0, take, true);

if (result != null)
{
Expand All @@ -96,28 +96,32 @@ public SearchResult SearchIdentical(IQuery query, int take)
return new SearchResult(query, 0, 0, System.Linq.Enumerable.Empty<Document>());
}

private ScoredResult Execute(IQuery query, int skip, int take, bool identicalMatchesOnly)
private ScoredResult OrchestrateSearch(IQuery query, int skip, int take, bool identicalMatchesOnly)
{
var timer = Stopwatch.StartNew();

// Scan index
// Scan index to find posting addresses for each query term.
Scan(query, identicalMatchesOnly);

LogDebug($"scanning took {timer.Elapsed}");
timer.Restart();

// Read postings lists
_postingsResolver.Resolve(query, _logger);
// Read postings.
_postingsReadOrchestrator.ReadAndMapPostings(query);

LogDebug($"reading postings took {timer.Elapsed}");
timer.Restart();

// Score
// Score postings.
IDictionary<(ulong CollectionId, long DocumentId), double> scoredResult = new Dictionary<(ulong, long), double>();
_scorer.Score(query, ref scoredResult);

LogDebug($"scoring took {timer.Elapsed}");
timer.Restart();

// Sort
// Sort postings by score.
var sorted = Sort(scoredResult, skip, take);

LogDebug($"sorting took {timer.Elapsed}");

return sorted;
Expand Down Expand Up @@ -152,7 +156,7 @@ private void Scan(IQuery query, bool identicalMatchesOnly)

if (hit != null)
{
if ((identicalMatchesOnly && hit.Score >= _model.IdenticalAngle) || !identicalMatchesOnly)
if (!identicalMatchesOnly || (hit.Score >= _model.IdenticalAngle))
{
term.Score = hit.Score;
term.PostingsOffsets = hit.PostingsOffsets;
Expand Down Expand Up @@ -253,8 +257,8 @@ private void LogError(Exception ex, string message)

public override void Dispose()
{
if (_postingsResolver!= null)
_postingsResolver.Dispose();
if (_postingsReadOrchestrator!= null)
_postingsReadOrchestrator.Dispose();

foreach (var reader in _readers.Values)
{
Expand Down
46 changes: 5 additions & 41 deletions src/Sir.KeyValue/KeyValueWriter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.IO;

namespace Sir.KeyValue
{
Expand All @@ -20,10 +18,10 @@ public class KeyValueWriter : IDisposable

public KeyValueWriter(string directory, ulong collectionId)
: this(
new ValueWriter(CreateAppendStream(directory, collectionId, "val")),
new ValueWriter(CreateAppendStream(directory, collectionId, "key")),
new ValueIndexWriter(CreateAppendStream(directory, collectionId, "vix")),
new ValueIndexWriter(CreateAppendStream(directory, collectionId, "kix"))
new ValueWriter(StreamFactory.CreateAppendStream(directory, collectionId, "val")),
new ValueWriter(StreamFactory.CreateAppendStream(directory, collectionId, "key")),
new ValueIndexWriter(StreamFactory.CreateAppendStream(directory, collectionId, "vix")),
new ValueIndexWriter(StreamFactory.CreateAppendStream(directory, collectionId, "kix"))
)
{
_collectionId = collectionId;
Expand All @@ -39,40 +37,6 @@ public KeyValueWriter(ValueWriter values, ValueWriter keys, ValueIndexWriter val
_keyIx = keyIx;
}

public static Stream CreateAppendStream(string directory, ulong collectionId, string fileExtension)
{
if (!System.IO.Directory.Exists(directory))
{
System.IO.Directory.CreateDirectory(directory);
}

var fileName = Path.Combine(directory, $"{collectionId}.{fileExtension}");

if (!File.Exists(fileName))
{
using (var fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite)) { }
}

return new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
}

public static Stream CreateAppendStream(string directory, ulong collectionId, long keyId, string fileExtension)
{
if (!System.IO.Directory.Exists(directory))
{
System.IO.Directory.CreateDirectory(directory);
}

var fileName = Path.Combine(directory, $"{collectionId}.{keyId}.{fileExtension}");

if (!File.Exists(fileName))
{
using (var fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite)) { }
}

return new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
}

public long EnsureKeyExists(string keyStr)
{
var keyHash = keyStr.ToHash();
Expand All @@ -92,7 +56,7 @@ public long EnsureKeyExists(string keyStr)
keyId = PutKeyInfo(keyInfo.offset, keyInfo.len, keyInfo.dataType);

// store key mapping
using (var stream = CreateAppendStream(_directory, _collectionId, "kmap"))
using (var stream = StreamFactory.CreateAppendStream(_directory, _collectionId, "kmap"))
{
stream.Write(BitConverter.GetBytes(keyHash), 0, sizeof(ulong));
}
Expand Down
Loading

0 comments on commit 8d8d1a0

Please sign in to comment.