-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Deduplication.cs
101 lines (78 loc) · 2.29 KB
/
Deduplication.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// ReSharper disable ReplaceAsyncWithTaskReturn
public class Deduplication
{
SqlConnection sqlConnection = null!;
string connectionString = null!;
async Task CreateTable()
{
#region CreateDeduplicationTable
var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Create();
#endregion
}
async Task DeleteTable()
{
#region DeleteDeduplicationTable
var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Drop();
#endregion
}
async Task Send()
{
string headers = null!;
byte[] body = null!;
#region SendWithDeduplication
var manager = new QueueManager(
"endpointTable",
sqlConnection,
"DeduplicationTable");
var message = new OutgoingMessage(
id: Guid.NewGuid(),
headers: headers,
bodyBytes: body);
await manager.Send(message);
#endregion
}
async Task DeduplicationCleanerJob()
{
#region DeduplicationCleanerJobStart
var cleaner = new DedupeCleanerJob(
table: "Deduplication",
connectionBuilder: cancel =>
ConnectionHelpers.OpenConnection(connectionString, cancel),
criticalError: _ => { },
expireWindow: TimeSpan.FromHours(1),
frequencyToRunCleanup: TimeSpan.FromMinutes(10));
cleaner.Start();
#endregion
#region DeduplicationCleanerJobStop
await cleaner.Stop();
#endregion
}
async Task SendBatch()
{
string headers1 = null!;
byte[] body1 = null!;
string headers2 = null!;
byte[] body2 = null!;
#region SendBatchWithDeduplication
var manager = new QueueManager(
"endpointTable",
sqlConnection,
"DeduplicationTable");
var messages = new List<OutgoingMessage>
{
new(
id: Guid.NewGuid(),
headers: headers1,
bodyBytes: body1),
new(
id: Guid.NewGuid(),
headers: headers2,
bodyBytes: body2),
};
await manager.Send(messages);
#endregion
}
}