cancel
Showing results for 
Search instead for 
Did you mean: 

Read file operator on Datahub does not pass message to python script in docker container

marcus_schiffer
Active Participant
0 Kudos

Hi,

using the read file operator on WASB connection for a large file (15 MB) sends a message to a terminal output. However, when using the same message as input for a python operator running in a docker (not sure if it would make a difference when running w/o docker) the message never reaches the python def on_input: function.

Any help appreciated.

Accepted Solutions (0)

Answers (6)

Answers (6)

henrique_pinto
Active Contributor
0 Kudos

Another possibility is to stream data between the operators/pods, instead of sending the whole file at once. The other pod can accumulate data until an EOF has been reached, and then continue the operation.

henrique_pinto
Active Contributor
0 Kudos

Hi Marcus,

try the following: increase your grouping to include not just the python operator, but all operators.
One pod can run more than one container, it's a logical unit of work that can include all containers that are required for a single task/activity. When you put all operators in the same group, it tries to execute all of them in the same pod.
Can you try that?

Best regards,
Henrique.

marcus_schiffer
Active Participant
0 Kudos

Hi Thorsten,

thanks for explaining.

The python operator runs in a group with a docker container. Message limit seems reasonable but for the use in e.g. ML scenarios with huge data or video analysis this will leave issues I guess. (Maybe DH is not the right tool here anyway) So with the limit one may be forced to store / read from the local file system with the other restrictions you pointed ot in another answer.

schneidertho
Advisor
Advisor
0 Kudos

Hi Marcus,

we need to improve our error handling a bit here. Sorry. In the log of the pipeline you will find an error "failed to publish a message... maximum payload exceeded".

Maximum size for data exchange accross different pods of a pipeline is currently 10 MB. That is a restriction. We might increase the limit in the future, but currently it is like this.

I will try to explain a bit what happens:

  • When you run a pipeline, this simply spoken leads to N pods on Kubernetes.
  • If your pipeline is simple, it is one pod. If you use groups in the pipeline, then each group is a pod and the rest of the operators (those outside of groups) also are one pod.
  • And if you have a group with a multiplicity of M, then this leads to M pods for this group.
  • We try to optimize communication between different operators to avoid copying data around in memory, simply because it is expensive. Let's say, you chain three operators and we will try to ensure that all three have access to the "same" data (I oversimplify a bit... sorry).
  • Now let's look back at what I said about the groups. Different groups are different pods. And different pods might run on different cluster nodes. So we need to communicate in a different way between the same three operators when each of them has its own group. What we do is: the sending operator puts the data into a messaging system (currently we use NATS) and the receiving operator picks up the data from the messaging system.

The above mechanism of different pods and a messaging system in between is also a nice way to optimize the data processing, scale different parts of a pipeline differently etc.

At the same time it also means: try to avoid groups when they are not necessary / of no value for the data processing, because (leaving the current limit aside) sending data through a messaging system can have a negative impact on performance compared to not doing it. The bigger the data exchange between two operators is the more expensive it gets when you go via messaging.

That all is independent of whether the port of the operator is of type "message" or something else. It's the same mechanism for all port types.

I hope that helps. When there is a reason why you put the Python operator in a group, then let's check why or consider putting the read file operator in the same group as the Python operator.

Cheers
Thorsten

marcus_schiffer
Active Participant
0 Kudos

Hi,

here is the JSON:

It never sends the "test" to the terminal.

Also no error is thrown.

File to be read is about 11,2 MB

{
	"properties": {},
	"description": "test2",
	"processes": {
		"python3operator1": {
			"component": "com.sap.system.python3Operator",
			"metadata": {
				"label": "Python3 Operator",
				"x": 277.99999809265137,
				"y": 40,
				"height": 82,
				"width": 120,
				"extensible": true,
				"config": {
					"script": "\ndef on_message(data):\n    \n     result = \"test\"\n  \n     api.send(\"res\", result)\napi.set_port_callback(\"message\", on_message)\n\n"
				},
				"additionalinports": [
					{
						"name": "message",
						"type": "message"
					}
				],
				"additionaloutports": [
					{
						"name": "out",
						"type": "string"
					},
					{
						"name": "image",
						"type": "blob"
					},
					{
						"name": "imageout",
						"type": "blob"
					},
					{
						"name": "res",
						"type": "string"
					}
				]
			}
		},
		"readfile1": {
			"component": "com.sap.storage.read",
			"metadata": {
				"label": "Read File",
				"x": 17,
				"y": 49.49999976158142,
				"height": 80,
				"width": 120,
				"config": {
					"service": "wasb",
					"wasbConnection": {
						"connectionProperties": {
							"accountKey": "",
							"accountName": "myaccount",
							"endpointSuffix": "core.windows.net",
							"protocol": "HTTPS",
							"rootPath": ""
						},
						"configurationType": "Configuration Manager",
						"connectionID": "WASB"
					},
					"wasbContainerName": "evonik1",
					"path": "/cells/video/test_Trim.mp4",
					"recursive": false
				}
			}
		},
		"terminal2": {
			"component": "com.sap.util.terminal",
			"metadata": {
				"label": "Terminal",
				"x": 587.9999961853027,
				"y": 19.49999976158142,
				"height": 80,
				"width": 120,
				"ui": "dynpath",
				"config": {}
			}
		}
	},
	"groups": [
		{
			"name": "group1",
			"nodes": [
				"python3operator1"
			],
			"metadata": {
				"description": "Group"
			},
			"tags": {
				"tesseract": "",
				"python36": "",
				"tornado": "5.0.2"
			}
		}
	],
	"connections": [
		{
			"metadata": {
				"points": "141,98.49999976158142 168.99999952316284,98.49999976158142 168.99999952316284,97.49999976158142 244.99999856948853,97.49999976158142 244.99999856948853,81 272.99999809265137,81"
			},
			"src": {
				"port": "outFile",
				"process": "readfile1"
			},
			"tgt": {
				"port": "message",
				"process": "python3operator1"
			}
		},
		{
			"metadata": {
				"points": "382.99999809265137,113 490,113 490,59.5 582.9999961853027,59.49999976158142"
			},
			"src": {
				"port": "res",
				"process": "python3operator1"
			},
			"tgt": {
				"port": "in1",
				"process": "terminal2"
			}
		}
	],
	"inports": {},
	"outports": {}
}

henrique_pinto
Active Contributor
0 Kudos

can you post your graph json?