mirabelle.action

above-dt

(above-dt config & children)

Takes a number threshold and a time period in seconds duration. If the condition “the event metric is > to the threshold” is valid for all events received during at least the period duration, valid events received after the duration period will be passed on until an invalid event arrives. :metric should not be nil (it will produce exceptions).

(above-dt {:threshold 100 :duration 10}
  (debug))

In this example, if the events :metric field are greater than 100 for more than 10 seconds, events are passed downstream.

action->fn

aggr-sum

(aggr-sum config & children)

aggregation*

(aggregation* _ {:keys [duration aggr-fn init]} & children)

async-queue!

(async-queue! queue-name & children)

Execute children into the specific async queue. The async queue should be defined in the I/O configuration file.

(async-queue! :my-queue
  (info))

async-queue!*

(async-queue!* context queue-name & children)

below-dt

(below-dt config & children)

Takes a number threshold and a time period in seconds duration. If the condition the event metric is < to the threshold is valid for all events received during at least the period duration, valid events received after the duration period will be passed on until an invalid event arrives. :metric should not be nil (it will produce exceptions).

clojure (below-dt {:threshold 100 :duration 10} (debug))

In this example, if the events :metric field are lower than 100 for more than 10 seconds, events are passed downstream.

between-dt

(between-dt config & children)

Takes two numbers, low and high, and a time period in seconds, duration. If the condition the event metric is > low and < high is valid for all events received during at least the period duration, valid events received after the duration period will be passed on until an invalid event arrives. :metric should not be nil (it will produce exceptions).

(between-dt {:low 50 :high 100 :duration 10}
  (debug))

In this example, if the events :metric field are between 50 ans 100 for more than 10 seconds, events are passed downstream.

by

(by fields & children)

Split stream by field Every time an event arrives with a new value of field, this action invokes its child forms to return a new, distinct set of streams for that particular value.

(by [:host :service]
  (fixed-time-window {:duration 60}))

This example generates a moving window for each host/service combination.

by-fn

(by-fn fields new-fork)

call-rescue

(call-rescue event children)

changed

(changed config & children)

Passes on events only if the field passed as parameter differs from the previous one. The init parameter is the default value for the stream.

(changed {:field :state :init "ok"})

For example, this action will let event pass if the :state field vary, the initial value being ok.

This stream is useful to get only events making a transition.

changed*

(changed* _ {:keys [field init]} & children)

coalesce

(coalesce config & children)

Returns a list of the latest non-expired events (by fields) every dt seconds (at best).

(coalesce {:duration 10 :fields [:host :service]}
  (debug)

In this example, the latest event for each host/service combination will be kept and forwarded downstream. The debug action will then receive this list of events. Expired events will be removed from the list.

coalesce*

(coalesce* _ {:keys [duration fields]} & children)

coll-bottom

(coll-bottom nb-events & children)

Receives a list of events, returns the top N events with the lowest metrics.

(fixed-time-window {:duration 60}
  (coll-bottom
    (info)))

coll-bottom*

(coll-bottom* _ nb-events & children)

coll-count

(coll-count & children)

Count the number of events. Should receive a list of events from the previous stream. The most recent event is used as a base to create the new event, and its :metric field is set to the number of events received as input.

(fixed-time-window {:duration 60}
  (coll-count
    (debug)))

coll-count*

(coll-count* _ & children)

coll-max

(coll-max & children)

Returns the event with the biggest metric. Should receive a list of events from the previous stream.

(fixed-event-window {:size 10}
  (coll-max
    (debug)))

Get the event the biggest metric on windows of 10 events

coll-max*

(coll-max* _ & children)

coll-mean

(coll-mean & children)

Computes the events mean (on metric). Should receive a list of events from the previous stream. The most recent event is used as a base to create the new event

(fixed-event-window {:size 10}
  (coll-mean
    (debug)))

Computes the mean on windows of 10 events

coll-mean*

(coll-mean* _ & children)

coll-min

(coll-min & children)

Returns the event with the smallest metric. Should receive a list of events from the previous stream.

(fixed-event-window {:size 10}
  (coll-min
    (debug)))

Get the event the smallest metric on windows of 10 events

coll-min*

(coll-min* _ & children)

coll-percentiles

(coll-percentiles points & children)

Receives a list of events and selects one event from that period for each point. If point is 0, takes the lowest metric event. If point is 1, takes the highest metric event. 0.5 is the median event, and so forth. Forwards each of these events to children. The event has the point appended the :quantile key. Useful for extracting histograms and percentiles.

(fixed-event-window {:size 10}
  (coll-percentiles [0.5 0.75 0.98 0.99]))

coll-percentiles*

(coll-percentiles* _ points & children)

coll-quotient

(coll-quotient & children)

Divide the first event :metrìc field by all subsequent events :metric. Return a new event containing the new :metric.

Should receive a list of events from the previous stream.

coll-quotient*

(coll-quotient* _ & children)

coll-rate

(coll-rate & children)

Computes the rate on a list of events. Should receive a list of events from the previous stream. The latest event is used as a base to build the new event.

(fixed-event-window {:size 3}
  (coll-rate
    (debug)))

If this example receives the events:

{:metric 1 :time 1} {:metric 2 :time 2} {:metric 1 :time 3}

The stream will return {:metric 2 :time 3}

Indeed, (1+2+1)/2 = 3 (we divide by 2 because we have 2 seconds between the min and max events time).

coll-rate*

(coll-rate* _ & children)

coll-sum

(coll-sum & children)

Sum all the events :metric fields Should receive a list of events from the previous stream.

(fixed-event-window {:size 10}
  (coll-sum
    (debug)))

Sum all :metric fields for windows of 10 events

coll-sum*

(coll-sum* _ & children)

coll-top

(coll-top nb-events & children)

Receives a list of events, returns the top N events with the highest metrics.

(fixed-time-window {:duration 60}
  (coll-top
    (info)))

coll-top*

(coll-top* _ nb-events & children)

cond-dt*

(cond-dt* _ conditions dt & children)

A stream which detects if a condition (f event) is true during dt seconds. Takes conditions (like in the where action) and a time period dt in seconds. If the condition is valid for all events received during at least the period dt, valid events received after the dt period will be passed on until an invalid event arrives. Skips events that are too old or that do not have a timestamp.

critical

(critical & children)

Keep all events in state critical.

(critical
  (error))

In this example, all events with :state “critical” will be logged.

critical*

(critical* _ & children)

critical-dt

(critical-dt config & children)

Takes a time period in seconds duration. If all events received during at least the period duration have :state critical, new critical events received after the duration period will be passed on until an invalid event arrives.

(critical-dt {:duration 10}
  (debug))

In this example, if the events :state are “critical” for more than 10 seconds, events are passed downstream.

custom

(custom action-name params & children)

Executes a custom action. Custom actions are defined in the Mirabelle configuration file. The actomn can then be called (by name) using this custom action.

(custom :my-custom-action ["parameters"]
  (info))

ddt

(ddt & children)

Differentiate metrics with respect to time. Takes an optional number followed by child streams. Emits an event for each event received, but with metric equal to the difference between the current event and the previous one, divided by the difference in their times. Skips events without metrics.

(ddt
  (info))

If ddt receives {:metric 1 :time 1} and {:metric 10 :time 4}, it will produce {:metric (/ 9 3) :time 4}.

ddt*

(ddt* _ remove-neg? & children)

ddt-pos

(ddt-pos & children)

Like ddt but do not forward events with negative metrics. This can be used for counters which may be reseted to zero for example.

debug

(debug)

Print the event in the logs using the debug level

(increment
  (debug))

debug*

(debug* _)

decrement

(decrement & children)

Decrement the event :metric field.

(decrement
  (index [:host]))

decrement*

(decrement* _ & children)

default

(default field value & children)

Set a default value for an event

(default :state "ok"
  (info))

In this example, all events where :state is not set will be updated with :state to “ok”.

default*

(default* _ field value & children)

discard-fn

(discard-fn e)

disk-queue!

(disk-queue!)

Write events into the on-disk queue.

disk-queue!*

(disk-queue!* context)

error

(error)

Print the event in the logs using the error level

(increment
  (debug))

error*

(error* _)

ewma-timeless

(ewma-timeless r & children)

Exponential weighted moving average. Constant space and time overhead. Passes on each event received, but with metric adjusted to the moving average. Does not take the time between events into account. R is the ratio between successive events: r=1 means always return the most recent metric; r=1/2 means the current event counts for half, the previous event for 1/4, the previous event for 1/8, and so on.

ewma-timeless*

(ewma-timeless* _ r & children)

exception->event

(exception->event e base-event)

Build a new event from an Exception and from the event which caused it.

exception-stream

(exception-stream & children)

Takes two actions. If an exception is thrown in the first action, an event representing this exception is emitted in in the second action.

(exception-stream
  (bad-action)
  (error))

Here, if bad-action throws, an event will be built (using the exception->event function) and sent to the error action (which will log it).

exception-stream*

(exception-stream* _ success-child failure-child)

expired

(expired & children)

Keep expired events.

(expired
  (increment))

In this example, all expired events will be forwarded to the incrementstream.

expired*

(expired* _ & children)

Keep expired events.

fixed-event-window

(fixed-event-window config & children)

Returns a fixed-sized window of events.

(fixed-event-window {:size 5}
  (debug))

This example will return a vector events partitioned 5 by 5.

fixed-event-window*

(fixed-event-window* _ {:keys [size]} & children)

fixed-time-window

(fixed-time-window config & children)

A fixed window over the event stream in time. Emits vectors of events, such that each vector has events from a distinct n-second interval. Windows do not overlap; each event appears at most once in the output stream. Once an event is emitted, all events older or equal to that emitted event are silently dropped.

Events without times accrue in the current window.

(fixed-time-window {:duration 60}
  (coll-max
    (info)))

fixed-time-window*

(fixed-time-window* _ {:keys [duration]} & children)

from-base64

(from-base64 field & children)

Convert a field or multiple fields from base64 to string. Fields values should be string.

(sdo
  ;; you can pass one field
  (from-base64 :host)
  ;; or a list of fields
  (from-base64 [:host :service]))

from-base64*

(from-base64* _ fields & children)

get-env-profile

(get-env-profile)

include

(include path config)

Include an configuration file by path into the configuration. The file will be read using the aero (https://github.com/juxt/aero/) library. The config variable supports these optional options:

  • :profile: the aero profile to use. By default, Mirabelle will read (and convert to a Clojure keyword) the PROFILE environment variable during compilation. You can override this value by setting :profile.
  • :variables: variables to pass to the configuration file. You can use the #mirabelle/var reader in order to define variables in your EDN file.

This allows you to use the same configuration snippet (eventually templated) from multiple streams (or multiple parts of the same stream)

(includes"/etc/mirabelle/includes/my-actions.clj {:profile :dev
                                                   :variables {:foo "bar"})

increment

(increment & children)

Increment the event :metric field.

(increment
  (index [:host]))

increment*

(increment* _ & children)

index

(index labels)

Insert events into the index. Events are indexed using the keys passed as parameter.

(index [:host :service])

This example will index events by host and services.

index*

(index* context labels)

info

(info)

Print the event in the logs using the info level

(increment
  (info))

info*

(info* _)

io

(io & children)

Discard all events in test mode. Else, forward to children. You can use this stream to avoid side effects in test mode.

io*

(io* context & children)

json-fields

(json-fields fields & children)

Takes a field or a list of fields, and converts the values associated to these fields from json to edn.

(with :my-field "{"foo": "bar"}
  (json-fields [:my-field]))

In this example, we associate to :my-field a json string and then we call json-fields on it. :my-field will now contain an edn map built from the json data, with keywords as keys.

json-fields*

(json-fields* _ fields & children)

keep-keys

(keep-keys keys-to-keep & children)

Keep only the specified keys for events.

(keep-keys [:host :metric :time :environment :description]
  (info))

keep-keys*

(keep-keys* _ keys-to-keep & children)

keep-non-discarded-events

(keep-non-discarded-events events)

Takes an event or a list of events. Returns an event (or a list of events depending of the input) with all events tagged “discard” filtered. Returns nil if all events are filtered.

keyword->aggr-f

log-action

(log-action level)

Generic logger

moving-event-window

(moving-event-window config & children)

A sliding window of the last few events. Every time an event arrives, calls children with a vector of the last n events, from oldest to newest. Ignores event times. Example:

(moving-event-window {:size 5}
  (coll-mean (info))

moving-event-window*

(moving-event-window* _ config & children)

not-expired

(not-expired & children)

Keep non-expired events.

(not-expired
  (increment))

In this example, all non-expired events will be forwarded to the `increment`stream.

not-expired*

(not-expired* _ & children)

Keep non-expired events.

outside-dt

(outside-dt config & children)

Takes two numbers, low and high, and a time period in seconds, duration. If the condition the event metric is < low or > high is valid for all events received during at least the period duration, valid events received after the duration period will be passed on until an invalid event arrives. :metric should not be nil (it will produce exceptions).

(outside-dt {:low 50 :high 100 :duration 10}
  (debug))

In this example, if the events :metric field are outside the 50-100 range for more than 10 seconds, events are passed downstream.

over

(over n & children)

Passes on events only when their metric is greater than x.

(over 10
  (info))

over*

(over* _ n & children)

project

(project conditions & children)

Takes a list of conditions. Like coalesce, project will return the most recent events matching the conditions.

(project [[:= :service "enqueues"]
          [:= :service "dequeues"]]
  (coll-quotient
    (with :service "enqueues per dequeue"
      (info))))

We divide here the latest event for the “enqueues” :service by the latest event from the “dequeues” one.

project*

(project* _ conditions & children)

publish!

(publish! channel)

Publish events in the given channel.

(publish! :my-channel)

Users can then subscribe to channels using the websocket engine.

publish!*

(publish!* context channel)

push-io!

(push-io! io-name)

Push events to an external system.

I/O are defined in a dedicated file. If you create a new I/O named :influxdb for example, you can use push-io! to push all events into this I/O:

(push-io! :influxdb)

I/O are automatically discarded in test mode.

push-io!*

(push-io!* context io-name)

reaper

(reaper interval)(reaper interval destination-stream)

Everytime this action receives an event, it will expires events from the index (every dt seconds) and reinject them into a stream (default to the current stream if not specified).

(reaper 5)
(reaper 5 :custom-stream)

reaper*

(reaper* context interval destination-stream)

reinject!

(reinject!)(reinject! destination-stream)

Reinject an event into the streaming system. By default, events are reinject into the real time engine. You can reinject events to a specific stream by passing the destination stream as parameter.

(reinject)

This example reinjects events into the real stream engine.

(reinject :foo)

This example reinjects events into the stream named :foo.

reinject!*

(reinject!* context destination-stream)

rename-keys

(rename-keys replacement & children)

Rename events keys.

(rename-keys {:host :service
              :environment :env}

In this example, the :host key will be renamed :service and the :environment key is renamed :env. Existing values will be overrided.

rename-keys*

(rename-keys* _ replacement & children)

scale

(scale factor & children)

Multiplies the event :metric field by the factor passed as parameter.

(scale 1000
  (info

This example will multiply the :metric field for all events by 1000.

scale*

(scale* _ factor & children)

sdissoc

(sdissoc fields & children)

Remove a key (or a list of keys) from the events/

(sdissoc :host (info))

(sdissoc [:environment :host] (info))

sdissoc*

(sdissoc* _ fields & children)

sdo

(sdo & children)

Send events to children. useful when you want to send the same events to multiple downstream actions.

(sdo
  (increment)
  (decrement))

Here, events arriving in sdo will be forwarded to both increment and decrement.

sdo*

(sdo* _ & children)

sflatten

(sflatten & children)

Streaming flatten. Calls children with each event in events. Events should be a sequence.

This stream can be used to “flat” a sequence of events (emitted by a time window stream for example).

(fixed-event-window {:size 5}
  (sflatten
    (info)))

sflatten*

(sflatten* _ & children)

sformat

(sformat template target-field fields & children)

Takes the content of multiple event keys, and use them to build a string value and assign it to a given key.

(sformat "%s-foo-%s" :format-test [:host :service])

If the event {:host "machine" :service "bar"} is passed to this action the event will become {:host "machine" :service "bar" :format-test "machine-foo-bar"}.

More information about availables formatters in the Clojure documentation: https://clojuredocs.org/clojure.core/format

sformat*

(sformat* _ template target-field fields & children)

split

(split & clauses)

Split by conditions.

(split
  [:> :metric 10] (debug)
  [:> :metric 5] (info)
  (error)

In this example, all events with :metric > 10 will go into the debug stream, all events with :metric > 5 in the info stream, and other events will to the default stream which is “error”.

The default stream is optional, if not set all events not matching a condition will be discarded.

split*

(split* _ clauses & children)

stable

(stable dt field & children)

Takes a duration (dt) in second and a field name as parameter. Returns events where the value of the field specified as second argument is equal to the value of the field for the last event, for at least dt seconds. Events can be buffered for dt seconds before being forwarded in order to see if they are stable or not.

Events should arrive in order (old events will be dropped).

You can use this stream to remove flapping states for example.

(stable 10 :state
  (info))

In this example, events will be forwarded of the value of the :state key is the same for at least 10 seconds

stable*

(stable* _ dt field & children)

stream

(stream config & children)

Creates a new stream. This action takes a map where the :name key, which will be the name of the stream, is mandatory.

streams

(streams & streams)

Entrypoint for all streams.

(streams
  (stream {:name :fobar}
    (info))
  (stream {:name :foo}
    (info)))

tag

(tag tags & children)

Adds a new tag, or set of tags, to events which flow through.

(tag "foo"
  (info))

This example adds the tag “foo” to events.

(tag ["foo" "bar"] (info))

This example adds the tag “foo” and “bar” to events.

tag*

(tag* _ tags & children)

tagged-all

(tagged-all tags & children)

Passes on events where all tags are present.

(tagged-all "foo"
  (info))

This example keeps only events tagged “foo”.

(tagged-all ["foo" "bar"] (info))

This example keeps only events tagged “foo” and “bar”.

tagged-all*

(tagged-all* _ tags & children)

tap

(tap tap-name)

Save events into the tap. Noop outside tests.

(where [:= :service "foo"]
  (tap :foo)

In test mode, all events with :service “foo” will be saved in a tap named :foo

tap*

(tap* context tape-name)

test-action

(test-action state & children)

Bufferize all received events in the state (an atom) passed as parameter

test-action*

(test-action* _ state)

throttle

(throttle config & children)

Let N event pass at most every duration seconds. Can be used for example to avoid sending to limit the number of alerts sent to an external system.

(throttle {:count 3 :duration 10}
  (error))

In this example, throttle will let 3 events pass at most every 10 seconds. Other events, or events with no time, are filtered.

throttle*

(throttle* _ config & children)

to-base64

(to-base64 field & children)

Convert a field or multiple fields to base64. Fields values should be string.

(sdo
  ;; you can pass one field
  (to-base64 :host)
  ;; or a list of fields
  (to-base64 [:host :service]))

to-base64*

(to-base64* _ fields & children)

under

(under n & children)

Passes on events only when their metric is under than x.

(under 10
  (info))

under*

(under* _ n & children)

untag

(untag tags & children)

Removes a tag, or set of tags, from events which flow through.

(untag "foo" index)

This example removes the tag “foo” from events.

(untag ["foo" "bar"] index)

This example removes the tags “foo” and “bar” from events

untag*

(untag* _ tags & children)

warning

(warning & children)

Keep all events in state warning.

(warning
  (warning))

In this example, all events with :state “warning” will be logged.

warning*

(warning* _ & children)

where

(where conditions & children)

Filter events based on conditions. Each condition is a vector composed of the function to apply on the field, the field to extract from the event, and the event itself. Multiple conditions can be added by using :or or :and.

(where [:= :metric 4])

Here, we keep only events where the :metric field is equal to 4.

(where [:and [:= :host "foo"]
             [:> :metric 10])

Here, we keep only events with :host = foo and with :metric > 10

where*

(where* _ conditions & children)

with

(with & args)

Set an event field to the given value.

(with :state "critical"
  (debug))

This example set the field :state to “critical” for events.

A map can also be provided:

(with {:service "foo" :state "critical"}
  (debug))

This example set the the field :service to “foo” and the field :state to “critical” for events.

with*

(with* _ fields & children)