summaryrefslogtreecommitdiffstats
path: root/flow/asyncschedule.cc
blob: b3251a7108b7d3340562f47785cd60c351b6cbe3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
    /*

    Copyright (C) 2000,2001 Stefan Westerfeld
                            stefan@space.twc.de

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Library General Public
    License as published by the Free Software Foundation; either
    version 2 of the License, or (at your option) any later version.
  
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Library General Public License for more details.
   
    You should have received a copy of the GNU Library General Public License
    along with this library; see the file COPYING.LIB.  If not, write to
    the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
    Boston, MA 02111-1307, USA.

    */

#include <iostream>
#include "asyncschedule.h"

using namespace std;
using namespace Arts;

#include "debug.h"
#include <stdio.h>

/* Since this file is a tad bit more complex, here is some basic documentation:

1) ASyncPort: There are asynchronous ports which are parts of the standard-
   flowsystem schedule nodes. Their lifetime starts whenever an asynchronous
   stream gets created by the flow system, and ends when the schedule node
   gets destroyed. Basically, an ASyncPort has two functions:

   * it is a "Port", which means that it gets connect(), disconnect() and
     other calls from the flowsystem

   * it is a "GenericDataChannel", which means that DataPackets can interact
     with it

   Although there will be ASyncPorts which only send data and ASyncPorts which
   only receive data (there are none that do both), there are no distinct
   classes for this.

2) Standard case: a DataPacket that gets transported over a datachannel locally:

   1. the user allocates himself a datapacket "packet"
   2. the user calls "packet->send()", which in turn calls
      ASyncPort::sendPacket(packet)
   3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
      useCount) over the NotificationManager
   4. the NotificationManager delivers the DataPackets to the receiver
   5. eventually, the receiver confirms using "packet->processed()"
   6. the packet informs the ASyncPort::processedPacket()
   7. the packet is freed

variant (pulling):

   1. the user gets told by the ASyncPort: produce some data, here is a "packet"
   2. the user calls "packet->send()", which in turn calls
      ASyncPort::sendPacket(packet)
   3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
      useCount) over the NotificationManager
   4. the NotificationManager delivers the DataPackets to the receiver
   5. eventually, the receiver confirms using "packet->processed()"
   6. the packet informs the ASyncPort::processedPacket()
   7. the ASyncPort restarts with 1.

3) Remote case: the remote case follows from the local case by adding two extra
things: one object that converts packets from their packet form to a message
(ASyncNetSend), and one object that converts packets from the message form
to a packet again. Effectively, the sending of a single packet looks like
this, then:

   1-S. the user allocates himself a datapacket "packet"
   2-S. the user calls "packet->send()", which in turn calls
        ASyncPort::sendPacket(packet)
   3-S. the ASyncPort sends the DataPacket to every subscriber (incrementing the
        useCount) over the NotificationManager
   4-S. the NotificationManager delivers the DataPackets to the ASyncNetSend
   5-S. the ASyncNetSend::notify method gets called, which in turn converts
        the packet to a network message
    
	... network transfer ...
   
   6-R. the ASyncNetReceive::receive method gets called - the method creates
        a new data packet, and sends it using the NotificationManager again
   7-R. the NotificationManager delivers the DataPacket to the receiver
   8-R. eventually, the receiver confirms using "packet->processed()"
   9-R. the packet informs the ASyncNetReceive::processedPacket() which
        frees the packet and tells the (remote) sender that it went all right

	... network transfer ...
 
   10-S. eventually, ASyncNetSend::processed() gets called, and confirms
         the packet using "packet->processed()"
   11-S. the packet informs the ASyncPort::processedPacket()
   12-S. the packet is freed

variant(pulling):

   works the same as in the local case by exchanging steps 1-S and 12-S

4) ownership:

   * ASyncPort: is owned by the Object which it is a part of, if the object
   dies, ASyncPort dies unconditionally

   * DataPacket: is owned by the GenericDataChannel they are propagated over,
   that is, the ASyncPort normally - however if the DataPacket is still in
   use (i.e. in state 5 of the local case), it will take responsibility to
   free itself once all processed() calls have been collected

   * ASyncNetSend, ASyncNetReceive: own each other, so that if the sender dies,
   the connection will die as well, and if the receiver dies, the same happens

*/

#undef DEBUG_ASYNC_TRANSFER

ASyncPort::ASyncPort(const std::string& name, void *ptr, long flags,
		StdScheduleNode* parent) : Port(name, ptr, flags, parent), pull(false)
{
	stream = (GenericAsyncStream *)ptr;
	stream->channel = this;
	stream->_notifyID = notifyID = parent->object()->_mkNotifyID();
}

ASyncPort::~ASyncPort()
{
	/* 
	 * tell all outstanding packets that we don't exist any longer, so that
	 * if they feel like they need to confirm they have been processed now,
	 * they don't talk to an no longer existing object about it
	 */
	while(!sent.empty())
	{
		sent.front()->channel = 0;
		sent.pop_front();
	}

	/* disconnect remote connections (if present): the following things will
	 * need to be ensured here, since we are being deleted:
     *
	 *  - the senders should not talk to us after our destructor
	 *  - all of our connections need to be disconnected
	 *  - every connection needs to be closed exactly once
     *
	 * (closing a connection can cause reentrancy due to mcop communication)
	 */
	while(!netSenders.empty())
		netSenders.front()->disconnect();

	FlowSystemReceiver receiver = netReceiver;
	if(!receiver.isNull())
		receiver.disconnect();
}

//-------------------- GenericDataChannel interface -------------------------

void ASyncPort::setPull(int packets, int capacity)
{
	pullNotification.receiver = parent->object();
	pullNotification.ID = notifyID;
	pullNotification.internal = 0;
	pull = true;

	for(int i=0;i<packets;i++)
	{
		GenericDataPacket *packet = stream->createPacket(capacity);
		packet->useCount = 0;
		pullNotification.data = packet;
		NotificationManager::the()->send(pullNotification);
	}
}

void ASyncPort::endPull()
{
	pull = false;
	// TODO: maybe remove all pending pull packets here
}

void ASyncPort::processedPacket(GenericDataPacket *packet)
{
	int count = 0;
	list<GenericDataPacket *>::iterator i = sent.begin();
	while(i != sent.end())
	{
		if(*i == packet)
		{
			count++;
			i = sent.erase(i);
		}
		else i++;
	}
	assert(count == 1);

#ifdef DEBUG_ASYNC_TRANSFER
	cout << "port::processedPacket" << endl;
#endif
	assert(packet->useCount == 0);
	if(pull)
	{
		pullNotification.data = packet;
		NotificationManager::the()->send(pullNotification);
	}
	else
	{
		stream->freePacket(packet);
	}
}

void ASyncPort::sendPacket(GenericDataPacket *packet)
{
	bool sendOk = false;

#ifdef DEBUG_ASYNC_TRANSFER
	cout << "port::sendPacket" << endl;
#endif

	if(packet->size > 0)
	{
		vector<Notification>::iterator i;
		for(i=subscribers.begin(); i != subscribers.end(); i++)
		{
			Notification n = *i;
			n.data = packet;
			packet->useCount++;
#ifdef DEBUG_ASYNC_TRANSFER
			cout << "sending notification " << n.ID << endl;
#endif
			NotificationManager::the()->send(n);
			sendOk = true;
		}
	}

	if(sendOk)
		sent.push_back(packet);
	else
		stream->freePacket(packet);
}

//----------------------- Port interface ------------------------------------

void ASyncPort::connect(Port *xsource)
{
	arts_debug("port(%s)::connect",_name.c_str());

	ASyncPort *source = xsource->asyncPort();
	assert(source);
	addAutoDisconnect(xsource);

	Notification n;
	n.receiver = parent->object();
	n.ID = notifyID;
	n.internal = 0;
	source->subscribers.push_back(n);
}

void ASyncPort::disconnect(Port *xsource)
{
	arts_debug("port::disconnect");

	ASyncPort *source = xsource->asyncPort();
	assert(source);
	removeAutoDisconnect(xsource);

	// remove our subscription from the source object
	vector<Notification>::iterator si;
	for(si = source->subscribers.begin(); si != source->subscribers.end(); si++)
	{
		if(si->receiver == parent->object())
		{
			source->subscribers.erase(si);
			return;
		}
	}

	// there should have been exactly one, so this shouldn't be reached
	assert(false);
}

ASyncPort *ASyncPort::asyncPort()
{
	return this;
}

GenericAsyncStream *ASyncPort::receiveNetCreateStream()
{
	return stream->createNewStream();
}

NotificationClient *ASyncPort::receiveNetObject()
{
	return parent->object();
}

long ASyncPort::receiveNetNotifyID()
{
	return notifyID;
}

// Network transparency
void ASyncPort::addSendNet(ASyncNetSend *netsend)
{
	Notification n;
	n.receiver = netsend;
	n.ID = netsend->notifyID();
	n.internal = 0;
	subscribers.push_back(n);
	netSenders.push_back(netsend);
}

void ASyncPort::removeSendNet(ASyncNetSend *netsend)
{
	arts_return_if_fail(netsend != 0);
	netSenders.remove(netsend);

	vector<Notification>::iterator si;
	for(si = subscribers.begin(); si != subscribers.end(); si++)
	{
		if(si->receiver == netsend)
		{
			subscribers.erase(si);
			return;
		}
	}
	arts_warning("Failed to remove ASyncNetSend (%p) from ASyncPort", netsend);
}

void ASyncPort::setNetReceiver(ASyncNetReceive *receiver)
{
	arts_return_if_fail(receiver != 0);

	FlowSystemReceiver r = FlowSystemReceiver::_from_base(receiver->_copy());
	netReceiver = r;
}

void ASyncPort::disconnectRemote(const string& dest)
{
	list<ASyncNetSend *>::iterator i;

	for(i = netSenders.begin(); i != netSenders.end(); i++)
	{
		if((*i)->dest() == dest)
		{
			(*i)->disconnect();
			return;
		}
	}
	arts_warning("failed to disconnect %s in ASyncPort", dest.c_str());
}

ASyncNetSend::ASyncNetSend(ASyncPort *ap, const std::string& dest) : ap(ap)
{
	_dest = dest;
	ap->addSendNet(this);
}

ASyncNetSend::~ASyncNetSend()
{
	while(!pqueue.empty())
	{
		pqueue.front()->processed();
		pqueue.pop();
	}
	if(ap)
	{
		ap->removeSendNet(this);
		ap = 0;
	}
}

long ASyncNetSend::notifyID()
{
	return 1;
}

void ASyncNetSend::notify(const Notification& notification)
{
	// got a packet?
	assert(notification.ID == notifyID());
	GenericDataPacket *dp = (GenericDataPacket *)notification.data;
	pqueue.push(dp);

	/*
	 * since packets are delivered asynchronously, and since disconnection
	 * involves communication, it might happen that we get a packet without
	 * actually being connected any longer - in that case, silently forget it
	 */
	if(!receiver.isNull())
	{
		// put it into a custom data message and send it to the receiver
		Buffer *buffer = receiver._allocCustomMessage(receiveHandlerID);
		dp->write(*buffer);
		receiver._sendCustomMessage(buffer);
	}
}

void ASyncNetSend::processed()
{
	assert(!pqueue.empty());
	pqueue.front()->processed();
	pqueue.pop();
}

void ASyncNetSend::setReceiver(FlowSystemReceiver newReceiver)
{
	receiver = newReceiver;
	receiveHandlerID = newReceiver.receiveHandlerID();
}

void ASyncNetSend::disconnect()
{
	/* since disconnection will cause destruction (most likely immediate),
	 * we'll reference ourselves ...  */
	_copy();

	if(!receiver.isNull())
	{
		FlowSystemReceiver r = receiver;
		receiver = FlowSystemReceiver::null();
		r.disconnect();
	}
	if(ap)
	{
		ap->removeSendNet(this);
		ap = 0;
	}
	
	_release();
}

string ASyncNetSend::dest()
{
	return _dest;
}

/* dispatching function for custom message */

static void _dispatch_ASyncNetReceive_receive(void *object, Buffer *buffer)
{
	((ASyncNetReceive *)object)->receive(buffer);
}

ASyncNetReceive::ASyncNetReceive(ASyncPort *port, FlowSystemSender sender)
{
	port->setNetReceiver(this);
	stream = port->receiveNetCreateStream();
	stream->channel = this;
	this->sender = sender;
	/* stream->_notifyID = _mkNotifyID(); */

	gotPacketNotification.ID = port->receiveNetNotifyID();
	gotPacketNotification.receiver = port->receiveNetObject();
	gotPacketNotification.internal = 0;
	_receiveHandlerID =
		_addCustomMessageHandler(_dispatch_ASyncNetReceive_receive,this);
}

ASyncNetReceive::~ASyncNetReceive()
{
	/* tell outstanding packets that we don't exist any longer */
	while(!sent.empty())
	{
		sent.front()->channel = 0;
		sent.pop_front();
	}
	delete stream;
}

long ASyncNetReceive::receiveHandlerID()
{
	return _receiveHandlerID;
}

void ASyncNetReceive::receive(Buffer *buffer)
{
	GenericDataPacket *dp = stream->createPacket(512);
	dp->read(*buffer);
	dp->useCount = 1;
	gotPacketNotification.data = dp;
	NotificationManager::the()->send(gotPacketNotification);
	sent.push_back(dp);
}

/*
 * It will happen that this routine is called in time critical situations,
 * such as: while audio calculation is running, and must be finished in
 * time. The routine is mostly harmless, because sender->processed() is
 * a oneway function, which just queues the buffer for sending and returns
 * back, so it should return at once.
 *
 * However there is an exception upon first call: when sender->processed()
 * is called for the first time, the method processed has still to be looked
 * up. Thus, a synchronous call to _lookupMethod is made. That means, upon
 * first call, the method will send out an MCOP request and block until the
 * remote process tells that id.
 */
void ASyncNetReceive::processedPacket(GenericDataPacket *packet)
{
	/*
	 * HACK! Upon disconnect, strange things will happen. One of them is
	 * that we might, for the reason of not being referenced any longer,
	 * cease to exist without warning. Another is that our nice "sender"
	 * reference will get a null reference without warning, see disconnect
	 * code (which will cause the attached stub to also disappear). As
	 * those objects (especially the stub) are not prepared for not
	 * being there any more in the middle of whatever they do, we here
	 * explicitly reference us, and them, *again*, so that no evil things
	 * will happen. A general solution for this would be garbage collection
	 * in a timer, but until this is implemented (if it ever will become
	 * implemented), we'll live with this hack.
	 */
	_copy();
	sent.remove(packet);
	stream->freePacket(packet);
	if(!sender.isNull())
	{
		FlowSystemSender xsender = sender;
		xsender.processed();
	}
	_release();
}

void ASyncNetReceive::disconnect()
{
	if(!sender.isNull())
	{
		FlowSystemSender s = sender;
		sender = FlowSystemSender::null();
		s.disconnect();
	}
}

void ASyncNetReceive::sendPacket(GenericDataPacket *)
{
	assert(false);
}

void ASyncNetReceive::setPull(int, int)
{
	assert(false);
}

void ASyncNetReceive::endPull()
{
	assert(false);
}