Bloblang Functions

Functions can be placed anywhere and allow you to extract information from your environment, generate values, or access data from the underlying message being mapped:

root.doc.id = uuid_v4()
root.doc.received_at = now()
root.doc.host = hostname()

Functions support both named and nameless style arguments:

root.values_one = range(start: 0, stop: this.max, step: 2)
root.values_two = range(0, this.max, 2)

# In:  {"max":10}

batch_index

Returns the zero-based index of the current message within its batch. Use this to conditionally process messages based on their position, or to create sequential identifiers within a batch.

Examples

root = if batch_index() > 0 { deleted() }

Create a unique identifier combining batch position with timestamp:

root.id = "%v-%v".format(timestamp_unix(), batch_index())

batch_size

Returns the total number of messages in the current batch. Use this to determine batch boundaries or compute relative positions.

Examples

root.total = batch_size()

Check if processing the last message in a batch:

root.is_last = batch_index() == batch_size() - 1

bytes

Creates a zero-initialized byte array of specified length. Use this to allocate fixed-size byte buffers for binary data manipulation or to generate padding.

Parameters

Name Type Description

length

integer

The size of the resulting byte array.

Examples

root.data = bytes(5)

Create a buffer for binary operations:

root.header = bytes(16)
root.payload = content()

content

Returns the raw message payload as bytes, regardless of the current mapping context. Use this to access the original message when working within nested contexts, or to store the entire message as a field.

Examples

root.doc = content().string()

# In:  {"foo":"bar"}
# Out: {"doc":"{\"foo\":\"bar\"}"}

Preserve original message while adding metadata:

root.original = content().string()
root.processed_by = "ai"

# In:  {"foo":"bar"}
# Out: {"original":"{\"foo\":\"bar\"}","processed_by":"ai"}

count

This method is deprecated and will be removed in a future version.

The count function is a counter starting at 1 which increments after each time it is called. Count takes an argument which is an identifier for the counter, allowing you to specify multiple unique counters in your configuration.

Parameters

Name Type Description

name

string

An identifier for the counter.

Examples

root = this
root.id = count("bloblang_function_example")

# In:  {"message":"foo"}
# Out: {"id":1,"message":"foo"}

# In:  {"message":"bar"}
# Out: {"id":2,"message":"bar"}

counter

Generates an incrementing sequence of integers starting from a minimum value (default 1). Each counter instance maintains its own independent state across message processing. When the maximum value is reached, the counter automatically resets to the minimum.

Parameters

Name Type Description

min

query expression

The starting value of the counter. This is the first value yielded. Evaluated once when the mapping is initialized.

max

query expression

The maximum value before the counter resets to min. Evaluated once when the mapping is initialized.

set (optional)

query expression

An optional query that controls counter behavior: when it resolves to a non-negative integer, the counter is set to that value; when it resolves to null, the counter is read without incrementing; when it resolves to a deletion, the counter resets to min; otherwise the counter increments normally.

Examples

Generate sequential IDs for each message:

root.id = counter()

# In:  {}
# Out: {"id":1}

# In:  {}
# Out: {"id":2}

Use a custom range for the counter:

root.batch_num = counter(min: 100, max: 200)

# In:  {}
# Out: {"batch_num":100}

# In:  {}
# Out: {"batch_num":101}

Increment a counter multiple times within a single mapping using a named map:

map increment {
  root = counter()
}

root.first_id = null.apply("increment")
root.second_id = null.apply("increment")

# In:  {}
# Out: {"first_id":1,"second_id":2}

# In:  {}
# Out: {"first_id":3,"second_id":4}

Conditionally reset a counter based on input data:

root.streak = counter(set: if this.status != "success" { 0 })

# In:  {"status":"success"}
# Out: {"streak":1}

# In:  {"status":"success"}
# Out: {"streak":2}

# In:  {"status":"failure"}
# Out: {"streak":0}

# In:  {"status":"success"}
# Out: {"streak":1}

Peek at the current counter value without incrementing by using null in the set parameter:

root.count = counter(set: if this.peek { null })

# In:  {"peek":false}
# Out: {"count":1}

# In:  {"peek":false}
# Out: {"count":2}

# In:  {"peek":true}
# Out: {"count":2}

# In:  {"peek":false}
# Out: {"count":3}

deleted

Returns a deletion marker that removes the target field or message. When applied to root, the entire message is dropped while still being acknowledged as successfully processed. Use this to filter data or conditionally remove fields.

Examples

root = this
root.bar = deleted()

# In:  {"bar":"bar_value","baz":"baz_value","foo":"foo value"}
# Out: {"baz":"baz_value","foo":"foo value"}

Filter array elements by returning deleted for unwanted items:

root.new_nums = this.nums.map_each(num -> if num < 10 { deleted() } else { num - 10 })

# In:  {"nums":[3,11,4,17]}
# Out: {"new_nums":[1,7]}

env

Reads an environment variable and returns its value as a string. Returns null if the variable is not set. By default, values are cached for performance.

Parameters

Name Type Description

name

string

The name of the environment variable to read.

no_cache

bool

Disable caching to read the latest value on each invocation.

Examples

root.api_key = env("API_KEY")
root.database_url = env("DB_URL").or("localhost:5432")

Use no_cache to read updated environment variables during runtime, useful for dynamic configuration changes:

root.config = env(name: "DYNAMIC_CONFIG", no_cache: true)

error

Returns the error message string if the message has failed processing, otherwise null. Use this in error handling pipelines to log or route failed messages based on their error details.

Examples

root.doc.error = error()

Route messages to different outputs based on error presence:

root = this
root.error_msg = error()
root.has_error = error() != null

error_source_label

Returns the user-defined label of the component that caused the error, empty string if no label is set, or null if the message has no error. Use this for more human-readable error tracking when components have custom labels.

Examples

root.doc.error_source_label = error_source_label()

Route errors based on component labels:

root.error_category = error_source_label().or("unknown")

error_source_name

Returns the component name that caused the error, or null if the message has no error or the error has no associated component. Use this to identify which processor or component in your pipeline caused a failure.

Examples

root.doc.error_source_name = error_source_name()

Create detailed error logs with component information:

root.error_details = if errored() {
  {
    "message": error(),
    "component": error_source_name(),
    "timestamp": now()
  }
}

error_source_path

Returns the dot-separated path to the component that caused the error, or null if the message has no error. Use this to identify the exact location of a failed component in nested pipeline configurations.

Examples

root.doc.error_source_path = error_source_path()

Build comprehensive error context for debugging:

root.error_info = {
  "path": error_source_path(),
  "component": error_source_name(),
  "message": error()
}

errored

Returns true if the message has failed processing, false otherwise. Use this for conditional logic in error handling workflows or to route failed messages to dead letter queues.

Examples

root.doc.status = if errored() { 400 } else { 200 }

Send only failed messages to a separate stream:

root = if errored() { this } else { deleted() }

fake

Generates realistic fake data for testing and development purposes. Supports a wide variety of data types including personal information, network addresses, dates/times, financial data, and UUIDs. Useful for creating mock data, populating test databases, or anonymizing sensitive information.

Supported functions: latitude, longitude, unix_time, date, time_string, month_name, year_string, day_of_week, day_of_month, timestamp, century, timezone, time_period, email, mac_address, domain_name, url, username, ipv4, ipv6, password, jwt, word, sentence, paragraph, cc_type, cc_number, currency, amount_with_currency, title_male, title_female, first_name, first_name_male, first_name_female, last_name, name, gender, chinese_first_name, chinese_last_name, chinese_name, phone_number, toll_free_phone_number, e164_phone_number, uuid_hyphenated, uuid_digit.

Parameters

Name Type Description

function

string

The name of the faker function to use. See description for full list of supported functions.

Examples

Generate fake user profile data for testing:

root.user = {
  "id": fake("uuid_hyphenated"),
  "name": fake("name"),
  "email": fake("email"),
  "created_at": fake("timestamp")
}

Create realistic test data for network monitoring:

root.event = {
  "source_ip": fake("ipv4"),
  "mac_address": fake("mac_address"),
  "url": fake("url")
}

file

Reads a file and returns its contents as bytes. Paths are resolved from the process working directory. For paths relative to the mapping file, use file_rel. By default, files are cached after first read.

Parameters

Name Type Description

path

string

The absolute or relative path to the file.

no_cache

bool

Disable caching to read the latest file contents on each invocation.

Examples

root.config = file("/etc/config.json").parse_json()
root.template = file("./templates/email.html").string()

Use no_cache to read updated file contents during runtime, useful for hot-reloading configuration:

root.rules = file(path: "/etc/rules.yaml", no_cache: true).parse_yaml()

file_rel

Reads a file and returns its contents as bytes. Paths are resolved relative to the mapping file’s directory, making it portable across different environments. By default, files are cached after first read.

Parameters

Name Type Description

path

string

The path to the file, relative to the mapping file’s directory.

no_cache

bool

Disable caching to read the latest file contents on each invocation.

Examples

root.schema = file_rel("./schemas/user.json").parse_json()
root.lookup = file_rel("../data/lookup.csv").parse_csv()

Use no_cache to read updated file contents during runtime, useful for reloading data files without restarting:

root.translations = file_rel(path: "./i18n/en.yaml", no_cache: true).parse_yaml()

hostname

Returns the hostname of the machine running Benthos. Useful for identifying which instance processed a message in distributed deployments.

Examples

root.processed_by = hostname()

json

Returns a field from the original JSON message by dot path, always accessing the root document regardless of mapping context. Use this to reference the source message when working in nested contexts or to extract specific fields.

Parameters

Name Type Description

path

string

An optional [dot path][field_paths] identifying a field to obtain.

Examples

root.mapped = json("foo.bar")

# In:  {"foo":{"bar":"hello world"}}
# Out: {"mapped":"hello world"}

Access the original message from within nested mapping contexts:

root.doc = json()

# In:  {"foo":{"bar":"hello world"}}
# Out: {"doc":{"foo":{"bar":"hello world"}}}

ksuid

Generates a K-Sortable Unique Identifier with built-in timestamp ordering. Use this for distributed unique IDs that sort chronologically and remain collision-resistant without coordination between generators.

Examples

root.id = ksuid()

Create sortable event IDs for logging:

root.event = {
  "id": ksuid(),
  "type": this.event_type,
  "data": this.payload
}

meta

This method is deprecated and will be removed in a future version.

Returns the value of a metadata key from the input message as a string, or null if the key does not exist. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. In order to query metadata mutations made within a mapping use the root_meta function. This function supports extracting metadata from other messages of a batch with the from method.

Parameters

Name Type Description

key

string

An optional key of a metadata value to obtain.

Examples

root.topic = meta("kafka_topic")

The key parameter is optional and if omitted the entire metadata contents are returned as an object:

root.all_metadata = meta()

metadata

Returns metadata from the input message by key, or null if the key doesn’t exist. This reads the original metadata; to access modified metadata during mapping, use the @ operator instead. Use this to extract message properties like topics, headers, or timestamps.

Parameters

Name Type Description

key

string

An optional key of a metadata value to obtain.

Examples

root.topic = metadata("kafka_topic")

Retrieve all metadata as an object by omitting the key parameter:

root.all_metadata = metadata()

Copy specific metadata fields to the message body:

root.source = {
  "topic": metadata("kafka_topic"),
  "partition": metadata("kafka_partition"),
  "timestamp": metadata("kafka_timestamp_unix")
}

nanoid

Generates a URL-safe unique identifier using Nano ID. Use this for compact, URL-friendly IDs with good collision resistance. Customize the length (default 21) or provide a custom alphabet for specific character requirements.

Parameters

Name Type Description

length (optional)

integer

An optional length.

alphabet (optional)

string

An optional custom alphabet to use for generating IDs. When specified the field length must also be present.

Examples

root.id = nanoid()

Generate a longer ID for additional uniqueness:

root.id = nanoid(54)

Use a custom alphabet for domain-specific IDs:

root.id = nanoid(54, "abcde")

nothing

now

Returns the current timestamp as an RFC 3339 formatted string with nanosecond precision. Use this to add processing timestamps to messages or measure time between events. Chain with ts_format to customize the format or timezone.

Examples

root.received_at = now()

Format the timestamp in a custom format and timezone:

root.received_at = now().ts_format("Mon Jan 2 15:04:05 -0700 MST 2006", "UTC")

pi

Returns the value of the mathematical constant Pi.

Examples

root.radians = this.degrees * (pi() / 180)

# In:  {"degrees":45}
# Out: {"radians":0.7853981633974483}
root.degrees = this.radians * (180 / pi())

# In:  {"radians":0.78540}
# Out: {"degrees":45.00010522957486}

random_int

Generates a pseudo-random non-negative 64-bit integer. Use this for creating random IDs, sampling data, or generating test values. Provide a seed for reproducible randomness, or use a dynamic seed like timestamp_unix_nano() for unique values per mapping instance.

Optional min and max parameters constrain the output range (both inclusive). For dynamic ranges based on message data, use the modulo operator instead: random_int() % dynamic_max + dynamic_min.

Parameters

Name Type Description

seed

query expression

A seed to use, if a query is provided it will only be resolved once during the lifetime of the mapping.

min

integer

The minimum value the random generated number will have. The default value is 0.

max

integer

The maximum value the random generated number will have. The default value is 9223372036854775806 (math.MaxInt64 - 1).

Examples

root.first = random_int()
root.second = random_int(1)
root.third = random_int(max:20)
root.fourth = random_int(min:10, max:20)
root.fifth = random_int(timestamp_unix_nano(), 5, 20)
root.sixth = random_int(seed:timestamp_unix_nano(), max:20)

Use a dynamic seed for unique random values per mapping instance:

root.random_id = random_int(timestamp_unix_nano())
root.sample_percent = random_int(seed: timestamp_unix_nano(), min: 0, max: 100)

range

Creates an array of integers from start (inclusive) to stop (exclusive) with an optional step. Use this to generate sequences for iteration, indexing, or creating numbered lists.

Parameters

Name Type Description

start

integer

The start value.

stop

integer

The stop value.

step

integer

The step value.

Examples

root.a = range(0, 10)
root.b = range(start: 0, stop: this.max, step: 2) # Using named params
root.c = range(0, -this.max, -2)

# In:  {"max":10}
# Out: {"a":[0,1,2,3,4,5,6,7,8,9],"b":[0,2,4,6,8],"c":[0,-2,-4,-6,-8]}

Generate a sequence for batch processing:

root.pages = range(0, this.total_items, 100).map_each(offset -> {
  "offset": offset,
  "limit": 100
})

# In:  {"total_items":250}
# Out: {"pages":[{"limit":100,"offset":0},{"limit":100,"offset":100}]}

root_meta

This method is deprecated and will be removed in a future version.

Returns the value of a metadata key from the new message being created as a string, or null if the key does not exist. Changes made to metadata during a mapping will be reflected by this function.

Parameters

Name Type Description

key

string

An optional key of a metadata value to obtain.

Examples

root.topic = root_meta("kafka_topic")

The key parameter is optional and if omitted the entire metadata contents are returned as an object:

root.all_metadata = root_meta()

snowflake_id

Generates a unique, time-ordered Snowflake ID. Snowflake IDs are 64-bit integers that encode timestamp, node ID, and sequence information, making them ideal for distributed systems where sortable unique identifiers are needed. Returns a string representation of the ID.

Parameters

Name Type Description

node_id

integer

Optional node identifier (0-1023) to distinguish IDs generated by different machines in a distributed system. Defaults to 1.

Examples

Generate a unique Snowflake ID for each message:

root.id = snowflake_id()
root.payload = this

Generate Snowflake IDs with different node IDs for multi-datacenter deployments:

root.id = snowflake_id(42)
root.data = this

throw

Immediately fails the mapping with a custom error message. Use this to halt processing when data validation fails or required fields are missing, causing the message to be routed to error handlers.

Parameters

Name Type Description

why

string

A string explanation for why an error was thrown, this will be added to the resulting error message.

Examples

root.doc.type = match {
  this.exists("header.id") => "foo"
  this.exists("body.data") => "bar"
  _ => throw("unknown type")
}
root.doc.contents = (this.body.content | this.thing.body)

# In:  {"header":{"id":"first"},"thing":{"body":"hello world"}}
# Out: {"doc":{"contents":"hello world","type":"foo"}}

# In:  {"nothing":"matches"}
# Out: Error("failed assignment (line 1): unknown type")

Validate required fields before processing:

root = if this.exists("user_id") {
  this
} else {
  throw("missing required field: user_id")
}

# In:  {"user_id":123,"name":"alice"}
# Out: {"name":"alice","user_id":123}

# In:  {"name":"bob"}
# Out: Error("failed assignment (line 1): missing required field: user_id")

timestamp_unix

Returns the current Unix timestamp in seconds since epoch. Use this for numeric timestamps compatible with most systems, or as a seed for random number generation.

Examples

root.received_at = timestamp_unix()

Create a sortable ID combining timestamp with a counter:

root.id = "%v-%v".format(timestamp_unix(), batch_index())

timestamp_unix_micro

Returns the current Unix timestamp in microseconds since epoch. Use this for high-precision timing measurements or when microsecond resolution is required.

Examples

root.received_at = timestamp_unix_micro()

Measure elapsed time between events:

root.processing_duration_us = timestamp_unix_micro() - this.start_time_us

timestamp_unix_milli

Returns the current Unix timestamp in milliseconds since epoch. Use this for millisecond-precision timestamps common in web APIs and JavaScript systems.

Examples

root.received_at = timestamp_unix_milli()

Add processing time metadata:

meta processing_time_ms = timestamp_unix_milli()

timestamp_unix_nano

Returns the current Unix timestamp in nanoseconds since epoch. Use this for the highest precision timing or as a unique seed value that changes on every invocation.

Examples

root.received_at = timestamp_unix_nano()

Generate unique random values on each mapping:

root.random_value = random_int(timestamp_unix_nano())

tracing_id

Returns the OpenTelemetry trace ID for the message, or an empty string if no tracing span exists. Use this to correlate logs and events with distributed traces.

Examples

meta trace_id = tracing_id()

Add trace ID to structured logs:

root.log_entry = this
root.log_entry.trace_id = tracing_id()

tracing_span

Returns the OpenTelemetry tracing span attached to the message as a text map object, or null if no span exists. Use this to propagate trace context to downstream systems via headers or metadata.

Examples

root.headers.traceparent = tracing_span().traceparent

# In:  {"some_stuff":"just can't be explained by science"}
# Out: {"headers":{"traceparent":"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}}

Forward all tracing fields to output metadata:

meta = tracing_span()

ulid

Generates a Universally Unique Lexicographically Sortable Identifier (ULID). ULIDs are 128-bit identifiers that are sortable by creation time, URL-safe, and case-insensitive. They consist of a 48-bit timestamp (millisecond precision) and 80 bits of randomness, making them ideal for distributed systems that need time-ordered unique IDs without coordination.

Parameters

Name Type Description

encoding

string

Encoding format for the ULID. "crockford" produces 26-character Base32 strings (recommended). "hex" produces 32-character hexadecimal strings.

random_source

string

Randomness source: "secure_random" uses cryptographically secure random (recommended for production), "fast_random" uses faster but non-secure random (only for non-sensitive testing).

Examples

Generate time-sortable IDs for distributed message ordering:

root.message_id = ulid()
root.timestamp = now()
root.data = this

Generate hex-encoded ULIDs for systems that prefer hexadecimal format:

root.id = ulid("hex")

uuid_v4

Generates a random RFC-4122 version 4 UUID. Use this for creating unique identifiers that don’t reveal timing information or require ordering. Each invocation produces a new globally unique ID.

Examples

root.id = uuid_v4()

Add unique request IDs for tracing:

root = this
root.request_id = uuid_v4()

uuid_v7

Generates a time-ordered UUID version 7 with millisecond timestamp precision. Use this for sortable unique identifiers that maintain chronological ordering, ideal for database keys or event IDs. Optionally specify a custom timestamp.

Parameters

Name Type Description

time (optional)

timestamp

An optional timestamp to use for the time ordered portion of the UUID.

Examples

root.id = uuid_v7()

Generate a UUID with a specific timestamp for backdating events:

root.id = uuid_v7(now().ts_sub_iso8601("PT1M"))

var

Parameters

Name Type Description

name

string

The name of the target variable.

with_schema_registry_header

Prepends a Confluent Schema Registry wire format header to message bytes. The header is 5 bytes: a magic byte (0x00) followed by a 4-byte big-endian schema ID. This format is required when producing messages to Kafka topics that use Confluent Schema Registry for schema validation and evolution.

Parameters

Name Type Description

schema_id

unknown

The schema ID from your Schema Registry (0 to 4294967295). This ID references the schema version used to encode the message.

message

unknown

The serialized message bytes (e.g., Avro, Protobuf, or JSON Schema encoded data) to prepend the header to.

Examples

Add Schema Registry header to Avro-encoded message:

root = with_schema_registry_header(123, content())

Use schema ID from metadata to add header dynamically:

root = with_schema_registry_header(meta("schema_id").number(), content())