Changeset 83


Ignore:
Timestamp:
Jul 24, 2009, 12:31:38 AM (11 years ago)
Author:
Dmitry A. Kuminov
Message:

corelib: Implemented socket notifiers (#24).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/corelib/kernel/qeventdispatcher_pm.cpp

    r67 r83  
    4646#include "qcoreapplication.h"
    4747#include "qhash.h"
    48 #include "qlibrary.h"
    49 #include "qpair.h"
    50 #include "qset.h"
    5148#include "qsocketnotifier.h"
    52 #include "qvarlengtharray.h"
     49#include "qmutex.h"
     50#include "qwaitcondition.h"
    5351
    5452#include "qabstracteventdispatcher_p.h"
     
    5957
    6058#include <Qt/qwindowdefs_pm.h> // for QPMObjectWindow declaration
     59
     60#include <sys/socket.h>
    6161
    6262QT_BEGIN_NAMESPACE
     
    152152*/
    153153
    154 /// @todo remove?
    155 // returns HMQ of the current thread or NULL if no event queue has been created
    156 static HMQ qt_WinQueryQueue(HAB hab)
    157 {
    158     PTIB ptib;
    159     PPIB ppib;
    160     DosGetInfoBlocks(&ptib, &ppib);
    161     return WinQueueFromID(hab, ppib->pib_ulpid, ptib->tib_ptib2->tib2_ultid);
    162 }
    163 
    164154/*!
    165155    Constructs a new object window for the current thread.
     
    263253
    264254/*!
    265   \fn QPMObjectWindow::send(ULONG msg, MPARAM mp1, MPARAM mp2) const
    266 
    267   Synchronously sends a message \a msg with the given parameters \a mp1 and
    268   \a mp2 to this window handle and returns a reply from the message() function.
    269 
    270   \note Must be called on the same thread that cosnstructed this instance.
     255    \fn QPMObjectWindow::send(ULONG msg, MPARAM mp1, MPARAM mp2) const
     256
     257    Synchronously sends a message \a msg with the given parameters \a mp1 and
     258    \a mp2 to this window handle and returns a reply from the message() function.
     259
     260    \note Must be called on the same thread that cosnstructed this instance.
    271261*/
    272262
    273263/*!
    274   \fn QPMObjectWindow::post(ULONG msg, MPARAM mp1, MPARAM mp2) const
    275 
    276   Asynchronously posts a message \a msg with the given parameters \a mp1 and
    277   \a mp2 to this window handle. Returns \c TRUE on success and \c FALSE
    278   otherwise.
    279 
    280   \note Can be called on any thread.
     264    \fn QPMObjectWindow::post(ULONG msg, MPARAM mp1, MPARAM mp2) const
     265
     266    Asynchronously posts a message \a msg with the given parameters \a mp1 and
     267    \a mp2 to this window handle. Returns \c true on success and \c false
     268    otherwise.
     269
     270    \note Can be called on any thread.
    281271*/
     272
    282273
    283274// socket select notification (highest priority)
     
    285276// zero timer notification (lowest priority)
    286277#define WM_U_SEM_ZEROTIMER  WM_SEM4
     278
     279/*****************************************************************************
     280 socket select() thread
     281 *****************************************************************************/
     282
     283#if !defined(QT_NO_THREAD)
     284
     285class QSockSelectThread : public QThread
     286{
     287public:
     288    typedef QSocketNotifier::Type Type;
     289
     290    static void addSelect(QSocketNotifier *notifier, HWND hwnd);
     291    static void removeSelect(QSocketNotifier *notifier);
     292    static QSocketNotifier *getSocketNotifier(int key);
     293
     294    static void attachThread();
     295    static void detachThread();
     296
     297private:
     298    QSockSelectThread() : finish(false), refcnt(0), maxSockfd(-1) {};
     299
     300    void run();
     301    void cancelSelectOrIdle();
     302
     303    bool finish;
     304    int refcnt;
     305    QWaitCondition cond;
     306
     307    typedef QHash<int, QSocketNotifier*> Sockets;
     308    Sockets sockets;
     309    int maxSockfd;
     310
     311    enum Op { Add, Remove };
     312    struct PendingOp {
     313        Op op;
     314        int sockfd;
     315        Type type;
     316        HWND hwnd;
     317    };
     318    typedef QList<PendingOp> Pending;
     319    Pending pending;
     320
     321    static int toKey(int sockfd, Type type) {
     322        // as a hash key, we use first two bits of int for the type and the rest
     323        // for the sockfd which should be enough in the real world
     324        Q_ASSERT(((sockfd << 2) >> 2) == sockfd);
     325        return (sockfd << 2) + type;
     326    }
     327    // opposite to toKey()
     328    static int toSocket(int key) { return key >> 2; }
     329    static Type toType(int key) { return Type(key & 0x3); }
     330
     331    static QSockSelectThread *instance;
     332    static QMutex mutex;
     333};
     334
     335// static
     336QSockSelectThread *QSockSelectThread::instance = 0;
     337QMutex QSockSelectThread::mutex;
     338
     339// static
     340void QSockSelectThread::addSelect(QSocketNotifier *notifier, HWND hwnd)
     341{
     342    Q_ASSERT(hwnd != NULLHANDLE);
     343    if (hwnd == NULLHANDLE)
     344        return;
     345
     346    int sockfd = notifier->socket();
     347    Type type = notifier->type();
     348    int key = toKey(sockfd, type);
     349
     350    QMutexLocker locker(&mutex);
     351    Q_ASSERT(instance);
     352
     353    instance->start();
     354
     355    if (instance->sockets.contains(key)) {
     356        static const char *t[] = { "Read", "Write", "Exception" };
     357        qWarning("QSocketNotifier: Multiple socket notifiers for "
     358                 "same socket %d and type %s", sockfd, t[type]);
     359
     360    }
     361    instance->sockets.insert(key, notifier);
     362    PendingOp op = {Add, sockfd, type, hwnd};
     363    instance->pending.append(op);
     364    instance->cancelSelectOrIdle();
     365}
     366
     367// static
     368void QSockSelectThread::removeSelect(QSocketNotifier *notifier)
     369{
     370    QMutexLocker locker(&mutex);
     371    Q_ASSERT(instance);
     372
     373    int sockfd = notifier->socket();
     374    Type type = notifier->type();
     375    int key = toKey(sockfd, notifier->type());
     376
     377    if (instance->sockets.contains(key)) {
     378        instance->sockets.remove(key);
     379        PendingOp op = {Remove, sockfd, type};
     380        instance->pending.append(op);
     381        instance->cancelSelectOrIdle();
     382    }
     383}
     384
     385/*!
     386    Returns the socket notifier object corresponding to the given key from the
     387    WM_U_SEM_SELECT message. The return value is only correct for the thread
     388    that owns QSocketNotifier (creates/registers/unregisters it).
     389
     390    May return 0 if the socket notifier object is disabled/deleted after
     391    WM_U_SEM_SELECT is issued for it but before this message gets processed by
     392    the owning thread.
     393*/
     394// static
     395QSocketNotifier *QSockSelectThread::getSocketNotifier(int key)
     396{
     397    QMutexLocker locker(&mutex);
     398    Q_ASSERT(instance);
     399
     400    if (instance->sockets.contains(key) &&
     401        instance->sockets[key]->thread() == QThread::currentThread())
     402        return instance->sockets[key];
     403
     404    return 0;
     405}
     406
     407/*!
     408    Incrreases the usage count of the QSockSelectThread instance by one. If no
     409    QSockSelectThread instance exists yet, creates it. Note that the thread is
     410    started on demand by addSelect(), not by this method.
     411
     412    Must be called once per thread before any other call and must be completed
     413    by the detachThread() call when socket select functionality is no more
     414    necassary.
     415*/
     416// static
     417void QSockSelectThread::attachThread()
     418{
     419    QMutexLocker locker(&mutex);
     420
     421    if (instance == 0) {
     422        instance = new QSockSelectThread();
     423        instance->start();
     424    }
     425
     426    ++instance->refcnt;
     427}
     428
     429/*!
     430    Removes all socket notifiers owned by the current thread and decreases the
     431    usage count of the QSockSelectThread instance by one. When the usage count
     432    goes to zero, the socket select thread is stopped and the instance is
     433    deleted.
     434
     435    May only be called once per thread and only if attachThread() was called on
     436    this thread before.
     437*/
     438// static
     439void QSockSelectThread::detachThread()
     440{
     441    QMutexLocker locker(&mutex);
     442    Q_ASSERT(instance);
     443
     444    for (Sockets::iterator it = instance->sockets.begin();
     445          it != instance->sockets.end();) {
     446        QSocketNotifier *notifier = it.value();
     447        if (notifier->thread() == QThread::currentThread()) {
     448            PendingOp op = {Remove, notifier->socket(), notifier->type()};
     449            instance->pending.append(op);
     450            it = instance->sockets.erase(it);
     451        } else {
     452            ++it;
     453        }
     454    }
     455
     456    if (--instance->refcnt == 0) {
     457        instance->finish = true;
     458        instance->cancelSelectOrIdle();
     459        instance->wait();
     460        delete instance;
     461        instance = 0;
     462    } else {
     463        instance->cancelSelectOrIdle();
     464    }
     465}
     466void QSockSelectThread::run()
     467{
     468    // maintain a separate hash for HWNDs to avoid mutex locking every time
     469    // select() returns an event that we want to post
     470    typedef QHash<int, HWND> Hwnds;
     471    Hwnds hwnds;
     472
     473    fd_set readS, writeS, exS;
     474    FD_ZERO(&readS);
     475    FD_ZERO(&writeS);
     476    FD_ZERO(&exS);
     477
     478    do {
     479        mutex.lock();
     480
     481        while (!finish && sockets.isEmpty()) {
     482            cond.wait(&mutex);
     483        }
     484
     485        if (finish) {
     486            mutex.unlock();
     487            break;
     488        }
     489
     490        while (!pending.isEmpty()) {
     491            PendingOp p = pending.takeFirst();
     492            switch (p.op) {
     493                case Add:
     494                    switch (p.type) {
     495                        case QSocketNotifier::Read:
     496                            FD_SET(p.sockfd, &readS); break;
     497                        case QSocketNotifier::Write:
     498                            FD_SET(p.sockfd, &writeS); break;
     499                        case QSocketNotifier::Exception:
     500                            FD_SET(p.sockfd, &exS); break;
     501                    }
     502                    hwnds.insert(toKey(p.sockfd, p.type), p.hwnd);
     503                    maxSockfd = qMax(maxSockfd, p.sockfd);
     504                    break;
     505                case Remove:
     506                    switch (p.type) {
     507                        case QSocketNotifier::Read:
     508                            FD_CLR(p.sockfd, &readS); break;
     509                        case QSocketNotifier::Write:
     510                            FD_CLR(p.sockfd, &writeS); break;
     511                        case QSocketNotifier::Exception:
     512                            FD_CLR(p.sockfd, &exS); break;
     513                    }
     514                    hwnds.remove(toKey(p.sockfd, p.type));
     515                    if (maxSockfd == p.sockfd) {
     516                        // find the new hignest socket
     517                        if (hwnds.isEmpty()) {
     518                            maxSockfd = -1;
     519                        } else {
     520                            for (Hwnds::const_iterator it = hwnds.constBegin();
     521                                  it != hwnds.constEnd(); ++it) {
     522                                maxSockfd = qMax(toSocket(it.key()), maxSockfd);
     523                            }
     524                        }
     525                    }
     526                    break;
     527            }
     528        }
     529
     530        // do select
     531        mutex.unlock();
     532        int nsel = ::select(maxSockfd + 1, &readS, &writeS, &exS, NULL);
     533        if (nsel > 0) {
     534            for (Hwnds::const_iterator it = hwnds.constBegin();
     535                  it != hwnds.constEnd(); ++it) {
     536                int sockfd = toSocket(it.key());
     537                bool isSet = false;
     538                switch (toType(it.key())) {
     539                    case QSocketNotifier::Read:
     540                        isSet = FD_ISSET(sockfd, &readS); break;
     541                    case QSocketNotifier::Write:
     542                        isSet = FD_ISSET(sockfd, &writeS); break;
     543                    case QSocketNotifier::Exception:
     544                        isSet = FD_ISSET(sockfd, &exS); break;
     545                }
     546                if (isSet)
     547                    WinPostMsg(it.value(), WM_U_SEM_SELECT, MPFROMLONG(it.key()), 0);
     548            }
     549        }
     550    } while(true);
     551}
     552
     553// Must be called from under QSockSelectThread::mutex
     554void QSockSelectThread::cancelSelectOrIdle()
     555{
     556    if (maxSockfd >= 0) {
     557        // terminate select() execution
     558        ::so_cancel(maxSockfd);
     559    } else {
     560        // terminate the idle state
     561        cond.wakeOne();
     562    }
     563}
     564
     565#else
     566
     567class QSockSelectThread
     568{
     569public:
     570    static void addSelect(QSocketNotifier *notifier, HWND hwnd) {
     571#ifndef QT_NO_DEBUG
     572        qWarning("QSocketNotifier: socket notifiers require thread support but"
     573                 "QT_NO_THREAD was defined");
     574#endif
     575    }
     576    static void removeSelect(QSocketNotifier *notifier) {}
     577    static QSocketNotifier *getSocketNotifier(int key); { return 0; }
     578
     579    static void attachThread() {}
     580    static void detachThread() {}
     581};
     582
     583#endif
    287584
    288585class QEventDispatcherPMPrivate : public QAbstractEventDispatcherPrivate
     
    318615
    319616// @todo later
    320 //  DWORD threadId;
    321 //
    322 //  bool interrupt;
    323 //
    324 //  // internal window handle used for socketnotifiers/timers/etc
    325 //  HWND internalHwnd;
    326617//
    327618//  // timers
     
    331622//  void unregisterTimer(WinTimerInfo *t);
    332623//  void sendTimerEvent(int timerId);
    333 //
    334 //  // socket notifiers
    335 //  QSNDict sn_read;
    336 //  QSNDict sn_write;
    337 //  QSNDict sn_except;
    338 //  void doWsaAsyncSelect(int socket);
    339624//
    340625//  QList<QMSG> queuedUserInputEvents;
     
    399684    switch (msg) {
    400685        case WM_U_SEM_SELECT: {
    401 // @todo later
    402 //          if (eventLoop)
    403 //              eventLoop->activateSocketNotifiers();
     686            QSocketNotifier *notifier =
     687                QSockSelectThread::getSocketNotifier(LONGFROMMP(mp1));
     688                if (notifier) {
     689                    QEvent event(QEvent::SockAct);
     690                    QCoreApplication::sendEvent(notifier, &event);
     691                }
    404692            break;
    405693        }
     
    575863}
    576864
    577 void QEventDispatcherPMPrivate::doWsaAsyncSelect(int socket)
    578 {
    579     Q_ASSERT(internalHwnd);
    580     int sn_event = 0;
    581     if (sn_read.contains(socket))
    582         sn_event |= FD_READ | FD_CLOSE | FD_ACCEPT;
    583     if (sn_write.contains(socket))
    584         sn_event |= FD_WRITE | FD_CONNECT;
    585     if (sn_except.contains(socket))
    586         sn_event |= FD_OOB;
    587     // BoundsChecker may emit a warning for WSAAsyncSelect when sn_event == 0
    588     // This is a BoundsChecker bug and not a Qt bug
    589     WSAAsyncSelect(socket, internalHwnd, sn_event ? WM_USER : 0, sn_event);
    590 }
    591 
    592865void QEventDispatcherPM::createInternalHwnd()
    593866{
     
    624897bool QEventDispatcherPM::processEvents(QEventLoop::ProcessEventsFlags flags)
    625898{
    626     Q_D(QEventDispatcherPM);
    627 
    628899// @todo later
    629900#if 0
     901    Q_D(QEventDispatcherPM);
     902
    630903    if (!d->internalHwnd)
    631904        createInternalHwnd();
     
    7321005void QEventDispatcherPM::registerSocketNotifier(QSocketNotifier *notifier)
    7331006{
    734 // @todo later
    735 #if 0
    7361007    Q_ASSERT(notifier);
     1008#ifndef QT_NO_DEBUG
    7371009    int sockfd = notifier->socket();
    738     int type = notifier->type();
    739 #ifndef QT_NO_DEBUG
    740     if (sockfd < 0) {
     1010    if (sockfd < 0
     1011        || unsigned(sockfd) >= FD_SETSIZE) {
    7411012        qWarning("QSocketNotifier: Internal error");
    7421013        return;
    743     } else if (notifier->thread() != thread() || thread() != QThread::currentThread()) {
     1014    } else if (notifier->thread() != thread()
     1015               || thread() != QThread::currentThread()) {
    7441016        qWarning("QSocketNotifier: socket notifiers cannot be enabled from another thread");
    7451017        return;
     
    7481020
    7491021    Q_D(QEventDispatcherPM);
    750     QSNDict *sn_vec[3] = { &d->sn_read, &d->sn_write, &d->sn_except };
    751     QSNDict *dict = sn_vec[type];
    752 
    753     if (QCoreApplication::closingDown()) // ### d->exitloop?
    754         return; // after sn_cleanup, don't reinitialize.
    755 
    756     if (dict->contains(sockfd)) {
    757         const char *t[] = { "Read", "Write", "Exception" };
    758     /* Variable "socket" below is a function pointer. */
    759         qWarning("QSocketNotifier: Multiple socket notifiers for "
    760                  "same socket %d and type %s", sockfd, t[type]);
    761     }
    762 
    763     QSockNot *sn = new QSockNot;
    764     sn->obj = notifier;
    765     sn->fd  = sockfd;
    766     dict->insert(sn->fd, sn);
    767 
    768     if (d->internalHwnd)
    769         d->doWsaAsyncSelect(sockfd);
    770 #endif
     1022    d->createAuxWnd();
     1023
     1024    QSockSelectThread::addSelect(notifier, d->auxWnd.hwnd());
    7711025}
    7721026
    7731027void QEventDispatcherPM::unregisterSocketNotifier(QSocketNotifier *notifier)
    7741028{
    775 // @todo later
    776 #if 0
    7771029    Q_ASSERT(notifier);
     1030#ifndef QT_NO_DEBUG
    7781031    int sockfd = notifier->socket();
    779     int type = notifier->type();
    780 #ifndef QT_NO_DEBUG
    781     if (sockfd < 0) {
     1032    if (sockfd < 0
     1033        || unsigned(sockfd) >= FD_SETSIZE) {
    7821034        qWarning("QSocketNotifier: Internal error");
    7831035        return;
    784     } else if (notifier->thread() != thread() || thread() != QThread::currentThread()) {
     1036    } else if (notifier->thread() != thread()
     1037               || thread() != QThread::currentThread()) {
    7851038        qWarning("QSocketNotifier: socket notifiers cannot be disabled from another thread");
    7861039        return;
     
    7881041#endif
    7891042
    790     Q_D(QEventDispatcherPM);
    791     QSNDict *sn_vec[3] = { &d->sn_read, &d->sn_write, &d->sn_except };
    792     QSNDict *dict = sn_vec[type];
    793     QSockNot *sn = dict->value(sockfd);
    794     if (!sn)
    795         return;
    796 
    797     dict->remove(sockfd);
    798     delete sn;
    799 
    800     if (d->internalHwnd)
    801         d->doWsaAsyncSelect(sockfd);
    802 #endif
     1043    QSockSelectThread::removeSelect(notifier);
    8031044}
    8041045
     
    9441185void QEventDispatcherPM::startingUp()
    9451186{
     1187    QSockSelectThread::attachThread();
    9461188}
    9471189
    9481190void QEventDispatcherPM::closingDown()
    9491191{
    950     Q_D(QEventDispatcherPM);
     1192    QSockSelectThread::detachThread();
    9511193
    9521194// @todo remove later
    9531195#if 0
    954     // clean up any socketnotifiers
    955     while (!d->sn_read.isEmpty())
    956         unregisterSocketNotifier((*(d->sn_read.begin()))->obj);
    957     while (!d->sn_write.isEmpty())
    958         unregisterSocketNotifier((*(d->sn_write.begin()))->obj);
    959     while (!d->sn_except.isEmpty())
    960         unregisterSocketNotifier((*(d->sn_except.begin()))->obj);
     1196    Q_D(QEventDispatcherPM);
    9611197
    9621198    // clean up any timers
Note: See TracChangeset for help on using the changeset viewer.