/* * * $Id: k3bpipebuffer.cpp 619556 2007-01-03 17:38:12Z trueg $ * Copyright (C) 2003 Sebastian Trueg * * This file is part of the K3b project. * Copyright (C) 1998-2007 Sebastian Trueg * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * See the file "COPYING" for the exact licensing terms. */ #include "k3bpipebuffer.h" #include #include #include #include #include #include #include #include #include #include // // This one is based on the little pipebuf2 program by Peter Osterlund // class K3bPipeBuffer::WorkThread : public K3bThread { public: WorkThread() : K3bThread(), buffer(0), bufSize(4*1024*1024), canceled(false) { outFd = inFd = -1; inFdPair[0] = inFdPair[1] = -1; } ~WorkThread() { delete [] buffer; } bool initFds() { if( inFd == -1 ) { if( ::socketpair(AF_UNIX, SOCK_STREAM, 0, inFdPair) ) { // if( ::pipe( inFdPair ) ) { kdDebug() << "(K3bPipeBuffer::WorkThread) unable to create socketpair" << endl; inFdPair[0] = inFdPair[1] = -1; return false; } else { ::fcntl(inFdPair[0], F_SETFL, O_NONBLOCK); ::fcntl(outFd, F_SETFL, O_NONBLOCK); } } else { ::fcntl(inFd, F_SETFL, O_NONBLOCK); } delete [] buffer; buffer = new char[bufSize]; return (buffer != 0); } void run() { emitStarted(); int usedInFd = -1; if( inFd > 0 ) usedInFd = inFd; else usedInFd = inFdPair[0]; kdDebug() << "(K3bPipeBuffer::WorkThread) reading from " << usedInFd << " and writing to " << outFd << endl; kdDebug() << "(K3bPipeBuffer::WorkThread) using buffer size of " << bufSize << endl; // start the buffering unsigned int bufPos = 0; unsigned int dataLen = 0; bool eof = false; bool error = false; canceled = false; int oldPercent = 0; static const unsigned int MAX_BUFFER_READ = 2048*3; while( !canceled && !error && (!eof || dataLen > 0) ) { // // create two fd sets // fd_set readFds, writeFds; FD_ZERO(&readFds); FD_ZERO(&writeFds); // // fill the fd sets // if( !eof && dataLen < bufSize ) FD_SET(usedInFd, &readFds); if( dataLen > 0 ) FD_SET(outFd, &writeFds); // // wait for data // int ret = select( TQMAX(usedInFd, outFd) + 1, &readFds, &writeFds, NULL, NULL); // // Do the buffering // if( !canceled && ret > 0 ) { int percent = -1; // // Read from the buffer and write to the output // if( FD_ISSET(outFd, &writeFds) ) { unsigned int maxLen = TQMIN(bufSize - bufPos, dataLen); ret = ::write( outFd, &buffer[bufPos], maxLen ); if( ret < 0 ) { if( (errno != EINTR) && (errno != EAGAIN) ) { kdDebug() << "(K3bPipeBuffer::WorkThread) error while writing to " << outFd << endl; error = true; } } else { // // we always emit before the reading from the buffer since // it makes way more sense to show the buffer before the reading. // percent = (int)((double)dataLen*100.0/(double)bufSize); bufPos = (bufPos + ret) % bufSize; dataLen -= ret; } } // // Read into the buffer // else if( FD_ISSET(usedInFd, &readFds) ) { unsigned int readPos = (bufPos + dataLen) % bufSize; unsigned int maxLen = TQMIN(bufSize - readPos, bufSize - dataLen); // // never read more than xxx bytes // This is some tuning to prevent the reading from blocking the whole thread // if( maxLen > MAX_BUFFER_READ ) // some dummy value below 1 MB maxLen = MAX_BUFFER_READ; ret = ::read( usedInFd, &buffer[readPos], maxLen ); if( ret < 0 ) { if( (errno != EINTR) && (errno != EAGAIN) ) { kdDebug() << "(K3bPipeBuffer::WorkThread) error while reading from " << usedInFd << endl; error = true; } } else if( ret == 0 ) { kdDebug() << "(K3bPipeBuffer::WorkThread) end of input." << endl; eof = true; } else { dataLen += ret; percent = (int)((double)dataLen*100.0/(double)bufSize); } } // A little hack to keep the buffer display from flickering if( percent == 99 ) percent = 100; if( percent != -1 && percent != oldPercent ) { emitPercent( percent ); oldPercent = percent; } } else if( !canceled ) { error = true; kdDebug() << "(K3bPipeBuffer::WorkThread) select: " << ::strerror(errno) << endl; } } if( inFd == -1 ) { ::close( inFdPair[0] ); ::close( inFdPair[1] ); inFdPair[0] = inFdPair[1] = -1; } // // close the fd we are writing to (this is need to make growisofs happy // TODO: perhaps make this configurable // ::close( outFd ); if( canceled ) emitCanceled(); emitFinished( !error && !canceled ); } char* buffer; size_t bufSize; int outFd; int inFd; int inFdPair[2]; bool canceled; }; K3bPipeBuffer::K3bPipeBuffer( K3bJobHandler* jh, TQObject* parent, const char* name ) : K3bThreadJob( jh, parent, name ) { m_thread = new WorkThread(); setThread( m_thread ); } K3bPipeBuffer::~K3bPipeBuffer() { delete m_thread; } void K3bPipeBuffer::start() { // // Create the socketpair in the gui thread to be sure it's available after // this method returns. // if( !m_thread->initFds() ) jobFinished(false); else K3bThreadJob::start(); } void K3bPipeBuffer::cancel() { m_thread->canceled = true; } void K3bPipeBuffer::setBufferSize( int mb ) { m_thread->bufSize = mb * 1024 * 1024; } void K3bPipeBuffer::readFromFd( int fd ) { m_thread->inFd = fd; } void K3bPipeBuffer::writeToFd( int fd ) { m_thread->outFd = fd; } int K3bPipeBuffer::inFd() const { if( m_thread->inFd == -1 ) return m_thread->inFdPair[1]; else return m_thread->inFd; }