Python Runtime

In addition to TensorFlow and ONNX model runtimes, Wallaroo supports Python runtimes. A Python model is treated like other model types and can be a step in a pipeline.

Uploading

The python should be saved to a .py file. Available imports include all standard python 3.8 libraries, Pandas, and Numpy.

[1]:
! cat ./test_resources/pre_process.py
"""Pre process adds some numbers into a tensor and specifies the field"""
# Arbitrary standard library import. Json is loaded by generated wrapper.
import math

def wallaroo_json(data):
    """wallaroo_json takes a json string as input"""
    obj = json.loads(data)
    #>>> math.exp(1)
    #2.718281828459045
    #>>> math.exp(2)
    #7.38905609893065
    if "query" in obj:
        return {
            "tensor_fields": ["foo"],
            "query": obj["query"],
            "foo": [[math.exp(1), math.exp(2)]]
            }
    else:
        return {
            "tensor_fields": ["passthrough"],
            "passthrough": obj
        }

Entrypoint

The python must contain the global entrypoint function wallaroo_json().

The function will be passed a string containing valid JSON. The function must return a Python Dict. Example input string:

{ "tensor_fields": ["foo"],
  "otherstuff": "charge",
  "state": "spin-up",
  "foo": [1, 2, 3, 4, 5] }

Tensor fields

The tensor_fields key may be present; its value is an array of field names which identify which fields in the structure are input tensors to the step. The default tensor field is tensor.

State

The JSON may also contain a field with key state, which will be passed to all steps in the pipeline. If the step modifies the value of this field, the new value will be propagate to later steps in the pipeline.

Example Pipeline

[2]:
import json
import wallaroo
import pandas as pd

wl = wallaroo.Client()
[3]:
# Upload models
model1 = wl.upload_model("preprocess", "./test_resources/pre_process.py").configure('python')
model2 = wl.upload_model("noopfloats", "./test_resources/no-op-floats.onnx")
model3 = wl.upload_model("postprocess", "./test_resources/post_process.py").configure('python')
[4]:
python_pipeline = (wl.build_pipeline("pythonpipeline")
        .add_model_step(model1)
        .add_model_step(model2)
        .add_model_step(model3)
        )
[5]:
python_pipeline.deploy()
Waiting for deployment - this will take up to 45s ..... ok
[5]:
{'name': 'pythonpipeline', 'create_time': datetime.datetime(2022, 3, 3, 23, 21, 4, 117316, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': 'd761cad1-d740-4fb2-9b74-2fff62741cfc', 'sha': '5553423b0d1f5863d76e74a5e852ee9a99a1e327762fb1fca78e6fa034b61257'}]}}, {'ModelInference': {'models': [{'name': 'noopfloats', 'version': '30359068-ddf0-48fc-9f5f-60000d93e949', 'sha': '4dc88d159249ccce83942ada69b919cb91455d5fd0e4bfc287de3f21d1aafb1b'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '9112aace-c7f2-4008-8602-03a5933c64ea', 'sha': '50202d07672dad4b9a4d9e07dc83d9c8a9730ff6b5f2c491522dc13ebfd6fb7c'}]}}]"}

Running inference on the pipeline

[6]:
python_pipeline.infer({"query": "this will give us the first conditional branch"})
[6]:
[InferenceResult({'check_failures': [],
  'elapsed': 1201278,
  'model_name': 'postprocess',
  'model_version': '27f63c7a-4761-4076-b67f-3120ee784b3d',
  'original_data': {'query': 'this will give us the first conditional branch'},
  'outputs': [{'Json': {'data': [{'original': {'outputs': [{'Float': {'data': [2.7182817459106445,
                                                                               7.389056205749512],
                                                                      'dim': [1,
                                                                              2],
                                                                      'v': 1}}]},
                                  'product': 20.085536603596665}],
                        'dim': [1],
                        'v': 1}}],
  'pipeline_name': 'pythonpipeline',
  'time': 1644011878036})]

Under the hood

First Step: Preprocessing Python

This is an example of what is happening in the pipeline.

[7]:
"""Pre process adds some numbers into a tensor and specifies the field"""
# Arbitrary standard library import. Json is loaded by generated wrapper. It is added here so it can run in the notebook
import math
import json

def wallaroo_json(data):
    """wallaroo_json takes a json string as input"""
    obj = json.loads(data)
    if "query" in obj:
        return {
            "tensor_fields": ["foo"],
            "query": obj["query"],
            "foo": [[math.exp(1), math.exp(2)]],
            "state": "First branch"
            }
    else:
        return {
            "tensor_fields": ["foo"],
            "query": obj["query"],
            "foo": [[math.exp(2), math.exp(3)]],
            "state": "Second branch"
        }
[8]:
wallaroo_json('{"query": "this will give us the first conditional branch"}')
[8]:
{'tensor_fields': ['foo'],
 'query': 'this will give us the first conditional branch',
 'foo': [[2.718281828459045, 7.38905609893065]],
 'state': 'First branch'}

Second Step: Model Inference

In this example, we’re just running the tensor through a no op ONNX model, just so we’re clear on the output.

The input that the next python step will receive will be

{
    "outputs" :[{"Float":{"v":1,"dim":[1,2],"data":[2.7182817459106445,7.389056205749512]}}],
    "state": "First branch"
}

Third Step: Post Processing

This step will calcluate the product of the output tensor from the previous step.

[9]:
"""Post process gets the product of the final output"""
import math
import json
def wallaroo_json(data):
    """wallaroo_json takes a json string as input"""
    obj = json.loads(data)
    result = {
        "original": obj
    }
    result["product"] = math.prod(obj["outputs"][0]['Float']['data'])
    return result
[10]:
wallaroo_json('{"outputs" :[{"Float":{"v":1,"dim":[1,2],"data":[2.7182817459106445,7.389056205749512]}}],"state": "First branch"}')
[10]:
{'original': {'outputs': [{'Float': {'v': 1,
     'dim': [1, 2],
     'data': [2.7182817459106445, 7.389056205749512]}}],
  'state': 'First branch'},
 'product': 20.085536603596665}

The json that will come back from the pipeline inference call will be

{
  "model_id": "post_process",
  "model_version": "version",
  "pipeline_id": "intermediate",
  "outputs": [
    {
      "Json": {
        "v": 1,
        "dim": [
          1
        ],
        "data": [
          {
            "original": {
              "outputs": [
                {
                  "Float": {
                    "v": 1,
                    "dim": [
                      1,
                      2
                    ],
                    "data": [
                      2.7182817459106445,
                      7.389056205749512
                    ]
                  }
                }
              ],
              "state": "First branch"
            },
            "product": 20.085536603596665
          }
        ]
      }
    }
  ],
  "elapsed": 379505,
  "time": 2,
  "original_data": {
    "query": "The preprocessor operates conditionally on this key"
  },
  "check_failures": []
}
[11]:
python_pipeline.undeploy()
[11]:
{'name': 'pythonpipeline', 'create_time': datetime.datetime(2022, 3, 3, 23, 21, 4, 117316, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': 'd761cad1-d740-4fb2-9b74-2fff62741cfc', 'sha': '5553423b0d1f5863d76e74a5e852ee9a99a1e327762fb1fca78e6fa034b61257'}]}}, {'ModelInference': {'models': [{'name': 'noopfloats', 'version': '30359068-ddf0-48fc-9f5f-60000d93e949', 'sha': '4dc88d159249ccce83942ada69b919cb91455d5fd0e4bfc287de3f21d1aafb1b'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '9112aace-c7f2-4008-8602-03a5933c64ea', 'sha': '50202d07672dad4b9a4d9e07dc83d9c8a9730ff6b5f2c491522dc13ebfd6fb7c'}]}}]"}
[ ]: