Python Runtime¶
In addition to TensorFlow and ONNX model runtimes, Wallaroo supports Python runtimes. A Python model is treated like other model types and can be a step in a pipeline.
Uploading¶
The python should be saved to a .py
file. Available imports include all standard python 3.8 libraries, Pandas, and Numpy.
[1]:
! cat ./test_resources/pre_process.py
"""Pre process adds some numbers into a tensor and specifies the field"""
# Arbitrary standard library import. Json is loaded by generated wrapper.
import math
def wallaroo_json(data):
"""wallaroo_json takes a json string as input"""
obj = json.loads(data)
#>>> math.exp(1)
#2.718281828459045
#>>> math.exp(2)
#7.38905609893065
if "query" in obj:
return {
"tensor_fields": ["foo"],
"query": obj["query"],
"foo": [[math.exp(1), math.exp(2)]]
}
else:
return {
"tensor_fields": ["passthrough"],
"passthrough": obj
}
Entrypoint¶
The python must contain the global entrypoint function wallaroo_json()
.
The function will be passed a string containing valid JSON. The function must return a Python Dict. Example input string:
{ "tensor_fields": ["foo"],
"otherstuff": "charge",
"state": "spin-up",
"foo": [1, 2, 3, 4, 5] }
Tensor fields¶
The tensor_fields
key may be present; its value is an array of field names which identify which fields in the structure are input tensors to the step. The default tensor field is tensor
.
State¶
The JSON may also contain a field with key state
, which will be passed to all steps in the pipeline. If the step modifies the value of this field, the new value will be propagate to later steps in the pipeline.
Example Pipeline¶
[2]:
import json
import wallaroo
import pandas as pd
wl = wallaroo.Client()
[3]:
# Upload models
model1 = wl.upload_model("preprocess", "./test_resources/pre_process.py").configure('python')
model2 = wl.upload_model("noopfloats", "./test_resources/no-op-floats.onnx")
model3 = wl.upload_model("postprocess", "./test_resources/post_process.py").configure('python')
[4]:
python_pipeline = (wl.build_pipeline("pythonpipeline")
.add_model_step(model1)
.add_model_step(model2)
.add_model_step(model3)
)
[5]:
python_pipeline.deploy()
Waiting for deployment - this will take up to 45s ..... ok
[5]:
{'name': 'pythonpipeline', 'create_time': datetime.datetime(2022, 3, 3, 23, 21, 4, 117316, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': 'd761cad1-d740-4fb2-9b74-2fff62741cfc', 'sha': '5553423b0d1f5863d76e74a5e852ee9a99a1e327762fb1fca78e6fa034b61257'}]}}, {'ModelInference': {'models': [{'name': 'noopfloats', 'version': '30359068-ddf0-48fc-9f5f-60000d93e949', 'sha': '4dc88d159249ccce83942ada69b919cb91455d5fd0e4bfc287de3f21d1aafb1b'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '9112aace-c7f2-4008-8602-03a5933c64ea', 'sha': '50202d07672dad4b9a4d9e07dc83d9c8a9730ff6b5f2c491522dc13ebfd6fb7c'}]}}]"}
Running inference on the pipeline¶
[6]:
python_pipeline.infer({"query": "this will give us the first conditional branch"})
[6]:
[InferenceResult({'check_failures': [],
'elapsed': 1201278,
'model_name': 'postprocess',
'model_version': '27f63c7a-4761-4076-b67f-3120ee784b3d',
'original_data': {'query': 'this will give us the first conditional branch'},
'outputs': [{'Json': {'data': [{'original': {'outputs': [{'Float': {'data': [2.7182817459106445,
7.389056205749512],
'dim': [1,
2],
'v': 1}}]},
'product': 20.085536603596665}],
'dim': [1],
'v': 1}}],
'pipeline_name': 'pythonpipeline',
'time': 1644011878036})]
Under the hood¶
First Step: Preprocessing Python¶
This is an example of what is happening in the pipeline.
[7]:
"""Pre process adds some numbers into a tensor and specifies the field"""
# Arbitrary standard library import. Json is loaded by generated wrapper. It is added here so it can run in the notebook
import math
import json
def wallaroo_json(data):
"""wallaroo_json takes a json string as input"""
obj = json.loads(data)
if "query" in obj:
return {
"tensor_fields": ["foo"],
"query": obj["query"],
"foo": [[math.exp(1), math.exp(2)]],
"state": "First branch"
}
else:
return {
"tensor_fields": ["foo"],
"query": obj["query"],
"foo": [[math.exp(2), math.exp(3)]],
"state": "Second branch"
}
[8]:
wallaroo_json('{"query": "this will give us the first conditional branch"}')
[8]:
{'tensor_fields': ['foo'],
'query': 'this will give us the first conditional branch',
'foo': [[2.718281828459045, 7.38905609893065]],
'state': 'First branch'}
Second Step: Model Inference¶
In this example, we’re just running the tensor through a no op ONNX model, just so we’re clear on the output.
The input that the next python step will receive will be
{
"outputs" :[{"Float":{"v":1,"dim":[1,2],"data":[2.7182817459106445,7.389056205749512]}}],
"state": "First branch"
}
Third Step: Post Processing¶
This step will calcluate the product of the output tensor from the previous step.
[9]:
"""Post process gets the product of the final output"""
import math
import json
def wallaroo_json(data):
"""wallaroo_json takes a json string as input"""
obj = json.loads(data)
result = {
"original": obj
}
result["product"] = math.prod(obj["outputs"][0]['Float']['data'])
return result
[10]:
wallaroo_json('{"outputs" :[{"Float":{"v":1,"dim":[1,2],"data":[2.7182817459106445,7.389056205749512]}}],"state": "First branch"}')
[10]:
{'original': {'outputs': [{'Float': {'v': 1,
'dim': [1, 2],
'data': [2.7182817459106445, 7.389056205749512]}}],
'state': 'First branch'},
'product': 20.085536603596665}
The json that will come back from the pipeline inference call will be
{
"model_id": "post_process",
"model_version": "version",
"pipeline_id": "intermediate",
"outputs": [
{
"Json": {
"v": 1,
"dim": [
1
],
"data": [
{
"original": {
"outputs": [
{
"Float": {
"v": 1,
"dim": [
1,
2
],
"data": [
2.7182817459106445,
7.389056205749512
]
}
}
],
"state": "First branch"
},
"product": 20.085536603596665
}
]
}
}
],
"elapsed": 379505,
"time": 2,
"original_data": {
"query": "The preprocessor operates conditionally on this key"
},
"check_failures": []
}
[11]:
python_pipeline.undeploy()
[11]:
{'name': 'pythonpipeline', 'create_time': datetime.datetime(2022, 3, 3, 23, 21, 4, 117316, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': 'd761cad1-d740-4fb2-9b74-2fff62741cfc', 'sha': '5553423b0d1f5863d76e74a5e852ee9a99a1e327762fb1fca78e6fa034b61257'}]}}, {'ModelInference': {'models': [{'name': 'noopfloats', 'version': '30359068-ddf0-48fc-9f5f-60000d93e949', 'sha': '4dc88d159249ccce83942ada69b919cb91455d5fd0e4bfc287de3f21d1aafb1b'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '9112aace-c7f2-4008-8602-03a5933c64ea', 'sha': '50202d07672dad4b9a4d9e07dc83d9c8a9730ff6b5f2c491522dc13ebfd6fb7c'}]}}]"}
[ ]: