Mitosis and table-returning UDFs
Similar to #7085, but this is about table-returning UDFs.
This function is safe to be executed in parallel, but I can't let MonetDB know about that. Like it happened in #7085 , it chops up the input bats, then immediately re-packs them and executes the UDF on the re-packed bats. So the result is worse performance than plain single-thread execution.
sql>explain select * from tokenize( (select subject,value,'SP',1,prob from tr_0_obj_string) );
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| mal |
+=====================================================================================================================================================================+
| function user.main():void; |
| X_1:void := querylog.define("explain select * from tokenize( (select subject,value,\\'SP\\',1,prob from tr_0_obj_string) );":str, "default_pipe":str, 21:int); |
| barrier X_223:bit := language.dataflow(); |
| X_29:bat[:str] := bat.pack(".%3":str, ".%3":str, ".%3":str); |
| X_30:bat[:str] := bat.pack("id":str, "token":str, "prob":str); |
| X_31:bat[:str] := bat.pack("int":str, "clob":str, "double":str); |
| X_32:bat[:int] := bat.pack(32:int, 0:int, 53:int); |
| X_33:bat[:int] := bat.pack(0:int, 0:int, 0:int); |
| X_4:int := sql.mvc(); |
| C_83:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 0:int, 8:int); |
| X_98:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 0:int, 8:int); |
| X_132:bat[:int] := algebra.projection(C_83:bat[:oid], X_98:bat[:int]); |
| C_85:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 1:int, 8:int); |
| X_99:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 1:int, 8:int); |
| X_133:bat[:int] := algebra.projection(C_85:bat[:oid], X_99:bat[:int]); |
| C_87:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 2:int, 8:int); |
| X_100:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 2:int, 8:int); |
| X_134:bat[:int] := algebra.projection(C_87:bat[:oid], X_100:bat[:int]); |
| C_89:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 3:int, 8:int); |
| X_101:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 3:int, 8:int); |
| X_135:bat[:int] := algebra.projection(C_89:bat[:oid], X_101:bat[:int]); |
| C_91:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 4:int, 8:int); |
| X_102:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 4:int, 8:int); |
| X_136:bat[:int] := algebra.projection(C_91:bat[:oid], X_102:bat[:int]); |
| C_93:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 5:int, 8:int); |
| X_103:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 5:int, 8:int); |
| X_137:bat[:int] := algebra.projection(C_93:bat[:oid], X_103:bat[:int]); |
| C_95:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 6:int, 8:int); |
| X_104:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 6:int, 8:int); |
| X_138:bat[:int] := algebra.projection(C_95:bat[:oid], X_104:bat[:int]); |
| C_97:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 7:int, 8:int); |
| X_105:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 7:int, 8:int); |
| X_139:bat[:int] := algebra.projection(C_97:bat[:oid], X_105:bat[:int]); |
| X_180:bat[:int] := mat.packIncrement(X_132:bat[:int], 8:int); |
| X_182:bat[:int] := mat.packIncrement(X_180:bat[:int], X_133:bat[:int]); |
| X_183:bat[:int] := mat.packIncrement(X_182:bat[:int], X_134:bat[:int]); |
| X_184:bat[:int] := mat.packIncrement(X_183:bat[:int], X_135:bat[:int]); |
| X_185:bat[:int] := mat.packIncrement(X_184:bat[:int], X_136:bat[:int]); |
| X_186:bat[:int] := mat.packIncrement(X_185:bat[:int], X_137:bat[:int]); |
| X_187:bat[:int] := mat.packIncrement(X_186:bat[:int], X_138:bat[:int]); |
| X_15:bat[:int] := mat.packIncrement(X_187:bat[:int], X_139:bat[:int]); |
| X_106:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 0:int, 8:int); |
| X_140:bat[:str] := algebra.projection(C_83:bat[:oid], X_106:bat[:str]); |
| X_107:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 1:int, 8:int); |
| X_141:bat[:str] := algebra.projection(C_85:bat[:oid], X_107:bat[:str]); |
| X_108:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 2:int, 8:int); |
| X_142:bat[:str] := algebra.projection(C_87:bat[:oid], X_108:bat[:str]); |
| X_109:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 3:int, 8:int); |
| X_143:bat[:str] := algebra.projection(C_89:bat[:oid], X_109:bat[:str]); |
| X_110:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 4:int, 8:int); |
| X_144:bat[:str] := algebra.projection(C_91:bat[:oid], X_110:bat[:str]); |
| X_111:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 5:int, 8:int); |
| X_145:bat[:str] := algebra.projection(C_93:bat[:oid], X_111:bat[:str]); |
| X_112:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 6:int, 8:int); |
| X_146:bat[:str] := algebra.projection(C_95:bat[:oid], X_112:bat[:str]); |
| X_113:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 7:int, 8:int); |
| X_147:bat[:str] := algebra.projection(C_97:bat[:oid], X_113:bat[:str]); |
| X_189:bat[:str] := mat.packIncrement(X_140:bat[:str], 8:int); |
| X_190:bat[:str] := mat.packIncrement(X_189:bat[:str], X_141:bat[:str]); |
| X_191:bat[:str] := mat.packIncrement(X_190:bat[:str], X_142:bat[:str]); |
| X_192:bat[:str] := mat.packIncrement(X_191:bat[:str], X_143:bat[:str]); |
| X_193:bat[:str] := mat.packIncrement(X_192:bat[:str], X_144:bat[:str]); |
| X_194:bat[:str] := mat.packIncrement(X_193:bat[:str], X_145:bat[:str]); |
| X_195:bat[:str] := mat.packIncrement(X_194:bat[:str], X_146:bat[:str]); |
| X_16:bat[:str] := mat.packIncrement(X_195:bat[:str], X_147:bat[:str]); |
| X_156:bat[:str] := algebra.project(X_132:bat[:int], "SP":str); |
| X_157:bat[:str] := algebra.project(X_133:bat[:int], "SP":str); |
| X_158:bat[:str] := algebra.project(X_134:bat[:int], "SP":str); |
| X_159:bat[:str] := algebra.project(X_135:bat[:int], "SP":str); |
| X_160:bat[:str] := algebra.project(X_136:bat[:int], "SP":str); |
| X_161:bat[:str] := algebra.project(X_137:bat[:int], "SP":str); |
| X_162:bat[:str] := algebra.project(X_138:bat[:int], "SP":str); |
| X_163:bat[:str] := algebra.project(X_139:bat[:int], "SP":str); |
| X_197:bat[:str] := mat.packIncrement(X_156:bat[:str], 8:int); |
| X_198:bat[:str] := mat.packIncrement(X_197:bat[:str], X_157:bat[:str]); |
| X_199:bat[:str] := mat.packIncrement(X_198:bat[:str], X_158:bat[:str]); |
| X_200:bat[:str] := mat.packIncrement(X_199:bat[:str], X_159:bat[:str]); |
| X_201:bat[:str] := mat.packIncrement(X_200:bat[:str], X_160:bat[:str]); |
| X_202:bat[:str] := mat.packIncrement(X_201:bat[:str], X_161:bat[:str]); |
| X_203:bat[:str] := mat.packIncrement(X_202:bat[:str], X_162:bat[:str]); |
| X_20:bat[:str] := mat.packIncrement(X_203:bat[:str], X_163:bat[:str]); |
| X_164:bat[:bte] := algebra.project(X_132:bat[:int], 1:bte); |
| X_165:bat[:bte] := algebra.project(X_133:bat[:int], 1:bte); |
| X_166:bat[:bte] := algebra.project(X_134:bat[:int], 1:bte); |
| X_167:bat[:bte] := algebra.project(X_135:bat[:int], 1:bte); |
| X_168:bat[:bte] := algebra.project(X_136:bat[:int], 1:bte); |
| X_169:bat[:bte] := algebra.project(X_137:bat[:int], 1:bte); |
| X_170:bat[:bte] := algebra.project(X_138:bat[:int], 1:bte); |
| X_171:bat[:bte] := algebra.project(X_139:bat[:int], 1:bte); |
| X_205:bat[:bte] := mat.packIncrement(X_164:bat[:bte], 8:int); |
| X_206:bat[:bte] := mat.packIncrement(X_205:bat[:bte], X_165:bat[:bte]); |
| X_207:bat[:bte] := mat.packIncrement(X_206:bat[:bte], X_166:bat[:bte]); |
| X_208:bat[:bte] := mat.packIncrement(X_207:bat[:bte], X_167:bat[:bte]); |
| X_209:bat[:bte] := mat.packIncrement(X_208:bat[:bte], X_168:bat[:bte]); |
| X_210:bat[:bte] := mat.packIncrement(X_209:bat[:bte], X_169:bat[:bte]); |
| X_211:bat[:bte] := mat.packIncrement(X_210:bat[:bte], X_170:bat[:bte]); |
| X_23:bat[:bte] := mat.packIncrement(X_211:bat[:bte], X_171:bat[:bte]); |
| X_116:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 0:int, 8:int); |
| X_148:bat[:dbl] := algebra.projection(C_83:bat[:oid], X_116:bat[:dbl]); |
| X_118:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 1:int, 8:int); |
| X_149:bat[:dbl] := algebra.projection(C_85:bat[:oid], X_118:bat[:dbl]); |
| X_120:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 2:int, 8:int); |
| X_150:bat[:dbl] := algebra.projection(C_87:bat[:oid], X_120:bat[:dbl]); |
| X_122:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 3:int, 8:int); |
| X_151:bat[:dbl] := algebra.projection(C_89:bat[:oid], X_122:bat[:dbl]); |
| X_124:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 4:int, 8:int); |
| X_152:bat[:dbl] := algebra.projection(C_91:bat[:oid], X_124:bat[:dbl]); |
| X_126:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 5:int, 8:int); |
| X_153:bat[:dbl] := algebra.projection(C_93:bat[:oid], X_126:bat[:dbl]); |
| X_128:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 6:int, 8:int); |
| X_154:bat[:dbl] := algebra.projection(C_95:bat[:oid], X_128:bat[:dbl]); |
| X_130:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 7:int, 8:int); |
| X_155:bat[:dbl] := algebra.projection(C_97:bat[:oid], X_130:bat[:dbl]); |
| X_213:bat[:dbl] := mat.packIncrement(X_148:bat[:dbl], 8:int); |
| X_215:bat[:dbl] := mat.packIncrement(X_213:bat[:dbl], X_149:bat[:dbl]); |
| X_216:bat[:dbl] := mat.packIncrement(X_215:bat[:dbl], X_150:bat[:dbl]); |
| X_217:bat[:dbl] := mat.packIncrement(X_216:bat[:dbl], X_151:bat[:dbl]); |
| X_218:bat[:dbl] := mat.packIncrement(X_217:bat[:dbl], X_152:bat[:dbl]); |
| X_219:bat[:dbl] := mat.packIncrement(X_218:bat[:dbl], X_153:bat[:dbl]); |
| X_220:bat[:dbl] := mat.packIncrement(X_219:bat[:dbl], X_154:bat[:dbl]); |
| X_17:bat[:dbl] := mat.packIncrement(X_220:bat[:dbl], X_155:bat[:dbl]); |
| (X_24:bat[:int], X_26:bat[:str], X_27:bat[:dbl]) := batspinque.UTF8tokenize_v0(X_15:bat[:int], X_16:bat[:str], X_20:bat[:str], X_23:bat[:bte], X_17:bat[:dbl]); |
| language.pass(X_132:bat[:int]); |
| language.pass(X_133:bat[:int]); |
| language.pass(X_134:bat[:int]); |
| language.pass(X_135:bat[:int]); |
| language.pass(X_136:bat[:int]); |
| language.pass(X_137:bat[:int]); |
| language.pass(X_138:bat[:int]); |
| language.pass(X_139:bat[:int]); |
| language.pass(C_83:bat[:oid]); |
| language.pass(C_85:bat[:oid]); |
| language.pass(C_87:bat[:oid]); |
| language.pass(C_89:bat[:oid]); |
| language.pass(C_91:bat[:oid]); |
| language.pass(C_93:bat[:oid]); |
| language.pass(C_95:bat[:oid]); |
| language.pass(C_97:bat[:oid]); |
| exit X_223:bit; |
| sql.resultSet(X_29:bat[:str], X_30:bat[:str], X_31:bat[:str], X_32:bat[:int], X_33:bat[:int], X_24:bat[:int], X_26:bat[:str], X_27:bat[:dbl]); |
| end user.main; |
Running table-returning UDFs in parallel seems tricky, that's why is not optimal yet. Maybe we could add a mergetable bailout on these cases.
Can I hope for actually getting it to run in parallel? That would be awesome. I mean, I understand the concerns about thread-safety, but after all if one uses a UDF he's also supposed to know whether it's thread-safe or not. So if it wasn't safe and returns wrong results, it's just the user's fault, I don't see an extreme need for protecting him from this. I could easily write and use a UDF that crashes the entire server, and nothing protects me from doing that.