summaryrefslogtreecommitdiffstats
path: root/libtdepim/weaver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'libtdepim/weaver.cpp')
-rw-r--r--libtdepim/weaver.cpp550
1 files changed, 550 insertions, 0 deletions
diff --git a/libtdepim/weaver.cpp b/libtdepim/weaver.cpp
new file mode 100644
index 00000000..b1dba8de
--- /dev/null
+++ b/libtdepim/weaver.cpp
@@ -0,0 +1,550 @@
+/* -*- C++ -*-
+
+ This file implements the Weaver, Job and Thread classes.
+
+ $ Author: Mirko Boehm $
+ $ Copyright: (C) 2004, Mirko Boehm $
+ $ Contact: mirko@kde.org
+ http://www.kde.org
+ http://www.hackerbuero.org $
+ $ License: LGPL with the following explicit clarification:
+ This code may be linked against any version of the TQt toolkit
+ from Troll Tech, Norway. $
+
+*/
+
+extern "C" {
+#include <signal.h>
+}
+
+#include <tqevent.h>
+#include <tqapplication.h>
+
+#include "weaver.h"
+
+namespace KPIM {
+namespace ThreadWeaver {
+
+ bool Debug = true;
+ int DebugLevel = 2;
+
+ Job::Job (TQObject* parent, const char* name)
+ : TQObject (parent, name),
+ m_finished (false),
+ m_mutex (new TQMutex (true) ),
+ m_thread (0)
+ {
+ }
+
+ Job::~Job()
+ {
+ }
+
+ void Job::lock()
+ {
+ m_mutex->lock();
+ }
+
+ void Job::unlock()
+ {
+ m_mutex->unlock();
+ }
+
+ void Job::execute(Thread *th)
+ {
+ m_mutex->lock();
+ m_thread = th;
+ m_mutex->unlock();
+
+ run ();
+
+ m_mutex->lock();
+ setFinished (true);
+ m_thread = 0;
+ m_mutex->unlock();
+ }
+
+ Thread *Job::thread ()
+ {
+ TQMutexLocker l (m_mutex);
+ return m_thread;
+ }
+
+ bool Job::isFinished() const
+ {
+ TQMutexLocker l (m_mutex);
+ return m_finished;
+ }
+
+ void Job::setFinished(bool status)
+ {
+ TQMutexLocker l (m_mutex);
+ m_finished = status;
+ }
+
+ void Job::processEvent (Event *e)
+ {
+ switch ( e->action() )
+ {
+ case Event::JobStarted:
+ emit ( started() );
+ break;
+ case Event::JobFinished:
+ emit ( done() );
+ break;
+ case Event::JobSPR:
+ emit ( SPR () );
+ m_wc->wakeOne ();
+ break;
+ case Event::JobAPR:
+ emit ( APR () );
+ // no wake here !
+ break;
+ default:
+ break;
+ }
+ }
+
+ void Job::triggerSPR ()
+ {
+ m_mutex->lock ();
+ m_wc = new TQWaitCondition;
+ m_mutex->unlock ();
+
+ thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
+ m_wc->wait ();
+
+ m_mutex->lock ();
+ delete m_wc;
+ m_wc = 0;
+ m_mutex->unlock ();
+ }
+
+ void Job::triggerAPR ()
+ {
+ m_mutex->lock ();
+ m_wc = new TQWaitCondition;
+ m_mutex->unlock ();
+
+ thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this);
+ m_wc->wait ();
+ }
+
+ void Job::wakeAPR ()
+ {
+ TQMutexLocker l(m_mutex);
+ if ( m_wc!=0 )
+ {
+ m_wc->wakeOne ();
+ delete m_wc;
+ m_wc = 0;
+ }
+ }
+
+ const int Event::Type = TQEvent::User + 1000;
+
+ Event::Event ( Action action, Thread *thread, Job *job)
+ : TQCustomEvent ( type () ),
+ m_action (action),
+ m_thread (thread),
+ m_job (job)
+ {
+ }
+
+ int Event::type ()
+ {
+ return Type;
+ }
+
+ Thread* Event::thread () const
+ {
+ if ( m_thread != 0)
+ {
+ return m_thread;
+ } else {
+ return 0;
+ }
+ }
+
+ Job* Event::job () const
+ {
+ return m_job;
+ }
+
+ Event::Action Event::action () const
+ {
+ return m_action;
+ }
+
+ unsigned int Thread::sm_Id;
+
+ Thread::Thread (Weaver *parent)
+ : TQThread (),
+ m_parent ( parent ),
+ m_id ( makeId() )
+ {
+ }
+
+ Thread::~Thread()
+ {
+ }
+
+ unsigned int Thread::makeId()
+ {
+ static TQMutex mutex;
+ TQMutexLocker l (&mutex);
+
+ return ++sm_Id;
+ }
+
+ unsigned int Thread::id() const
+ {
+ return m_id;
+ }
+
+ void Thread::run()
+ {
+ Job *job = 0;
+
+ post ( Event::ThreadStarted );
+
+ while (true)
+ {
+ debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
+
+ job = m_parent->applyForWork ( this, job );
+
+ if (job == 0)
+ {
+ break;
+ } else {
+ post ( Event::JobStarted, job );
+ job->execute (this);
+ post ( Event::JobFinished, job );
+ }
+ }
+
+ post ( Event::ThreadExiting );
+ }
+
+ void Thread::post (Event::Action a, Job *j)
+ {
+ m_parent->post ( a, this, j);
+ }
+
+ void Thread::msleep(unsigned long msec)
+ {
+ TQThread::msleep(msec);
+ }
+
+ Weaver::Weaver(TQObject* parent, const char* name,
+ int inventoryMin, int inventoryMax)
+ : TQObject(parent, name),
+ m_active(0),
+ m_inventoryMin(inventoryMin),
+ m_inventoryMax(inventoryMax),
+ m_shuttingDown(false),
+ m_running (false),
+ m_suspend (false),
+ m_mutex ( new TQMutex(true) )
+ {
+ lock();
+
+ for ( int count = 0; count < m_inventoryMin; ++count)
+ {
+ Thread *th = new Thread(this);
+ m_inventory.append(th);
+ // this will idle the thread, waiting for a job
+ th->start();
+
+ emit (threadCreated (th) );
+ }
+
+ unlock();
+ }
+
+ Weaver::~Weaver()
+ {
+ lock();
+
+ debug ( 1, "Weaver dtor: destroying inventory.\n" );
+
+ m_shuttingDown = true;
+
+ unlock();
+
+ m_jobAvailable.wakeAll();
+
+ // problem: Some threads might not be asleep yet, just finding
+ // out if a job is available. Those threads will suspend
+ // waiting for their next job (a rare case, but not impossible).
+ // Therefore, if we encounter a thread that has not exited, we
+ // have to wake it again (which we do in the following for
+ // loop).
+
+ for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
+ {
+ if ( !th->finished() )
+ {
+ m_jobAvailable.wakeAll();
+ th->wait();
+ }
+
+ emit (threadDestroyed (th) );
+ delete th;
+
+ }
+
+ m_inventory.clear();
+
+ delete m_mutex;
+
+ debug ( 1, "Weaver dtor: done\n" );
+
+ }
+
+ void Weaver::lock()
+ {
+ debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
+ ( m_mutex->locked() ? "locked" : "not locked" ) );
+ m_mutex->lock();
+ }
+
+ void Weaver::unlock()
+ {
+ m_mutex->unlock();
+
+ debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
+ ( m_mutex->locked() ? "locked" : "not locked" ) );
+ }
+
+ int Weaver::threads () const
+ {
+ TQMutexLocker l (m_mutex);
+ return m_inventory.count ();
+ }
+
+ void Weaver::enqueue(Job* job)
+ {
+ lock();
+
+ m_assignments.append(job);
+ m_running = true;
+
+ unlock();
+
+ assignJobs();
+ }
+
+ void Weaver::enqueue (TQPtrList <Job> jobs)
+ {
+ lock();
+
+ for ( Job * job = jobs.first(); job; job = jobs.next() )
+ {
+ m_assignments.append (job);
+ }
+
+ unlock();
+
+ assignJobs();
+ }
+
+ bool Weaver::dequeue ( Job* job )
+ {
+ TQMutexLocker l (m_mutex);
+ return m_assignments.remove (job);
+ }
+
+ void Weaver::dequeue ()
+ {
+ TQMutexLocker l (m_mutex);
+ m_assignments.clear();
+ }
+
+ void Weaver::suspend (bool state)
+ {
+ lock();
+
+ if (state)
+ {
+ // no need to wake any threads here
+ m_suspend = true;
+ if ( m_active == 0 && isEmpty() )
+ { // instead of waking up threads:
+ post (Event::Suspended);
+ }
+ } else {
+ m_suspend = false;
+ // make sure we emit suspended () even if all threads are sleeping:
+ assignJobs ();
+ debug (2, "Weaver::suspend: queueing resumed.\n" );
+ }
+
+ unlock();
+ }
+
+ void Weaver::assignJobs()
+ {
+ m_jobAvailable.wakeAll();
+ }
+
+ bool Weaver::event (TQEvent *e )
+ {
+ if ( e->type() >= TQEvent::User )
+ {
+
+ if ( e->type() == Event::type() )
+ {
+ Event *event = (Event*) e;
+
+ switch (event->action() )
+ {
+ case Event::JobFinished:
+ if ( event->job() !=0 )
+ {
+ emit (jobDone (event->job() ) );
+ }
+ break;
+ case Event::Finished:
+ emit ( finished() );
+ break;
+ case Event::Suspended:
+ emit ( suspended() );
+ break;
+ case Event::ThreadSuspended:
+ if (!m_shuttingDown )
+ {
+ emit (threadSuspended ( event->thread() ) );
+ }
+ break;
+ case Event::ThreadBusy:
+ if (!m_shuttingDown )
+ {
+ emit (threadBusy (event->thread() ) );
+ }
+ break;
+ default:
+ break;
+ }
+
+ if ( event->job() !=0 )
+ {
+ event->job()->processEvent (event);
+ }
+ } else {
+ debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
+ }
+ return true;
+ } else {
+ // others - please make sure we are a TQObject!
+ return TQObject::event ( e );
+ }
+ }
+
+ void Weaver::post (Event::Action a, Thread* t, Job* j)
+ {
+ Event *e = new Event ( a, t, j);
+ TQApplication::postEvent (this, e);
+ }
+
+ bool Weaver::isEmpty() const
+ {
+ TQMutexLocker l (m_mutex);
+ return m_assignments.count()==0;
+ }
+
+ Job* Weaver::applyForWork(Thread *th, Job* previous)
+ {
+ Job *rc = 0;
+ bool lastjob = false;
+ bool suspended = false;
+
+ while (true)
+ {
+ lock();
+
+ if (previous != 0)
+ { // cleanup and send events:
+ --m_active;
+
+ debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
+ "%i active jobs left.\n",
+ queueLength(), m_active );
+
+ if ( m_active == 0 && isEmpty() )
+ {
+ lastjob = true;
+ m_running = false;
+ post (Event::Finished);
+ debug ( 3, "Weaver::applyForWork: last job.\n" );
+ }
+
+ if (m_active == 0 && m_suspend == true)
+ {
+ suspended = true;
+ post (Event::Suspended);
+ debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
+ }
+
+ m_jobFinished.wakeOne();
+ }
+
+ previous = 0;
+
+ if (m_shuttingDown == true)
+ {
+ unlock();
+
+ return 0;
+ } else {
+ if ( !isEmpty() && m_suspend == false )
+ {
+ rc = m_assignments.getFirst();
+ m_assignments.removeFirst ();
+ ++m_active;
+
+ debug ( 3, "Weaver::applyForWork: job assigned, "
+ "%i jobs in queue (%i active).\n",
+ m_assignments.count(), m_active );
+ unlock();
+
+ post (Event::ThreadBusy, th);
+
+ return rc;
+ } else {
+ unlock();
+
+ post (Event::ThreadSuspended, th);
+ m_jobAvailable.wait();
+ }
+ }
+ }
+ }
+
+ int Weaver::queueLength()
+ {
+ TQMutexLocker l (m_mutex);
+ return m_assignments.count();
+ }
+
+ bool Weaver::isIdle () const
+ {
+ TQMutexLocker l (m_mutex);
+ return isEmpty() && m_active == 0;
+ }
+
+ void Weaver::finish()
+ {
+ while ( !isIdle() )
+ {
+ debug (2, "Weaver::finish: not done, waiting.\n" );
+ m_jobFinished.wait();
+ }
+ debug (1, "Weaver::finish: done.\n\n\n" );
+ }
+
+}
+}
+
+#include "weaver.moc"