MonetDB icon indicating copy to clipboard operation
MonetDB copied to clipboard

Mitosis and table-returning UDFs

Open swingbit opened this issue 4 years ago • 2 comments

Similar to #7085, but this is about table-returning UDFs.

This function is safe to be executed in parallel, but I can't let MonetDB know about that. Like it happened in #7085 , it chops up the input bats, then immediately re-packs them and executes the UDF on the re-packed bats. So the result is worse performance than plain single-thread execution.

sql>explain select * from tokenize( (select subject,value,'SP',1,prob from tr_0_obj_string) );
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| mal                                                                                                                                                                 |
+=====================================================================================================================================================================+
| function user.main():void;                                                                                                                                          |
|     X_1:void := querylog.define("explain select * from tokenize( (select subject,value,\\'SP\\',1,prob from tr_0_obj_string) );":str, "default_pipe":str, 21:int);  |
| barrier X_223:bit := language.dataflow();                                                                                                                           |
|     X_29:bat[:str] := bat.pack(".%3":str, ".%3":str, ".%3":str);                                                                                                    |
|     X_30:bat[:str] := bat.pack("id":str, "token":str, "prob":str);                                                                                                  |
|     X_31:bat[:str] := bat.pack("int":str, "clob":str, "double":str);                                                                                                |
|     X_32:bat[:int] := bat.pack(32:int, 0:int, 53:int);                                                                                                              |
|     X_33:bat[:int] := bat.pack(0:int, 0:int, 0:int);                                                                                                                |
|     X_4:int := sql.mvc();                                                                                                                                           |
|     C_83:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 0:int, 8:int);                                                                         |
|     X_98:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 0:int, 8:int);                                                  |
|     X_132:bat[:int] := algebra.projection(C_83:bat[:oid], X_98:bat[:int]);                                                                                          |
|     C_85:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 1:int, 8:int);                                                                         |
|     X_99:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 1:int, 8:int);                                                  |
|     X_133:bat[:int] := algebra.projection(C_85:bat[:oid], X_99:bat[:int]);                                                                                          |
|     C_87:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 2:int, 8:int);                                                                         |
|     X_100:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 2:int, 8:int);                                                 |
|     X_134:bat[:int] := algebra.projection(C_87:bat[:oid], X_100:bat[:int]);                                                                                         |
|     C_89:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 3:int, 8:int);                                                                         |
|     X_101:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 3:int, 8:int);                                                 |
|     X_135:bat[:int] := algebra.projection(C_89:bat[:oid], X_101:bat[:int]);                                                                                         |
|     C_91:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 4:int, 8:int);                                                                         |
|     X_102:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 4:int, 8:int);                                                 |
|     X_136:bat[:int] := algebra.projection(C_91:bat[:oid], X_102:bat[:int]);                                                                                         |
|     C_93:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 5:int, 8:int);                                                                         |
|     X_103:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 5:int, 8:int);                                                 |
|     X_137:bat[:int] := algebra.projection(C_93:bat[:oid], X_103:bat[:int]);                                                                                         |
|     C_95:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 6:int, 8:int);                                                                         |
|     X_104:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 6:int, 8:int);                                                 |
|     X_138:bat[:int] := algebra.projection(C_95:bat[:oid], X_104:bat[:int]);                                                                                         |
|     C_97:bat[:oid] := sql.tid(X_4:int, "spinque":str, "tr_0_obj_string":str, 7:int, 8:int);                                                                         |
|     X_105:bat[:int] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "subject":str, 0:int, 7:int, 8:int);                                                 |
|     X_139:bat[:int] := algebra.projection(C_97:bat[:oid], X_105:bat[:int]);                                                                                         |
|     X_180:bat[:int] := mat.packIncrement(X_132:bat[:int], 8:int);                                                                                                   |
|     X_182:bat[:int] := mat.packIncrement(X_180:bat[:int], X_133:bat[:int]);                                                                                         |
|     X_183:bat[:int] := mat.packIncrement(X_182:bat[:int], X_134:bat[:int]);                                                                                         |
|     X_184:bat[:int] := mat.packIncrement(X_183:bat[:int], X_135:bat[:int]);                                                                                         |
|     X_185:bat[:int] := mat.packIncrement(X_184:bat[:int], X_136:bat[:int]);                                                                                         |
|     X_186:bat[:int] := mat.packIncrement(X_185:bat[:int], X_137:bat[:int]);                                                                                         |
|     X_187:bat[:int] := mat.packIncrement(X_186:bat[:int], X_138:bat[:int]);                                                                                         |
|     X_15:bat[:int] := mat.packIncrement(X_187:bat[:int], X_139:bat[:int]);                                                                                          |
|     X_106:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 0:int, 8:int);                                                   |
|     X_140:bat[:str] := algebra.projection(C_83:bat[:oid], X_106:bat[:str]);                                                                                         |
|     X_107:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 1:int, 8:int);                                                   |
|     X_141:bat[:str] := algebra.projection(C_85:bat[:oid], X_107:bat[:str]);                                                                                         |
|     X_108:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 2:int, 8:int);                                                   |
|     X_142:bat[:str] := algebra.projection(C_87:bat[:oid], X_108:bat[:str]);                                                                                         |
|     X_109:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 3:int, 8:int);                                                   |
|     X_143:bat[:str] := algebra.projection(C_89:bat[:oid], X_109:bat[:str]);                                                                                         |
|     X_110:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 4:int, 8:int);                                                   |
|     X_144:bat[:str] := algebra.projection(C_91:bat[:oid], X_110:bat[:str]);                                                                                         |
|     X_111:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 5:int, 8:int);                                                   |
|     X_145:bat[:str] := algebra.projection(C_93:bat[:oid], X_111:bat[:str]);                                                                                         |
|     X_112:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 6:int, 8:int);                                                   |
|     X_146:bat[:str] := algebra.projection(C_95:bat[:oid], X_112:bat[:str]);                                                                                         |
|     X_113:bat[:str] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "value":str, 0:int, 7:int, 8:int);                                                   |
|     X_147:bat[:str] := algebra.projection(C_97:bat[:oid], X_113:bat[:str]);                                                                                         |
|     X_189:bat[:str] := mat.packIncrement(X_140:bat[:str], 8:int);                                                                                                   |
|     X_190:bat[:str] := mat.packIncrement(X_189:bat[:str], X_141:bat[:str]);                                                                                         |
|     X_191:bat[:str] := mat.packIncrement(X_190:bat[:str], X_142:bat[:str]);                                                                                         |
|     X_192:bat[:str] := mat.packIncrement(X_191:bat[:str], X_143:bat[:str]);                                                                                         |
|     X_193:bat[:str] := mat.packIncrement(X_192:bat[:str], X_144:bat[:str]);                                                                                         |
|     X_194:bat[:str] := mat.packIncrement(X_193:bat[:str], X_145:bat[:str]);                                                                                         |
|     X_195:bat[:str] := mat.packIncrement(X_194:bat[:str], X_146:bat[:str]);                                                                                         |
|     X_16:bat[:str] := mat.packIncrement(X_195:bat[:str], X_147:bat[:str]);                                                                                          |
|     X_156:bat[:str] := algebra.project(X_132:bat[:int], "SP":str);                                                                                                  |
|     X_157:bat[:str] := algebra.project(X_133:bat[:int], "SP":str);                                                                                                  |
|     X_158:bat[:str] := algebra.project(X_134:bat[:int], "SP":str);                                                                                                  |
|     X_159:bat[:str] := algebra.project(X_135:bat[:int], "SP":str);                                                                                                  |
|     X_160:bat[:str] := algebra.project(X_136:bat[:int], "SP":str);                                                                                                  |
|     X_161:bat[:str] := algebra.project(X_137:bat[:int], "SP":str);                                                                                                  |
|     X_162:bat[:str] := algebra.project(X_138:bat[:int], "SP":str);                                                                                                  |
|     X_163:bat[:str] := algebra.project(X_139:bat[:int], "SP":str);                                                                                                  |
|     X_197:bat[:str] := mat.packIncrement(X_156:bat[:str], 8:int);                                                                                                   |
|     X_198:bat[:str] := mat.packIncrement(X_197:bat[:str], X_157:bat[:str]);                                                                                         |
|     X_199:bat[:str] := mat.packIncrement(X_198:bat[:str], X_158:bat[:str]);                                                                                         |
|     X_200:bat[:str] := mat.packIncrement(X_199:bat[:str], X_159:bat[:str]);                                                                                         |
|     X_201:bat[:str] := mat.packIncrement(X_200:bat[:str], X_160:bat[:str]);                                                                                         |
|     X_202:bat[:str] := mat.packIncrement(X_201:bat[:str], X_161:bat[:str]);                                                                                         |
|     X_203:bat[:str] := mat.packIncrement(X_202:bat[:str], X_162:bat[:str]);                                                                                         |
|     X_20:bat[:str] := mat.packIncrement(X_203:bat[:str], X_163:bat[:str]);                                                                                          |
|     X_164:bat[:bte] := algebra.project(X_132:bat[:int], 1:bte);                                                                                                     |
|     X_165:bat[:bte] := algebra.project(X_133:bat[:int], 1:bte);                                                                                                     |
|     X_166:bat[:bte] := algebra.project(X_134:bat[:int], 1:bte);                                                                                                     |
|     X_167:bat[:bte] := algebra.project(X_135:bat[:int], 1:bte);                                                                                                     |
|     X_168:bat[:bte] := algebra.project(X_136:bat[:int], 1:bte);                                                                                                     |
|     X_169:bat[:bte] := algebra.project(X_137:bat[:int], 1:bte);                                                                                                     |
|     X_170:bat[:bte] := algebra.project(X_138:bat[:int], 1:bte);                                                                                                     |
|     X_171:bat[:bte] := algebra.project(X_139:bat[:int], 1:bte);                                                                                                     |
|     X_205:bat[:bte] := mat.packIncrement(X_164:bat[:bte], 8:int);                                                                                                   |
|     X_206:bat[:bte] := mat.packIncrement(X_205:bat[:bte], X_165:bat[:bte]);                                                                                         |
|     X_207:bat[:bte] := mat.packIncrement(X_206:bat[:bte], X_166:bat[:bte]);                                                                                         |
|     X_208:bat[:bte] := mat.packIncrement(X_207:bat[:bte], X_167:bat[:bte]);                                                                                         |
|     X_209:bat[:bte] := mat.packIncrement(X_208:bat[:bte], X_168:bat[:bte]);                                                                                         |
|     X_210:bat[:bte] := mat.packIncrement(X_209:bat[:bte], X_169:bat[:bte]);                                                                                         |
|     X_211:bat[:bte] := mat.packIncrement(X_210:bat[:bte], X_170:bat[:bte]);                                                                                         |
|     X_23:bat[:bte] := mat.packIncrement(X_211:bat[:bte], X_171:bat[:bte]);                                                                                          |
|     X_116:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 0:int, 8:int);                                                    |
|     X_148:bat[:dbl] := algebra.projection(C_83:bat[:oid], X_116:bat[:dbl]);                                                                                         |
|     X_118:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 1:int, 8:int);                                                    |
|     X_149:bat[:dbl] := algebra.projection(C_85:bat[:oid], X_118:bat[:dbl]);                                                                                         |
|     X_120:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 2:int, 8:int);                                                    |
|     X_150:bat[:dbl] := algebra.projection(C_87:bat[:oid], X_120:bat[:dbl]);                                                                                         |
|     X_122:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 3:int, 8:int);                                                    |
|     X_151:bat[:dbl] := algebra.projection(C_89:bat[:oid], X_122:bat[:dbl]);                                                                                         |
|     X_124:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 4:int, 8:int);                                                    |
|     X_152:bat[:dbl] := algebra.projection(C_91:bat[:oid], X_124:bat[:dbl]);                                                                                         |
|     X_126:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 5:int, 8:int);                                                    |
|     X_153:bat[:dbl] := algebra.projection(C_93:bat[:oid], X_126:bat[:dbl]);                                                                                         |
|     X_128:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 6:int, 8:int);                                                    |
|     X_154:bat[:dbl] := algebra.projection(C_95:bat[:oid], X_128:bat[:dbl]);                                                                                         |
|     X_130:bat[:dbl] := sql.bind(X_4:int, "spinque":str, "tr_0_obj_string":str, "prob":str, 0:int, 7:int, 8:int);                                                    |
|     X_155:bat[:dbl] := algebra.projection(C_97:bat[:oid], X_130:bat[:dbl]);                                                                                         |
|     X_213:bat[:dbl] := mat.packIncrement(X_148:bat[:dbl], 8:int);                                                                                                   |
|     X_215:bat[:dbl] := mat.packIncrement(X_213:bat[:dbl], X_149:bat[:dbl]);                                                                                         |
|     X_216:bat[:dbl] := mat.packIncrement(X_215:bat[:dbl], X_150:bat[:dbl]);                                                                                         |
|     X_217:bat[:dbl] := mat.packIncrement(X_216:bat[:dbl], X_151:bat[:dbl]);                                                                                         |
|     X_218:bat[:dbl] := mat.packIncrement(X_217:bat[:dbl], X_152:bat[:dbl]);                                                                                         |
|     X_219:bat[:dbl] := mat.packIncrement(X_218:bat[:dbl], X_153:bat[:dbl]);                                                                                         |
|     X_220:bat[:dbl] := mat.packIncrement(X_219:bat[:dbl], X_154:bat[:dbl]);                                                                                         |
|     X_17:bat[:dbl] := mat.packIncrement(X_220:bat[:dbl], X_155:bat[:dbl]);                                                                                          |
|     (X_24:bat[:int], X_26:bat[:str], X_27:bat[:dbl]) := batspinque.UTF8tokenize_v0(X_15:bat[:int], X_16:bat[:str], X_20:bat[:str], X_23:bat[:bte], X_17:bat[:dbl]); |
|     language.pass(X_132:bat[:int]);                                                                                                                                 |
|     language.pass(X_133:bat[:int]);                                                                                                                                 |
|     language.pass(X_134:bat[:int]);                                                                                                                                 |
|     language.pass(X_135:bat[:int]);                                                                                                                                 |
|     language.pass(X_136:bat[:int]);                                                                                                                                 |
|     language.pass(X_137:bat[:int]);                                                                                                                                 |
|     language.pass(X_138:bat[:int]);                                                                                                                                 |
|     language.pass(X_139:bat[:int]);                                                                                                                                 |
|     language.pass(C_83:bat[:oid]);                                                                                                                                  |
|     language.pass(C_85:bat[:oid]);                                                                                                                                  |
|     language.pass(C_87:bat[:oid]);                                                                                                                                  |
|     language.pass(C_89:bat[:oid]);                                                                                                                                  |
|     language.pass(C_91:bat[:oid]);                                                                                                                                  |
|     language.pass(C_93:bat[:oid]);                                                                                                                                  |
|     language.pass(C_95:bat[:oid]);                                                                                                                                  |
|     language.pass(C_97:bat[:oid]);                                                                                                                                  |
| exit X_223:bit;                                                                                                                                                     |
|     sql.resultSet(X_29:bat[:str], X_30:bat[:str], X_31:bat[:str], X_32:bat[:int], X_33:bat[:int], X_24:bat[:int], X_26:bat[:str], X_27:bat[:dbl]);                  |
| end user.main;                                                                                                                                                      |

swingbit avatar Mar 24 '21 11:03 swingbit

Running table-returning UDFs in parallel seems tricky, that's why is not optimal yet. Maybe we could add a mergetable bailout on these cases.

PedroTadim avatar Mar 24 '21 16:03 PedroTadim

Can I hope for actually getting it to run in parallel? That would be awesome. I mean, I understand the concerns about thread-safety, but after all if one uses a UDF he's also supposed to know whether it's thread-safe or not. So if it wasn't safe and returns wrong results, it's just the user's fault, I don't see an extreme need for protecting him from this. I could easily write and use a UDF that crashes the entire server, and nothing protects me from doing that.

swingbit avatar Mar 24 '21 16:03 swingbit