/** * @file BReactor_glib.c * @author Ambroz Bizjak * * @section LICENSE * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the author nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include "BReactor_glib.h" #include struct fd_source { GSource source; BFileDescriptor *bfd; }; static void assert_timer (BSmallTimer *bt) { ASSERT(bt->is_small == 0 || bt->is_small == 1) ASSERT(bt->active == 0 || bt->active == 1) ASSERT(!bt->active || bt->reactor) ASSERT(!bt->active || bt->source) } static void dispatch_pending (BReactor *o) { while (!o->exiting && BPendingGroup_HasJobs(&o->pending_jobs)) { BPendingGroup_ExecuteJob(&o->pending_jobs); } } static void reset_limits (BReactor *o) { LinkedList1Node *list_node; while (list_node = LinkedList1_GetFirst(&o->active_limits_list)) { BReactorLimit *limit = UPPER_OBJECT(list_node, BReactorLimit, active_limits_list_node); ASSERT(limit->count > 0) limit->count = 0; LinkedList1_Remove(&o->active_limits_list, &limit->active_limits_list_node); } } static gushort get_glib_wait_events (int ev) { gushort gev = G_IO_ERR | G_IO_HUP; if (ev & BREACTOR_READ) { gev |= G_IO_IN; } if (ev & BREACTOR_WRITE) { gev |= G_IO_OUT; } return gev; } static int get_fd_dispatchable_events (BFileDescriptor *bfd) { ASSERT(bfd->active) int ev = 0; if ((bfd->waitEvents & BREACTOR_READ) && (bfd->pollfd.revents & G_IO_IN)) { ev |= BREACTOR_READ; } if ((bfd->waitEvents & BREACTOR_WRITE) && (bfd->pollfd.revents & G_IO_OUT)) { ev |= BREACTOR_WRITE; } if ((bfd->pollfd.revents & G_IO_ERR)) { ev |= BREACTOR_ERROR; } if ((bfd->pollfd.revents & G_IO_HUP)) { ev |= BREACTOR_HUP; } return ev; } static gboolean timer_source_handler (gpointer data) { BSmallTimer *bt = (void *)data; assert_timer(bt); ASSERT(bt->active) BReactor *reactor = bt->reactor; if (reactor->exiting) { return FALSE; } g_source_destroy(bt->source); g_source_unref(bt->source); bt->active = 0; DebugCounter_Decrement(&reactor->d_timers_ctr); if (bt->is_small) { bt->handler.smalll(bt); } else { BTimer *btimer = UPPER_OBJECT(bt, BTimer, base); bt->handler.heavy(btimer->user); } dispatch_pending(reactor); reset_limits(reactor); return FALSE; } static gboolean fd_source_func_prepare (GSource *source, gint *timeout) { BFileDescriptor *bfd = ((struct fd_source *)source)->bfd; ASSERT(bfd->active) ASSERT(bfd->source == source) *timeout = -1; return FALSE; } static gboolean fd_source_func_check (GSource *source) { BFileDescriptor *bfd = ((struct fd_source *)source)->bfd; ASSERT(bfd->active) ASSERT(bfd->source == source) return (get_fd_dispatchable_events(bfd) ? TRUE : FALSE); } static gboolean fd_source_func_dispatch (GSource *source, GSourceFunc callback, gpointer user_data) { BFileDescriptor *bfd = ((struct fd_source *)source)->bfd; BReactor *reactor = bfd->reactor; ASSERT(bfd->active) ASSERT(bfd->source == source) if (reactor->exiting) { return TRUE; } int events = get_fd_dispatchable_events(bfd); if (!events) { return TRUE; } bfd->handler(bfd->user, events); dispatch_pending(reactor); reset_limits(reactor); return TRUE; } void BSmallTimer_Init (BSmallTimer *bt, BSmallTimer_handler handler) { bt->handler.smalll = handler; bt->active = 0; bt->is_small = 1; } int BSmallTimer_IsRunning (BSmallTimer *bt) { assert_timer(bt); return bt->active; } void BTimer_Init (BTimer *bt, btime_t msTime, BTimer_handler handler, void *user) { bt->base.handler.heavy = handler; bt->base.active = 0; bt->base.is_small = 0; bt->user = user; bt->msTime = msTime; } int BTimer_IsRunning (BTimer *bt) { return BSmallTimer_IsRunning(&bt->base); } void BFileDescriptor_Init (BFileDescriptor *bs, int fd, BFileDescriptor_handler handler, void *user) { bs->fd = fd; bs->handler = handler; bs->user = user; bs->active = 0; } int BReactor_Init (BReactor *bsys) { return BReactor_InitFromExistingGMainLoop(bsys, g_main_loop_new(NULL, FALSE), 1); } void BReactor_Free (BReactor *bsys) { DebugObject_Free(&bsys->d_obj); DebugCounter_Free(&bsys->d_timers_ctr); DebugCounter_Free(&bsys->d_limits_ctr); DebugCounter_Free(&bsys->d_fds_counter); ASSERT(!BPendingGroup_HasJobs(&bsys->pending_jobs)) ASSERT(LinkedList1_IsEmpty(&bsys->active_limits_list)) // free job queue BPendingGroup_Free(&bsys->pending_jobs); // unref main loop if needed if (bsys->unref_gloop_on_free) { g_main_loop_unref(bsys->gloop); } } int BReactor_Exec (BReactor *bsys) { DebugObject_Access(&bsys->d_obj); // dispatch pending jobs (until exiting) and reset limits dispatch_pending(bsys); reset_limits(bsys); // if exiting, do not enter glib loop if (bsys->exiting) { return bsys->exit_code; } // enter glib loop g_main_loop_run(bsys->gloop); ASSERT(bsys->exiting) return bsys->exit_code; } void BReactor_Quit (BReactor *bsys, int code) { DebugObject_Access(&bsys->d_obj); // remember exiting bsys->exiting = 1; bsys->exit_code = code; // request termination of glib loop g_main_loop_quit(bsys->gloop); } void BReactor_SetSmallTimer (BReactor *bsys, BSmallTimer *bt, int mode, btime_t time) { DebugObject_Access(&bsys->d_obj); assert_timer(bt); // remove timer if it's already set BReactor_RemoveSmallTimer(bsys, bt); // if mode is absolute, subtract current time if (mode == BTIMER_SET_ABSOLUTE) { btime_t now = btime_gettime(); time = (time < now ? 0 : time - now); } // set active and reactor bt->active = 1; bt->reactor = bsys; // init source bt->source = g_timeout_source_new(time); g_source_set_callback(bt->source, timer_source_handler, bt, NULL); g_source_attach(bt->source, g_main_loop_get_context(bsys->gloop)); DebugCounter_Increment(&bsys->d_timers_ctr); } void BReactor_RemoveSmallTimer (BReactor *bsys, BSmallTimer *bt) { DebugObject_Access(&bsys->d_obj); assert_timer(bt); // do nothing if timer is not active if (!bt->active) { return; } // free source g_source_destroy(bt->source); g_source_unref(bt->source); // set not active bt->active = 0; DebugCounter_Decrement(&bsys->d_timers_ctr); } void BReactor_SetTimer (BReactor *bsys, BTimer *bt) { BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_RELATIVE, bt->msTime); } void BReactor_SetTimerAfter (BReactor *bsys, BTimer *bt, btime_t after) { BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_RELATIVE, after); } void BReactor_SetTimerAbsolute (BReactor *bsys, BTimer *bt, btime_t time) { BReactor_SetSmallTimer(bsys, &bt->base, BTIMER_SET_ABSOLUTE, time); } void BReactor_RemoveTimer (BReactor *bsys, BTimer *bt) { return BReactor_RemoveSmallTimer(bsys, &bt->base); } BPendingGroup * BReactor_PendingGroup (BReactor *bsys) { DebugObject_Access(&bsys->d_obj); return &bsys->pending_jobs; } int BReactor_Synchronize (BReactor *bsys, BSmallPending *ref) { DebugObject_Access(&bsys->d_obj); ASSERT(ref) while (!bsys->exiting) { ASSERT(BPendingGroup_HasJobs(&bsys->pending_jobs)) if (BPendingGroup_PeekJob(&bsys->pending_jobs) == ref) { return 1; } BPendingGroup_ExecuteJob(&bsys->pending_jobs); } return 0; } int BReactor_AddFileDescriptor (BReactor *bsys, BFileDescriptor *bs) { DebugObject_Access(&bsys->d_obj); ASSERT(!bs->active) // set active, no wait events, and set reactor bs->active = 1; bs->waitEvents = 0; bs->reactor = bsys; // create source bs->source = g_source_new(&bsys->fd_source_funcs, sizeof(struct fd_source)); ((struct fd_source *)bs->source)->bfd = bs; // init pollfd bs->pollfd.fd = bs->fd; bs->pollfd.events = get_glib_wait_events(bs->waitEvents); bs->pollfd.revents = 0; // start source g_source_add_poll(bs->source, &bs->pollfd); g_source_attach(bs->source, g_main_loop_get_context(bsys->gloop)); DebugCounter_Increment(&bsys->d_fds_counter); return 1; } void BReactor_RemoveFileDescriptor (BReactor *bsys, BFileDescriptor *bs) { DebugObject_Access(&bsys->d_obj); DebugCounter_Decrement(&bsys->d_fds_counter); ASSERT(bs->active) // free source g_source_destroy(bs->source); g_source_unref(bs->source); // set not active bs->active = 0; } void BReactor_SetFileDescriptorEvents (BReactor *bsys, BFileDescriptor *bs, int events) { DebugObject_Access(&bsys->d_obj); ASSERT(bs->active) ASSERT(!(events&~(BREACTOR_READ|BREACTOR_WRITE))) // set new wait events bs->waitEvents = events; // update pollfd wait events bs->pollfd.events = get_glib_wait_events(bs->waitEvents); } int BReactor_InitFromExistingGMainLoop (BReactor *bsys, GMainLoop *gloop, int unref_gloop_on_free) { ASSERT(gloop) ASSERT(unref_gloop_on_free == !!unref_gloop_on_free) // set not exiting bsys->exiting = 0; // set gloop and unref on free flag bsys->gloop = gloop; bsys->unref_gloop_on_free = unref_gloop_on_free; // init fd source functions table memset(&bsys->fd_source_funcs, 0, sizeof(bsys->fd_source_funcs)); bsys->fd_source_funcs.prepare = fd_source_func_prepare; bsys->fd_source_funcs.check = fd_source_func_check; bsys->fd_source_funcs.dispatch = fd_source_func_dispatch; bsys->fd_source_funcs.finalize = NULL; // init job queue BPendingGroup_Init(&bsys->pending_jobs); // init active limits list LinkedList1_Init(&bsys->active_limits_list); DebugCounter_Init(&bsys->d_fds_counter); DebugCounter_Init(&bsys->d_limits_ctr); DebugCounter_Init(&bsys->d_timers_ctr); DebugObject_Init(&bsys->d_obj); return 1; } GMainLoop * BReactor_GetGMainLoop (BReactor *bsys) { DebugObject_Access(&bsys->d_obj); return bsys->gloop; } int BReactor_SynchronizeAll (BReactor *bsys) { DebugObject_Access(&bsys->d_obj); dispatch_pending(bsys); return !bsys->exiting; } void BReactorLimit_Init (BReactorLimit *o, BReactor *reactor, int limit) { DebugObject_Access(&reactor->d_obj); ASSERT(limit > 0) // init arguments o->reactor = reactor; o->limit = limit; // set count zero o->count = 0; DebugCounter_Increment(&reactor->d_limits_ctr); DebugObject_Init(&o->d_obj); } void BReactorLimit_Free (BReactorLimit *o) { BReactor *reactor = o->reactor; DebugObject_Free(&o->d_obj); DebugCounter_Decrement(&reactor->d_limits_ctr); // remove from active limits list if (o->count > 0) { LinkedList1_Remove(&reactor->active_limits_list, &o->active_limits_list_node); } } int BReactorLimit_Increment (BReactorLimit *o) { BReactor *reactor = o->reactor; DebugObject_Access(&o->d_obj); // check count against limit if (o->count >= o->limit) { return 0; } // increment count o->count++; // if limit was zero, add to active limits list if (o->count == 1) { LinkedList1_Append(&reactor->active_limits_list, &o->active_limits_list_node); } return 1; } void BReactorLimit_SetLimit (BReactorLimit *o, int limit) { DebugObject_Access(&o->d_obj); ASSERT(limit > 0) // set limit o->limit = limit; }