cljobq.core
db-migrate!
(db-migrate!)
(db-migrate! {:keys [context], :or {context (clojure.core/deref global-ctx*)}})
Run cljobq database migrations to ensure the database schema is set up and up-to-date.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.
Example:
(cljobq.core/db-migrate!)
delete-job
(delete-job {:keys [context queue job-name], :or {context (clojure.core/deref global-ctx*)}})
Delete a specific job job-name
from the specified queue. Deleting a job by name only really makes sense for recurring jobs. Returns the number of jobs deleted.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:job-name
The name of the job to delete. Required.:queue
The name of the queue to delete the job from. Required.
Example:
(cljobq.core/delete-job {:queue "math", :job-name "algebra-on-wednesdays"})
delete-queue-jobs
(delete-queue-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*)}})
Delete all jobs from the specified queue. Returns the number of jobs deleted.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:queue
The name of the queue. Required.
Example:
(cljobq.core/delete-queue-jobs {:queue "math"})
enqueue
(enqueue {:keys [context queue job-name interval actor args], :or {context (clojure.core/deref global-ctx*), job-name (uuid/v1), interval nil, args []}})
Enqueue a job based on a job description with an optionally specfied explicit context
. If a job with the given job-name
for the specified queue
already exists, the existing job will be updated instead. If no job-name
is given, a uuid-based name will be assigned automatically. An interval
string (unix cron format) can be specified to create a recurring job. Errors in the interval specification cause an exception to be thrown. Returns the newly inserted job.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:job-name
A name for the job. A job name is unique within a queue. If not specified, a UUID-based random name will be used instead.:queue
A queue name to enqueue the job to. Required.:actor
A var-quoted function to run as part of this job. Required.:args
A vector of arguments to pass to the job function.:interval
A string-based UNIX cron expression defining the interval at which to run a job. If specified, a recurring job is created. If omitted or nil, the job will only run once.:run-at
The date/time at which to run the job. If omitted, the job will be scheduled to run immediately if it is a one-off job, or at the next recurrence if aninterval
was specified.
Example:
(cljobq.core/enqueue
{:actor #'*-and-log
:args [9 6]
:queue "math"})
Example return value:
{:args [9 6]
:job-name "10189f80-81ab-11e8-9e30-698897b5b41d"
:last-error nil
:queue "math"
:status "pending"
:id 18
:picked-at nil
:interval nil
:created-at #inst "2018-07-07T06:00:34.430781000-00:00"
:run-at #inst "2018-07-07T06:00:34.424000000-00:00"
:attempt 0
:related-job-id nil
:actor "user/*-and-log"}
global-ctx*
list-failed-jobs
(list-failed-jobs)
(list-failed-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})
List all failed jobs. If queue
is specified, only jobs for that queue will be shown.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:queue
The name of the queue. If omitted, jobs from all queues will be shown.
Example:
(cljobq.core/list-failed-jobs {:queue "queue-c"})
Example return value:
({:args [14 :e]
:job-name "929238f0-7e2b-11e8-9580-698897b5b41d"
:last-error "Error from actor `user/*-and-log` payload `[14 :e]`: clojure.lang.Keyword cannot be cast to java.lang.Number\nclojure.lang.Numbers.multiply(Numbers.java:148)\nuser$_STAR__and_log.invokeStatic(form-init6140559550272751977.clj:2)\nuser$_STAR__and_log.invoke(form-init6140559550272751977.clj:1) ..."
:queue "queue-c"
:status nil
:id 12
:picked-at #inst "2018-07-04T20:43:12.000747000-00:00"
:interval nil
:created-at #inst "2018-07-02T19:10:24.264601000-00:00"
:run-at nil
:attempt 5
:related-job-id nil
:actor "user/*-and-log"})
list-jobs
(list-jobs)
(list-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})
List all non-failed jobs. If queue
is specified, only jobs for that queue will be shown.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:queue
The name of the queue. If omitted, jobs from all queues will be shown.
Example:
(cljobq.core/list-jobs)
Example return value:
({:args [9 6]
:job-name "10189f80-81ab-11e8-9e30-698897b5b41d"
:last-error nil
:queue "math"
:status "pending"
:id 18
:picked-at nil
:interval nil
:created-at #inst "2018-07-07T06:00:34.430781000-00:00"
:run-at #inst "2018-07-07T06:00:34.424000000-00:00"
:attempt 0
:related-job-id nil
:actor "user/*-and-log"}
{:args [:a 81]
:job-name "regular-math-err"
:last-error nil
:queue "queue-c"
:status "scheduled"
:id 22
:picked-at nil
:interval "15 23 * * *"
:created-at #inst "2018-07-07T06:56:35.294888000-00:00"
:run-at #inst "2018-07-07T22:15:00.000000000-00:00"
:attempt 0
:related-job-id nil
:actor "user/*-and-log"})
list-recurring-jobs
(list-recurring-jobs)
(list-recurring-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})
List all recurring jobs. If queue
is specified, only jobs for that queue will be shown.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:queue
The name of the queue. If omitted, jobs from all queues will be shown.
Example:
(cljobq.core/list-recurring-jobs {:queue "math"})
Example return value:
({:args [:a 81]
:job-name "regular-math-err"
:last-error nil
:queue "queue-c"
:status "scheduled"
:id 22
:picked-at nil
:interval "15 23 * * *"
:created-at #inst "2018-07-07T06:56:35.294888000-00:00"
:run-at #inst "2018-07-07T22:15:00.000000000-00:00"
:attempt 0
:related-job-id nil
:actor "user/*-and-log"})
make-context
(make-context {:keys [db default-timeout default-max-timeout default-max-attempts default-num-threads], :or {default-timeout 1800, default-max-timeout (* default-timeout 10), default-num-threads 1, default-max-attempts 3}, :as context})
Return a new cljobq context based on the config hash passed in. cljobq does not use connection pooling internally, however, connection pooling can be enabled by using a db-spec with a data source set using hikari-cp or c3p0.
All options except for :db
can be overriden per queue in the queue-defs passed to cljobq.core/start
.
Options:
:db
Aclojure.java.jdbc
db-spec map. Required.:default-timeout
The default job timeout, in seconds. Defaults to 1800 seconds.:default-max-timeout
The default maximum job timeout, including any backoff. Defaults to 10 times the default timeout.:default-max-attempts
The default maximum number of attempts/retries to try and run a job for before marking it as failed. Defaults to 3.:default-num-threads
The default number of worker threads for a queue. Defaults to 1.:app/context
An optional application-specific context passed as first argument to all job function invocations. If not provided, the parameter will not omitted from the job function invocation.
Example:
(def jobq-context
(cljobq.core/make-context
{:db {:connection-uri "jdbc:postgresql://localhost:5432/jobqtest?user=test&password=test"}
:default-timeout 180}))
set-context!
(set-context! v)
Set the global context cljobq.core/global-ctx*
based on the passed in config v
. See cljobq.core/make-context
for more information on the configuration options.
The global context serves as an implicit context to all other functions in the absence of an explicit context.
Example:
(cljobq.core/set-context!
{:db {:connection-uri "jdbc:postgresql://localhost:5432/jobqtest?user=test&password=test"}
:default-timeout 180})
start
(start {:keys [context queues], :or {context (clojure.core/deref global-ctx*)}})
Given a set of queue definitions in queues
, start the worker threads for each of the defined queues and start processing jobs. Returns a run-info
which can be used to stop the threads (see cljobq.core/stop
for more information.
Options:
:context
cljobq context created viacljobq.core/make-context
. If not specified, the implicitcljobq.core/global-ctx*
atom set usingcljobq.core/set-context!
is used instead.:queues
A map of queue-name -> queue-def. A queue def can an empty map, in which case the defaults of thecontext
will be used for all options.
Each queue-def is a map with the following options (defaults are taken from the context - see cljobq.core/make-context
):
:num-threads
The number of worker threads to start for this queue.:max-attempts
The maximum number of attempts/retries to try and run a job on this queue for before marking it as failed.:timeout
The job timeout, in seconds.:max-timeout
The maximum job timeout, including any backoff.:backoff-factor
The backoff factor to use when calculating when to retry a job. Calculated as:random() * timeout + min(max-timeout, timeout * backoff-factor ^ attempt-number)
. Abackoff-factor
of 1 defines a constant backoff, whilst a factor greater than 1 defines an exponential backoff.:poll-interval
The number of seconds the queue’s workers will sleep for if no job is ready to run.
Example:
(def jobq-runinfo
(cljobq.core/start
{:queues
{:default
{}
:math
{:num-threads 4
:max-attempts 1
:poll-interval 5}
:email
{:num-threads 2
:timeout 600
:max-timeout 14400
:backoff-factor 1.9}}}))
stop
(stop)
(stop run-info-or-ctx)
Stop all worker threads. Can be passed either a run-info
returned by cljobq.core/start
or a context. If passed a context, only the threads started by the last call to cljobq.core/start
with that context will be stopped.
Example:
(cljobq.core/stop jobq-runinfo)