Processor Functions

Processor functions can be composed to a pipeline. Each function in a pipeline has to have a unique name.
If an error occurs during function execution or a validation fails, the unchanged input from the pipeline will be sent to the error topic. The message sent to the error topic will take two headers:

  • x-exception-message: The message text of the exception

  • x-exception-fqcn: The full qualified classname of the exception

hasValueValidator

hasValueValidator

If the Json element at the given path does not have a value, an error is emitted.
Having a value means:

  • the element exists

  • the element value is not null

  • for a textual single value: the string is not empty

  • for a multivalue: the array contains at least one textual element which is not empty

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

elementPath

Json Pointer

Yes

The Pointer to the element to be checked

Example

Configuration

hasValueValidator

Valid Input (single value)

{
  "id": "ID_1"
}

Valid Input (multivalue)

{
  "id": [null, "ID_X"]
}

Invalid Input (element does not exist)

{
  "name": "Without id"
}

Invalid Input (null value)

{
  "id": null
}

Invalid Input (empty string)

{
  "id": ""
}

Invalid Input (multivalue with no nonempty string)

{
  "id": [null, ""]
}

trimNormalizer

trimNormalizer

If the Json element at the given path is textual or an array containing textual elements, the trimNormalizer function will trim the textual content and return the Json content with the textual elements being trimmed. For arrays containing non-textual elements beside textual, the non-textual elements will be left untouched while textual elements will undergo a trim.

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

elementPath

Json Pointer

Yes

The Pointer to the element to be trimmed

mode

Enum

Yes

Determining the trim action. The validated content for mode is: RIGHT, LEFT, and BOTH.

  • RIGHT: trims only the right side

  • LEFT: trims only the left side

  • BOTH: trims both sides

Example

Configuration

trimNormalizer

Function configuration:
    mode: LEFT
    elementPath: /ean
Sample input (Single textual content)
{
  "ean": "  12345  "
}
Sample output of trimNormalizer function
{
  "ean": "12345  "
}
Sample input (Multiple textual content)
{
  "ean": ["  12345  ", "56789"]
}
Sample output of trimNormalizer function
{
  "ean": ["12345  ", "56789"]
}

Configuration

trimNormalizer RIGHT

Function configuration:
    mode: RIGHT
    elementPath: /ean
Sample input (Single textual content)
{
  "ean": "  12345  "
}
Sample output of trimNormalizer function
{
  "ean": "  12345"
}
Sample input (Multiple textual content)
{
  "ean": ["  12345  ", "56789"]
}
Sample output of trimNormalizer function
{
  "ean": ["  12345", "56789"]
}

Configuration

trimNormalizer BOTH

Function configuration:
    mode: BOTH
    elementPath: /ean
Sample input (Single textual content)
{
  "ean": "  12345  "
}
Sample output of trimNormalizer function
{
  "ean": "12345"
}
Sample input (Multiple textual content)
{
  "ean": ["  12345  ", "56789"]
}
Sample output of trimNormalizer function
{
  "ean": ["12345", "56789"]
}

padNormalizer

padNormalizer

If the Json element at the given path is textual or an array containing textual elements, the padNormalizer function will add the determined character to the content to achieve a specific length and return the normalized Json. For arrays containing non-textual elements beside textual, the non-textual elements will be left untouched while textual elements will undergo the pad operation.

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

elementPath

Json Pointer

Yes

The Pointer to the element to be trimmed

length

Integer

Yes

The minimum length of textual elements

  • The value of length should be greater than or equal to 1

  • If the length of textual elements is less than the given length, then the padNormalizer function will add the filler character to the textual elements to achieve the given length.

fillerCharacter

Character

Yes

The padNormalizer function uses this given character to increase the length of the textual elements.

pad

Enum

Yes

Determining the side of adding the filler character. The validated content for pad is: LEFT, RIGHT.

  • LEFT: add the filler character only on the left side

  • RIGHT: add the filler character only on the right side

Example

Configuration

padNormalizer

Function configuration:
    pad: LEFT
    elementPath: /ean
    length: 8
    fillerCharacter: 0
Sample input (Single textual content)
{
  "ean": "12345"
}
Sample output of padNormalizer function
{
  "ean": "00012345"
}
Sample input (Multiple textual content)
{
  "ean": ["12345", "123456789"]
}
Sample output of padNormalizer function
{
  "ean": ["00012345", "123456789"]
}

Configuration

padNormalizer RIGHT

Function configuration:
    pad: RIGHT
    elementPath: /ean
    length: 8
    fillerCharacter: 0
Sample input (Single textual content)
{
  "ean": "12345"
}
Sample output of padNormalizer function
{
  "ean": "12345000"
}
Sample input (Multiple textual content)
{
  "ean": ["12345", "123456789"]
}
Sample output of padNormalizer function
{
  "ean": ["12345000", "123456789"]
}

Match

Match

fromInput

literal

The Match function executes a search on an OpenSearch index to find entities in the index matching the function input.
The search utilizes an OpenSearch search template. The search template call can get named parameters, either a literal string or a value extracted from functions input.
The function result is a list of matched entities and the input used. The result fulfills the following schema:

{
  "input": { /* Input Json content */ },
  "matches": [ /* List of Matched entities */ ]
}

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

index

String

Yes

The name of the OpenSearch index

template

String

Yes

The name of the OpenSearch search template

paramsFromInput

Map

No

The paramsFromInput is a Map from parameter name to a value from an input element. Which value has to be taken from the input has to be specified by a Json Pointer. The Match function extracts the value on the given path of the input and creates a serch template parameter with given name and extracted value.

Notes: The search template should require these parameters and values.

A sample:

  • name: ids

  • elementPath: /id

  • That means the Match function extracts the value from the Json element at path /id, and creates a search parameter with the name ids and value of the elements value at path /id.

literalParams

Map

No

The literalParams is a Map between a String and a String. The Match function creates a search parameter for running the search template by the key and the value.

Notes: The search template should require these parameters and values.

A sample:

  • name: field

  • value: id

  • That means the Match function creates a search template parameter with the name field and value id

Example

Configuration

Match Configured

Function configuration:
    index: testindex
    template: testtemplate
    paramsFromInput: [
        Key: ids, Value: /id
    ]
    literalParams: [
        Key: field, Value: id
    ]
Search Template Sample:
{
  "script": {
    "lang": "mustache",
    "source": "{\"query\":{\"terms\":{\"{{field}}\":{{#toJson}}ids{{/toJson}}}}}"
  }
}
Sample output of Match function
{
  "input": {
    "id": "1235",
    "name": "sample-info"
  },
  "matches": [
    {
      "source": {
        "id": "1235",
        "name": "claas"
      },
      "id": "lghKxZYBh3Hgc5n0WdDk",
      "score": 0.18232156
    },
    {
      "source": {
        "id": "1235",
        "name": "sabine"
      },
      "id": "lwhNxZYBh3Hgc5n0mdA7",
      "score": 0.18232156
    }
  ]
}

Reduce2One

reduce2One

The Reduce2One function usually gets its input from the Match function, and tries to solve the ambiguous match situation to reduce the matched entities to one entity, under these conditions:

  • If the matches element only contains one item, it does nothing and passes the input as result.

  • If the matches element contains no item, it adds a matched entity with an empty source element to the matches.

  • If the matches element contains more than one item, it returns the highest score matched entity. (If the matches element contains more than one item with the highest score, it returns the first matched with the highest score.)

Note:

  • If the matches element does not exist, or is not an array, the Reduce2One raises an exception.

  • If the matches element contains more than one item, and one of these items does not contain the score element, or the score element is not numeric, it would be replaced with the default value equal to 1.0

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

Example

One item in the matches element

Input sample
{
    "input": {"id": "ID1"},
    "matches": [
        {"id": "match1", "score": 1.0, "source": {"foo": "bar"}}
    ]
}
Output sample
{
    "input": {"id": "ID1"},
    "matches": [
        {"id": "match1", "score": 1.0, "source": {"foo": "bar"}}
    ]
}

No item in the matches element

Input sample
{
    "input": {"id": "ID1"},
    "matches": []
}
Output sample
{
    "input": {"id": "ID1"},
    "matches": [
        {"source":{}}
    ]
}

More than one item in the matches element

Input sample
 {
    "input": {"id": "ID1"},
    "matches": [
        {"id": "match1", "score": 0.98, "source": {"foo": "bar"}},
        {"id": "match2", "score": 0.99, "source": {"foo": "baz"}},
        {"id": "match3", "score": 0.99, "source": {"foo": "bad"}}
    ]
}
Output sample
{
    "input": {"id": "ID1"},
    "matches": [
        {"id": "match2", "score": 0.99, "source": {"foo": "baz"}}
    ]
}

MergeCreate

MergeCreate

mapping

The MergeCreate function usually gets its input from the Match function, and tries to create elements from the input entity in the source-matched entity, under these conditions:

  • These changes only affect the first item in the matches element.

  • The MergeCreate configurations determine which element of the input should be mapped into the source-matched entity.

  • If the source-matched entity contains the mapped element that was determined in the function configuration, the MergeCreate function does not touch it.

  • If the source-matched entity does not contain the mapped element that was determined in the function configuration, the MergeCreate function creates the element and fills it from the mapped element value of the input entity.

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

mappings

List

Yes

The list of mapping elements from input to the source-matched entity.

Each item of the list contains 2 elements:

  • from: Json Pointer determining the Json element inside the input.

  • to: Json Pointer determining the Json element inside the source-matched entity.

Example

Configuration

MergeCreate Configured

Function configuration:
    mappings: [
        from: /id, to: /identifier/id
        from: /id, to: /ID
        from: /doesNotExist, to: /foo
    ]
Input sample
{
  "input": {
    "id": "ID1"
  },
  "matches": [
    {
      "id": "match!",
      "score": 1.0,
      "source": {
        "id": "TARGET_ID",
        "identifier": {
          "foo": 42
        }
      }
    }
  ]
}
Output sample
{
  "input": {
    "id": "ID1"
  },
  "matches": [
    {
      "id": "match!",
      "score": 1.0,
      "source": {
        "id": "TARGET_ID",
        "identifier": {
          "foo": 42,
          "id": "ID1"
        },
        "ID": "ID1"
      }
    }
  ]
}

Note: As you can see in the Input and Output sample, if the input entity does not contain the from configured element, the MergeCreate function does nothing.

ChangeEventEmit

ChangeEventEmit

topic

The ChangeEventEmit function extracts content and message key from the Json content of the message, and could emit the extracted message key and content in the normal output topic of the pipeline or a specific configured topic.

Parameters

Parameter Name Type Required Description

name

String

Yes

The name of the function instance

eventContent

Json Pointer

Yes

The ChangeEventEmit function extracts the content of the emitted message by using this Json Pointer.

eventKey

Json Pointer

No

The ChangeEventEmit function extracts the message key of the emitted message by using this Json Pointer.

topic

String

No

The destination topic for emitted messages.

Notes:

  • The topic is an optional config. If it is set, the ChangeEventEmit function uses this config and emits the messages to this topic.
    If it is not set, the ChangeEventEmit function emits messages to the regular output topic as configured on the processor.

cleanUpMode

enum

No

Determines the cleanup mode of the topic. Valid values are: COMPACT, DELETE.

Notes:

  • If the topic is set, the default value for cleanUpMode is COMPACT

  • For more information, please check these links: Log Compaction (related to the COMPACT config) , Log Retention (related to the DELETE config)

cleanUpTimeHours

int

No

Determine the cleanup time in hour of the topic.

Notes:

  • If the topic is set, the default value for cleanUpTimeHours is 336

Example

Configuration

ChangeEventEmit Configured1

Function configuration:
    eventContent: /input
    topic: source
    cleanUpMode: DELETE
    cleanUpTimeHours: 12
Sample input
{
    "input": {"foo": "bar"},
    "matches": [
      {"id": "0815", "source":{"foo": "baz"}}
    ]
}
Sample output to the source topic
{
  "foo": "bar"
}

Configuration

ChangeEventEmit Configured2

Function configuration:
    eventContent: /matches/0/source
    eventKey: /matches/0/id
Sample input
{
    "input": {"foo": "bar"},
    "matches": [
      {"id": "0815", "source":{"foo": "baz"}}
    ]
}
Sample output to the normal pipeline output topic
{
  "foo": "baz"
}

And the message key for the output message is 0815

Multiple Functions

multipleFns

The Multiple Functions actually is an Operator, and it is not a real function.The Multiple Functions gets an input message and broadcasts it to each function inside the Multiple Functions.So the result would be similar to creating a fork in the pipeline.

Normally, Multiple Functions combined with ChangeEventEmit is used at the end of the pipeline for emitting different parts of input messages to different topics (please check the first example).However, the Multiple Functions can be used in more complicated scenarios throughout each part of the pipeline (please refer to the second example).

Example

Using a single Multiple Functions at the end of a pipeline

multipleFns Configured1

Sample input of Multiple Functions
{
  "input": {
    "id": "1235",
    "name": "sabine"
  },
  "matches": [
    {
      "source": {
        "id": "1235",
        "name": "claas"
      },
      "id": "lghKxZYBh3Hgc5n0WdDk",
      "score": 0.18232156
    }
  ]
}
Sample output messages to the source-topic
{
  "id": "1235",
  "name": "sabine"
}
Sample output messages to the output-topic
{
  "id": "1235",
  "name": "claas"
}

And the message key will be equal to lghKxZYBh3Hgc5n0WdDk

Using several Multiple Functions in different parts of a pipeline

multipleFns Configured

Sample input messages in the input-topic
1: {"id":[], "name":"name0"}

2: {"id":["  ID1  "], "name":"  name1  "}

3: {"id":["ID2"], "name":"name2"}
Sample output messages to the output-topic
1: {"id":["ID1"],"name":"name1"}

2: {"id":["ID2"],"name":"name2"}
Sample output messages to the error-topic
1: {"id":[], "name":"name0"}
Sample output messages to the validate-topic
1: {"id":[],"name":"name0"}

2: {"id":["ID1"],"name":"name1"}

3: {"id":["ID2"],"name":"name2"}
Sample output messages to the trim-topic
1: {"id":[],"name":"name0"}

2: {"id":["  ID1  "],"name":"name1"}

3: {"id":["ID2"],"name":"name2"}

4: {"id":["  ID1  "],"name":"name1"}

5: {"id":["ID2"],"name":"name2"}

And in the diagram below, you can see the state of messages in these three paths:

  • Main Route: Between input-topic and output-topic, and error pushed to the error-topic

  • Validate Route: created by validateEmitter and messages pushed to the validate-topic

  • Trim Route: created by trimEmitter and messages pushed to the trim-topic

pipeline fork