cljobq.core

db-migrate!

(db-migrate!)(db-migrate! {:keys [context], :or {context (clojure.core/deref global-ctx*)}})

Run cljobq database migrations to ensure the database schema is set up and up-to-date.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.

Example:

(cljobq.core/db-migrate!)

delete-job

(delete-job {:keys [context queue job-name], :or {context (clojure.core/deref global-ctx*)}})

Delete a specific job job-name from the specified queue. Deleting a job by name only really makes sense for recurring jobs. Returns the number of jobs deleted.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :job-name The name of the job to delete. Required.
  • :queue The name of the queue to delete the job from. Required.

Example:

(cljobq.core/delete-job {:queue "math", :job-name "algebra-on-wednesdays"})

delete-queue-jobs

(delete-queue-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*)}})

Delete all jobs from the specified queue. Returns the number of jobs deleted.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :queue The name of the queue. Required.

Example:

(cljobq.core/delete-queue-jobs {:queue "math"})

enqueue

(enqueue {:keys [context queue job-name interval actor args], :or {context (clojure.core/deref global-ctx*), job-name (uuid/v1), interval nil, args []}})

Enqueue a job based on a job description with an optionally specfied explicit context. If a job with the given job-name for the specified queue already exists, the existing job will be updated instead. If no job-name is given, a uuid-based name will be assigned automatically. An interval string (unix cron format) can be specified to create a recurring job. Errors in the interval specification cause an exception to be thrown. Returns the newly inserted job.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :job-name A name for the job. A job name is unique within a queue. If not specified, a UUID-based random name will be used instead.
  • :queue A queue name to enqueue the job to. Required.
  • :actor A var-quoted function to run as part of this job. Required.
  • :args A vector of arguments to pass to the job function.
  • :interval A string-based UNIX cron expression defining the interval at which to run a job. If specified, a recurring job is created. If omitted or nil, the job will only run once.
  • :run-at The date/time at which to run the job. If omitted, the job will be scheduled to run immediately if it is a one-off job, or at the next recurrence if an interval was specified.

Example:

(cljobq.core/enqueue
  {:actor #'*-and-log
   :args [9 6]
   :queue "math"})

Example return value:

{:args [9 6]
 :job-name "10189f80-81ab-11e8-9e30-698897b5b41d"
 :last-error nil
 :queue "math"
 :status "pending"
 :id 18
 :picked-at nil
 :interval nil
 :created-at #inst "2018-07-07T06:00:34.430781000-00:00"
 :run-at #inst "2018-07-07T06:00:34.424000000-00:00"
 :attempt 0
 :related-job-id nil
 :actor "user/*-and-log"}

global-ctx*

list-failed-jobs

(list-failed-jobs)(list-failed-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})

List all failed jobs. If queue is specified, only jobs for that queue will be shown.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :queue The name of the queue. If omitted, jobs from all queues will be shown.

Example:

(cljobq.core/list-failed-jobs {:queue "queue-c"})

Example return value:

({:args [14 :e]
  :job-name "929238f0-7e2b-11e8-9580-698897b5b41d"
  :last-error "Error from actor `user/*-and-log` payload `[14 :e]`: clojure.lang.Keyword cannot be cast to java.lang.Number\nclojure.lang.Numbers.multiply(Numbers.java:148)\nuser$_STAR__and_log.invokeStatic(form-init6140559550272751977.clj:2)\nuser$_STAR__and_log.invoke(form-init6140559550272751977.clj:1) ..."
  :queue "queue-c"
  :status nil
  :id 12
  :picked-at #inst "2018-07-04T20:43:12.000747000-00:00"
  :interval nil
  :created-at #inst "2018-07-02T19:10:24.264601000-00:00"
  :run-at nil
  :attempt 5
  :related-job-id nil
  :actor "user/*-and-log"})

list-jobs

(list-jobs)(list-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})

List all non-failed jobs. If queue is specified, only jobs for that queue will be shown.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :queue The name of the queue. If omitted, jobs from all queues will be shown.

Example:

(cljobq.core/list-jobs)

Example return value:

({:args [9 6]
  :job-name "10189f80-81ab-11e8-9e30-698897b5b41d"
  :last-error nil
  :queue "math"
  :status "pending"
  :id 18
  :picked-at nil
  :interval nil
  :created-at #inst "2018-07-07T06:00:34.430781000-00:00"
  :run-at #inst "2018-07-07T06:00:34.424000000-00:00"
  :attempt 0
  :related-job-id nil
  :actor "user/*-and-log"}
 {:args [:a 81]
  :job-name "regular-math-err"
  :last-error nil
  :queue "queue-c"
  :status "scheduled"
  :id 22
  :picked-at nil
  :interval "15 23 * * *"
  :created-at #inst "2018-07-07T06:56:35.294888000-00:00"
  :run-at #inst "2018-07-07T22:15:00.000000000-00:00"
  :attempt 0
  :related-job-id nil
  :actor "user/*-and-log"})

list-recurring-jobs

(list-recurring-jobs)(list-recurring-jobs {:keys [context queue], :or {context (clojure.core/deref global-ctx*), queue nil}})

List all recurring jobs. If queue is specified, only jobs for that queue will be shown.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :queue The name of the queue. If omitted, jobs from all queues will be shown.

Example:

(cljobq.core/list-recurring-jobs {:queue "math"})

Example return value:

({:args [:a 81]
  :job-name "regular-math-err"
  :last-error nil
  :queue "queue-c"
  :status "scheduled"
  :id 22
  :picked-at nil
  :interval "15 23 * * *"
  :created-at #inst "2018-07-07T06:56:35.294888000-00:00"
  :run-at #inst "2018-07-07T22:15:00.000000000-00:00"
  :attempt 0
  :related-job-id nil
  :actor "user/*-and-log"})

make-context

(make-context {:keys [db default-timeout default-max-timeout default-max-attempts default-num-threads], :or {default-timeout 1800, default-max-timeout (* default-timeout 10), default-num-threads 1, default-max-attempts 3}, :as context})

Return a new cljobq context based on the config hash passed in. cljobq does not use connection pooling internally, however, connection pooling can be enabled by using a db-spec with a data source set using hikari-cp or c3p0.

All options except for :db can be overriden per queue in the queue-defs passed to cljobq.core/start.

Options:

  • :db A clojure.java.jdbc db-spec map. Required.
  • :default-timeout The default job timeout, in seconds. Defaults to 1800 seconds.
  • :default-max-timeout The default maximum job timeout, including any backoff. Defaults to 10 times the default timeout.
  • :default-max-attempts The default maximum number of attempts/retries to try and run a job for before marking it as failed. Defaults to 3.
  • :default-num-threads The default number of worker threads for a queue. Defaults to 1.
  • :app/context An optional application-specific context passed as first argument to all job function invocations. If not provided, the parameter will not omitted from the job function invocation.

Example:

(def jobq-context
  (cljobq.core/make-context
    {:db {:connection-uri "jdbc:postgresql://localhost:5432/jobqtest?user=test&password=test"}
     :default-timeout 180}))

set-context!

(set-context! v)

Set the global context cljobq.core/global-ctx* based on the passed in config v. See cljobq.core/make-context for more information on the configuration options.

The global context serves as an implicit context to all other functions in the absence of an explicit context.

Example:

(cljobq.core/set-context!
  {:db {:connection-uri "jdbc:postgresql://localhost:5432/jobqtest?user=test&password=test"}
   :default-timeout 180})

start

(start {:keys [context queues], :or {context (clojure.core/deref global-ctx*)}})

Given a set of queue definitions in queues, start the worker threads for each of the defined queues and start processing jobs. Returns a run-info which can be used to stop the threads (see cljobq.core/stop for more information.

Options:

  • :context cljobq context created via cljobq.core/make-context. If not specified, the implicit cljobq.core/global-ctx* atom set using cljobq.core/set-context! is used instead.
  • :queues A map of queue-name -> queue-def. A queue def can an empty map, in which case the defaults of the context will be used for all options.

Each queue-def is a map with the following options (defaults are taken from the context - see cljobq.core/make-context):

  • :num-threads The number of worker threads to start for this queue.
  • :max-attempts The maximum number of attempts/retries to try and run a job on this queue for before marking it as failed.
  • :timeout The job timeout, in seconds.
  • :max-timeout The maximum job timeout, including any backoff.
  • :backoff-factor The backoff factor to use when calculating when to retry a job. Calculated as: random() * timeout + min(max-timeout, timeout * backoff-factor ^ attempt-number). A backoff-factor of 1 defines a constant backoff, whilst a factor greater than 1 defines an exponential backoff.
  • :poll-interval The number of seconds the queue’s workers will sleep for if no job is ready to run.

Example:

(def jobq-runinfo
  (cljobq.core/start
    {:queues
     {:default
      {}
      :math
      {:num-threads 4
       :max-attempts 1
       :poll-interval 5}
      :email
      {:num-threads 2
       :timeout 600
       :max-timeout 14400
       :backoff-factor 1.9}}}))

stop

(stop)(stop run-info-or-ctx)

Stop all worker threads. Can be passed either a run-info returned by cljobq.core/start or a context. If passed a context, only the threads started by the last call to cljobq.core/start with that context will be stopped.

Example:

(cljobq.core/stop jobq-runinfo)