/** * @file DataProto.c * @author Ambroz Bizjak * * @section LICENSE * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the author nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include static void monitor_handler (DataProtoSink *o); static void refresh_up_job (DataProtoSink *o); static void receive_timer_handler (DataProtoSink *o); static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len); static void up_job_handler (DataProtoSink *o); static void flow_buffer_free (struct DataProtoFlow_buffer *b); static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink); static void flow_buffer_detach (struct DataProtoFlow_buffer *b); static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b); static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b); static void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b); void monitor_handler (DataProtoSink *o) { DebugObject_Access(&o->d_obj); // send keep-alive PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker); } void refresh_up_job (DataProtoSink *o) { if (o->up != o->up_report) { BPending_Set(&o->up_job); } else { BPending_Unset(&o->up_job); } } void receive_timer_handler (DataProtoSink *o) { DebugObject_Access(&o->d_obj); // consider down o->up = 0; refresh_up_job(o); } void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len) { DebugObject_Access(&o->d_obj); ASSERT(data_len >= sizeof(struct dataproto_header)) int flags = 0; // if we are receiving keepalives, set the flag if (BTimer_IsRunning(&o->receive_timer)) { flags |= DATAPROTO_FLAGS_RECEIVING_KEEPALIVES; } // modify existing packet here struct dataproto_header header; memcpy(&header, data, sizeof(header)); header.flags = hton8(flags); memcpy(data, &header, sizeof(header)); } void up_job_handler (DataProtoSink *o) { DebugObject_Access(&o->d_obj); ASSERT(o->up != o->up_report) o->up_report = o->up; o->handler(o->user, o->up); return; } void source_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len) { DebugObject_Access(&o->d_obj); ASSERT(buf) ASSERT(recv_len >= 0) ASSERT(recv_len <= o->frame_mtu) // remember packet o->current_buf = buf; o->current_recv_len = recv_len; // call handler o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len); return; } void flow_buffer_free (struct DataProtoFlow_buffer *b) { ASSERT(!b->sink) // free route buffer RouteBuffer_Free(&b->rbuf); // free inactivity monitor if (b->inactivity_time >= 0) { PacketPassInactivityMonitor_Free(&b->monitor); } // free connector PacketPassConnector_Free(&b->connector); // free buffer structure free(b); } void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink) { ASSERT(!b->sink) // init queue flow PacketPassFairQueueFlow_Init(&b->sink_qflow, &sink->queue); // connect to queue flow PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->sink_qflow)); // set DataProto b->sink = sink; } void flow_buffer_detach (struct DataProtoFlow_buffer *b) { ASSERT(b->sink) PacketPassFairQueueFlow_AssertFree(&b->sink_qflow); // disconnect from queue flow PacketPassConnector_DisconnectOutput(&b->connector); // free queue flow PacketPassFairQueueFlow_Free(&b->sink_qflow); // clear reference to this buffer in the sink if (b->sink->detaching_buffer == b) { b->sink->detaching_buffer = NULL; } // set no DataProto b->sink = NULL; } void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b) { ASSERT(b->sink) ASSERT(PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) ASSERT(!b->sink->detaching_buffer || b->sink->detaching_buffer == b) if (b->sink->detaching_buffer == b) { return; } // request cancel PacketPassFairQueueFlow_RequestCancel(&b->sink_qflow); // set busy handler PacketPassFairQueueFlow_SetBusyHandler(&b->sink_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b); // remember this buffer in the sink so it can handle us if it goes away b->sink->detaching_buffer = b; } void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b) { ASSERT(b->sink) ASSERT(b->sink->detaching_buffer == b) PacketPassFairQueueFlow_AssertFree(&b->sink_qflow); // detach flow_buffer_detach(b); if (!b->flow) { // free flow_buffer_free(b); } else if (b->flow->sink_desired) { // attach flow_buffer_attach(b, b->flow->sink_desired); } } void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b) { ASSERT(b->sink) ASSERT(b->sink->detaching_buffer == b) PacketPassFairQueueFlow_AssertFree(&b->sink_qflow); flow_buffer_finish_detach(b); } int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user) { ASSERT(PacketPassInterface_HasCancel(output)) ASSERT(PacketPassInterface_GetMTU(output) >= DATAPROTO_MAX_OVERHEAD) // init arguments o->reactor = reactor; o->handler = handler; o->user = user; // set frame MTU o->frame_mtu = PacketPassInterface_GetMTU(output) - DATAPROTO_MAX_OVERHEAD; // init notifier PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor)); PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o); // init monitor PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o); PacketPassInactivityMonitor_Force(&o->monitor); // init queue if (!PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1)) { BLog(BLOG_ERROR, "PacketPassFairQueue_Init failed"); goto fail1; } // init keepalive queue flow PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue); // init keepalive source DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor)); // init keepalive blocker PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor)); // init keepalive buffer if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) { BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed"); goto fail2; } // init receive timer BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o); // init handler job BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o); // set not up o->up = 0; o->up_report = 0; // set no detaching buffer o->detaching_buffer = NULL; DebugCounter_Init(&o->d_ctr); DebugObject_Init(&o->d_obj); return 1; fail2: PacketRecvBlocker_Free(&o->ka_blocker); DataProtoKeepaliveSource_Free(&o->ka_source); PacketPassFairQueueFlow_Free(&o->ka_qflow); PacketPassFairQueue_Free(&o->queue); fail1: PacketPassInactivityMonitor_Free(&o->monitor); PacketPassNotifier_Free(&o->notifier); return 0; } void DataProtoSink_Free (DataProtoSink *o) { DebugObject_Free(&o->d_obj); DebugCounter_Free(&o->d_ctr); // allow freeing queue flows PacketPassFairQueue_PrepareFree(&o->queue); // release detaching buffer if (o->detaching_buffer) { ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->sink_desired != o) flow_buffer_finish_detach(o->detaching_buffer); } // free handler job BPending_Free(&o->up_job); // free receive timer BReactor_RemoveTimer(o->reactor, &o->receive_timer); // free keepalive buffer SinglePacketBuffer_Free(&o->ka_buffer); // free keepalive blocker PacketRecvBlocker_Free(&o->ka_blocker); // free keepalive source DataProtoKeepaliveSource_Free(&o->ka_source); // free keepalive queue flow PacketPassFairQueueFlow_Free(&o->ka_qflow); // free queue PacketPassFairQueue_Free(&o->queue); // free monitor PacketPassInactivityMonitor_Free(&o->monitor); // free notifier PacketPassNotifier_Free(&o->notifier); } void DataProtoSink_Received (DataProtoSink *o, int peer_receiving) { ASSERT(peer_receiving == 0 || peer_receiving == 1) DebugObject_Access(&o->d_obj); // reset receive timer BReactor_SetTimer(o->reactor, &o->receive_timer); if (!peer_receiving) { // peer reports not receiving, consider down o->up = 0; // send keep-alive to converge faster PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker); } else { // consider up o->up = 1; } refresh_up_job(o); } int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor) { ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD) ASSERT(handler) // init arguments o->handler = handler; o->user = user; o->reactor = reactor; // remember frame MTU o->frame_mtu = PacketRecvInterface_GetMTU(input); // init router if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) { BLog(BLOG_ERROR, "PacketRouter_Init failed"); goto fail0; } DebugCounter_Init(&o->d_ctr); DebugObject_Init(&o->d_obj); return 1; fail0: return 0; } void DataProtoSource_Free (DataProtoSource *o) { DebugObject_Free(&o->d_obj); DebugCounter_Free(&o->d_ctr); // free router PacketRouter_Free(&o->router); } int DataProtoFlow_Init (DataProtoFlow *o, DataProtoSource *source, peerid_t source_id, peerid_t dest_id, int num_packets, int inactivity_time, void *user, DataProtoFlow_handler_inactivity handler_inactivity) { DebugObject_Access(&source->d_obj); ASSERT(num_packets > 0) ASSERT(!(inactivity_time >= 0) || handler_inactivity) // init arguments o->source = source; o->source_id = source_id; o->dest_id = dest_id; // set no desired sink o->sink_desired = NULL; // allocate buffer structure struct DataProtoFlow_buffer *b = (struct DataProtoFlow_buffer *)malloc(sizeof(*b)); if (!b) { BLog(BLOG_ERROR, "malloc failed"); goto fail0; } o->b = b; // set parent b->flow = o; // remember inactivity time b->inactivity_time = inactivity_time; // init connector PacketPassConnector_Init(&b->connector, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, BReactor_PendingGroup(source->reactor)); // init inactivity monitor PacketPassInterface *buf_out = PacketPassConnector_GetInput(&b->connector); if (b->inactivity_time >= 0) { PacketPassInactivityMonitor_Init(&b->monitor, buf_out, source->reactor, b->inactivity_time, handler_inactivity, user); buf_out = PacketPassInactivityMonitor_GetInput(&b->monitor); } // init route buffer if (!RouteBuffer_Init(&b->rbuf, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, buf_out, num_packets)) { BLog(BLOG_ERROR, "RouteBuffer_Init failed"); goto fail1; } // set no sink b->sink = NULL; DebugCounter_Increment(&source->d_ctr); DebugObject_Init(&o->d_obj); return 1; fail1: if (b->inactivity_time >= 0) { PacketPassInactivityMonitor_Free(&b->monitor); } PacketPassConnector_Free(&b->connector); free(b); fail0: return 0; } void DataProtoFlow_Free (DataProtoFlow *o) { DebugObject_Free(&o->d_obj); DebugCounter_Decrement(&o->source->d_ctr); ASSERT(!o->sink_desired) struct DataProtoFlow_buffer *b = o->b; if (b->sink) { if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) { // schedule detach, free buffer after detach flow_buffer_schedule_detach(b); b->flow = NULL; // remove inactivity handler if (b->inactivity_time >= 0) { PacketPassInactivityMonitor_SetHandler(&b->monitor, NULL, NULL); } } else { // detach and free buffer now flow_buffer_detach(b); flow_buffer_free(b); } } else { // free buffer flow_buffer_free(b); } } void DataProtoFlow_Route (DataProtoFlow *o, int more) { DebugObject_Access(&o->d_obj); PacketRouter_AssertRoute(&o->source->router); ASSERT(o->source->current_buf) ASSERT(more == 0 || more == 1) struct DataProtoFlow_buffer *b = o->b; // write header. Don't set flags, it will be set in notifier_handler. struct dataproto_header header; struct dataproto_peer_id id; header.from_id = htol16(o->source_id); header.num_peer_ids = htol16(1); id.id = htol16(o->dest_id); memcpy(o->source->current_buf, &header, sizeof(header)); memcpy(o->source->current_buf + sizeof(header), &id, sizeof(id)); // route uint8_t *next_buf; if (!PacketRouter_Route(&o->source->router, DATAPROTO_MAX_OVERHEAD + o->source->current_recv_len, &b->rbuf, &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->source->current_recv_len : 0) )) { BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id); return; } // remember next buffer, or don't allow further routing if more==0 o->source->current_buf = (more ? next_buf : NULL); } void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *sink) { DebugObject_Access(&o->d_obj); DebugObject_Access(&sink->d_obj); ASSERT(!o->sink_desired) ASSERT(sink) ASSERT(o->source->frame_mtu <= sink->frame_mtu) struct DataProtoFlow_buffer *b = o->b; if (b->sink) { if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) { // schedule detach and reattach flow_buffer_schedule_detach(b); } else { // detach and reattach now flow_buffer_detach(b); flow_buffer_attach(b, sink); } } else { // attach flow_buffer_attach(b, sink); } // set desired sink o->sink_desired = sink; DebugCounter_Increment(&sink->d_ctr); } void DataProtoFlow_Detach (DataProtoFlow *o) { DebugObject_Access(&o->d_obj); ASSERT(o->sink_desired) struct DataProtoFlow_buffer *b = o->b; ASSERT(b->sink) DataProtoSink *sink = o->sink_desired; if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) { // schedule detach flow_buffer_schedule_detach(b); } else { // detach now flow_buffer_detach(b); } // set no desired sink o->sink_desired = NULL; DebugCounter_Decrement(&sink->d_ctr); }