Skip to content

Commit

Permalink
add donations worker, polling new donations on an interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Felk committed Dec 18, 2024
1 parent 7b8532b commit 4981909
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 6 deletions.
2 changes: 2 additions & 0 deletions TPP.Core/Configuration/BaseConfig.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Immutable;
using System.ComponentModel;
using NodaTime;
Expand Down Expand Up @@ -89,4 +90,5 @@ public sealed class StreamlabsConfig : ConfigBase
{
public bool Enabled { get; init; } = false;
public string AccessToken { get; init; } = "";
public TimeSpan PollingInterval { get; init; } = TimeSpan.FromMinutes(1);
}
13 changes: 12 additions & 1 deletion TPP.Core/DonationHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using TPP.Core.Chat;
using TPP.Core.Overlay;
using TPP.Core.Overlay.Events;
using TPP.Core.Streamlabs;
using TPP.Model;
using TPP.Persistence;

Expand All @@ -29,7 +30,17 @@ public record NewDonation(
string Username,
decimal Amount,
string Currency,
string? Message);
string? Message)
{
public static NewDonation FromStreamlabs(StreamlabsClient.Donation donation) => new(
Id: donation.DonationId,
CreatedAt: donation.CreatedAt,
Username: donation.Name,
Amount: donation.Amount,
Currency: donation.Currency,
Message: donation.Message
);
}

public async Task Process(NewDonation donation)
{
Expand Down
48 changes: 48 additions & 0 deletions TPP.Core/DonationsWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using TPP.Core.Streamlabs;
using TPP.Model;
using TPP.Persistence;

namespace TPP.Core;

public sealed class DonationsWorker(
ILoggerFactory loggerFactory,
TimeSpan pollingInterval,
StreamlabsClient streamlabsClient,
IDonationRepo donationRepo,
DonationHandler donationHandler
) : IWithLifecycle
{
private readonly ILogger<ChattersWorker> _logger = loggerFactory.CreateLogger<ChattersWorker>();

public async Task Start(CancellationToken cancellationToken)
{
try { await Task.Delay(pollingInterval, cancellationToken); }
catch (OperationCanceledException) { return; }
while (!cancellationToken.IsCancellationRequested)
{
try
{
Donation? mostRecentDonation = await donationRepo.GetMostRecentDonation();
_logger.LogDebug("Polling for new donations... most recent one is {Donation}", mostRecentDonation);
List<StreamlabsClient.Donation> donations =
await streamlabsClient.GetDonations(after: mostRecentDonation?.DonationId, currency: "USD");
_logger.LogDebug("Received new donations: {Donations}", string.Join(", ", donations));
foreach (var donation in donations.OrderBy(d => d.CreatedAt)) // process in chronological order
await donationHandler.Process(DonationHandler.NewDonation.FromStreamlabs(donation));
}
catch (Exception e)
{
_logger.LogError(e, "Failed polling for new donations");
}

try { await Task.Delay(pollingInterval, cancellationToken); }
catch (OperationCanceledException) { break; }
}
}
}
14 changes: 9 additions & 5 deletions TPP.Core/Modes/ModeBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public sealed class ModeBase : IWithLifecycle, ICommandHandler
private readonly IClock _clock;
private readonly ProcessMessage _processMessage;
private readonly ChattersWorker? _chattersWorker;
private readonly DonationsWorker? _donationsWorker;

/// Processes a message that wasn't already processed by the mode base,
/// and returns whether the message was actively processed.
Expand Down Expand Up @@ -154,16 +155,17 @@ public ModeBase(
: new ChattersWorker(loggerFactory, clock,
((TwitchChat)_chats[primaryChat.Name]).TwitchApi, repos.ChattersSnapshotsRepo, primaryChat);

if (baseConfig.StreamlabsConfig.Enabled)
StreamlabsConfig streamlabsConfig = baseConfig.StreamlabsConfig;
if (streamlabsConfig.Enabled)
{
IChat chat = _chats.Values.First(); // TODO
DonationHandler donationHandler = new(loggerFactory.CreateLogger<DonationHandler>(),
repos.DonationRepo, repos.UserRepo, repos.TokensBank, chat, overlayConnection,
baseConfig.DonorBadgeCents);
// TODO use the handler

StreamlabsClient streamlabsClient = new(loggerFactory.CreateLogger<StreamlabsClient>(), baseConfig.StreamlabsConfig.AccessToken);
// TODO do something with the client, e.g. open Websocket and add donations refresh worker
StreamlabsClient streamlabsClient = new(loggerFactory.CreateLogger<StreamlabsClient>(),
streamlabsConfig.AccessToken);
_donationsWorker = new DonationsWorker(loggerFactory, streamlabsConfig.PollingInterval,
streamlabsClient, repos.DonationRepo, donationHandler);
}
}

Expand Down Expand Up @@ -262,6 +264,8 @@ public async Task Start(CancellationToken cancellationToken)
tasks.Add(_sendOutQueuedMessagesWorker.Start(cancellationToken));
if (_chattersWorker != null)
tasks.Add(_chattersWorker.Start(cancellationToken));
if (_donationsWorker != null)
tasks.Add(_donationsWorker.Start(cancellationToken));
await TaskUtils.WhenAllFastExit(tasks);

foreach (IChat chat in _chats.Values)
Expand Down
3 changes: 3 additions & 0 deletions TPP.Core/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@
"string",
"null"
]
},
"PollingInterval": {
"type": "string"
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions TPP.Persistence.MongoDB/Repos/DonationRepo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ private void InitIndexes()
public async Task<Donation?> FindDonation(long donationId) =>
await Collection.Find(p => p.DonationId == donationId).FirstOrDefaultAsync();

public async Task<Donation?> GetMostRecentDonation() =>
await Collection.AsQueryable().OrderByDescending(u => u.CreatedAt).FirstOrDefaultAsync();

public async Task<Donation> InsertDonation(
long donationId, Instant createdAt, string userName, string? userId, int cents, string? message = null)
{
Expand Down
1 change: 1 addition & 0 deletions TPP.Persistence/IDonationRepo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface IDonationRepo
{
public Task<Donation?> FindDonation(long donationId);

public Task<Donation?> GetMostRecentDonation();
public Task<Donation> InsertDonation(
long donationId,
Instant createdAt,
Expand Down

0 comments on commit 4981909

Please sign in to comment.