Docs Connect Bloblang Functions Bloblang Functions Page options Copy as Markdown Copied! View as plain text Ask AI about this topic Add MCP server to VS Code Functions can be placed anywhere and allow you to extract information from your environment, generate values, or access data from the underlying message being mapped: root.doc.id = uuid_v4() root.doc.received_at = now() root.doc.host = hostname() Functions support both named and nameless style arguments: root.values_one = range(start: 0, stop: this.max, step: 2) root.values_two = range(0, this.max, 2) # In: {"max":10} batch_index Returns the zero-based index of the current message within its batch. Use this to conditionally process messages based on their position, or to create sequential identifiers within a batch. Examples root = if batch_index() > 0 { deleted() } Create a unique identifier combining batch position with timestamp: root.id = "%v-%v".format(timestamp_unix(), batch_index()) batch_size Returns the total number of messages in the current batch. Use this to determine batch boundaries or compute relative positions. Examples root.total = batch_size() Check if processing the last message in a batch: root.is_last = batch_index() == batch_size() - 1 bytes Creates a zero-initialized byte array of specified length. Use this to allocate fixed-size byte buffers for binary data manipulation or to generate padding. Parameters Name Type Description length integer The size of the resulting byte array. Examples root.data = bytes(5) Create a buffer for binary operations: root.header = bytes(16) root.payload = content() content Returns the raw message payload as bytes, regardless of the current mapping context. Use this to access the original message when working within nested contexts, or to store the entire message as a field. Examples root.doc = content().string() # In: {"foo":"bar"} # Out: {"doc":"{\"foo\":\"bar\"}"} Preserve original message while adding metadata: root.original = content().string() root.processed_by = "ai" # In: {"foo":"bar"} # Out: {"original":"{\"foo\":\"bar\"}","processed_by":"ai"} count This method is deprecated and will be removed in a future version. The count function is a counter starting at 1 which increments after each time it is called. Count takes an argument which is an identifier for the counter, allowing you to specify multiple unique counters in your configuration. Parameters Name Type Description name string An identifier for the counter. Examples root = this root.id = count("bloblang_function_example") # In: {"message":"foo"} # Out: {"id":1,"message":"foo"} # In: {"message":"bar"} # Out: {"id":2,"message":"bar"} counter Generates an incrementing sequence of integers starting from a minimum value (default 1). Each counter instance maintains its own independent state across message processing. When the maximum value is reached, the counter automatically resets to the minimum. Parameters Name Type Description min query expression The starting value of the counter. This is the first value yielded. Evaluated once when the mapping is initialized. max query expression The maximum value before the counter resets to min. Evaluated once when the mapping is initialized. set (optional) query expression An optional query that controls counter behavior: when it resolves to a non-negative integer, the counter is set to that value; when it resolves to null, the counter is read without incrementing; when it resolves to a deletion, the counter resets to min; otherwise the counter increments normally. Examples Generate sequential IDs for each message: root.id = counter() # In: {} # Out: {"id":1} # In: {} # Out: {"id":2} Use a custom range for the counter: root.batch_num = counter(min: 100, max: 200) # In: {} # Out: {"batch_num":100} # In: {} # Out: {"batch_num":101} Increment a counter multiple times within a single mapping using a named map: map increment { root = counter() } root.first_id = null.apply("increment") root.second_id = null.apply("increment") # In: {} # Out: {"first_id":1,"second_id":2} # In: {} # Out: {"first_id":3,"second_id":4} Conditionally reset a counter based on input data: root.streak = counter(set: if this.status != "success" { 0 }) # In: {"status":"success"} # Out: {"streak":1} # In: {"status":"success"} # Out: {"streak":2} # In: {"status":"failure"} # Out: {"streak":0} # In: {"status":"success"} # Out: {"streak":1} Peek at the current counter value without incrementing by using null in the set parameter: root.count = counter(set: if this.peek { null }) # In: {"peek":false} # Out: {"count":1} # In: {"peek":false} # Out: {"count":2} # In: {"peek":true} # Out: {"count":2} # In: {"peek":false} # Out: {"count":3} deleted Returns a deletion marker that removes the target field or message. When applied to root, the entire message is dropped while still being acknowledged as successfully processed. Use this to filter data or conditionally remove fields. Examples root = this root.bar = deleted() # In: {"bar":"bar_value","baz":"baz_value","foo":"foo value"} # Out: {"baz":"baz_value","foo":"foo value"} Filter array elements by returning deleted for unwanted items: root.new_nums = this.nums.map_each(num -> if num < 10 { deleted() } else { num - 10 }) # In: {"nums":[3,11,4,17]} # Out: {"new_nums":[1,7]} env Reads an environment variable and returns its value as a string. Returns null if the variable is not set. By default, values are cached for performance. Parameters Name Type Description name string The name of the environment variable to read. no_cache bool Disable caching to read the latest value on each invocation. Examples root.api_key = env("API_KEY") root.database_url = env("DB_URL").or("localhost:5432") Use no_cache to read updated environment variables during runtime, useful for dynamic configuration changes: root.config = env(name: "DYNAMIC_CONFIG", no_cache: true) error Returns the error message string if the message has failed processing, otherwise null. Use this in error handling pipelines to log or route failed messages based on their error details. Examples root.doc.error = error() Route messages to different outputs based on error presence: root = this root.error_msg = error() root.has_error = error() != null error_source_label Returns the user-defined label of the component that caused the error, empty string if no label is set, or null if the message has no error. Use this for more human-readable error tracking when components have custom labels. Examples root.doc.error_source_label = error_source_label() Route errors based on component labels: root.error_category = error_source_label().or("unknown") error_source_name Returns the component name that caused the error, or null if the message has no error or the error has no associated component. Use this to identify which processor or component in your pipeline caused a failure. Examples root.doc.error_source_name = error_source_name() Create detailed error logs with component information: root.error_details = if errored() { { "message": error(), "component": error_source_name(), "timestamp": now() } } error_source_path Returns the dot-separated path to the component that caused the error, or null if the message has no error. Use this to identify the exact location of a failed component in nested pipeline configurations. Examples root.doc.error_source_path = error_source_path() Build comprehensive error context for debugging: root.error_info = { "path": error_source_path(), "component": error_source_name(), "message": error() } errored Returns true if the message has failed processing, false otherwise. Use this for conditional logic in error handling workflows or to route failed messages to dead letter queues. Examples root.doc.status = if errored() { 400 } else { 200 } Send only failed messages to a separate stream: root = if errored() { this } else { deleted() } fake Generates realistic fake data for testing and development purposes. Supports a wide variety of data types including personal information, network addresses, dates/times, financial data, and UUIDs. Useful for creating mock data, populating test databases, or anonymizing sensitive information. Supported functions: latitude, longitude, unix_time, date, time_string, month_name, year_string, day_of_week, day_of_month, timestamp, century, timezone, time_period, email, mac_address, domain_name, url, username, ipv4, ipv6, password, jwt, word, sentence, paragraph, cc_type, cc_number, currency, amount_with_currency, title_male, title_female, first_name, first_name_male, first_name_female, last_name, name, gender, chinese_first_name, chinese_last_name, chinese_name, phone_number, toll_free_phone_number, e164_phone_number, uuid_hyphenated, uuid_digit. Parameters Name Type Description function string The name of the faker function to use. See description for full list of supported functions. Examples Generate fake user profile data for testing: root.user = { "id": fake("uuid_hyphenated"), "name": fake("name"), "email": fake("email"), "created_at": fake("timestamp") } Create realistic test data for network monitoring: root.event = { "source_ip": fake("ipv4"), "mac_address": fake("mac_address"), "url": fake("url") } file Reads a file and returns its contents as bytes. Paths are resolved from the process working directory. For paths relative to the mapping file, use file_rel. By default, files are cached after first read. Parameters Name Type Description path string The absolute or relative path to the file. no_cache bool Disable caching to read the latest file contents on each invocation. Examples root.config = file("/etc/config.json").parse_json() root.template = file("./templates/email.html").string() Use no_cache to read updated file contents during runtime, useful for hot-reloading configuration: root.rules = file(path: "/etc/rules.yaml", no_cache: true).parse_yaml() file_rel Reads a file and returns its contents as bytes. Paths are resolved relative to the mapping file’s directory, making it portable across different environments. By default, files are cached after first read. Parameters Name Type Description path string The path to the file, relative to the mapping file’s directory. no_cache bool Disable caching to read the latest file contents on each invocation. Examples root.schema = file_rel("./schemas/user.json").parse_json() root.lookup = file_rel("../data/lookup.csv").parse_csv() Use no_cache to read updated file contents during runtime, useful for reloading data files without restarting: root.translations = file_rel(path: "./i18n/en.yaml", no_cache: true).parse_yaml() hostname Returns the hostname of the machine running Benthos. Useful for identifying which instance processed a message in distributed deployments. Examples root.processed_by = hostname() json Returns a field from the original JSON message by dot path, always accessing the root document regardless of mapping context. Use this to reference the source message when working in nested contexts or to extract specific fields. Parameters Name Type Description path string An optional [dot path][field_paths] identifying a field to obtain. Examples root.mapped = json("foo.bar") # In: {"foo":{"bar":"hello world"}} # Out: {"mapped":"hello world"} Access the original message from within nested mapping contexts: root.doc = json() # In: {"foo":{"bar":"hello world"}} # Out: {"doc":{"foo":{"bar":"hello world"}}} ksuid Generates a K-Sortable Unique Identifier with built-in timestamp ordering. Use this for distributed unique IDs that sort chronologically and remain collision-resistant without coordination between generators. Examples root.id = ksuid() Create sortable event IDs for logging: root.event = { "id": ksuid(), "type": this.event_type, "data": this.payload } meta This method is deprecated and will be removed in a future version. Returns the value of a metadata key from the input message as a string, or null if the key does not exist. Since values are extracted from the read-only input message they do NOT reflect changes made from within the map. In order to query metadata mutations made within a mapping use the root_meta function. This function supports extracting metadata from other messages of a batch with the from method. Parameters Name Type Description key string An optional key of a metadata value to obtain. Examples root.topic = meta("kafka_topic") The key parameter is optional and if omitted the entire metadata contents are returned as an object: root.all_metadata = meta() metadata Returns metadata from the input message by key, or null if the key doesn’t exist. This reads the original metadata; to access modified metadata during mapping, use the @ operator instead. Use this to extract message properties like topics, headers, or timestamps. Parameters Name Type Description key string An optional key of a metadata value to obtain. Examples root.topic = metadata("kafka_topic") Retrieve all metadata as an object by omitting the key parameter: root.all_metadata = metadata() Copy specific metadata fields to the message body: root.source = { "topic": metadata("kafka_topic"), "partition": metadata("kafka_partition"), "timestamp": metadata("kafka_timestamp_unix") } nanoid Generates a URL-safe unique identifier using Nano ID. Use this for compact, URL-friendly IDs with good collision resistance. Customize the length (default 21) or provide a custom alphabet for specific character requirements. Parameters Name Type Description length (optional) integer An optional length. alphabet (optional) string An optional custom alphabet to use for generating IDs. When specified the field length must also be present. Examples root.id = nanoid() Generate a longer ID for additional uniqueness: root.id = nanoid(54) Use a custom alphabet for domain-specific IDs: root.id = nanoid(54, "abcde") nothing now Returns the current timestamp as an RFC 3339 formatted string with nanosecond precision. Use this to add processing timestamps to messages or measure time between events. Chain with ts_format to customize the format or timezone. Examples root.received_at = now() Format the timestamp in a custom format and timezone: root.received_at = now().ts_format("Mon Jan 2 15:04:05 -0700 MST 2006", "UTC") pi Returns the value of the mathematical constant Pi. Examples root.radians = this.degrees * (pi() / 180) # In: {"degrees":45} # Out: {"radians":0.7853981633974483} root.degrees = this.radians * (180 / pi()) # In: {"radians":0.78540} # Out: {"degrees":45.00010522957486} random_int Generates a pseudo-random non-negative 64-bit integer. Use this for creating random IDs, sampling data, or generating test values. Provide a seed for reproducible randomness, or use a dynamic seed like timestamp_unix_nano() for unique values per mapping instance. Optional min and max parameters constrain the output range (both inclusive). For dynamic ranges based on message data, use the modulo operator instead: random_int() % dynamic_max + dynamic_min. Parameters Name Type Description seed query expression A seed to use, if a query is provided it will only be resolved once during the lifetime of the mapping. min integer The minimum value the random generated number will have. The default value is 0. max integer The maximum value the random generated number will have. The default value is 9223372036854775806 (math.MaxInt64 - 1). Examples root.first = random_int() root.second = random_int(1) root.third = random_int(max:20) root.fourth = random_int(min:10, max:20) root.fifth = random_int(timestamp_unix_nano(), 5, 20) root.sixth = random_int(seed:timestamp_unix_nano(), max:20) Use a dynamic seed for unique random values per mapping instance: root.random_id = random_int(timestamp_unix_nano()) root.sample_percent = random_int(seed: timestamp_unix_nano(), min: 0, max: 100) range Creates an array of integers from start (inclusive) to stop (exclusive) with an optional step. Use this to generate sequences for iteration, indexing, or creating numbered lists. Parameters Name Type Description start integer The start value. stop integer The stop value. step integer The step value. Examples root.a = range(0, 10) root.b = range(start: 0, stop: this.max, step: 2) # Using named params root.c = range(0, -this.max, -2) # In: {"max":10} # Out: {"a":[0,1,2,3,4,5,6,7,8,9],"b":[0,2,4,6,8],"c":[0,-2,-4,-6,-8]} Generate a sequence for batch processing: root.pages = range(0, this.total_items, 100).map_each(offset -> { "offset": offset, "limit": 100 }) # In: {"total_items":250} # Out: {"pages":[{"limit":100,"offset":0},{"limit":100,"offset":100}]} root_meta This method is deprecated and will be removed in a future version. Returns the value of a metadata key from the new message being created as a string, or null if the key does not exist. Changes made to metadata during a mapping will be reflected by this function. Parameters Name Type Description key string An optional key of a metadata value to obtain. Examples root.topic = root_meta("kafka_topic") The key parameter is optional and if omitted the entire metadata contents are returned as an object: root.all_metadata = root_meta() snowflake_id Generates a unique, time-ordered Snowflake ID. Snowflake IDs are 64-bit integers that encode timestamp, node ID, and sequence information, making them ideal for distributed systems where sortable unique identifiers are needed. Returns a string representation of the ID. Parameters Name Type Description node_id integer Optional node identifier (0-1023) to distinguish IDs generated by different machines in a distributed system. Defaults to 1. Examples Generate a unique Snowflake ID for each message: root.id = snowflake_id() root.payload = this Generate Snowflake IDs with different node IDs for multi-datacenter deployments: root.id = snowflake_id(42) root.data = this throw Immediately fails the mapping with a custom error message. Use this to halt processing when data validation fails or required fields are missing, causing the message to be routed to error handlers. Parameters Name Type Description why string A string explanation for why an error was thrown, this will be added to the resulting error message. Examples root.doc.type = match { this.exists("header.id") => "foo" this.exists("body.data") => "bar" _ => throw("unknown type") } root.doc.contents = (this.body.content | this.thing.body) # In: {"header":{"id":"first"},"thing":{"body":"hello world"}} # Out: {"doc":{"contents":"hello world","type":"foo"}} # In: {"nothing":"matches"} # Out: Error("failed assignment (line 1): unknown type") Validate required fields before processing: root = if this.exists("user_id") { this } else { throw("missing required field: user_id") } # In: {"user_id":123,"name":"alice"} # Out: {"name":"alice","user_id":123} # In: {"name":"bob"} # Out: Error("failed assignment (line 1): missing required field: user_id") timestamp_unix Returns the current Unix timestamp in seconds since epoch. Use this for numeric timestamps compatible with most systems, or as a seed for random number generation. Examples root.received_at = timestamp_unix() Create a sortable ID combining timestamp with a counter: root.id = "%v-%v".format(timestamp_unix(), batch_index()) timestamp_unix_micro Returns the current Unix timestamp in microseconds since epoch. Use this for high-precision timing measurements or when microsecond resolution is required. Examples root.received_at = timestamp_unix_micro() Measure elapsed time between events: root.processing_duration_us = timestamp_unix_micro() - this.start_time_us timestamp_unix_milli Returns the current Unix timestamp in milliseconds since epoch. Use this for millisecond-precision timestamps common in web APIs and JavaScript systems. Examples root.received_at = timestamp_unix_milli() Add processing time metadata: meta processing_time_ms = timestamp_unix_milli() timestamp_unix_nano Returns the current Unix timestamp in nanoseconds since epoch. Use this for the highest precision timing or as a unique seed value that changes on every invocation. Examples root.received_at = timestamp_unix_nano() Generate unique random values on each mapping: root.random_value = random_int(timestamp_unix_nano()) tracing_id Returns the OpenTelemetry trace ID for the message, or an empty string if no tracing span exists. Use this to correlate logs and events with distributed traces. Examples meta trace_id = tracing_id() Add trace ID to structured logs: root.log_entry = this root.log_entry.trace_id = tracing_id() tracing_span Returns the OpenTelemetry tracing span attached to the message as a text map object, or null if no span exists. Use this to propagate trace context to downstream systems via headers or metadata. Examples root.headers.traceparent = tracing_span().traceparent # In: {"some_stuff":"just can't be explained by science"} # Out: {"headers":{"traceparent":"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"}} Forward all tracing fields to output metadata: meta = tracing_span() ulid Generates a Universally Unique Lexicographically Sortable Identifier (ULID). ULIDs are 128-bit identifiers that are sortable by creation time, URL-safe, and case-insensitive. They consist of a 48-bit timestamp (millisecond precision) and 80 bits of randomness, making them ideal for distributed systems that need time-ordered unique IDs without coordination. Parameters Name Type Description encoding string Encoding format for the ULID. "crockford" produces 26-character Base32 strings (recommended). "hex" produces 32-character hexadecimal strings. random_source string Randomness source: "secure_random" uses cryptographically secure random (recommended for production), "fast_random" uses faster but non-secure random (only for non-sensitive testing). Examples Generate time-sortable IDs for distributed message ordering: root.message_id = ulid() root.timestamp = now() root.data = this Generate hex-encoded ULIDs for systems that prefer hexadecimal format: root.id = ulid("hex") uuid_v4 Generates a random RFC-4122 version 4 UUID. Use this for creating unique identifiers that don’t reveal timing information or require ordering. Each invocation produces a new globally unique ID. Examples root.id = uuid_v4() Add unique request IDs for tracing: root = this root.request_id = uuid_v4() uuid_v7 Generates a time-ordered UUID version 7 with millisecond timestamp precision. Use this for sortable unique identifiers that maintain chronological ordering, ideal for database keys or event IDs. Optionally specify a custom timestamp. Parameters Name Type Description time (optional) timestamp An optional timestamp to use for the time ordered portion of the UUID. Examples root.id = uuid_v7() Generate a UUID with a specific timestamp for backdating events: root.id = uuid_v7(now().ts_sub_iso8601("PT1M")) var Parameters Name Type Description name string The name of the target variable. with_schema_registry_header Prepends a Confluent Schema Registry wire format header to message bytes. The header is 5 bytes: a magic byte (0x00) followed by a 4-byte big-endian schema ID. This format is required when producing messages to Kafka topics that use Confluent Schema Registry for schema validation and evolution. Parameters Name Type Description schema_id unknown The schema ID from your Schema Registry (0 to 4294967295). This ID references the schema version used to encode the message. message unknown The serialized message bytes (e.g., Avro, Protobuf, or JSON Schema encoded data) to prepend the header to. Examples Add Schema Registry header to Avro-encoded message: root = with_schema_registry_header(123, content()) Use schema ID from metadata to add header dynamically: root = with_schema_registry_header(meta("schema_id").number(), content()) Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Walkthrough Methods