summaryrefslogtreecommitdiffstats
path: root/mpeglib/lib/util/abstract/threadQueue.cpp
blob: 1b130ba969f4525e4c1a53fb66084eb085e7309d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
  fifo waitqueue for threads.(Multi-in, single out)
  Copyright (C) 2000  Martin Vogt

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU Library General Public License as published by
  the Free Software Foundation.

  For more information look at the file COPYRIGHT in this package

 */

#include "threadQueue.h"

#define _MAX_THREAD_IN_QUEUE 5

#include <iostream>

using namespace std;

//
// WaitThreadEntry class [START]
//

WaitThreadEntry::WaitThreadEntry() {
  abs_thread_cond_init(&waitCond);
}


WaitThreadEntry::~WaitThreadEntry() {
  abs_thread_cond_destroy(&waitCond);
}

//
// WaitThreadEntry class [END]
//


ThreadQueue::ThreadQueue() {
  waitThreadEntries=new WaitThreadEntry* [_MAX_THREAD_IN_QUEUE];
  int i;
  for(i=0;i<_MAX_THREAD_IN_QUEUE;i++) {
    waitThreadEntries[i]=new WaitThreadEntry();
  }
  abs_thread_mutex_init(&queueMut);
  insertPos=0;
  removePos=0;
  size=0;
}


ThreadQueue::~ThreadQueue() {
  abs_thread_mutex_lock(&queueMut);
  if (size != 0) {
    cout << "Aieee! Make sure that all threads are out of ThreadQueue"<<endl;
    exit(0);
  }
  int i;
  for(i=0;i<_MAX_THREAD_IN_QUEUE;i++) {
    delete waitThreadEntries[i];
  }
  delete [] waitThreadEntries;
  abs_thread_mutex_unlock(&queueMut);
  abs_thread_mutex_destroy(&queueMut);
}


void ThreadQueue::waitForExclusiveAccess() {
  abs_thread_mutex_lock(&queueMut);
  if (size == 0) {
    abs_thread_mutex_unlock(&queueMut);
    return;
  }
  // wait
  size++;
  if (size == _MAX_THREAD_IN_QUEUE) {
    cout << "Aieee! ThreadQueue can only buffer:"<<_MAX_THREAD_IN_QUEUE<<endl;
    exit(0);
  }
  abs_thread_cond_t* waitCond=&(waitThreadEntries[insertPos]->waitCond);
  insertPos++;
  // wrap counter
  if (insertPos == _MAX_THREAD_IN_QUEUE) {
    insertPos=0;
  }
  abs_thread_cond_wait(waitCond,&queueMut);
  abs_thread_mutex_unlock(&queueMut);
}


void ThreadQueue::releaseExclusiveAccess() {
  abs_thread_mutex_lock(&queueMut);
  if (size == 0) {
    abs_thread_mutex_unlock(&queueMut);
    return;
  } 
  // wake up next thread
  abs_thread_cond_t* waitCond=&(waitThreadEntries[removePos]->waitCond);
  removePos++;
  // wrap counter
  if (removePos == _MAX_THREAD_IN_QUEUE) {
    removePos=0;
  }
  size--;
  abs_thread_cond_signal(waitCond);
  abs_thread_mutex_unlock(&queueMut);
}