summaryrefslogtreecommitdiffstats
path: root/kopete/protocols/jabber/jingle/libjingle/talk/third_party/ortp/scheduler.c
blob: 2ea733acbbcc32ac3e55e65d90744f3d81d487e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/*
  The oRTP library is an RTP (Realtime Transport Protocol - rfc1889) stack.
  Copyright (C) 2001  Simon MORLAT simon.morlat@linphone.org

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
*/

#ifndef _WIN32	/* do not include ortp-config.h when we are on win32 */
	#include <rtpport.h>
	#include <sched.h>
	#include <unistd.h>
	#include <errno.h>
#else
	#include "ortp-config-win32.h"
#endif



#include "scheduler.h"

// To avoid warning during compile
extern void rtp_session_process (RtpSession * session, guint32 time, RtpScheduler *sched);


void rtp_scheduler_init(RtpScheduler *sched)
{
	sched->list=0;
	sched->time_=0;
	/* default to the posix timer */
	rtp_scheduler_set_timer(sched,&posix_timer);
	sched->lock=g_mutex_new();
	//sched->unblock_select_mutex=g_mutex_new();
	sched->unblock_select_cond=g_cond_new();
	sched->max_sessions=sizeof(SessionSet)*8;
	session_set_init(&sched->all_sessions);
	sched->all_max=0;
	session_set_init(&sched->r_sessions);
	sched->r_max=0;
	session_set_init(&sched->w_sessions);
	sched->w_max=0;
	session_set_init(&sched->e_sessions);
	sched->e_max=0;
}

RtpScheduler * rtp_scheduler_new()
{
	RtpScheduler *sched=g_malloc(sizeof(RtpScheduler));
	memset(sched,0,sizeof(RtpScheduler));
	rtp_scheduler_init(sched);
	return sched;
}

void rtp_scheduler_set_timer(RtpScheduler *sched,RtpTimer *timer)
{
	if (sched->thread_running){
		g_warning("Cannot change timer while the scheduler is running !!");
		return;
	}
	sched->timer=timer;
	/* report the timer increment */
	sched->timer_inc=(timer->interval.tv_usec/1000) + (timer->interval.tv_sec*1000);
}

void rtp_scheduler_start(RtpScheduler *sched)
{
	if (sched->thread_running==0){
		sched->thread_running=1;
		g_mutex_lock(sched->lock);
		sched->thread=g_thread_create((GThreadFunc)rtp_scheduler_schedule,(gpointer)sched,TRUE,NULL);
		g_cond_wait(sched->unblock_select_cond,sched->lock);
		g_mutex_unlock(sched->lock);
	}
	else g_warning("Scheduler thread already running.");

}
void rtp_scheduler_stop(RtpScheduler *sched)
{
	if (sched->thread_running==1)
	{
		sched->thread_running=0;
		g_thread_join(sched->thread);
	}
	else g_warning("Scheduler thread is not running.");
}

void rtp_scheduler_destroy(RtpScheduler *sched)
{
	if (sched->thread_running) rtp_scheduler_stop(sched);
	g_mutex_free(sched->lock);
	//g_mutex_free(sched->unblock_select_mutex);
	g_cond_free(sched->unblock_select_cond);
	g_free(sched);
}

gpointer rtp_scheduler_schedule(gpointer psched)
{
	RtpScheduler *sched=(RtpScheduler*) psched;
	RtpTimer *timer=sched->timer;
	RtpSession *current;
	int err;

	/* try to get the real time priority by getting root*/
#ifndef _WIN32
#ifdef HAVE_SETEUID
	err=seteuid(0);
#else
	err=setuid(0);
#endif
	if (err<0) g_message("Could not get root euid: %s",strerror(errno));
#endif
	g_message("scheduler: trying to reach real time kernel scheduling...");

	/* take this lock to prevent the thread to start until g_thread_create() returns
		because we need sched->thread to be initialized */
	g_mutex_lock(sched->lock);
	g_cond_signal(sched->unblock_select_cond);	/* unblock the starting thread */
	g_mutex_unlock(sched->lock);
	g_thread_set_priority(sched->thread,G_THREAD_PRIORITY_HIGH);
	timer->timer_init();
	while(sched->thread_running)
	{
		/* do the processing here: */
		
		g_mutex_lock(sched->lock);
		
		current=sched->list;
		/* processing all scheduled rtp sessions */
		while (current!=NULL)
		{
			ortp_debug("scheduler: processing session=%p.\n",current);
			rtp_session_process(current,sched->time_,sched);
			current=current->next;
		}
		/* wake up all the threads that are sleeping in _select()  */
		g_cond_broadcast(sched->unblock_select_cond);
		g_mutex_unlock(sched->lock);
		
		/* now while the scheduler is going to sleep, the other threads can compute their
		result tqmask and see if they have to leave, or to wait for next tick*/
		//g_message("scheduler: sleeping.");
		timer->timer_do();
		sched->time_+=sched->timer_inc;
	}
	/* when leaving the thread, stop the timer */
	timer->timer_uninit();
	return NULL;
}

void rtp_scheduler_add_session(RtpScheduler *sched, RtpSession *session)
{
	RtpSession *oldfirst;
	int i;
	if (session->flags & RTP_SESSION_IN_SCHEDULER){
		/* the rtp session is already scheduled, so return silently */
		return;
	}
	rtp_scheduler_lock(sched);
	/* enqueue the session to the list of scheduled sessions */
	oldfirst=sched->list;
	sched->list=session;
	session->next=oldfirst;
	if (sched->max_sessions==0){
		g_error("rtp_scheduler_add_session: max_session=0 !");
	}
	/* find a free pos in the session tqmask*/
	for (i=0;i<sched->max_sessions;i++){
		if (!ORTP_FD_ISSET(i,&sched->all_sessions.rtpset)){
			session->tqmask_pos=i;
			session_set_set(&sched->all_sessions,session);
			/* make a new session scheduled not blockable if it has not started*/
			if (session->flags & RTP_SESSION_RECV_NOT_STARTED) 
				session_set_set(&sched->r_sessions,session);
			if (session->flags & RTP_SESSION_SEND_NOT_STARTED) 
				session_set_set(&sched->w_sessions,session);
			if (i>sched->all_max){
				sched->all_max=i;
			}
			break;
		}
	}
	
	rtp_session_set_flag(session,RTP_SESSION_IN_SCHEDULER);
	rtp_scheduler_unlock(sched);
}

void rtp_scheduler_remove_session(RtpScheduler *sched, RtpSession *session)
{
	RtpSession *tmp;
	int cond=1;
	g_return_if_fail(session!=NULL); 
	if (!(session->flags & RTP_SESSION_IN_SCHEDULER)){
		/* the rtp session is not scheduled, so return silently */
		return;
	}

	rtp_scheduler_lock(sched);
	tmp=sched->list;
	if (tmp==session){
		sched->list=tmp->next;
		rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
		session_set_clr(&sched->all_sessions,session);
		rtp_scheduler_unlock(sched);
		return;
	}
	/* go the position of session in the list */
	while(cond){
		if (tmp!=NULL){
			if (tmp->next==session){
				tmp->next=tmp->next->next;
				cond=0;
			}
			else tmp=tmp->next;
		}else {
			/* the session was not found ! */
			g_warning("rtp_scheduler_remove_session: the session was not found in the scheduler list!");
			cond=0;
		}
	}
	rtp_session_unset_flag(session,RTP_SESSION_IN_SCHEDULER);
	/* delete the bit in the tqmask */
	session_set_clr(&sched->all_sessions,session);
	rtp_scheduler_unlock(sched);
}