bayeslite
bayeslite copied to clipboard
engine.compose_cgpm with multiprocessing fails with vscgpm
Using A Causal Probabilistic Program & Non-Parametric Bayes To Model Kepler's Law
In this notebook, we are going to write a custom probabilistic program in VentureScript to implement at CGPM which models the conditional distribution of period_minutes
given apogee_km
and perigee_km
. In particular, we are going to (non-parametrically) learn a clustering of satellites, based on the magnitude of deviation of their actual period_minutes
(given their apogee_km
and perigee_km
) to their theoretical period_minutes
implied by Kepler's Law.
Our database is the satellites dataset from the Union of Concerned Scientists as the population of interest.
Prepare the notebook and .bdb
file.
%load_ext iventure.magics
%matplotlib inline
The iventure.magics extension is already loaded. To reload it, use:
%reload_ext iventure.magics
import os;
if os.path.exists('bdbs/satellites_kepler.bdb'):
os.remove('bdbs/satellites_kepler.bdb')
%bayesdb -j bdbs/satellites_kepler.bdb
u'Loaded: bdbs/satellites_kepler.bdb'
Create a table satellites_t
containing satellite data records from "satellites.csv".
%%mml
CREATE TABLE satellites_ucs FROM '../../resources/satellites.csv'
.nullify satellites_ucs 'NaN'
%%mml
CREATE POPULATION satellites FOR satellites_ucs WITH SCHEMA { GUESS STATTYPES FOR (*) };
Write a VentureScript program to represent the CGPM for period | apogee, perigee. Also expose the latent variables of the program, namely the cluster identity of each satellites and its deviation from the "true" Keplerian period.
%%venturescript
// Kepler CGPM.
define kepler = () -> {
// Kepler's law.
assume keplers_law = (apogee, perigee) -> {
let GM = 398600.4418;
let earth_radius = 6378;
let a = (abs(apogee) + abs(perigee)) *
0.5 + earth_radius;
2 * 3.1415 * sqrt(a**3 / GM) / 60
};
// Internal samplers.
assume crp_alpha = .5;
assume cluster_sampler = make_crp(crp_alpha);
assume error_sampler = mem((cluster) ->
make_nig_normal(1, 1, 1, 1));
// Output simulators.
assume sim_cluster_id =
mem((rowid, apogee, perigee) ~> {
tag(atom(rowid), atom(1), cluster_sampler())
});
assume sim_error =
mem((rowid, apogee, perigee) ~> {
let cluster_id = sim_cluster_id(
rowid, apogee, perigee);
tag(atom(rowid), atom(2),
error_sampler(cluster_id)())
});
assume sim_period =
mem((rowid, apogee, perigee) ~> {
keplers_law(apogee, perigee) +
sim_error(rowid, apogee, perigee)
});
// List of simulators.
assume simulators = [
sim_period, sim_cluster_id, sim_error];
};
// Output observers.
define obs_cluster_id =
(rowid, apogee, perigee, value, label) -> {
$label: observe sim_cluster_id(
$rowid, $apogee, $perigee) = atom(value);
};
define obs_error =
(rowid, apogee, perigee, value, label) -> {
$label: observe sim_error(
$rowid, $apogee, $perigee) = value;
};
define obs_period =
(rowid, apogee, perigee, value, label) -> {
let theoretical_period = run(
sample keplers_law($apogee, $perigee));
obs_error(
rowid, apogee, perigee,
value - theoretical_period, label);
};
// List of observers.
define observers = [
obs_period, obs_cluster_id, obs_error];
// List of inputs.
define inputs = ["apogee", "perigee"];
// Transition operator.
define transition = (N) -> {mh(default, one, N)};
MML program for a create a hybrid CGPM, which composes crosscat with the kepler
VentureScript defined in the cell above, as well as other CGPMs available in the cgpm
library.
%%mml
CREATE GENERATOR satellites_hybrid FOR satellites WITH BASELINE crosscat(
OVERRIDE GENERATIVE MODEL FOR period_minutes
GIVEN apogee_km, perigee_km
AND EXPOSE
kepler_cluster CATEGORICAL,
kepler_residual NUMERICAL
USING
venturescript(mode=venture_script, sp=kepler);
SUBSAMPLE 10
);
Initialize a model and run inference transitions.
%mml INITIALIZE 1 MODELS FOR satellites_hybrid;
9
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-32-4136f672b2c0> in <module>()
----> 1 get_ipython().magic(u'mml INITIALIZE 1 MODELS FOR satellites_hybrid;')
/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
2144 magic_name, _, magic_arg_s = arg_s.partition(' ')
2145 magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2146 return self.run_line_magic(magic_name, magic_arg_s)
2147
2148 #-------------------------------------------------------------------------
/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
2065 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
2066 with self.builtin_trap:
-> 2067 result = fn(*args,**kwargs)
2068 return result
2069
/scratch/fs/iventure/iventure/magics.py in logged_cell_wrapper(self, line, cell)
145 raw = self._retrieve_raw(line, cell)
146 try:
--> 147 output = func(self, line, cell)
148 except:
149 exception = traceback.format_exc()
<decorator-gen-127> in mml(self, line, cell)
/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
186 # but it's overkill for just that one bit of state.
187 def magic_deco(arg):
--> 188 call = lambda f, *a, **k: f(*a, **k)
189
190 if callable(arg):
/scratch/fs/iventure/iventure/magics.py in mml(self, line, cell)
289 return self._cmd(cmd_q)
290 if bql_q:
--> 291 return self._bql(bql_q)
292
293 @logged_cell
/scratch/fs/iventure/iventure/magics.py in _bql(self, lines)
320 if out.getvalue() and bql_string_complete_p(out.getvalue()):
321 ok = True
--> 322 cursor = self._bdb.execute(out.getvalue())
323 return bqu.cursor_to_df(cursor)
324
/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in execute(self, string, bindings)
213 bindings = ()
214 return self._maybe_trace(
--> 215 self.tracer, self._do_execute, string, bindings)
216
217 def _maybe_trace(self, tracer, meth, string, bindings):
/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _maybe_trace(self, tracer, meth, string, bindings)
221 if tracer:
222 tracer(string, bindings)
--> 223 return meth(string, bindings)
224
225 def _qid(self):
/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bayesdb.py in _do_execute(self, string, bindings)
262 else:
263 raise ValueError('>1 phrase in string')
--> 264 cursor = bql.execute_phrase(self, phrase, bindings)
265 return self._empty_cursor if cursor is None else cursor
266
/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/bql.py in execute_phrase(bdb, phrase, bindings)
490 # Do metamodel-specific initialization.
491 metamodel = core.bayesdb_generator_metamodel(bdb, generator_id)
--> 492 metamodel.initialize_models(bdb, generator_id, modelnos)
493 return empty_cursor(bdb)
494
/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/metamodels/cgpm_metamodel.py in initialize_models(self, bdb, generator_id, modelnos)
200 cgpms = [self._initialize_cgpm(bdb, generator_id, cgpm_ext)
201 for _ in xrange(n)]
--> 202 engine.compose_cgpm(cgpms, multiprocess=self._multiprocess)
203
204 # Store the newly initialized engine.
/scratch/fs/cgpm/cgpm/crosscat/engine.py in compose_cgpm(self, cgpms, multiprocess)
132 ())
133 for i in xrange(self.num_states())]
--> 134 self.states = mapper(_compose, args)
135
136 def logpdf(self, rowid, query, evidence=None, accuracy=None, multiprocess=1):
/scratch/fs/cgpm/cgpm/utils/parallel_map.py in parallel_map(f, l, parallelism)
97 while 0 < ctr[0]:
98 j = le32dec(os.read(retq_rd, 4))
---> 99 process_output(fl, ctr, outq[j][0].recv())
100
101 # Cancel all the worker processes.
/scratch/fs/cgpm/cgpm/utils/parallel_map.py in process_output(fl, ctr, output)
60 (i, ok, fx) = output
61 if not ok:
---> 62 raise RuntimeError('Subprocess failed: %s' % (fx,))
63 fl[i] = fx
64 ctr[0] -= 1
RuntimeError: Subprocess failed: Traceback (most recent call last):
File "/scratch/fs/cgpm/cgpm/utils/parallel_map.py", line 55, in process_input
outq_wr.send((i, ok, fx))
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed