summaryrefslogtreecommitdiffstats
path: root/flow/gsl/gslopmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'flow/gsl/gslopmaster.c')
-rw-r--r--flow/gsl/gslopmaster.c783
1 files changed, 783 insertions, 0 deletions
diff --git a/flow/gsl/gslopmaster.c b/flow/gsl/gslopmaster.c
new file mode 100644
index 0000000..f71ec3d
--- /dev/null
+++ b/flow/gsl/gslopmaster.c
@@ -0,0 +1,783 @@
+/* GSL Engine - Flow module operation engine
+ * Copyright (C) 2001 Tim Janik
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+#include <string.h>
+#include <sys/poll.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include "gslopmaster.h"
+
+#include "gslcommon.h"
+#include "gslopnode.h"
+#include "gsloputil.h"
+#include "gslopschedule.h"
+#include "gslieee754.h"
+
+
+
+#define NODE_FLAG_RECONNECT(node) G_STMT_START { (node)->reconnected = (node)->module.klass->reconnect != NULL; } G_STMT_END
+
+
+/* --- time stamping (debugging) --- */
+#define ToyprofStamp struct timeval
+#define toyprof_clock_name() ("Glibc gettimeofday(2)")
+#define toyprof_stampinit() /* nothing */
+#define toyprof_stamp(st) gettimeofday (&(st), 0)
+#define toyprof_stamp_ticks() (1000000)
+static inline guint64
+toyprof_elapsed (ToyprofStamp fstamp,
+ ToyprofStamp lstamp)
+{
+ guint64 first = fstamp.tv_sec * toyprof_stamp_ticks () + fstamp.tv_usec;
+ guint64 last = lstamp.tv_sec * toyprof_stamp_ticks () + lstamp.tv_usec;
+ return last - first;
+}
+
+
+/* --- typedefs & structures --- */
+typedef struct _Poll Poll;
+struct _Poll
+{
+ Poll *next;
+ GslPollFunc poll_func;
+ gpointer data;
+ guint n_fds;
+ GPollFD *fds;
+ GslFreeFunc free_func;
+};
+
+
+/* --- prototypes --- */
+static void master_schedule_discard (void);
+
+
+/* --- variables --- */
+static gboolean master_need_reflow = FALSE;
+static gboolean master_need_process = FALSE;
+static EngineNode *master_consumer_list = NULL;
+const gfloat gsl_engine_master_zero_block[GSL_STREAM_MAX_VALUES] = { 0, }; /* FIXME */
+static Poll *master_poll_list = NULL;
+static guint master_n_pollfds = 0;
+static guint master_pollfds_changed = FALSE;
+static GPollFD master_pollfds[GSL_ENGINE_MAX_POLLFDS];
+static EngineSchedule *master_schedule = NULL;
+
+
+/* --- functions --- */
+static void
+add_consumer (EngineNode *node)
+{
+ g_return_if_fail (ENGINE_NODE_IS_CONSUMER (node) && node->toplevel_next == NULL && node->integrated);
+
+ node->toplevel_next = master_consumer_list;
+ master_consumer_list = node;
+}
+
+static void
+remove_consumer (EngineNode *node)
+{
+ EngineNode *tmp, *last = NULL;
+
+ g_return_if_fail (!ENGINE_NODE_IS_CONSUMER (node) || !node->integrated);
+
+ for (tmp = master_consumer_list; tmp; last = tmp, tmp = last->toplevel_next)
+ if (tmp == node)
+ break;
+ g_return_if_fail (tmp != NULL);
+ if (last)
+ last->toplevel_next = node->toplevel_next;
+ else
+ master_consumer_list = node->toplevel_next;
+ node->toplevel_next = NULL;
+}
+
+static void
+master_idisconnect_node (EngineNode *node,
+ guint istream)
+{
+ EngineNode *src_node = node->inputs[istream].src_node;
+ guint ostream = node->inputs[istream].src_stream;
+ gboolean was_consumer;
+
+ g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) &&
+ src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */
+
+ node->inputs[istream].src_node = NULL;
+ node->inputs[istream].src_stream = ~0;
+ node->module.istreams[istream].connected = FALSE;
+ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
+ src_node->outputs[ostream].n_outputs -= 1;
+ src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0;
+ src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node);
+ NODE_FLAG_RECONNECT (node);
+ NODE_FLAG_RECONNECT (src_node);
+ /* add to consumer list */
+ if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node))
+ add_consumer (src_node);
+}
+
+static void
+master_jdisconnect_node (EngineNode *node,
+ guint jstream,
+ guint con)
+{
+ EngineNode *src_node = node->jinputs[jstream][con].src_node;
+ guint i, ostream = node->jinputs[jstream][con].src_stream;
+ gboolean was_consumer;
+
+ g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) &&
+ node->module.jstreams[jstream].n_connections > 0 &&
+ src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */
+
+ i = --node->module.jstreams[jstream].n_connections;
+ node->jinputs[jstream][con] = node->jinputs[jstream][i];
+ node->module.jstreams[jstream].values[i] = NULL; /* float**values 0-termination */
+ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
+ src_node->outputs[ostream].n_outputs -= 1;
+ src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0;
+ src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node);
+ NODE_FLAG_RECONNECT (node);
+ NODE_FLAG_RECONNECT (src_node);
+ /* add to consumer list */
+ if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node))
+ add_consumer (src_node);
+}
+
+static void
+master_disconnect_node_outputs (EngineNode *src_node,
+ EngineNode *dest_node)
+{
+ gint i, j;
+
+ for (i = 0; i < ENGINE_NODE_N_ISTREAMS (dest_node); i++)
+ if (dest_node->inputs[i].src_node == src_node)
+ master_idisconnect_node (dest_node, i);
+ for (j = 0; j < ENGINE_NODE_N_JSTREAMS (dest_node); j++)
+ for (i = 0; i < dest_node->module.jstreams[j].n_connections; i++)
+ if (dest_node->jinputs[j][i].src_node == src_node)
+ master_jdisconnect_node (dest_node, j, i--);
+}
+
+static void
+master_process_job (GslJob *job)
+{
+ switch (job->job_id)
+ {
+ EngineNode *node, *src_node;
+ Poll *poll, *poll_last;
+ guint istream, jstream, ostream, con;
+ EngineFlowJob *fjob;
+ gboolean was_consumer;
+ case ENGINE_JOB_INTEGRATE:
+ node = job->data.node;
+ JOB_DEBUG ("integrate(%p)", node);
+ g_return_if_fail (node->integrated == FALSE);
+ g_return_if_fail (node->sched_tag == FALSE);
+ _engine_mnl_integrate (node);
+ if (ENGINE_NODE_IS_CONSUMER (node))
+ add_consumer (node);
+ node->counter = 0;
+ NODE_FLAG_RECONNECT (node);
+ master_need_reflow |= TRUE;
+ break;
+ case ENGINE_JOB_DISCARD:
+ /* FIXME: free pending flow jobs */
+ node = job->data.node;
+ JOB_DEBUG ("discard(%p)", node);
+ g_return_if_fail (node->integrated == TRUE);
+ /* disconnect inputs */
+ for (istream = 0; istream < ENGINE_NODE_N_ISTREAMS (node); istream++)
+ if (node->inputs[istream].src_node)
+ master_idisconnect_node (node, istream);
+ for (jstream = 0; jstream < ENGINE_NODE_N_JSTREAMS (node); jstream++)
+ while (node->module.jstreams[jstream].n_connections)
+ master_jdisconnect_node (node, jstream, node->module.jstreams[jstream].n_connections - 1);
+ /* disconnect outputs */
+ while (node->output_nodes)
+ master_disconnect_node_outputs (node, node->output_nodes->data);
+ /* remove from consumer list */
+ if (ENGINE_NODE_IS_CONSUMER (node))
+ {
+ _engine_mnl_remove (node);
+ remove_consumer (node);
+ }
+ else
+ _engine_mnl_remove (node);
+ node->counter = 0;
+ master_need_reflow |= TRUE;
+ master_schedule_discard (); /* discard schedule so node may be freed */
+ break;
+ case ENGINE_JOB_SET_CONSUMER:
+ case ENGINE_JOB_UNSET_CONSUMER:
+ node = job->data.node;
+ JOB_DEBUG ("toggle_consumer(%p)", node);
+ was_consumer = ENGINE_NODE_IS_CONSUMER (node);
+ node->is_consumer = job->job_id == ENGINE_JOB_SET_CONSUMER;
+ if (was_consumer != ENGINE_NODE_IS_CONSUMER (node))
+ {
+ if (ENGINE_NODE_IS_CONSUMER (node))
+ add_consumer (node);
+ else
+ remove_consumer (node);
+ master_need_reflow |= TRUE;
+ }
+ break;
+ case ENGINE_JOB_ICONNECT:
+ node = job->data.connection.dest_node;
+ src_node = job->data.connection.src_node;
+ istream = job->data.connection.dest_ijstream;
+ ostream = job->data.connection.src_ostream;
+ JOB_DEBUG ("connect(%p,%u,%p,%u)", node, istream, src_node, ostream);
+ g_return_if_fail (node->integrated == TRUE);
+ g_return_if_fail (src_node->integrated == TRUE);
+ g_return_if_fail (node->inputs[istream].src_node == NULL);
+ node->inputs[istream].src_node = src_node;
+ node->inputs[istream].src_stream = ostream;
+ node->module.istreams[istream].connected = TRUE;
+ /* remove from consumer list */
+ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
+ src_node->outputs[ostream].n_outputs += 1;
+ src_node->module.ostreams[ostream].connected = TRUE;
+ src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node);
+ NODE_FLAG_RECONNECT (node);
+ NODE_FLAG_RECONNECT (src_node);
+ src_node->counter = 0; /* FIXME: counter reset? */
+ if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node))
+ remove_consumer (src_node);
+ master_need_reflow |= TRUE;
+ break;
+ case ENGINE_JOB_JCONNECT:
+ node = job->data.connection.dest_node;
+ src_node = job->data.connection.src_node;
+ jstream = job->data.connection.dest_ijstream;
+ ostream = job->data.connection.src_ostream;
+ JOB_DEBUG ("jconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream);
+ g_return_if_fail (node->integrated == TRUE);
+ g_return_if_fail (src_node->integrated == TRUE);
+ con = node->module.jstreams[jstream].n_connections++;
+ node->jinputs[jstream] = g_renew (EngineJInput, node->jinputs[jstream], node->module.jstreams[jstream].n_connections);
+ node->module.jstreams[jstream].values = g_renew (const gfloat*, node->module.jstreams[jstream].values, node->module.jstreams[jstream].n_connections + 1);
+ node->module.jstreams[jstream].values[node->module.jstreams[jstream].n_connections] = NULL; /* float**values 0-termination */
+ node->jinputs[jstream][con].src_node = src_node;
+ node->jinputs[jstream][con].src_stream = ostream;
+ /* remove from consumer list */
+ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
+ src_node->outputs[ostream].n_outputs += 1;
+ src_node->module.ostreams[ostream].connected = TRUE;
+ src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node);
+ NODE_FLAG_RECONNECT (node);
+ NODE_FLAG_RECONNECT (src_node);
+ src_node->counter = 0; /* FIXME: counter reset? */
+ if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node))
+ remove_consumer (src_node);
+ master_need_reflow |= TRUE;
+ break;
+ case ENGINE_JOB_IDISCONNECT:
+ node = job->data.connection.dest_node;
+ JOB_DEBUG ("idisconnect(%p,%u)", node, job->data.connection.dest_ijstream);
+ g_return_if_fail (node->integrated == TRUE);
+ g_return_if_fail (node->inputs[job->data.connection.dest_ijstream].src_node != NULL);
+ master_idisconnect_node (node, job->data.connection.dest_ijstream);
+ master_need_reflow |= TRUE;
+ break;
+ case ENGINE_JOB_JDISCONNECT:
+ node = job->data.connection.dest_node;
+ jstream = job->data.connection.dest_ijstream;
+ src_node = job->data.connection.src_node;
+ ostream = job->data.connection.src_ostream;
+ JOB_DEBUG ("jdisconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream);
+ g_return_if_fail (node->integrated == TRUE);
+ g_return_if_fail (node->module.jstreams[jstream].n_connections > 0);
+ for (con = 0; con < node->module.jstreams[jstream].n_connections; con++)
+ if (node->jinputs[jstream][con].src_node == src_node &&
+ node->jinputs[jstream][con].src_stream == ostream)
+ break;
+ if (con < node->module.jstreams[jstream].n_connections)
+ {
+ master_jdisconnect_node (node, jstream, con);
+ master_need_reflow |= TRUE;
+ }
+ else
+ g_warning ("jdisconnect(dest:%p,%u,src:%p,%u): no such connection", node, jstream, src_node, ostream);
+ break;
+ case ENGINE_JOB_ACCESS:
+ node = job->data.access.node;
+ JOB_DEBUG ("access node(%p): %p(%p)", node, job->data.access.access_func, job->data.access.data);
+ g_return_if_fail (node->integrated == TRUE);
+ job->data.access.access_func (&node->module, job->data.access.data);
+ break;
+ case ENGINE_JOB_FLOW_JOB:
+ node = job->data.flow_job.node;
+ fjob = job->data.flow_job.fjob;
+ JOB_DEBUG ("add flow_job(%p,%p)", node, fjob);
+ g_return_if_fail (node->integrated == TRUE);
+ job->data.flow_job.fjob = NULL; /* ownership taken over */
+ _engine_node_insert_flow_job (node, fjob);
+ _engine_mnl_reorder (node);
+ break;
+ case ENGINE_JOB_DEBUG:
+ JOB_DEBUG ("debug");
+ g_printerr ("JOB-DEBUG: %s\n", job->data.debug);
+ break;
+ case ENGINE_JOB_ADD_POLL:
+ JOB_DEBUG ("add poll %p(%p,%u)", job->data.poll.poll_func, job->data.poll.data, job->data.poll.n_fds);
+ if (job->data.poll.n_fds + master_n_pollfds > GSL_ENGINE_MAX_POLLFDS)
+ g_error ("adding poll job exceeds maximum number of poll-fds (%u > %u)",
+ job->data.poll.n_fds + master_n_pollfds, GSL_ENGINE_MAX_POLLFDS);
+ poll = gsl_new_struct0 (Poll, 1);
+ poll->poll_func = job->data.poll.poll_func;
+ poll->data = job->data.poll.data;
+ poll->free_func = job->data.poll.free_func;
+ job->data.poll.free_func = NULL; /* don't free data this round */
+ poll->n_fds = job->data.poll.n_fds;
+ poll->fds = poll->n_fds ? master_pollfds + master_n_pollfds : master_pollfds;
+ master_n_pollfds += poll->n_fds;
+ if (poll->n_fds)
+ master_pollfds_changed = TRUE;
+ memcpy (poll->fds, job->data.poll.fds, sizeof (poll->fds[0]) * poll->n_fds);
+ poll->next = master_poll_list;
+ master_poll_list = poll;
+ break;
+ case ENGINE_JOB_REMOVE_POLL:
+ JOB_DEBUG ("remove poll %p(%p)", job->data.poll.poll_func, job->data.poll.data);
+ for (poll = master_poll_list, poll_last = NULL; poll; poll_last = poll, poll = poll_last->next)
+ if (poll->poll_func == job->data.poll.poll_func && poll->data == job->data.poll.data)
+ {
+ if (poll_last)
+ poll_last->next = poll->next;
+ else
+ master_poll_list = poll->next;
+ break;
+ }
+ if (poll)
+ {
+ job->data.poll.free_func = poll->free_func; /* free data with job */
+ poll_last = poll;
+ if (poll_last->n_fds)
+ {
+ for (poll = master_poll_list; poll; poll = poll->next)
+ if (poll->fds > poll_last->fds)
+ poll->fds -= poll_last->n_fds;
+ g_memmove (poll_last->fds, poll_last->fds + poll_last->n_fds,
+ ((guint8*) (master_pollfds + master_n_pollfds)) -
+ ((guint8*) (poll_last->fds + poll_last->n_fds)));
+ master_n_pollfds -= poll_last->n_fds;
+ master_pollfds_changed = TRUE;
+ }
+ gsl_delete_struct (Poll, poll_last);
+ }
+ else
+ g_warning (G_STRLOC ": failed to remove unknown poll function %p(%p)",
+ job->data.poll.poll_func, job->data.poll.data);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+ JOB_DEBUG ("done");
+}
+
+static void
+master_poll_check (glong *timeout_p,
+ gboolean check_with_revents)
+{
+ gboolean need_processing = FALSE;
+ Poll *poll;
+
+ if (master_need_process || *timeout_p == 0)
+ {
+ master_need_process = TRUE;
+ return;
+ }
+ for (poll = master_poll_list; poll; poll = poll->next)
+ {
+ glong timeout = -1;
+
+ if (poll->poll_func (poll->data, gsl_engine_block_size (), &timeout,
+ poll->n_fds, poll->n_fds ? poll->fds : NULL, check_with_revents)
+ || timeout == 0)
+ {
+ need_processing |= TRUE;
+ *timeout_p = 0;
+ break;
+ }
+ else if (timeout > 0)
+ *timeout_p = *timeout_p < 0 ? timeout : MIN (*timeout_p, timeout);
+ }
+ master_need_process = need_processing;
+}
+
+static inline guint64
+master_handle_flow_jobs (EngineNode *node,
+ guint64 max_tick)
+{
+ EngineFlowJob *fjob = _engine_node_pop_flow_job (node, max_tick);
+
+ if_reject (fjob)
+ do
+ {
+ g_printerr ("FJob: at:%lld from:%lld \n", node->counter, fjob->any.tick_stamp);
+ switch (fjob->fjob_id)
+ {
+ case ENGINE_FLOW_JOB_ACCESS:
+ fjob->access.access_func (&node->module, fjob->access.data);
+ break;
+ default:
+ g_assert_not_reached (); /* FIXME */
+ }
+ fjob = _engine_node_pop_flow_job (node, max_tick);
+ }
+ while (fjob);
+
+ return _engine_node_peek_flow_job_stamp (node);
+}
+
+static void
+master_process_locked_node (EngineNode *node,
+ guint n_values)
+{
+ guint64 final_counter = GSL_TICK_STAMP + n_values;
+
+ while (node->counter < final_counter)
+ {
+ guint64 next_counter = master_handle_flow_jobs (node, node->counter);
+ guint64 new_counter = MIN (next_counter, final_counter);
+ guint i, j, diff = node->counter - GSL_TICK_STAMP;
+
+ for (i = 0; i < ENGINE_NODE_N_ISTREAMS (node); i++)
+ {
+ EngineNode *inode = node->inputs[i].src_node;
+
+ if (inode)
+ {
+ ENGINE_NODE_LOCK (inode);
+ if (inode->counter < final_counter)
+ master_process_locked_node (inode, final_counter - node->counter);
+ node->module.istreams[i].values = inode->outputs[node->inputs[i].src_stream].buffer;
+ node->module.istreams[i].values += diff;
+ ENGINE_NODE_UNLOCK (inode);
+ }
+ else
+ node->module.istreams[i].values = gsl_engine_master_zero_block;
+ }
+ for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++)
+ for (i = 0; i < node->module.jstreams[j].n_connections; i++)
+ {
+ EngineNode *inode = node->jinputs[j][i].src_node;
+
+ ENGINE_NODE_LOCK (inode);
+ if (inode->counter < final_counter)
+ master_process_locked_node (inode, final_counter - node->counter);
+ node->module.jstreams[j].values[i] = inode->outputs[node->jinputs[j][i].src_stream].buffer;
+ node->module.jstreams[j].values[i] += diff;
+ ENGINE_NODE_UNLOCK (inode);
+ }
+ for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++)
+ node->module.ostreams[i].values = node->outputs[i].buffer + diff;
+ if_reject (node->reconnected)
+ {
+ node->module.klass->reconnect (&node->module);
+ node->reconnected = FALSE;
+ }
+ node->module.klass->process (&node->module, new_counter - node->counter);
+ for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++)
+ {
+ /* FIXME: this takes the worst possible performance hit to support virtualization */
+ if (node->module.ostreams[i].values != node->outputs[i].buffer + diff)
+ memcpy (node->outputs[i].buffer + diff, node->module.ostreams[i].values,
+ (new_counter - node->counter) * sizeof (gfloat));
+ }
+ node->counter = new_counter;
+ }
+}
+
+static GslLong gsl_profile_modules = 0; /* set to 1 in gdb to get profile output */
+
+static void
+master_process_flow (void)
+{
+ guint64 new_counter = GSL_TICK_STAMP + gsl_engine_block_size ();
+ GslLong profile_maxtime = 0;
+ GslLong profile_modules = gsl_profile_modules;
+ EngineNode *profile_node = NULL;
+
+ g_return_if_fail (master_need_process == TRUE);
+
+ g_assert (gsl_fpu_okround () == TRUE);
+
+ MAS_DEBUG ("process_flow");
+ if (master_schedule)
+ {
+ EngineNode *node;
+
+ _engine_schedule_restart (master_schedule);
+ _engine_set_schedule (master_schedule);
+
+ node = _engine_pop_unprocessed_node ();
+ while (node)
+ {
+ ToyprofStamp profile_stamp1, profile_stamp2;
+
+ if_reject (profile_modules)
+ toyprof_stamp (profile_stamp1);
+
+ master_process_locked_node (node, gsl_engine_block_size ());
+
+ if_reject (profile_modules)
+ {
+ GslLong duration;
+
+ toyprof_stamp (profile_stamp2);
+ duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
+ if (duration > profile_maxtime)
+ {
+ profile_maxtime = duration;
+ profile_node = node;
+ }
+ }
+
+ _engine_push_processed_node (node);
+ node = _engine_pop_unprocessed_node ();
+ }
+
+ if_reject (profile_modules)
+ {
+ if (profile_node)
+ {
+ if (profile_maxtime > profile_modules)
+ g_print ("Excess Node: %p Duration: %lu usecs ((void(*)())%p) \n",
+ profile_node, profile_maxtime, profile_node->module.klass->process);
+ else
+ g_print ("Slowest Node: %p Duration: %lu usecs ((void(*)())%p) \r",
+ profile_node, profile_maxtime, profile_node->module.klass->process);
+ }
+ }
+
+ /* walk unscheduled nodes which have flow jobs */
+ node = _engine_mnl_head ();
+ while (node && GSL_MNL_HEAD_NODE (node))
+ {
+ EngineNode *tmp = node->mnl_next;
+ EngineFlowJob *fjob = _engine_node_pop_flow_job (node, new_counter);
+
+ if (fjob)
+ {
+ while (fjob)
+ {
+ g_printerr ("ignoring flow_job %p\n", fjob);
+ fjob = _engine_node_pop_flow_job (node, new_counter);
+ }
+ _engine_mnl_reorder (node);
+ }
+ node = tmp;
+ }
+
+ /* nothing new to process, wait on slaves */
+ _engine_wait_on_unprocessed ();
+
+ _engine_unset_schedule (master_schedule);
+ _gsl_tick_stamp_inc ();
+ _engine_recycle_const_values ();
+ }
+ master_need_process = FALSE;
+}
+
+static void
+master_reschedule_flow (void)
+{
+ EngineNode *node;
+
+ g_return_if_fail (master_need_reflow == TRUE);
+
+ MAS_DEBUG ("flow_reschedule");
+ if (!master_schedule)
+ master_schedule = _engine_schedule_new ();
+ else
+ {
+ _engine_schedule_unsecure (master_schedule);
+ _engine_schedule_clear (master_schedule);
+ }
+ for (node = master_consumer_list; node; node = node->toplevel_next)
+ _engine_schedule_consumer_node (master_schedule, node);
+ _engine_schedule_secure (master_schedule);
+ master_need_reflow = FALSE;
+}
+
+static void
+master_schedule_discard (void)
+{
+ g_return_if_fail (master_need_reflow == TRUE);
+
+ if (master_schedule)
+ {
+ _engine_schedule_unsecure (master_schedule);
+ _engine_schedule_destroy (master_schedule);
+ master_schedule = NULL;
+ }
+}
+
+
+/* --- MasterThread main loop --- */
+gboolean
+_engine_master_prepare (GslEngineLoop *loop)
+{
+ gboolean need_dispatch;
+ guint i;
+
+ g_return_val_if_fail (loop != NULL, FALSE);
+
+ /* setup and clear pollfds here already, so master_poll_check() gets no junk (and IRIX can't handle non-0 revents) */
+ loop->fds_changed = master_pollfds_changed;
+ master_pollfds_changed = FALSE;
+ loop->n_fds = master_n_pollfds;
+ loop->fds = master_pollfds;
+ for (i = 0; i < loop->n_fds; i++)
+ loop->fds[i].revents = 0;
+ loop->revents_filled = FALSE;
+
+ loop->timeout = -1;
+ /* cached checks first */
+ need_dispatch = master_need_reflow || master_need_process;
+ /* lengthy query */
+ if (!need_dispatch)
+ need_dispatch = _engine_job_pending ();
+ /* invoke custom poll checks */
+ if (!need_dispatch)
+ {
+ master_poll_check (&loop->timeout, FALSE);
+ need_dispatch = master_need_process;
+ }
+ if (need_dispatch)
+ loop->timeout = 0;
+
+ MAS_DEBUG ("PREPARE: need_dispatch=%u timeout=%6ld n_fds=%u",
+ need_dispatch,
+ loop->timeout, loop->n_fds);
+
+ return need_dispatch;
+}
+
+gboolean
+_engine_master_check (const GslEngineLoop *loop)
+{
+ gboolean need_dispatch;
+
+ g_return_val_if_fail (loop != NULL, FALSE);
+ g_return_val_if_fail (loop->n_fds == master_n_pollfds, FALSE);
+ g_return_val_if_fail (loop->fds == master_pollfds, FALSE);
+ if (loop->n_fds)
+ g_return_val_if_fail (loop->revents_filled == TRUE, FALSE);
+
+ /* cached checks first */
+ need_dispatch = master_need_reflow || master_need_process;
+ /* lengthy query */
+ if (!need_dispatch)
+ need_dispatch = _engine_job_pending ();
+ /* invoke custom poll checks */
+ if (!need_dispatch)
+ {
+ glong dummy = -1;
+
+ master_poll_check (&dummy, TRUE);
+ need_dispatch = master_need_process;
+ }
+
+ MAS_DEBUG ("CHECK: need_dispatch=%u", need_dispatch);
+
+ return need_dispatch;
+}
+
+void
+_engine_master_dispatch_jobs (void)
+{
+ GslJob *job;
+
+ job = _engine_pop_job ();
+ while (job)
+ {
+ master_process_job (job);
+ job = _engine_pop_job (); /* have to process _all_ jobs */
+ }
+}
+
+void
+_engine_master_dispatch (void)
+{
+ /* processing has prime priority, but we can't process the
+ * network, until all jobs have been handled and if necessary
+ * rescheduled the network.
+ * that's why we have to handle everything at once and can't
+ * preliminarily return after just handling jobs or rescheduling.
+ */
+ _engine_master_dispatch_jobs ();
+ if (master_need_reflow)
+ master_reschedule_flow ();
+ if (master_need_process)
+ master_process_flow ();
+}
+
+void
+_engine_master_thread (gpointer data)
+{
+ gboolean run = TRUE;
+
+ /* assert sane configuration checks, since we're simply casting structures */
+ g_assert (sizeof (struct pollfd) == sizeof (GPollFD) &&
+ G_STRUCT_OFFSET (GPollFD, fd) == G_STRUCT_OFFSET (struct pollfd, fd) &&
+ G_STRUCT_OFFSET (GPollFD, events) == G_STRUCT_OFFSET (struct pollfd, events) &&
+ G_STRUCT_OFFSET (GPollFD, revents) == G_STRUCT_OFFSET (struct pollfd, revents));
+
+ /* add the thread wakeup pipe to master pollfds, so we get woken
+ * up in time (even though we evaluate the pipe contents later)
+ */
+ gsl_thread_get_pollfd (master_pollfds);
+ master_n_pollfds += 1;
+ master_pollfds_changed = TRUE;
+
+ toyprof_stampinit ();
+
+ while (run)
+ {
+ GslEngineLoop loop;
+ gboolean need_dispatch;
+
+ need_dispatch = _engine_master_prepare (&loop);
+
+ if (!need_dispatch)
+ {
+ gint err;
+
+ err = poll ((struct pollfd*) loop.fds, loop.n_fds, loop.timeout);
+
+ if (err >= 0)
+ loop.revents_filled = TRUE;
+ else
+ g_printerr (G_STRLOC ": poll() error: %s\n", g_strerror (errno));
+
+ if (loop.revents_filled)
+ need_dispatch = _engine_master_check (&loop);
+ }
+
+ if (need_dispatch)
+ _engine_master_dispatch ();
+
+ /* handle thread pollfd messages */
+ run = gsl_thread_sleep (0);
+ }
+}
+/* vim:set ts=8 sts=2 sw=2: */