summaryrefslogtreecommitdiffstats
path: root/plugins/streaming/streaming-job.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/streaming/streaming-job.cpp')
-rw-r--r--plugins/streaming/streaming-job.cpp279
1 files changed, 279 insertions, 0 deletions
diff --git a/plugins/streaming/streaming-job.cpp b/plugins/streaming/streaming-job.cpp
new file mode 100644
index 0000000..3318567
--- /dev/null
+++ b/plugins/streaming/streaming-job.cpp
@@ -0,0 +1,279 @@
+/***************************************************************************
+ streaming-job.cpp - description
+ -------------------
+ begin : Sun Sept 3 2006
+ copyright : (C) 2006 by Martin Witte
+ email : witte@kawo1.rwth-aachen.de
+ ***************************************************************************/
+
+/***************************************************************************
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ ***************************************************************************/
+
+#include "streaming-job.h"
+
+#include "../../src/include/utils.h"
+#include <kurl.h>
+
+#include <tdeio/job.h>
+
+
+StreamingJob::StreamingJob()
+ : TQObject(),
+ m_URL(TQString()),
+ m_SoundFormat(),
+ m_BufferSize(65536),
+ m_Buffer(m_BufferSize),
+ m_OpenCounter(0),
+ m_StreamPos(0),
+ m_StartTime(0),
+ m_SkipCount(0),
+ m_TDEIO_Job(NULL),
+ m_capturing(false)
+{
+}
+
+StreamingJob::StreamingJob(const TQString &_URL, const SoundFormat &_SoundFormat, size_t _bufferSize)
+ : TQObject(),
+ m_URL(_URL),
+ m_SoundFormat(_SoundFormat),
+ m_BufferSize(_bufferSize),
+ m_Buffer(m_BufferSize),
+ m_OpenCounter(0),
+ m_StreamPos(0),
+ m_StartTime(0),
+ m_SkipCount(0),
+ m_TDEIO_Job(NULL),
+ m_capturing(false)
+{
+}
+
+StreamingJob::StreamingJob(const StreamingJob &c)
+ : TQObject(),
+ m_URL(c.m_URL),
+ m_SoundFormat(c.m_SoundFormat),
+ m_BufferSize(c.m_BufferSize),
+ m_Buffer(m_BufferSize),
+ m_OpenCounter(0),
+ m_StreamPos(0),
+ m_StartTime(0),
+ m_SkipCount(0),
+ m_TDEIO_Job(NULL),
+ m_capturing(c.m_capturing)
+{
+}
+
+StreamingJob::~StreamingJob()
+{
+}
+
+
+void StreamingJob::setURL(const TQString &url)
+{
+ if (m_URL != url) {
+ m_URL = url;
+ delete m_TDEIO_Job;
+ m_TDEIO_Job = NULL;
+ if (!m_capturing) {
+ startPutJob();
+ } else {
+ startGetJob();
+ }
+ }
+}
+
+
+void StreamingJob::setSoundFormat(const SoundFormat &sf)
+{
+ m_SoundFormat = sf;
+}
+
+
+void StreamingJob::setBufferSize(size_t buffer_size)
+{
+ if (m_BufferSize != buffer_size) {
+ m_Buffer.clear();
+ m_Buffer.resize(m_BufferSize = buffer_size);
+ }
+}
+
+
+bool StreamingJob::startPutJob()
+{
+ m_TDEIO_Job = TDEIO::put(m_URL, -1, true, false, false);
+ if (!m_TDEIO_Job)
+ return false;
+ m_TDEIO_Job->setAsyncDataEnabled(true);
+ connect (m_TDEIO_Job, TQT_SIGNAL(dataReq(TDEIO::Job *job, TQByteArray &data)),
+ this, TQT_SLOT(slotWriteData (TDEIO::Job *job, TQByteArray &data)));
+ connect (m_TDEIO_Job, TQT_SIGNAL(result(TDEIO::Job *)),
+ this, TQT_SLOT(slotIOJobResult(TDEIO::Job *)));
+ return true;
+}
+
+
+bool StreamingJob::startPlayback()
+{
+ if (!m_OpenCounter) {
+ m_Buffer.clear();
+ m_OpenCounter = 1;
+ if (!startPutJob())
+ return false;
+ m_StartTime = time(NULL);
+ m_StreamPos = 0;
+ if (m_TDEIO_Job->error()) {
+ emit logStreamError(m_URL, m_TDEIO_Job->errorString());
+ }
+ return m_TDEIO_Job->error() == 0;
+ }
+ else {
+ return true;
+ }
+}
+
+bool StreamingJob::stopPlayback()
+{
+ if (m_OpenCounter) {
+ if (!--m_OpenCounter) {
+ delete m_TDEIO_Job;
+ m_TDEIO_Job = NULL;
+ }
+ }
+ return true;
+}
+
+
+bool StreamingJob::startGetJob()
+{
+ m_TDEIO_Job = TDEIO::get(m_URL, false, false);
+ if (!m_TDEIO_Job)
+ return false;
+ m_TDEIO_Job->setAsyncDataEnabled(true);
+ connect (m_TDEIO_Job, TQT_SIGNAL(data(TDEIO::Job *, const TQByteArray &)),
+ this, TQT_SLOT(slotReadData(TDEIO::Job *, const TQByteArray &)));
+ connect (m_TDEIO_Job, TQT_SIGNAL(result(TDEIO::Job *)),
+ this, TQT_SLOT(slotIOJobResult(TDEIO::Job *)));
+ return true;
+}
+
+
+bool StreamingJob::startCapture(const SoundFormat &/*proposed_format*/,
+ SoundFormat &real_format,
+ bool /*force_format*/)
+{
+ if (!m_OpenCounter) {
+ m_capturing = true;
+ m_Buffer.clear();
+ if (!startGetJob())
+ return false;
+ m_StartTime = time(NULL);
+ m_StreamPos = 0;
+ if (m_TDEIO_Job->error()) {
+ emit logStreamError(m_URL, m_TDEIO_Job->errorString());
+ }
+ return m_TDEIO_Job->error() == 0;
+ }
+ ++m_OpenCounter;
+ real_format = m_SoundFormat;
+ return true;
+}
+
+
+bool StreamingJob::stopCapture()
+{
+ if (m_OpenCounter) {
+ if (!--m_OpenCounter) {
+ delete m_TDEIO_Job;
+ m_TDEIO_Job = NULL;
+ }
+ }
+ return true;
+}
+
+
+void StreamingJob::slotReadData (TDEIO::Job */*job*/, const TQByteArray &data)
+{
+ size_t free = m_Buffer.getFreeSize();
+ if (free < data.size()) {
+ m_SkipCount += data.size() - free;
+ emit logStreamWarning(m_URL, i18n("skipped %1 bytes").arg(data.size() - free));
+ }
+ else {
+ free = data.size();
+ }
+
+ m_Buffer.addData(data.data(), free);
+ m_StreamPos += free;
+
+ if (m_Buffer.getFreeSize() < data.size()) {
+ m_TDEIO_Job->suspend();
+ }
+}
+
+
+void StreamingJob::slotWriteData (TDEIO::Job */*job*/, TQByteArray &)
+{
+ size_t size = m_Buffer.getFillSize();
+ if (size) {
+ char *buf = new char [size];
+ size = m_Buffer.takeData(buf, size);
+ TQByteArray data;
+ data.assign(buf, size);
+ m_TDEIO_Job->sendAsyncData(data);
+ m_StreamPos += size;
+ }
+ else {
+ // does a warning really make sense here?
+ //emit logStreamWarning(m_URL, i18n("buffer underrun"));
+ m_SkipCount++;
+ }
+}
+
+
+void StreamingJob::playData(const char *data, size_t size, size_t &consumed_size)
+{
+ size_t free = m_Buffer.getFreeSize();
+ consumed_size = (consumed_size == SIZE_T_DONT_CARE) ? free : min(consumed_size, free);
+ if (free > size) {
+ free = size;
+ }
+ m_Buffer.addData(data, free);
+}
+
+
+bool StreamingJob::hasRecordedData() const
+{
+ return m_Buffer.getFillSize() > m_Buffer.getSize() / 3;
+}
+
+
+void StreamingJob::lockData(const char *&data, size_t &size, SoundMetaData &meta_data)
+{
+ data = m_Buffer.getData(size);
+ time_t cur_time = time(NULL);
+ meta_data = SoundMetaData(m_StreamPos, cur_time - m_StartTime, cur_time, m_URL);
+}
+
+
+void StreamingJob::removeData(size_t size)
+{
+ m_Buffer.removeData(size);
+ if (m_Buffer.getFreeSize() > m_Buffer.getSize() / 2) {
+ m_TDEIO_Job->resume();
+ }
+}
+
+void StreamingJob::slotIOJobResult (TDEIO::Job *job)
+{
+ if (job && job->error()) {
+ emit logStreamError(m_URL, job->errorString());
+ }
+}
+
+#include "streaming-job.moc"
+