diff --git a/.gitignore b/.gitignore index 499fe60..3e91d5a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ .cabal-sandbox .stack-work cabal.sandbox.config -dist-stack \ No newline at end of file +dist-stack +stack.yaml.lock +dist-newstyle +cabal.project.local +.DS_Store diff --git a/CHANGELOG.md b/CHANGELOG.md index dc7c099..f06068f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +* 0.4.0 Libby Horacek 2024-12-30 + + Bump version due to breaking changes + +* 0.3.0 Remeike Forbes 2022-11-23 + + Introduce batched jobs + * 0.2.0 Remeike Forbes Coerce jobs to string (to work with Redis 5) diff --git a/example/hworker-example.cabal b/example/hworker-example.cabal index c86c8ef..519024d 100644 --- a/example/hworker-example.cabal +++ b/example/hworker-example.cabal @@ -23,4 +23,5 @@ executable hworker-example , attoparsec , uuid >= 1.2.6 , hworker - default-language: Haskell2010 \ No newline at end of file + , saturn + default-language: Haskell2010 diff --git a/example/src/Main.hs b/example/src/Main.hs index de6c3e9..c1f7dba 100644 --- a/example/src/Main.hs +++ b/example/src/Main.hs @@ -1,37 +1,77 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (MVar, newMVar, putMVar, takeMVar) -import Control.Monad (forever) -import Data.Aeson (FromJSON, ToJSON) -import qualified Data.Text as T -import GHC.Generics (Generic) + + +-------------------------------------------------------------------------------- +import Control.Concurrent ( forkIO, threadDelay ) +import Control.Concurrent.MVar ( MVar, newMVar, putMVar, takeMVar ) +import Control.Monad ( forever ) +import Data.Aeson ( FromJSON, ToJSON ) +import Data.Time.Clock ( getCurrentTime ) +import GHC.Generics ( Generic ) +import qualified Saturn as Schedule +-------------------------------------------------------------------------------- import System.Hworker +-------------------------------------------------------------------------------- + + +data PrintJob + = PrintA + | PrintB + | PrintC + deriving (Generic, Show) + + +newtype State = + State (MVar Int) + -data PrintJob = PrintA | PrintB deriving (Generic, Show) -data State = State (MVar Int) instance ToJSON PrintJob instance FromJSON PrintJob -loopForever :: a -loopForever = loopForever instance Job State PrintJob where - job (State mvar) PrintA = - do v <- takeMVar mvar - if v == 0 - then do putMVar mvar 0 - putStrLn "A" >> return Success - else do putMVar mvar (v - 1) - error $ "Dying: " ++ show v - - job _ PrintB = putStrLn "B" >> return Success - -main = do mvar <- newMVar 3 - hworker <- create "printer" (State mvar) - forkIO (worker hworker) - forkIO (monitor hworker) - forkIO (forever $ queue hworker PrintA >> threadDelay 1000000) - forkIO (forever $ queue hworker PrintB >> threadDelay 500000) - forever (threadDelay 1000000) + job hw PrintA = + let + State mvar = hworkerState hw + in do + v <- takeMVar mvar + if v == 0 + then do + putMVar mvar 0 + putStrLn "A" >> return Success + else do + putMVar mvar (v - 1) + error $ "Dying: " ++ show v + + job _ PrintB = + putStrLn "B" >> return Success + + job _ PrintC = + putStrLn "C" >> getCurrentTime >>= print >> return Success + + +main :: IO () +main = do + mvar <- newMVar 3 + hworker <- create "printer" (State mvar) + _ <- forkIO (worker hworker) + _ <- forkIO (monitor hworker) + _ <- forkIO (forever $ queue hworker PrintA >> threadDelay 1000000) + _ <- forkIO (forever $ queue hworker PrintB >> threadDelay 500000) + forever (threadDelay 1000000) + + +runCron :: IO () +runCron = do + print ("Starting" :: String) + mvar <- newMVar 3 + hworker <- + createWith + (defaultHworkerConfig "printer" (State mvar)) + { hwconfigCronJobs = [CronJob "per-minute" PrintC Schedule.everyMinute] } + _ <- forkIO (worker hworker) + _ <- forkIO (monitor hworker) + _ <- forkIO (scheduler hworker) + forever (threadDelay 1000000) diff --git a/hworker.cabal b/hworker.cabal index a96105d..66d1f19 100644 --- a/hworker.cabal +++ b/hworker.cabal @@ -1,5 +1,6 @@ +cabal-version: 3.0 name: hworker -version: 0.2.0 +version: 0.4.0 synopsis: A reliable at-least-once job queue built on top of redis. description: See README. homepage: http://github.com/positiondev/hworker @@ -9,11 +10,9 @@ author: Daniel Patterson maintainer: dbp@dbpmail.net build-type: Simple extra-source-files: README.md CHANGELOG.md -cabal-version: >=1.10 library exposed-modules: System.Hworker - other-modules: Data.Aeson.Helpers build-depends: base >= 4.7 && < 5 , aeson , hedis >= 0.6.5 @@ -22,16 +21,20 @@ library , time >= 1.5 , attoparsec , uuid >= 1.2.6 + , mtl + , conduit + , saturn hs-source-dirs: src default-language: Haskell2010 ghc-options: -Wall Test-Suite hworker-test - type: exitcode-stdio-1.0 - hs-source-dirs: src test - main-is: Spec.hs - other-modules: Data.Aeson.Helpers - , System.Hworker + type: exitcode-stdio-1.0 + hs-source-dirs: src test + main-is: Spec.hs + other-modules: System.Hworker + ghc-options: -Wall + -fno-warn-unused-do-bind build-depends: base >= 4.7 && < 5 , aeson , hedis >= 0.6.5 @@ -43,3 +46,6 @@ Test-Suite hworker-test , hspec >= 2 , hspec-contrib , HUnit + , mtl + , conduit + , saturn diff --git a/src/Data/Aeson/Helpers.hs b/src/Data/Aeson/Helpers.hs deleted file mode 100644 index f6a3a3b..0000000 --- a/src/Data/Aeson/Helpers.hs +++ /dev/null @@ -1,20 +0,0 @@ -module Data.Aeson.Helpers where - -import Data.Aeson -import Data.Aeson.Parser (value) -import Data.Attoparsec.Lazy (Parser) -import qualified Data.Attoparsec.Lazy as L -import qualified Data.ByteString.Lazy as L - --- NOTE(dbp 2015-06-14): Taken from Data.Aeson.Parser.Internal -decodeWith :: Parser Value -> (Value -> Result a) -> L.ByteString -> Maybe a -decodeWith p to s = - case L.parse p s of - L.Done _ v -> case to v of - Success a -> Just a - _ -> Nothing - _ -> Nothing -{-# INLINE decodeWith #-} - -decodeValue :: FromJSON t => L.ByteString -> Maybe t -decodeValue = decodeWith value fromJSON diff --git a/src/System/Hworker.hs b/src/System/Hworker.hs index 7af19a7..83c26cd 100644 --- a/src/System/Hworker.hs +++ b/src/System/Hworker.hs @@ -1,11 +1,13 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE ExistentialQuantification #-} -{-# LANGUAGE FunctionalDependencies #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE GeneralisedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StrictData #-} {-| @@ -27,7 +29,7 @@ also good examples): > instance FromJSON PrintJob > > instance Job State PrintJob where -> job (State mvar) Print = +> job Hworker { hworkerState = State mvar } Print = > do v <- takeMVar mvar > putMVar mvar (v + 1) > putStrLn $ "A(" ++ show v ++ ")" @@ -44,66 +46,116 @@ also good examples): -} module System.Hworker - ( -- * Types - Result(..) - , Job(..) - , Hworker - , HworkerConfig(..) - , ExceptionBehavior(..) - , RedisConnection(..) - , defaultHworkerConfig - -- * Managing Workers - , create - , createWith - , destroy - , worker - , monitor - -- * Queuing Jobs - , queue - -- * Inspecting Workers - , jobs - , failed - , broken - -- * Debugging Utilities - , debugger - ) - where - -import Control.Arrow (second) -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) -import Control.Exception (SomeException, catch) -import Control.Monad (forM, forever, void, when) -import Data.Aeson (FromJSON, ToJSON) -import qualified Data.Aeson as A -import Data.Aeson.Helpers -import Data.ByteString (ByteString) -import qualified Data.ByteString.Char8 as B8 -import qualified Data.ByteString.Lazy as LB -import Data.Either (isRight) -import Data.Maybe (fromJust, mapMaybe) -import Data.Monoid ((<>)) -import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Text.Encoding as T -import Data.Time.Calendar (Day (..)) -import Data.Time.Clock (NominalDiffTime, UTCTime (..), - diffUTCTime, getCurrentTime) -import qualified Data.UUID as UUID -import qualified Data.UUID.V4 as UUID -import qualified Database.Redis as R -import GHC.Generics (Generic) + ( -- * Types + Result(..) + , Job(..) + , Hworker(..) + , HworkerConfig(..) + , defaultHworkerConfig + , ExceptionBehavior(..) + , RedisConnection(..) + , BatchId(..) + , BatchStatus(..) + , BatchSummary(..) + , CronJob(..) + , QueueingResult(..) + , StreamingResult(..) + -- * Managing Workers + , create + , createWith + , destroy + , worker + , scheduler + , monitor + -- * Queuing Jobs + , queue + , queueScheduled + , queueBatch + , streamBatch + , streamBatchTx + , initBatch + , stopBatchQueueing + -- * Cron Jobs + , initCron + , queueCron + , requeueCron + , checkCron + -- * Inspecting Workers + , listJobs + , listFailed + , listScheduled + , getCronProcessing + , broken + , batchSummary + -- * Debugging Utilities + , debugger + , batchCounter + ) where + +-------------------------------------------------------------------------------- +import Control.Arrow ( second) +import Control.Concurrent ( threadDelay) +import Control.Exception ( SomeException + , Exception + , throw + , catch + , catchJust + , asyncExceptionFromException + , AsyncException + ) +import Control.Monad ( forM_, forever, void, when ) +import Control.Monad.Trans ( liftIO, lift ) +import Data.Aeson ( FromJSON, ToJSON, (.=), (.:), (.:?) ) +import qualified Data.Aeson as A +import Data.ByteString ( ByteString ) +import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy as LB +import Data.Conduit ( ConduitT ) +import qualified Data.Conduit as Conduit +import Data.Either ( isRight ) +import Data.Maybe ( isJust, mapMaybe, listToMaybe ) +import Data.Text ( Text ) +import qualified Data.Text as T +import qualified Data.Text.Encoding as T +import Data.Time.Clock ( NominalDiffTime + , UTCTime(..) + , diffUTCTime + , getCurrentTime + ) +import qualified Data.Time.Clock.POSIX as Posix +import Data.UUID ( UUID ) +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import Database.Redis ( Redis + , RedisTx + , TxResult(..) + , Connection + , ConnectInfo + , runRedis + ) +import qualified Database.Redis as R +import GHC.Generics ( Generic ) +import Saturn ( Schedule ) +import qualified Saturn as Schedule +-------------------------------------------------------------------------------- + + -- | Jobs can return 'Success', 'Retry' (with a message), or 'Failure' -- (with a message). Jobs that return 'Failure' are stored in the -- 'failed' queue and are not re-run. Jobs that return 'Retry' are re-run. -data Result = Success - | Retry Text - | Failure Text - deriving (Generic, Show) + +data Result + = Success + | Retry Text + | Failure Text + deriving (Generic, Show) + + instance ToJSON Result instance FromJSON Result + -- | Each Worker that you create will be responsible for one type of -- job, defined by a 'Job' instance. -- @@ -116,7 +168,7 @@ instance FromJSON Result -- structure. The data structure (the `t` parameter) will be stored -- and copied a few times in Redis while in the lifecycle, so -- generally it is a good idea for it to be relatively small (and have --- it be able to look up data that it needs while the job in running). +-- it be able to look up data that it needs while the job is running). -- -- Finally, while deriving FromJSON and ToJSON instances automatically -- might seem like a good idea, you will most likely be better off @@ -124,42 +176,141 @@ instance FromJSON Result -- compatible if you change them, as any jobs that can't be -- deserialized will not be run (and will end up in the 'broken' -- queue). This will only happen if the queue is non-empty when you --- replce the running application version, but this is obviously +-- replace the running application version, but this is obviously -- possible and could be likely depending on your use. + class (FromJSON t, ToJSON t, Show t) => Job s t | s -> t where - job :: s -> t -> IO Result + job :: Hworker s t -> t -> IO Result -data JobData t = JobData UTCTime t -- | What should happen when an unexpected exception is thrown in a -- job - it can be treated as either a 'Failure' (the default) or a -- 'Retry' (if you know the only exceptions are triggered by -- intermittent problems). -data ExceptionBehavior = RetryOnException | FailOnException + +data ExceptionBehavior + = RetryOnException + | FailOnException + + +type JobId = Text + + +-- | A unique identifier for grouping jobs together. + +newtype BatchId = + BatchId UUID + deriving (ToJSON, FromJSON, Eq, Show) + + +-- | The result of a batch of jobs queued atomically. + +data QueueingResult + = BatchNotFound BatchId + | TransactionAborted BatchId Int + | QueueingSuccess BatchSummary + | QueueingFailed BatchId Int Text + | AlreadyQueued BatchSummary + deriving (Eq, Show) + + +-- | The return value of a batch of jobs that are streamed in. + +data StreamingResult + = StreamingOk -- End the stream successfully + | StreamingAborted Text -- Close the stream with the given error message, + -- reverting all previously added jobs + +-- | Represents a recurring job that executes on a particular schedule. + +data CronJob t = + CronJob Text t Schedule + + +-- | Represents the current status of a batch. A batch is considered to be +-- "queueing" if jobs can still be added to the batch. While jobs are +-- queueing it is possible for them to be "processing" during that time. +-- The status only changes to "processing" once jobs can no longer be queued +-- but are still being processed. The batch is then finished once all jobs +-- are processed (they have either failed or succeeded). + +data BatchStatus + = BatchQueueing + | BatchFailed + | BatchProcessing + | BatchFinished + deriving (Eq, Show) + + +-- | A summary of a particular batch, including figures on the total number +-- of jobs queued, the number of jobs that have completed (i.e. +-- failed or succeeded), the number of jobs succeeded, the number of jobs +-- failed, the number of jobs retried, and the current status of the +-- batch overall. + +data BatchSummary = + BatchSummary + { batchSummaryID :: BatchId + , batchSummaryQueued :: Int + , batchSummaryCompleted :: Int + , batchSummarySuccesses :: Int + , batchSummaryFailures :: Int + , batchSummaryRetries :: Int + , batchSummaryStatus :: BatchStatus + } deriving (Eq, Show) + + +data JobRef = + JobRef JobId (Maybe BatchId) (Maybe Text) + deriving (Eq, Show) + + +instance ToJSON JobRef where + toJSON (JobRef j b s) = + A.object ["j" .= j, "b" .= b, "s" .= s] + + +instance FromJSON JobRef where + -- NOTE(rjbf 2022-11-19): This is just here for the sake of migration and + -- can be removed eventually. Before `JobRef`, which is encoded as + -- a JSON object, there was a just a `String` representing the job ID. + + parseJSON (A.String j) = pure (JobRef j Nothing Nothing) + parseJSON val = A.withObject "JobRef" (\o -> JobRef <$> o .: "j" <*> o .: "b" <*> o .:? "s") val + hwlog :: Show a => Hworker s t -> a -> IO () -hwlog hw a = hworkerLogger hw (hworkerName hw, a) +hwlog hw a = + hworkerLogger hw (hworkerName hw, a) + -- | The worker data type - it is parametrized be the worker -- state (the `s`) and the job type (the `t`). + data Hworker s t = - Hworker { hworkerName :: ByteString - , hworkerState :: s - , hworkerConnection :: R.Connection - , hworkerExceptionBehavior :: ExceptionBehavior - , hworkerLogger :: forall a. Show a => a -> IO () - , hworkerJobTimeout :: NominalDiffTime - , hworkerFailedQueueSize :: Int - , hworkerDebug :: Bool - } + Hworker + { hworkerName :: ByteString + , hworkerState :: s + , hworkerConnection :: Connection + , hworkerExceptionBehavior :: ExceptionBehavior + , hworkerLogger :: forall a. Show a => a -> IO () + , hworkerJobTimeout :: NominalDiffTime + , hworkerFailedQueueSize :: Int + , hworkerDebug :: Bool + , hworkerBatchCompleted :: BatchSummary -> IO () + } + -- | When configuring a worker, you can tell it to use an existing -- redis connection pool (which you may have for the rest of your -- application). Otherwise, you can specify connection info. By -- default, hworker tries to connect to localhost, which may not be -- true for your production application. -data RedisConnection = RedisConnectInfo R.ConnectInfo - | RedisConnection R.Connection + +data RedisConnection + = RedisConnectInfo ConnectInfo + | RedisConnection Connection + -- | The main configuration for workers. -- @@ -185,173 +336,685 @@ data RedisConnection = RedisConnectInfo R.ConnectInfo -- -- 'hwconfigFailedQueueSize' controls how many 'failed' jobs will be -- kept. It defaults to 1000. -data HworkerConfig s = - HworkerConfig { - hwconfigName :: Text - , hwconfigState :: s - , hwconfigRedisConnectInfo :: RedisConnection - , hwconfigExceptionBehavior :: ExceptionBehavior - , hwconfigLogger :: forall a. Show a => a -> IO () - , hwconfigTimeout :: NominalDiffTime - , hwconfigFailedQueueSize :: Int - , hwconfigDebug :: Bool - } + +data HworkerConfig s t = + HworkerConfig + { hwconfigName :: Text + , hwconfigState :: s + , hwconfigRedisConnectInfo :: RedisConnection + , hwconfigExceptionBehavior :: ExceptionBehavior + , hwconfigLogger :: forall a. Show a => a -> IO () + , hwconfigTimeout :: NominalDiffTime + , hwconfigFailedQueueSize :: Int + , hwconfigDebug :: Bool + , hwconfigBatchCompleted :: BatchSummary -> IO () + , hwconfigCronJobs :: [CronJob t] + } + -- | The default worker config - it needs a name and a state (as those -- will always be unique). -defaultHworkerConfig :: Text -> s -> HworkerConfig s + +defaultHworkerConfig :: Text -> s -> HworkerConfig s t defaultHworkerConfig name state = - HworkerConfig name - state - (RedisConnectInfo R.defaultConnectInfo) - FailOnException - print - 120 - 1000 - False + HworkerConfig + { hwconfigName = name + , hwconfigState = state + , hwconfigRedisConnectInfo = RedisConnectInfo R.defaultConnectInfo + , hwconfigExceptionBehavior = FailOnException + , hwconfigLogger = print + , hwconfigTimeout = 120 + , hwconfigFailedQueueSize = 1000 + , hwconfigDebug = False + , hwconfigBatchCompleted = const (return ()) + , hwconfigCronJobs = [] + } + -- | Create a new worker with the default 'HworkerConfig'. -- -- Note that you must create at least one 'worker' and 'monitor' for -- the queue to actually process jobs (and for it to retry ones that -- time-out). + create :: Job s t => Text -> s -> IO (Hworker s t) -create name state = createWith (defaultHworkerConfig name state) +create name state = + createWith (defaultHworkerConfig name state) + -- | Create a new worker with a specified 'HworkerConfig'. -- -- Note that you must create at least one 'worker' and 'monitor' for -- the queue to actually process jobs (and for it to retry ones that -- time-out). -createWith :: Job s t => HworkerConfig s -> IO (Hworker s t) -createWith HworkerConfig{..} = - do conn <- case hwconfigRedisConnectInfo of - RedisConnectInfo c -> R.connect c - RedisConnection c -> return c - return $ Hworker (T.encodeUtf8 hwconfigName) - hwconfigState - conn - hwconfigExceptionBehavior - hwconfigLogger - hwconfigTimeout - hwconfigFailedQueueSize - hwconfigDebug + +createWith :: Job s t => HworkerConfig s t -> IO (Hworker s t) +createWith HworkerConfig{..} = do + conn <- + case hwconfigRedisConnectInfo of + RedisConnectInfo c -> R.connect c + RedisConnection c -> return c + + let + hworker = + Hworker + { hworkerName = T.encodeUtf8 hwconfigName + , hworkerState = hwconfigState + , hworkerConnection = conn + , hworkerExceptionBehavior = hwconfigExceptionBehavior + , hworkerLogger = hwconfigLogger + , hworkerJobTimeout = hwconfigTimeout + , hworkerFailedQueueSize = hwconfigFailedQueueSize + , hworkerDebug = hwconfigDebug + , hworkerBatchCompleted = hwconfigBatchCompleted + } + + time <- getCurrentTime + initCron hworker time hwconfigCronJobs + return hworker + -- | Destroy a worker. This will delete all the queues, clearing out --- all existing 'jobs', the 'broken' and 'failed' queues. There is no need --- to do this in normal applications (and most likely, you won't want to). +-- all existing 'jobs', the 'broken' and 'failed' queues, and the hashes for +-- batched jobs. There is no need to do this in normal applications +-- (and most likely, you won't want to). + destroy :: Job s t => Hworker s t -> IO () -destroy hw = void $ R.runRedis (hworkerConnection hw) $ - R.del [ jobQueue hw - , progressQueue hw - , brokenQueue hw - , failedQueue hw - ] +destroy hw = + let + batchKeys = + "hworker-batch-" <> hworkerName hw <> "*" + in + void $ runRedis (hworkerConnection hw) $ do + R.keys batchKeys >>= + \case + Left err -> liftIO $ hwlog hw err + Right keys -> void $ R.del keys + + R.del + [ jobQueue hw + , progressQueue hw + , brokenQueue hw + , failedQueue hw + , scheduleQueue hw + , cronSchedule hw + , cronProcessing hw + ] + jobQueue :: Hworker s t -> ByteString -jobQueue hw = "hworker-jobs-" <> hworkerName hw +jobQueue hw = + "hworker-jobs-" <> hworkerName hw + progressQueue :: Hworker s t -> ByteString -progressQueue hw = "hworker-progress-" <> hworkerName hw +progressQueue hw = + "hworker-progress-" <> hworkerName hw + brokenQueue :: Hworker s t -> ByteString -brokenQueue hw = "hworker-broken-" <> hworkerName hw +brokenQueue hw = + "hworker-broken-" <> hworkerName hw + failedQueue :: Hworker s t -> ByteString -failedQueue hw = "hworker-failed-" <> hworkerName hw +failedQueue hw = + "hworker-failed-" <> hworkerName hw + + +scheduleQueue :: Hworker s t -> ByteString +scheduleQueue hw = + "hworker-scheduled-" <> hworkerName hw + + +batchCounter :: Hworker s t -> BatchId -> ByteString +batchCounter hw (BatchId batch) = + "hworker-batch-" <> hworkerName hw <> ":" <> UUID.toASCIIBytes batch + + +cronSchedule :: Hworker s t -> ByteString +cronSchedule hw = + "hworker-cron-schedule-" <> hworkerName hw + + +cronProcessing :: Hworker s t -> ByteString +cronProcessing hw = + "hworker-cron-processing-" <> hworkerName hw + + +cronId :: Text -> Text +cronId cron = + "cron:" <> cron + -- | Adds a job to the queue. Returns whether the operation succeeded. + queue :: Job s t => Hworker s t -> t -> IO Bool -queue hw j = - do job_id <- UUID.toString <$> UUID.nextRandom - isRight <$> R.runRedis (hworkerConnection hw) - (R.lpush (jobQueue hw) [LB.toStrict $ A.encode (job_id, j)]) +queue hw j = do + jobId <- UUID.toText <$> UUID.nextRandom + result <- + runRedis (hworkerConnection hw) + $ R.lpush (jobQueue hw) + $ [LB.toStrict $ A.encode (JobRef jobId Nothing Nothing, j)] + return $ isRight result + + +-- | Initializes all cron jobs. This is will add all of the cron schedules +-- if not already present or update the schedules if they are. + +initCron :: Job s t => Hworker s t -> UTCTime -> [CronJob t] -> IO () +initCron hw time cronJobs = do + void + $ runRedis (hworkerConnection hw) + $ R.hmset (cronSchedule hw) + $ fmap + ( \(CronJob cron _ schedule) -> + (T.encodeUtf8 cron, T.encodeUtf8 $ Schedule.toText schedule) + ) + cronJobs + + mapM_ (queueCron hw time) cronJobs + + +-- | Queues a cron job for the first time, adding it to the schedule queue +-- at its next scheduled time. + +queueCron :: Job s t => Hworker s t -> UTCTime -> CronJob t -> IO Bool +queueCron hw time (CronJob cron j schedule) = + case Schedule.nextMatch time schedule of + Nothing -> + return False + + Just utc -> do + result <- + runRedis (hworkerConnection hw) $ R.zadd (scheduleQueue hw) $ + [ ( utcToDouble utc + , LB.toStrict $ A.encode (JobRef (cronId cron) Nothing (Just cron), j) + ) + ] + return $ isRight result + + +-- | Re-enqueues cron job, removing the record from the cron processing hash +-- and adding it back to the schedule queue at its next scheduled time. + +requeueCron :: Job s t => Hworker s t -> Text -> t -> IO () +requeueCron hw cron j = + runRedis (hworkerConnection hw) $ do + void $ withInt hw $ R.hdel (cronProcessing hw) [T.encodeUtf8 cron] + R.hget (cronSchedule hw) (T.encodeUtf8 cron) >>= + \case + Left err -> + liftIO $ hwlog hw err + + Right Nothing -> + -- This can happen if the scheduled changed between the job + -- being queued for execution and then being requeued + liftIO $ hwlog hw $ "CRON NOT FOUND: " <> cron + + Right (Just field) -> + case Schedule.fromText (T.decodeUtf8 field) of + Left err -> + liftIO $ hwlog hw err + + Right schedule -> do + time <- liftIO getCurrentTime + + case Schedule.nextMatch time schedule of + Nothing -> + liftIO $ hwlog hw $ "CRON SCHEDULE NOT FOUND: " <> field + + Just utc -> + void $ withInt hw $ R.zadd (scheduleQueue hw) $ + [ ( utcToDouble utc + , LB.toStrict $ A.encode (JobRef (cronId cron) Nothing (Just cron), j) + ) + ] + + +-- | Checks if the there is a already a job for a particular cron process +-- which is either scheduled or currently being processed. + +checkCron :: forall s t. Job s t => Hworker s t -> Text -> IO Bool +checkCron hw cron = + runRedis (hworkerConnection hw) $ do + R.hget (cronProcessing hw) (T.encodeUtf8 cron) >>= + \case + Right (Just _) -> + return True + + _ -> + R.zrange (scheduleQueue hw) 0 (-1) >>= + \case + Left err -> do + liftIO $ hwlog hw err + return False + + Right ls -> + case traverse A.decodeStrict ls :: Maybe [(JobRef, t)] of + Just scheduledJobs -> + return + $ any (\(JobRef _ _ c, _) -> c == Just cron) + $ scheduledJobs + + Nothing -> + return False + + +-- | Adds a job to be added to the queue at the specified time. +-- Returns whether the operation succeeded. + +queueScheduled :: Job s t => Hworker s t -> t -> UTCTime -> IO Bool +queueScheduled hw j utc = do + jobId <- UUID.toText <$> UUID.nextRandom + result <- + runRedis (hworkerConnection hw) + $ R.zadd (scheduleQueue hw) + $ [(utcToDouble utc, LB.toStrict $ A.encode (JobRef jobId Nothing Nothing, j))] + return $ isRight result + + +-- | Adds jobs to the queue, but as part of a particular batch of jobs. +-- It takes the `BatchId` of the specified job, a `Bool` that when `True` +-- closes the batch to further queueing, and a list of jobs to be queued, and +-- returns whether the operation succeeded. The process is atomic +-- so that if a single job fails to queue then then none of the jobs +-- in the list will queue. + +queueBatch :: Job s t => Hworker s t -> BatchId -> Bool -> [t] -> IO QueueingResult +queueBatch hw batch close js = + withBatchQueue hw batch $ runRedis (hworkerConnection hw) $ + R.multiExec $ do + mapM_ + ( \j -> do + jobId <- UUID.toText <$> liftIO UUID.nextRandom + let ref = JobRef jobId (Just batch) Nothing + _ <- R.lpush (jobQueue hw) [LB.toStrict $ A.encode (ref, j)] + + -- Do the counting outside of the transaction, hence runRedis here. + liftIO + $ runRedis (hworkerConnection hw) + $ R.hincrby (batchCounter hw batch) "queued" 1 + ) + js + + when close + $ void + $ R.hset (batchCounter hw batch) "status" "processing" + + return (pure ()) + + +data AbortException = + AbortException Text + deriving Show + + +instance Exception AbortException + + +-- | Like 'queueBatch', but instead of a list of jobs, it takes a conduit +-- that streams jobs within IO. + +streamBatch :: + Job s t => + Hworker s t -> BatchId -> Bool -> ConduitT () t IO StreamingResult -> + IO QueueingResult +streamBatch hw batch close producer = + streamBatchTx hw batch close $ Conduit.transPipe liftIO producer + + +-- | Like 'streamBatch', but instead of IO, jobs are streamed directly within +-- a Redis transaction. + +streamBatchTx :: + Job s t => + Hworker s t -> BatchId -> Bool -> ConduitT () t RedisTx StreamingResult -> + IO QueueingResult +streamBatchTx hw batch close producer = + let + sink = + Conduit.await >>= + \case + Nothing -> + liftIO (batchSummary hw batch) >>= + \case + Just summary | batchSummaryQueued summary == 0 -> + void . lift $ R.hset (batchCounter hw batch) "status" "finished" + + _ -> + when close + $ void . lift + $ R.hset (batchCounter hw batch) "status" "processing" + + Just j -> do + jobId <- UUID.toText <$> liftIO UUID.nextRandom + let ref = JobRef jobId (Just batch) Nothing + _ <- lift $ R.lpush (jobQueue hw) [LB.toStrict $ A.encode (ref, j)] + + -- Do the counting outside of the transaction, hence runRedis here. + _ <- + liftIO + $ runRedis (hworkerConnection hw) + $ R.hincrby (batchCounter hw batch) "queued" 1 + + sink + + run = + Conduit.runConduit (Conduit.fuseUpstream producer sink) >>= + \case + StreamingOk -> return (pure ()) + StreamingAborted err -> throw (AbortException err) + in + withBatchQueue hw batch + $ runRedis (hworkerConnection hw) + $ R.multiExec run + + +withBatchQueue :: + Job s t => Hworker s t -> BatchId -> IO (TxResult ()) -> IO QueueingResult +withBatchQueue hw batch process = + runRedis (hworkerConnection hw) (batchSummary' hw batch) >>= + \case + Nothing -> + return $ BatchNotFound batch + + Just summary | batchSummaryStatus summary == BatchQueueing -> + catch + ( catch + ( process >>= + \case + TxSuccess () -> + return $ QueueingSuccess summary + + TxAborted -> do + n <- runRedis (hworkerConnection hw) $ failBatchSummary hw batch + return $ TransactionAborted batch n + + TxError err -> do + n <- runRedis (hworkerConnection hw) $ failBatchSummary hw batch + return $ QueueingFailed batch n (T.pack err) + ) + ( \(AbortException msg :: AbortException) -> do + n <- runRedis (hworkerConnection hw) (failBatchSummary hw batch) + return $ QueueingFailed batch n msg + ) + ) + ( \(e :: SomeException) -> do + n <- runRedis (hworkerConnection hw) (failBatchSummary hw batch) + return $ QueueingFailed batch n (T.pack (show e)) + ) + + Just summary -> + return $ AlreadyQueued summary + + +-- | Prevents queueing new jobs to a batch. If the number of jobs completed equals +-- the number of jobs queued, then the status of the batch is immediately set +-- to `BatchFinished`, otherwise it's set to `BatchProcessing`. + +stopBatchQueueing :: Hworker s t -> BatchId -> IO () +stopBatchQueueing hw batch = + runRedis (hworkerConnection hw) $ do + batchSummary' hw batch >>= + \case + Nothing -> + liftIO $ hwlog hw $ "Batch not found: " <> show batch + + Just summary | batchSummaryCompleted summary >= batchSummaryQueued summary -> + void + $ R.hset (batchCounter hw batch) "status" + $ encodeBatchStatus BatchFinished + + Just _-> + void + $ R.hset (batchCounter hw batch) "status" + $ encodeBatchStatus BatchProcessing + -- | Creates a new worker thread. This is blocking, so you will want to -- 'forkIO' this into a thread. You can have any number of these (and -- on any number of servers); the more there are, the faster jobs will -- be processed. + worker :: Job s t => Hworker s t -> IO () worker hw = - do now <- getCurrentTime - r <- R.runRedis (hworkerConnection hw) $ - R.eval "local job = redis.call('rpop',KEYS[1])\n\ - \if job ~= nil then\n\ - \ redis.call('hset', KEYS[2], tostring(job), ARGV[1])\n\ - \ return job\n\ - \else\n\ - \ return nil\n\ - \end" - [jobQueue hw, progressQueue hw] - [LB.toStrict $ A.encode now] - case r of - Left err -> hwlog hw err >> delayAndRun - Right Nothing -> delayAndRun - Right (Just t) -> - do when (hworkerDebug hw) $ hwlog hw ("WORKER RUNNING", t) - case decodeValue (LB.fromStrict t) of - Nothing -> do hwlog hw ("BROKEN JOB", t) - now <- getCurrentTime - withNil hw (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ - \if del == 1 then\n\ - \ redis.call('hset', KEYS[2], ARGV[1], ARGV[2])\n\ - \end\n\ - \return nil" - [progressQueue hw, brokenQueue hw] - [t, LB.toStrict $ A.encode now]) - delayAndRun - Just (_ :: String, j) -> do - result <- runJob (job (hworkerState hw) j) - case result of - Success -> - do when (hworkerDebug hw) $ hwlog hw ("JOB COMPLETE", t) - delete_res <- R.runRedis (hworkerConnection hw) - (R.hdel (progressQueue hw) [t]) - case delete_res of - Left err -> hwlog hw err >> delayAndRun - Right 1 -> justRun - Right n -> do hwlog hw ("Job done: did not delete 1, deleted " <> show n) - delayAndRun - Retry msg -> - do hwlog hw ("Retry: " <> msg) - withNil hw - (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ - \if del == 1 then\n\ - \ redis.call('lpush', KEYS[2], ARGV[1])\n\ - \end\n\ - \return nil" - [progressQueue hw, jobQueue hw] - [t]) - delayAndRun - Failure msg -> - do hwlog hw ("Failure: " <> msg) - withNil hw - (R.eval "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ - \if del == 1 then\n\ - \ redis.call('lpush', KEYS[2], ARGV[1])\n\ - \ redis.call('ltrim', KEYS[2], 0, ARGV[2])\n\ - \end\n\ - \return nil" - [progressQueue hw, failedQueue hw] - [t, B8.pack (show (hworkerFailedQueueSize hw - 1))]) - void $ R.runRedis (hworkerConnection hw) - (R.hdel (progressQueue hw) [t]) - delayAndRun - where delayAndRun = threadDelay 10000 >> worker hw - justRun = worker hw - runJob v = - do x <- newEmptyMVar - jt <- forkIO (catch (v >>= putMVar x . Right) - (\(e::SomeException) -> - putMVar x (Left e))) - res <- takeMVar x - case res of - Left e -> - let b = case hworkerExceptionBehavior hw of - RetryOnException -> Retry - FailOnException -> Failure in - return (b ("Exception raised: " <> (T.pack . show) e)) - Right r -> return r + let + delayAndRun = + threadDelay 10000 >> worker hw + + justRun = + worker hw + + runJob action = do + eitherResult <- + catchJust + ( \(e :: SomeException) -> + if isJust (asyncExceptionFromException e :: Maybe AsyncException) + then Nothing + else Just e + ) + ( Right <$> action ) + ( return . Left ) + + case eitherResult of + Left exception -> + let + resultMessage = + case hworkerExceptionBehavior hw of + RetryOnException -> Retry + FailOnException -> Failure + in + return + $ resultMessage + $ "Exception raised: " <> (T.pack . show) exception + + Right result -> + return result + in do + now <- getCurrentTime + + eitherReply <- + runRedis (hworkerConnection hw) $ + R.eval + "local job = redis.call('rpop',KEYS[1])\n\ + \if job ~= nil then\n\ + \ redis.call('hset', KEYS[2], tostring(job), ARGV[1])\n\ + \ return job\n\ + \else\n\ + \ return nil\n\ + \end" + [jobQueue hw, progressQueue hw] + [LB.toStrict $ A.encode now] + + case eitherReply of + Left err -> + hwlog hw err >> delayAndRun + + Right Nothing -> + delayAndRun + + Right (Just t) -> do + when (hworkerDebug hw) $ hwlog hw ("WORKER RUNNING" :: Text, t) + + case A.decodeStrict t of + Nothing -> do + hwlog hw ("BROKEN JOB" :: Text, t) + now' <- getCurrentTime + + runWithNil hw $ + R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ redis.call('hset', KEYS[2], ARGV[1], ARGV[2])\n\ + \end\n\ + \return nil" + [progressQueue hw, brokenQueue hw] + [t, LB.toStrict $ A.encode now'] + + delayAndRun + + Just (JobRef _ maybeBatch maybeCron, j) -> + let + nextCron = + case maybeCron of + Just cron -> requeueCron hw cron j + Nothing -> return () + in do + runJob (job hw j) >>= + \case + Success -> do + when (hworkerDebug hw) $ hwlog hw ("JOB COMPLETE" :: Text, t) + case maybeBatch of + Nothing -> do + deletionResult <- + runRedis (hworkerConnection hw) + $ R.hdel (progressQueue hw) [t] + nextCron + case deletionResult of + Left err -> hwlog hw err >> delayAndRun + Right 1 -> justRun + Right n -> do + hwlog hw ("Job done: did not delete 1, deleted " <> show n) + delayAndRun + + Just batch -> + runWithMaybe hw + ( R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ local batch = KEYS[2]\n\ + \ redis.call('hincrby', batch, 'successes', '1')\n\ + \ local completed = redis.call('hincrby', batch, 'completed', '1')\n\ + \ local queued = redis.call('hincrby', batch, 'queued', '0')\n\ + \ local status = redis.call('hget', batch, 'status')\n\ + \ if tonumber(completed) >= tonumber(queued) and status == 'processing' then\n\ + \ redis.call('hset', batch, 'status', 'finished')\n\ + \ end\n\ + \ return redis.call('hgetall', batch)\ + \end\n\ + \return nil" + [progressQueue hw, batchCounter hw batch] + [t] + ) + ( \hm -> do + nextCron + case decodeBatchSummary batch hm of + Nothing -> do + hwlog hw ("Job done: did not delete 1" :: Text) + delayAndRun + + Just summary -> do + when (batchSummaryStatus summary == BatchFinished) + $ hworkerBatchCompleted hw summary + justRun + ) + + Retry msg -> do + hwlog hw ("RETRY: " <> msg) + + case maybeBatch of + Nothing -> + runWithNil hw $ + R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ redis.call('lpush', KEYS[2], ARGV[1])\n\ + \end\n\ + \return nil" + [progressQueue hw, jobQueue hw] + [t] + + Just batch -> + runWithNil hw $ + R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ redis.call('lpush', KEYS[2], ARGV[1])\n\ + \ redis.call('hincrby', KEYS[3], 'retries', '1')\n\ + \end\n\ + \return nil" + [progressQueue hw, jobQueue hw, batchCounter hw batch] + [t] + + delayAndRun + + Failure msg -> do + hwlog hw ("Failure: " <> msg) + + case maybeBatch of + Nothing -> + runWithNil hw $ + R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ redis.call('lpush', KEYS[2], ARGV[1])\n\ + \ redis.call('ltrim', KEYS[2], 0, ARGV[2])\n\ + \end\n\ + \return nil" + [progressQueue hw, failedQueue hw] + [t, B8.pack (show (hworkerFailedQueueSize hw - 1))] + + Just batch -> + runWithMaybe hw + ( R.eval + "local del = redis.call('hdel', KEYS[1], ARGV[1])\n\ + \if del == 1 then\n\ + \ redis.call('lpush', KEYS[2], ARGV[1])\n\ + \ redis.call('ltrim', KEYS[2], 0, ARGV[2])\n\ + \ local batch = KEYS[3]\n\ + \ redis.call('hincrby', batch, 'failures', '1')\n\ + \ local completed = redis.call('hincrby', batch, 'completed', '1')\n\ + \ local queued = redis.call('hincrby', batch, 'queued', '0')\n\ + \ local status = redis.call('hget', batch, 'status')\n\ + \ if tonumber(completed) >= tonumber(queued) and status == 'processing' then\n\ + \ redis.call('hset', batch, 'status', 'finished')\n\ + \ return redis.call('hgetall', batch)\ + \ end\n\ + \end\n\ + \return nil" + [progressQueue hw, failedQueue hw, batchCounter hw batch] + [t, B8.pack (show (hworkerFailedQueueSize hw - 1))] + ) + ( \hm -> + forM_ (decodeBatchSummary batch hm) + $ hworkerBatchCompleted hw + ) + + nextCron + delayAndRun + + +-- | Start a scheduler. Like 'worker', this is blocking, so should be +-- started in a thread. This is responsible for pushing scheduled jobs +-- to the queue at the expected time. + +scheduler :: forall s t . Job s t => Hworker s t -> IO () +scheduler hw = + forever $ do + now <- getCurrentTime + + runRedis (hworkerConnection hw) $ + withNil hw $ + R.eval + "local job = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)[1]\n\ + \if job ~= nil then\n\ + \ redis.call('lpush', KEYS[2], tostring(job))\n\ + \ redis.call('zrem', KEYS[1], tostring(job))\n\ + \ local cron = cjson.decode(job)[1]['s']\n\ + \ if cron ~= cjson.null then\n\ + \ redis.call('hset', KEYS[3], tostring(cron), ARGV[1])\n\ + \ return tostring(cron) \n\ + \ end\n\ + \end\n\ + \return nil" + [ scheduleQueue hw + , jobQueue hw + , cronProcessing hw + ] + [ B8.pack $ show (utcToDouble now) ] + + threadDelay 500000 >> scheduler hw -- | Start a monitor. Like 'worker', this is blocking, so should be @@ -359,106 +1022,294 @@ worker hw = -- time out (which can happen if the processing thread is killed, for -- example). You need to have at least one of these running to have -- the retry happen, but it is safe to have any number running. + monitor :: Job s t => Hworker s t -> IO () monitor hw = - forever $ - do now <- getCurrentTime - withList hw (R.hkeys (progressQueue hw)) - (\jobs -> - void $ forM jobs $ \job -> - withMaybe hw (R.hget (progressQueue hw) job) - (\start -> - when (diffUTCTime now (fromJust $ decodeValue (LB.fromStrict start)) > hworkerJobTimeout hw) $ - do n <- - withInt hw - (R.eval "local del = redis.call('hdel', KEYS[2], ARGV[1])\n\ - \if del == 1 then\ - \ redis.call('rpush', KEYS[1], ARGV[1])\n\ \end\n\ - \return del" - [jobQueue hw, progressQueue hw] - [job]) - when (hworkerDebug hw) $ hwlog hw ("MONITOR RV", n) - when (hworkerDebug hw && n == 1) $ hwlog hw ("MONITOR REQUEUED", job))) - -- NOTE(dbp 2015-07-25): We check every 1/10th of timeout. - threadDelay (floor $ 100000 * hworkerJobTimeout hw) + forever $ do + now <- getCurrentTime + + runWithList hw (R.hkeys (progressQueue hw)) $ \js -> + forM_ js $ \j -> + runWithMaybe hw (R.hget (progressQueue hw) j) $ + \start -> + let + duration = + diffUTCTime now (parseTime start) + + in + when (duration > hworkerJobTimeout hw) $ do + n <- + runWithInt hw $ + R.eval + "local del = redis.call('hdel', KEYS[2], ARGV[1])\n\ + \if del == 1 then\ + \ redis.call('rpush', KEYS[1], ARGV[1])\n\ + \end\n\ + \return del" + [jobQueue hw, progressQueue hw] + [j] + when (hworkerDebug hw) + $ hwlog hw ("MONITOR RV" :: Text, n) + when (hworkerDebug hw && n == 1) + $ hwlog hw ("MONITOR REQUEUED" :: Text, j) + + -- NOTE(dbp 2015-07-25): We check every 1/10th of timeout. + threadDelay (floor $ 100000 * hworkerJobTimeout hw) + -- | Returns the jobs that could not be deserialized, most likely -- because you changed the 'ToJSON'/'FromJSON' instances for you job -- in a way that resulted in old jobs not being able to be converted -- back from json. Another reason for jobs to end up here (and much -- worse) is if you point two instances of 'Hworker', with different --- job types, at the same queue (ie, you re-use the name). Then +-- job types, at the same queue (i.e., you re-use the name). Then -- anytime a worker from one queue gets a job from the other it would -- think it is broken. + broken :: Hworker s t -> IO [(ByteString, UTCTime)] -broken hw = do r <- R.runRedis (hworkerConnection hw) (R.hgetall (brokenQueue hw)) - case r of - Left err -> hwlog hw err >> return [] - Right xs -> return (map (second parseTime) xs) - where parseTime = fromJust . decodeValue . LB.fromStrict - -jobsFromQueue :: Job s t => Hworker s t -> ByteString -> IO [t] -jobsFromQueue hw queue = - do r <- R.runRedis (hworkerConnection hw) (R.lrange queue 0 (-1)) - case r of - Left err -> hwlog hw err >> return [] - Right [] -> return [] - Right xs -> return $ mapMaybe (fmap (\(_::String, x) -> x) . decodeValue . LB.fromStrict) xs - --- | Returns all pending jobs. -jobs :: Job s t => Hworker s t -> IO [t] -jobs hw = jobsFromQueue hw (jobQueue hw) - --- | Returns all failed jobs. This is capped at the most recent +broken hw = + runRedis (hworkerConnection hw) (R.hgetall (brokenQueue hw)) >>= + \case + Left err -> hwlog hw err >> return [] + Right xs -> return (map (second parseTime) xs) + + +-- | Returns pending jobs. + +listJobs :: Job s t => Hworker s t -> Integer -> Integer -> IO [t] +listJobs hw offset limit = + listJobsFromQueue hw (jobQueue hw) offset limit + + +listJobsFromQueue :: + Job s t => Hworker s t -> ByteString -> Integer -> Integer -> IO [t] +listJobsFromQueue hw q offset limit = + let + a = offset * limit + b = (offset + 1) * limit - 1 + in + runRedis (hworkerConnection hw) (R.lrange q a b) >>= + \case + Left err -> + hwlog hw err >> return [] + + Right [] -> + return [] + + Right xs -> + return $ mapMaybe (fmap (\(JobRef _ _ _, x) -> x) . A.decodeStrict) xs + + +-- | Returns all scheduled jobs + +listScheduled :: + Job s t => Hworker s t -> Integer -> Integer -> IO [(t, UTCTime)] +listScheduled hw offset limit = + let + a = offset * limit + b = (offset + 1) * limit - 1 + in + runRedis (hworkerConnection hw) (R.zrangeWithscores (scheduleQueue hw) a b) >>= + \case + Left err -> + hwlog hw err >> return [] + + Right [] -> + return [] + + Right xs -> + return $ + mapMaybe + ( \(bytes, s) -> + case A.decodeStrict bytes of + Just (JobRef _ _ _, j) -> Just (j, doubleToUtc s) + Nothing -> Nothing + ) + xs + + +-- | Returns timestamp of active cron job. + +getCronProcessing :: Job s t => Hworker s t -> Text -> IO (Maybe Double) +getCronProcessing hw cron = + runRedis (hworkerConnection hw) (R.hget (cronProcessing hw) (T.encodeUtf8 cron)) >>= + \case + Right mbytes -> return $ mbytes >>= A.decodeStrict + Left _ -> return Nothing + + +-- | Returns failed jobs. This is capped at the most recent -- 'hworkerconfigFailedQueueSize' jobs that returned 'Failure' (or -- threw an exception when 'hworkerconfigExceptionBehavior' is -- 'FailOnException'). -failed :: Job s t => Hworker s t -> IO [t] -failed hw = jobsFromQueue hw (failedQueue hw) + +listFailed :: Job s t => Hworker s t -> Integer -> Integer -> IO [t] +listFailed hw offset limit = + listJobsFromQueue hw (failedQueue hw) offset limit + -- | Logs the contents of the jobqueue and the inprogress queue at -- `microseconds` intervals. + debugger :: Job s t => Int -> Hworker s t -> IO () debugger microseconds hw = - forever $ - do withList hw (R.hkeys (progressQueue hw)) - (\running -> - withList hw (R.lrange (jobQueue hw) 0 (-1)) - (\queued -> hwlog hw ("DEBUG", queued, running))) - threadDelay microseconds + forever $ do + runWithList hw (R.hkeys (progressQueue hw)) $ + \running -> + runWithList hw (R.lrange (jobQueue hw) 0 (-1)) + $ \queued -> hwlog hw ("DEBUG" :: Text, queued, running) + + threadDelay microseconds + + +-- | Initializes a batch of jobs. By default the information for tracking a +-- batch of jobs, created by this function, will expires a week from +-- its creation. The optional `seconds` argument can be used to override this. + +initBatch :: Hworker s t -> Maybe Integer -> IO (Maybe BatchId) +initBatch hw mseconds = do + batch <- BatchId <$> UUID.nextRandom + runRedis (hworkerConnection hw) $ do + r <- + R.hmset (batchCounter hw batch) + [ ("queued", "0") + , ("completed", "0") + , ("successes", "0") + , ("failures", "0") + , ("retries", "0") + , ("status", "queueing") + ] + case r of + Left err -> + liftIO (hwlog hw err) >> return Nothing + + Right _ -> do + case mseconds of + Nothing -> return () + Just s -> void $ R.expire (batchCounter hw batch) s + + return (Just batch) + + +-- | Return a summary of the batch. + +batchSummary :: Hworker s t -> BatchId -> IO (Maybe BatchSummary) +batchSummary hw batch = + runRedis (hworkerConnection hw) (batchSummary' hw batch) + + +batchSummary' :: Hworker s t -> BatchId -> Redis (Maybe BatchSummary) +batchSummary' hw batch = do + R.hgetall (batchCounter hw batch) >>= + \case + Left err -> liftIO (hwlog hw err) >> return Nothing + Right hm -> return $ decodeBatchSummary batch hm + -- Redis helpers follow -withList hw a f = - do r <- R.runRedis (hworkerConnection hw) a - case r of - Left err -> hwlog hw err - Right [] -> return () - Right xs -> f xs - -withMaybe hw a f = - do r <- R.runRedis (hworkerConnection hw) a - case r of - Left err -> hwlog hw err - Right Nothing -> return () - Right (Just v) -> f v +runWithList :: + Show a => Hworker s t -> Redis (Either a [b]) -> ([b] -> IO ()) -> IO () +runWithList hw a f = + runRedis (hworkerConnection hw) a >>= + \case + Left err -> hwlog hw err + Right [] -> return () + Right xs -> f xs + + +runWithMaybe :: + Show a => Hworker s t -> Redis (Either a (Maybe b)) -> (b -> IO ()) -> IO () +runWithMaybe hw a f = do + runRedis (hworkerConnection hw) a >>= + \case + Left err -> hwlog hw err + Right Nothing -> return () + Right (Just v) -> f v + + +runWithNil :: Show a => Hworker s t -> Redis (Either a (Maybe ByteString)) -> IO () +runWithNil hw a = + runRedis (hworkerConnection hw) $ withNil hw a + + +withNil :: + Show a => Hworker s t -> Redis (Either a (Maybe ByteString)) -> Redis () withNil hw a = - do r <- R.runRedis (hworkerConnection hw) a - case r of - Left err -> hwlog hw err - Right (Just ("" :: ByteString)) -> return () - Right _ -> return () + a >>= + \case + Left err -> liftIO (hwlog hw err) + Right _ -> return () + -withInt :: Hworker s t -> R.Redis (Either R.Reply Integer) -> IO Integer +runWithInt :: Hworker s t -> Redis (Either R.Reply Integer) -> IO Integer +runWithInt hw a = + runRedis (hworkerConnection hw) $ withInt hw a + + +withInt :: Hworker s t -> Redis (Either R.Reply Integer) -> Redis Integer withInt hw a = - do r <- R.runRedis (hworkerConnection hw) a - case r of - Left err -> hwlog hw err >> return (-1) - Right n -> return n - -withIgnore :: Hworker s t -> R.Redis (Either R.Reply a) -> IO () -withIgnore hw a = - do r <- R.runRedis (hworkerConnection hw) a - case r of - Left err -> hwlog hw err - Right _ -> return () + a >>= + \case + Left err -> liftIO (hwlog hw err) >> return (-1) + Right n -> return n + + +-- Parsing Helpers + +encodeBatchStatus :: BatchStatus -> ByteString +encodeBatchStatus BatchQueueing = "queueing" +encodeBatchStatus BatchFailed = "failed" +encodeBatchStatus BatchProcessing = "processing" +encodeBatchStatus BatchFinished = "finished" + + +decodeBatchStatus :: ByteString -> Maybe BatchStatus +decodeBatchStatus "queueing" = Just BatchQueueing +decodeBatchStatus "failed" = Just BatchFailed +decodeBatchStatus "processing" = Just BatchProcessing +decodeBatchStatus "finished" = Just BatchFinished +decodeBatchStatus _ = Nothing + + +decodeBatchSummary :: BatchId -> [(ByteString, ByteString)] -> Maybe BatchSummary +decodeBatchSummary batch hm = + BatchSummary batch + <$> (lookup "queued" hm >>= readMaybe) + <*> (lookup "completed" hm >>= readMaybe) + <*> (lookup "successes" hm >>= readMaybe) + <*> (lookup "failures" hm >>= readMaybe) + <*> (lookup "retries" hm >>= readMaybe) + <*> (lookup "status" hm >>= decodeBatchStatus) + + +parseTime :: ByteString -> UTCTime +parseTime t = + case A.decodeStrict t of + Nothing -> error ("FAILED TO PARSE TIMESTAMP: " <> B8.unpack t) + Just time -> time + + +readMaybe :: Read a => ByteString -> Maybe a +readMaybe = + fmap fst . listToMaybe . reads . B8.unpack + + +failBatchSummary :: Hworker s t -> BatchId -> Redis Int +failBatchSummary hw batch = do + void + $ R.hset (batchCounter hw batch) "status" + $ encodeBatchStatus BatchFailed + + batchSummary' hw batch >>= + \case + Just summary -> return $ batchSummaryQueued summary + _ -> return 0 + + +utcToDouble :: UTCTime -> Double +utcToDouble = realToFrac . Posix.utcTimeToPOSIXSeconds + + +doubleToUtc :: Double -> UTCTime +doubleToUtc = Posix.posixSecondsToUTCTime . realToFrac diff --git a/stack.yaml b/stack.yaml index e0f6367..6219f0c 100644 --- a/stack.yaml +++ b/stack.yaml @@ -3,4 +3,4 @@ packages: - '.' - 'example' extra-deps: [] -resolver: lts-3.1 +resolver: lts-18.18 diff --git a/test/Spec.hs b/test/Spec.hs index 5feabc6..89d1de6 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,389 +1,803 @@ {-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} -import Control.Concurrent (forkIO, killThread, threadDelay) -import Control.Concurrent.MVar (MVar, modifyMVarMasked_, newMVar, - readMVar, takeMVar) -import Control.Monad (replicateM_) -import Data.Aeson (FromJSON, ToJSON) -import qualified Data.Text as T -import GHC.Generics (Generic) -import System.Hworker -import System.IO + +-------------------------------------------------------------------------------- +import Control.Concurrent ( forkIO, killThread, threadDelay ) +import Control.Concurrent.MVar ( MVar, modifyMVarMasked_, newMVar + , readMVar, takeMVar + ) +import Control.Monad ( replicateM_, void ) +import Control.Monad.Trans ( lift, liftIO ) +import Data.Aeson ( FromJSON(..), ToJSON(..) ) +import qualified Data.Conduit as Conduit +import Data.Text ( Text ) +import qualified Data.Text as T +import Data.Time +import qualified Database.Redis as Redis +import GHC.Generics ( Generic) +import Saturn ( everyMinute ) import Test.Hspec -import Test.Hspec.Contrib.HUnit -import Test.HUnit +import Test.HUnit ( assertEqual ) +-------------------------------------------------------------------------------- +import System.Hworker +-------------------------------------------------------------------------------- + + + +main :: IO () +main = hspec $ do + describe "Simple" $ do + it "should run and increment counter" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + queue hworker SimpleJob + threadDelay 30000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 1 after job runs" 1 v + + it "queueing 2 jobs should increment twice" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-2" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + queue hworker SimpleJob + queue hworker SimpleJob + threadDelay 40000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 2 after 2 jobs run" 2 v + + it "queueing 1000 jobs should increment 1000" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-3" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + replicateM_ 1000 (queue hworker SimpleJob) + threadDelay 2000000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 1000 after 1000 job runs" 1000 v + + it "should work with multiple workers" $ do + -- NOTE(dbp 2015-07-12): This probably won't run faster, because + -- they are all blocking on the MVar, but that's not the point. + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-4" (SimpleState mvar)) + wthread1 <- forkIO (worker hworker) + wthread2 <- forkIO (worker hworker) + wthread3 <- forkIO (worker hworker) + wthread4 <- forkIO (worker hworker) + replicateM_ 1000 (queue hworker SimpleJob) + threadDelay 1000000 + killThread wthread1 + killThread wthread2 + killThread wthread3 + killThread wthread4 + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 1000 after 1000 job runs" 1000 v + + describe "Exceptions" $ do + it "should be able to have exceptions thrown in jobs and retry the job" $ do + mvar <- newMVar 0 + hworker <- + createWith + (conf "exworker-1" (ExState mvar)) + { hwconfigExceptionBehavior = RetryOnException } + wthread <- forkIO (worker hworker) + queue hworker ExJob + threadDelay 40000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 2, since the first run failed" 2 v + + it "should not retry if mode is FailOnException" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "exworker-2" (ExState mvar)) + wthread <- forkIO (worker hworker) + queue hworker ExJob + threadDelay 30000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 1, since failing run wasn't retried" 1 v + + describe "Retry" $ do + it "should be able to return Retry and get run again" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "retryworker-1" (RetryState mvar)) + wthread <- forkIO (worker hworker) + queue hworker RetryJob + threadDelay 50000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 2, since it got retried" 2 v + + describe "Fail" $ do + it "should not retry a job that Fails" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "failworker-1" (FailState mvar)) + wthread <- forkIO (worker hworker) + queue hworker FailJob + threadDelay 30000 + killThread wthread + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 1, since failing run wasn't retried" 1 v + + it "should put a failed job into the failed queue" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "failworker-2" (FailState mvar)) + wthread <- forkIO (worker hworker) + queue hworker FailJob + threadDelay 30000 + killThread wthread + failedJobs <- listFailed hworker 0 100 + destroy hworker + assertEqual "Should have failed job" [FailJob] failedJobs + + it "should only store failedQueueSize failed jobs" $ do + mvar <- newMVar 0 + hworker <- + createWith + (conf "failworker-3" (AlwaysFailState mvar)) + { hwconfigFailedQueueSize = 2 } + wthread <- forkIO (worker hworker) + queue hworker AlwaysFailJob + queue hworker AlwaysFailJob + queue hworker AlwaysFailJob + queue hworker AlwaysFailJob + threadDelay 100000 + killThread wthread + failedJobs <- listFailed hworker 0 100 + destroy hworker + v <- takeMVar mvar + assertEqual "State should be 4, since all jobs were run" 4 v + assertEqual "Should only have stored 2" [AlwaysFailJob,AlwaysFailJob] failedJobs + + describe "Batch" $ do + it "should set up a batch job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + summary <- startBatch hworker Nothing >>= expectBatchSummary hworker + batchSummaryQueued summary `shouldBe` 0 + batchSummaryCompleted summary `shouldBe` 0 + batchSummarySuccesses summary `shouldBe` 0 + batchSummaryFailures summary `shouldBe` 0 + batchSummaryRetries summary `shouldBe` 0 + batchSummaryStatus summary `shouldBe` BatchQueueing + destroy hworker + + it "should expire batch job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker (Just 1) + batchSummary hworker batch >>= shouldNotBe Nothing + threadDelay 2000000 + batchSummary hworker batch >>= shouldBe Nothing + destroy hworker + + it "should increment batch total after queueing a batch job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1 + destroy hworker + + it "should not enqueue job for completed batch" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + threadDelay 30000 + stopBatchQueueing hworker batch + summary <- expectBatchSummary hworker batch + queueBatch hworker batch False [SimpleJob] + >>= shouldBe (AlreadyQueued summary) + threadDelay 30000 + summary' <- expectBatchSummary hworker batch + batchSummaryQueued summary' `shouldBe` 1 + killThread wthread + destroy hworker + + it "should increment success and completed after completing a successful batch job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + threadDelay 30000 + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1 + batchSummaryFailures summary `shouldBe` 0 + batchSummarySuccesses summary `shouldBe` 1 + batchSummaryCompleted summary `shouldBe` 1 + batchSummaryStatus summary `shouldBe` BatchQueueing + killThread wthread + destroy hworker + + it "should increment failure and completed after completing a failed batch job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "failworker-1" (FailState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [FailJob] + threadDelay 30000 + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1 + batchSummaryFailures summary `shouldBe` 1 + batchSummarySuccesses summary `shouldBe` 0 + batchSummaryCompleted summary `shouldBe` 1 + batchSummaryStatus summary `shouldBe` BatchQueueing + killThread wthread + destroy hworker + + it "should change job status to processing when batch is set to stop queueing" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + stopBatchQueueing hworker batch + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1 + batchSummaryStatus summary `shouldBe` BatchProcessing + destroy hworker + + it "should change job status to finished when batch is set to stop queueing and jobs are already run" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + threadDelay 30000 + stopBatchQueueing hworker batch + Just batch' <- batchSummary hworker batch + batchSummaryQueued batch' `shouldBe` 1 + batchSummaryStatus batch' `shouldBe` BatchFinished + killThread wthread + destroy hworker + + it "should change job status to finished when last processed" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False [SimpleJob] + stopBatchQueueing hworker batch + threadDelay 30000 + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1 + batchSummaryStatus summary `shouldBe` BatchFinished + killThread wthread + destroy hworker + + it "queueing 1000 jobs should increment 1000" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-3" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + batch <- startBatch hworker Nothing + queueBatch hworker batch False (replicate 1000 SimpleJob) + stopBatchQueueing hworker batch + threadDelay 2000000 + v <- takeMVar mvar + v `shouldBe` 1000 + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 1000 + batchSummaryFailures summary `shouldBe` 0 + batchSummarySuccesses summary `shouldBe` 1000 + batchSummaryCompleted summary `shouldBe` 1000 + batchSummaryStatus summary `shouldBe` BatchFinished + killThread wthread + destroy hworker + + describe "Atomicity Tests" $ do + it "should queue all jobs" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + streamBatch hworker batch True $ do + replicateM_ 50 $ Conduit.yield SimpleJob + return StreamingOk + ls <- listJobs hworker 0 100 + length ls `shouldBe` 50 + summary <- expectBatchSummary hworker batch + batchSummaryQueued summary `shouldBe` 50 + batchSummaryStatus summary `shouldBe` BatchProcessing + destroy hworker + + it "should not queue jobs when producer throws error" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + streamBatch hworker batch True $ do + replicateM_ 20 $ Conduit.yield SimpleJob + return (StreamingAborted "abort") + ls <- listJobs hworker 0 100 + expectBatchSummary hworker batch + destroy hworker + length ls `shouldBe` 0 + + it "should not queue jobs on transaction error" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + streamBatchTx hworker batch True $ do + replicateM_ 20 $ Conduit.yield SimpleJob + _ <- lift $ Redis.lpush "" [] + replicateM_ 20 $ Conduit.yield SimpleJob + return StreamingOk + ls <- listJobs hworker 0 100 + destroy hworker + length ls `shouldBe` 0 + + it "should not queue jobs when transaction is aborted" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + _ <- Redis.runRedis (hworkerConnection hworker) $ Redis.watch [batchCounter hworker batch] + streamBatch hworker batch True $ do + replicateM_ 20 $ Conduit.yield SimpleJob + return StreamingOk + ls <- listJobs hworker 0 100 + destroy hworker + length ls `shouldBe` 0 + + it "should increment summary up until failure" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + batch <- startBatch hworker Nothing + + thread <- + forkIO . void . streamBatch hworker batch True $ do + replicateM_ 5 $ + Conduit.yield SimpleJob >> liftIO (threadDelay 50000) + error "BLOW UP!" + replicateM_ 5 $ + Conduit.yield SimpleJob >> liftIO (threadDelay 50000) + return StreamingOk + + threadDelay 190000 + summary1 <- expectBatchSummary hworker batch + batchSummaryQueued summary1 `shouldBe` 4 + ls <- listJobs hworker 0 100 + length ls `shouldBe` 0 + threadDelay 100000 + summary2 <- expectBatchSummary hworker batch + batchSummaryQueued summary2 `shouldBe` 5 + batchSummaryStatus summary2 `shouldBe` BatchFailed + killThread thread + destroy hworker + + + describe "Monitor" $ do + it "should add job back after timeout" $ do + -- NOTE(dbp 2015-07-12): The timing on this test is somewhat + -- tricky. We want to get the job started with one worker, + -- then kill the worker, then start a new worker, and have + -- the monitor put the job back in the queue and have the + -- second worker finish it. It's important that the job + -- takes less time to complete than the timeout for the + -- monitor, or else it'll queue it forever. + -- + -- The timeout is 5 seconds. The job takes 1 seconds to run. + -- The worker is killed after 0.5 seconds, which should be + -- plenty of time for it to have started the job. Then after + -- the second worker is started, we wait 10 seconds, which + -- should be plenty; we expect the total run to take around 11. + mvar <- newMVar 0 + hworker <- + createWith + (conf "timedworker-1" (TimedState mvar)) { hwconfigTimeout = 5 } + wthread1 <- forkIO (worker hworker) + mthread <- forkIO (monitor hworker) + queue hworker (TimedJob 1000000) + threadDelay 500000 + killThread wthread1 + wthread2 <- forkIO (worker hworker) + threadDelay 10000000 + v <- takeMVar mvar + killThread wthread2 + killThread mthread + destroy hworker + assertEqual "State should be 1, since first failed" 1 v + + it "should add back multiple jobs after timeout" $ do + -- NOTE(dbp 2015-07-23): Similar to the above test, but we + -- have multiple jobs started, multiple workers killed. + -- then one worker will finish both interrupted jobs. + mvar <- newMVar 0 + hworker <- + createWith + (conf "timedworker-2" (TimedState mvar)) { hwconfigTimeout = 5 } + wthread1 <- forkIO (worker hworker) + wthread2 <- forkIO (worker hworker) + mthread <- forkIO (monitor hworker) + queue hworker (TimedJob 1000000) + queue hworker (TimedJob 1000000) + threadDelay 500000 + killThread wthread1 + killThread wthread2 + wthread3 <- forkIO (worker hworker) + threadDelay 10000000 + destroy hworker + v <- takeMVar mvar + killThread wthread3 + killThread mthread + assertEqual "State should be 2, since first 2 failed" 2 v + + it "should work with multiple monitors" $ do + mvar <- newMVar 0 + hworker <- + createWith + (conf "timedworker-3" (TimedState mvar)) { hwconfigTimeout = 5 } + wthread1 <- forkIO (worker hworker) + wthread2 <- forkIO (worker hworker) + -- NOTE(dbp 2015-07-24): This might seem silly, but it + -- was actually sufficient to expose a race condition. + mthread1 <- forkIO (monitor hworker) + mthread2 <- forkIO (monitor hworker) + mthread3 <- forkIO (monitor hworker) + mthread4 <- forkIO (monitor hworker) + mthread5 <- forkIO (monitor hworker) + mthread6 <- forkIO (monitor hworker) + queue hworker (TimedJob 1000000) + queue hworker (TimedJob 1000000) + threadDelay 500000 + killThread wthread1 + killThread wthread2 + wthread3 <- forkIO (worker hworker) + threadDelay 30000000 + destroy hworker + v <- takeMVar mvar + killThread wthread3 + mapM_ killThread [mthread1, mthread2, mthread3, mthread4, mthread5, mthread6] + assertEqual "State should be 2, since first 2 failed" 2 v + -- NOTE(dbp 2015-07-24): It would be really great to have a + -- test that went after a race between the retry logic and + -- the monitors (ie, assume that the job completed with + -- Retry, and it happened to complete right at the timeout + -- period). I'm not sure if I could get that sort of + -- precision without adding other delay mechanisms, or + -- something to make it more deterministic. + + describe "Scheduled and Recurring Jobs" $ do + it "should execute job at scheduled time" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + wthread <- forkIO (worker hworker) + sthread <- forkIO (scheduler hworker) + time <- getCurrentTime + queueScheduled hworker SimpleJob (addUTCTime 1 time) + queueScheduled hworker SimpleJob (addUTCTime 2 time) + queueScheduled hworker SimpleJob (addUTCTime 4 time) + threadDelay 1500000 >> readMVar mvar >>= shouldBe 1 + threadDelay 1000000 >> readMVar mvar >>= shouldBe 2 + threadDelay 1000000 >> readMVar mvar >>= shouldBe 2 + threadDelay 1000000 >> readMVar mvar >>= shouldBe 3 + killThread wthread + killThread sthread + destroy hworker + + it "should execute a recurring job" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "recurringworker-1" (RecurringState mvar)) + wthread <- forkIO (worker hworker) + sthread <- forkIO (scheduler hworker) + time <- getCurrentTime + queueScheduled hworker RecurringJob (addUTCTime 2 time) + threadDelay 3000000 >> readMVar mvar >>= shouldBe 1 + threadDelay 2000000 >> readMVar mvar >>= shouldBe 2 + threadDelay 2000000 >> readMVar mvar >>= shouldBe 3 + threadDelay 2000000 >> readMVar mvar >>= shouldBe 4 + destroy hworker + killThread wthread + killThread sthread + + it "should queue cron on start up" $ do + mvar <- newMVar 0 + hworker <- + createWith + (conf "simpleworker-1" (SimpleState mvar)) + { hwconfigCronJobs = [CronJob "cron-test" SimpleJob everyMinute] } + + checkCron hworker "cron-test" >>= shouldBe True + ls <- listScheduled hworker 0 100 + length ls `shouldBe` 1 + destroy hworker + + it "should not enqueue the same job multiple times" $ do + mvar <- newMVar 0 + hworker <- + createWith + (conf "simpleworker-1" (SimpleState mvar)) + { hwconfigCronJobs = [CronJob "cron-test" SimpleJob everyMinute] } + + time <- getCurrentTime + initCron hworker time [CronJob "cron-test" SimpleJob everyMinute] + initCron hworker time [CronJob "cron-test" SimpleJob everyMinute] + initCron hworker time [CronJob "cron-test" SimpleJob everyMinute] + ls <- listScheduled hworker 0 100 + length ls `shouldBe` 1 + destroy hworker + + it "should add to processing hash once a cron job is pushed to the jobs queue" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + destroy hworker + sthread <- forkIO (scheduler hworker) + time <- getCurrentTime + initCron hworker (addUTCTime (-60) time) [CronJob "cron-test" SimpleJob everyMinute] + threadDelay 1000000 + s <- listScheduled hworker 0 100 + length s `shouldBe` 0 + j <- listJobs hworker 0 100 + length j `shouldBe` 1 + checkCron hworker "cron-test" >>= shouldBe True + liftIO (getCronProcessing hworker "cron-test") >>= shouldNotBe Nothing + destroy hworker + killThread sthread + + it "should remove from processing hash and re-enqueue once a cron job is executed" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + destroy hworker + time <- getCurrentTime + initCron hworker (addUTCTime (-60) time) [CronJob "cron-test" SimpleJob everyMinute] + wthread <- forkIO (worker hworker) + sthread <- forkIO (scheduler hworker) + threadDelay 1000000 + j <- listJobs hworker 0 100 + length j `shouldBe` 0 + s <- listScheduled hworker 0 100 + length s `shouldBe` 1 + liftIO (getCronProcessing hworker "cron-test") >>= shouldBe Nothing + destroy hworker + killThread wthread + killThread sthread + + describe "Listing jobs" $ do + it "should list pending jobs" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + replicateM_ 45 (queue hworker SimpleJob) + listJobs hworker 0 10 >>= shouldBe 10 . length + listJobs hworker 1 10 >>= shouldBe 10 . length + listJobs hworker 2 10 >>= shouldBe 10 . length + listJobs hworker 3 10 >>= shouldBe 10 . length + listJobs hworker 4 10 >>= shouldBe 5 . length + destroy hworker + + it "should list scheduled jobs" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "simpleworker-1" (SimpleState mvar)) + time <- getCurrentTime + replicateM_ 45 (queueScheduled hworker SimpleJob (addUTCTime 1 time)) + listScheduled hworker 0 10 >>= shouldBe 10 . length + listScheduled hworker 1 10 >>= shouldBe 10 . length + listScheduled hworker 2 10 >>= shouldBe 10 . length + listScheduled hworker 3 10 >>= shouldBe 10 . length + listScheduled hworker 4 10 >>= shouldBe 5 . length + destroy hworker + + describe "Broken jobs" $ + it "should store broken jobs" $ do + -- NOTE(dbp 2015-08-09): The more common way this could + -- happen is that you change your serialization format. But + -- we can abuse this by creating two different workers + -- pointing to the same queue, and submit jobs in one, try + -- to run them in another, where the types are different. + mvar <- newMVar 0 + hworker1 <- createWith (conf "broken-1" (TimedState mvar)) { hwconfigTimeout = 5 } + hworker2 <- createWith (conf "broken-1" (SimpleState mvar)) { hwconfigTimeout = 5 } + wthread <- forkIO (worker hworker1) + queue hworker2 SimpleJob + threadDelay 100000 + brokenJobs <- broken hworker2 + killThread wthread + destroy hworker1 + v <- takeMVar mvar + assertEqual "State should be 0, as nothing should have happened" 0 v + assertEqual "Should be one broken job, as serialization is wrong" 1 (length brokenJobs) + + describe "Dump jobs" $ do + it "should return the job that was queued" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "dump-1" (SimpleState mvar)) { hwconfigTimeout = 5 } + queue hworker SimpleJob + res <- listJobs hworker 0 100 + destroy hworker + assertEqual "Should be [SimpleJob]" [SimpleJob] res + + it "should return jobs in order (most recently added at front; worker pulls from back)" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "dump-2" (TimedState mvar)) { hwconfigTimeout = 5 } + queue hworker (TimedJob 1) + queue hworker (TimedJob 2) + res <- listJobs hworker 0 100 + destroy hworker + assertEqual "Should by [TimedJob 2, TimedJob 1]" [TimedJob 2, TimedJob 1] res + + describe "Large jobs" $ do + it "should be able to deal with lots of large jobs" $ do + mvar <- newMVar 0 + hworker <- createWith (conf "big-1" (BigState mvar)) + wthread1 <- forkIO (worker hworker) + wthread2 <- forkIO (worker hworker) + wthread3 <- forkIO (worker hworker) + wthread4 <- forkIO (worker hworker) + let content = T.intercalate "\n" (take 1000 (repeat "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")) + replicateM_ 5000 (queue hworker (BigJob content)) + threadDelay 10000000 + killThread wthread1 + killThread wthread2 + killThread wthread3 + killThread wthread4 + destroy hworker + v <- takeMVar mvar + assertEqual "Should have processed 5000" 5000 v + + +data SimpleJob = + SimpleJob deriving (Generic, Show, Eq) -data SimpleJob = SimpleJob deriving (Generic, Show, Eq) -data SimpleState = SimpleState { unSimpleState :: MVar Int } instance ToJSON SimpleJob instance FromJSON SimpleJob + +newtype SimpleState = + SimpleState (MVar Int) + instance Job SimpleState SimpleJob where - job (SimpleState mvar) SimpleJob = - do modifyMVarMasked_ mvar (return . (+1)) - return Success + job Hworker { hworkerState = SimpleState mvar } SimpleJob = + modifyMVarMasked_ mvar (return . (+1)) >> return Success + + +data ExJob = + ExJob deriving (Generic, Show) -data ExJob = ExJob deriving (Generic, Show) -data ExState = ExState { unExState :: MVar Int } instance ToJSON ExJob instance FromJSON ExJob + +newtype ExState = + ExState (MVar Int) + instance Job ExState ExJob where - job (ExState mvar) ExJob = - do modifyMVarMasked_ mvar (return . (+1)) - v <- readMVar mvar - if v > 1 - then return Success - else error "ExJob: failing badly!" - -data RetryJob = RetryJob deriving (Generic, Show) -data RetryState = RetryState { unRetryState :: MVar Int } + job Hworker { hworkerState = ExState mvar } ExJob = do + modifyMVarMasked_ mvar (return . (+1)) + v <- readMVar mvar + if v > 1 + then return Success + else error "ExJob: failing badly!" + + +data RetryJob = + RetryJob deriving (Generic, Show) + instance ToJSON RetryJob instance FromJSON RetryJob + +newtype RetryState = + RetryState (MVar Int) + instance Job RetryState RetryJob where - job (RetryState mvar) RetryJob = - do modifyMVarMasked_ mvar (return . (+1)) - v <- readMVar mvar - if v > 1 - then return Success - else return (Retry "RetryJob retries") - -data FailJob = FailJob deriving (Eq, Generic, Show) -data FailState = FailState { unFailState :: MVar Int } + job Hworker { hworkerState = RetryState mvar } RetryJob = do + modifyMVarMasked_ mvar (return . (+1)) + v <- readMVar mvar + if v > 1 + then return Success + else return (Retry "RetryJob retries") + + +data FailJob = + FailJob deriving (Eq, Generic, Show) + instance ToJSON FailJob instance FromJSON FailJob + +newtype FailState = + FailState (MVar Int) + instance Job FailState FailJob where - job (FailState mvar) FailJob = - do modifyMVarMasked_ mvar (return . (+1)) - v <- readMVar mvar - if v > 1 - then return Success - else return (Failure "FailJob fails") - -data AlwaysFailJob = AlwaysFailJob deriving (Eq, Generic, Show) -data AlwaysFailState = AlwaysFailState { unAlwaysFailState :: MVar Int } + job Hworker { hworkerState = FailState mvar } FailJob = do + modifyMVarMasked_ mvar (return . (+1)) + v <- readMVar mvar + if v > 1 + then return Success + else return (Failure "FailJob fails") + + +data AlwaysFailJob = + AlwaysFailJob deriving (Eq, Generic, Show) + instance ToJSON AlwaysFailJob instance FromJSON AlwaysFailJob + +newtype AlwaysFailState = + AlwaysFailState (MVar Int) + instance Job AlwaysFailState AlwaysFailJob where - job (AlwaysFailState mvar) AlwaysFailJob = - do modifyMVarMasked_ mvar (return . (+1)) - return (Failure "AlwaysFailJob fails") + job Hworker { hworkerState = AlwaysFailState mvar} AlwaysFailJob = do + modifyMVarMasked_ mvar (return . (+1)) + return (Failure "AlwaysFailJob fails") + + +data TimedJob = + TimedJob Int deriving (Generic, Show, Eq) -data TimedJob = TimedJob Int deriving (Generic, Show, Eq) -data TimedState = TimedState { unTimedState :: MVar Int } instance ToJSON TimedJob instance FromJSON TimedJob + +newtype TimedState = + TimedState (MVar Int) + instance Job TimedState TimedJob where - job (TimedState mvar) (TimedJob delay) = - do threadDelay delay - modifyMVarMasked_ mvar (return . (+1)) - return Success + job Hworker { hworkerState = TimedState mvar } (TimedJob delay) = do + threadDelay delay + modifyMVarMasked_ mvar (return . (+1)) + return Success + + +data BigJob = + BigJob Text deriving (Generic, Show, Eq) -data BigJob = BigJob T.Text deriving (Generic, Show, Eq) -data BigState = BigState { unBigState :: MVar Int } instance ToJSON BigJob instance FromJSON BigJob + +newtype BigState = + BigState (MVar Int) + instance Job BigState BigJob where - job (BigState mvar) (BigJob _) = - do modifyMVarMasked_ mvar (return . (+1)) - return Success + job Hworker { hworkerState = BigState mvar } (BigJob _) = + modifyMVarMasked_ mvar (return . (+1)) >> return Success -nullLogger :: Show a => a -> IO () -nullLogger = const (return ()) -print' :: Show a => a -> IO () -print' a = do print a - hFlush stdout +data RecurringJob = + RecurringJob deriving (Generic, Show, Eq) -conf n s = (defaultHworkerConfig n s) { - hwconfigLogger = nullLogger - , hwconfigExceptionBehavior = FailOnException - , hwconfigTimeout = 4 - } +instance ToJSON RecurringJob +instance FromJSON RecurringJob -main :: IO () -main = hspec $ - do describe "Simple" $ - do it "should run and increment counter" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "simpleworker-1" - (SimpleState mvar)) - wthread <- forkIO (worker hworker) - queue hworker SimpleJob - threadDelay 30000 - killThread wthread - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 1 after job runs" 1 v - it "queueing 2 jobs should increment twice" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "simpleworker-2" - (SimpleState mvar)) - wthread <- forkIO (worker hworker) - queue hworker SimpleJob - queue hworker SimpleJob - threadDelay 40000 - killThread wthread - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 2 after 2 jobs run" 2 v - it "queueing 1000 jobs should increment 1000" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "simpleworker-3" - (SimpleState mvar)) - wthread <- forkIO (worker hworker) - replicateM_ 1000 (queue hworker SimpleJob) - threadDelay 2000000 - killThread wthread - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 1000 after 1000 job runs" 1000 v - it "should work with multiple workers" $ - -- NOTE(dbp 2015-07-12): This probably won't run faster, because - -- they are all blocking on the MVar, but that's not the point. - do mvar <- newMVar 0 - hworker <- createWith (conf "simpleworker-4" - (SimpleState mvar)) - wthread1 <- forkIO (worker hworker) - wthread2 <- forkIO (worker hworker) - wthread3 <- forkIO (worker hworker) - wthread4 <- forkIO (worker hworker) - replicateM_ 1000 (queue hworker SimpleJob) - threadDelay 1000000 - killThread wthread1 - killThread wthread2 - killThread wthread3 - killThread wthread4 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 1000 after 1000 job runs" 1000 v - - describe "Exceptions" $ - do it "should be able to have exceptions thrown in jobs and retry the job" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "exworker-1" - (ExState mvar)) { - hwconfigExceptionBehavior = - RetryOnException - } - wthread <- forkIO (worker hworker) - queue hworker ExJob - threadDelay 40000 - killThread wthread - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 2, since the first run failed" 2 v - it "should not retry if mode is FailOnException" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "exworker-2" - (ExState mvar)) - wthread <- forkIO (worker hworker) - queue hworker ExJob - threadDelay 30000 - killThread wthread - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 1, since failing run wasn't retried" 1 v - - describe "Retry" $ - do it "should be able to return Retry and get run again" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "retryworker-1" - (RetryState mvar)) - wthread <- forkIO (worker hworker) - queue hworker RetryJob - threadDelay 50000 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 2, since it got retried" 2 v - - describe "Fail" $ - do it "should not retry a job that Fails" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "failworker-1" - (FailState mvar)) - wthread <- forkIO (worker hworker) - queue hworker FailJob - threadDelay 30000 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 1, since failing run wasn't retried" 1 v - it "should put a failed job into the failed queue" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "failworker-2" - (FailState mvar)) - wthread <- forkIO (worker hworker) - queue hworker FailJob - threadDelay 30000 - jobs <- failed hworker - destroy hworker - assertEqual "Should have failed job" [FailJob] jobs - it "should only store failedQueueSize failed jobs" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "failworker-3" - (AlwaysFailState mvar)) { - hwconfigFailedQueueSize = 2 - } - wthread <- forkIO (worker hworker) - queue hworker AlwaysFailJob - queue hworker AlwaysFailJob - queue hworker AlwaysFailJob - queue hworker AlwaysFailJob - threadDelay 100000 - jobs <- failed hworker - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 4, since all jobs were run" 4 v - assertEqual "Should only have stored 2" - [AlwaysFailJob,AlwaysFailJob] jobs - describe "Monitor" $ - do it "should add job back after timeout" $ - -- NOTE(dbp 2015-07-12): The timing on this test is somewhat - -- tricky. We want to get the job started with one worker, - -- then kill the worker, then start a new worker, and have - -- the monitor put the job back in the queue and have the - -- second worker finish it. It's important that the job - -- takes less time to complete than the timeout for the - -- monitor, or else it'll queue it forever. - -- - -- The timeout is 5 seconds. The job takes 1 seconds to run. - -- The worker is killed after 0.5 seconds, which should be - -- plenty of time for it to have started the job. Then after - -- the second worker is started, we wait 10 seconds, which - -- should be plenty; we expect the total run to take around 11. - do mvar <- newMVar 0 - hworker <- createWith (conf "timedworker-1" - (TimedState mvar)) { - hwconfigTimeout = 5 - } - wthread1 <- forkIO (worker hworker) - mthread <- forkIO (monitor hworker) - queue hworker (TimedJob 1000000) - threadDelay 500000 - killThread wthread1 - wthread2 <- forkIO (worker hworker) - threadDelay 10000000 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 2, since monitor thinks it failed" 2 v - it "should add back multiple jobs after timeout" $ - -- NOTE(dbp 2015-07-23): Similar to the above test, but we - -- have multiple jobs started, multiple workers killed. - -- then one worker will finish both interrupted jobs. - do mvar <- newMVar 0 - hworker <- createWith (conf "timedworker-2" - (TimedState mvar)) { - hwconfigTimeout = 5 - } - wthread1 <- forkIO (worker hworker) - wthread2 <- forkIO (worker hworker) - mthread <- forkIO (monitor hworker) - queue hworker (TimedJob 1000000) - queue hworker (TimedJob 1000000) - threadDelay 500000 - killThread wthread1 - killThread wthread2 - wthread3 <- forkIO (worker hworker) - threadDelay 10000000 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 4, since monitor thinks first 2 failed" 4 v - it "should work with multiple monitors" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "timedworker-3" - (TimedState mvar)) { - hwconfigTimeout = 5 - } - wthread1 <- forkIO (worker hworker) - wthread2 <- forkIO (worker hworker) - -- NOTE(dbp 2015-07-24): This might seem silly, but it - -- was actually sufficient to expose a race condition. - mthread1 <- forkIO (monitor hworker) - mthread2 <- forkIO (monitor hworker) - mthread3 <- forkIO (monitor hworker) - mthread4 <- forkIO (monitor hworker) - mthread5 <- forkIO (monitor hworker) - mthread6 <- forkIO (monitor hworker) - queue hworker (TimedJob 1000000) - queue hworker (TimedJob 1000000) - threadDelay 500000 - killThread wthread1 - killThread wthread2 - wthread3 <- forkIO (worker hworker) - threadDelay 30000000 - destroy hworker - v <- takeMVar mvar - assertEqual "State should be 4, since monitor thinks first 2 failed" 4 v - -- NOTE(dbp 2015-07-24): It would be really great to have a - -- test that went after a race between the retry logic and - -- the monitors (ie, assume that the job completed with - -- Retry, and it happened to complete right at the timeout - -- period). I'm not sure if I could get that sort of - -- precision without adding other delay mechanisms, or - -- something to make it more deterministic. - describe "Broken jobs" $ - it "should store broken jobs" $ - do -- NOTE(dbp 2015-08-09): The more common way this could - -- happen is that you change your serialization format. But - -- we can abuse this by creating two different workers - -- pointing to the same queue, and submit jobs in one, try - -- to run them in another, where the types are different. - mvar <- newMVar 0 - hworker1 <- createWith (conf "broken-1" - (TimedState mvar)) { - hwconfigTimeout = 5 - } - hworker2 <- createWith (conf "broken-1" - (SimpleState mvar)) { - hwconfigTimeout = 5 - } - wthread <- forkIO (worker hworker1) - queue hworker2 SimpleJob - threadDelay 100000 - jobs <- broken hworker2 - killThread wthread - destroy hworker1 - v <- takeMVar mvar - assertEqual "State should be 0, as nothing should have happened" 0 v - assertEqual "Should be one broken job, as serialization is wrong" 1 (length jobs) - describe "Dump jobs" $ do - it "should return the job that was queued" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "dump-1" - (SimpleState mvar)) { - hwconfigTimeout = 5 - } - queue hworker SimpleJob - res <- jobs hworker - destroy hworker - assertEqual "Should be [SimpleJob]" [SimpleJob] res - it "should return jobs in order (most recently added at front; worker pulls from back)" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "dump-2" - (TimedState mvar)) { - hwconfigTimeout = 5 - } - queue hworker (TimedJob 1) - queue hworker (TimedJob 2) - res <- jobs hworker - destroy hworker - assertEqual "Should by [TimedJob 2, TimedJob 1]" [TimedJob 2, TimedJob 1] res - describe "Large jobs" $ do - it "should be able to deal with lots of large jobs" $ - do mvar <- newMVar 0 - hworker <- createWith (conf "big-1" - (BigState mvar)) - wthread1 <- forkIO (worker hworker) - wthread2 <- forkIO (worker hworker) - wthread3 <- forkIO (worker hworker) - wthread4 <- forkIO (worker hworker) - let content = T.intercalate "\n" (take 1000 (repeat "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")) - replicateM_ 5000 (queue hworker (BigJob content)) - threadDelay 10000000 - killThread wthread1 - killThread wthread2 - killThread wthread3 - killThread wthread4 - destroy hworker - v <- takeMVar mvar - assertEqual "Should have processed 5000" 5000 v +newtype RecurringState = + RecurringState (MVar Int) + +instance Job RecurringState RecurringJob where + job hw@Hworker{ hworkerState = RecurringState mvar} RecurringJob = do + modifyMVarMasked_ mvar (return . (+1)) + time <- getCurrentTime + queueScheduled hw RecurringJob (addUTCTime 1.99 time) + return Success + + +conf :: Text -> s -> HworkerConfig s t +conf n s = + (defaultHworkerConfig n s) + { hwconfigLogger = const (return ()) + , hwconfigExceptionBehavior = FailOnException + , hwconfigTimeout = 4 + } + + +startBatch :: Hworker s t -> Maybe Integer -> IO BatchId +startBatch hw expiration = + initBatch hw expiration >>= + \case + Just batch -> return batch + Nothing -> fail "Failed to create batch" + + +expectBatchSummary :: Hworker s t -> BatchId -> IO BatchSummary +expectBatchSummary hw batch = + batchSummary hw batch >>= + \case + Just summary -> return summary + Nothing -> fail "Failed to getch batch summary"