diff options
Diffstat (limited to 'common/trans.c')
| -rw-r--r-- | common/trans.c | 336 |
1 files changed, 252 insertions, 84 deletions
diff --git a/common/trans.c b/common/trans.c index 3828a174..2c44f28c 100644 --- a/common/trans.c +++ b/common/trans.c @@ -24,6 +24,8 @@ #include "parse.h" #include "ssl_calls.h" +#define MAX_SBYTES 0 + /*****************************************************************************/ int APP_CC trans_tls_recv(struct trans *self, void *ptr, int len) @@ -75,7 +77,7 @@ trans_tcp_send(struct trans *self, const void *data, int len) int APP_CC trans_tcp_can_recv(struct trans *self, int sck, int millis) { - return g_tcp_can_recv(sck, millis); + return g_sck_can_recv(sck, millis); } /*****************************************************************************/ @@ -169,13 +171,29 @@ trans_get_wait_objs(struct trans *self, tbus *objs, int *count) /*****************************************************************************/ int APP_CC trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, - tbus *wobjs, int *wcount) + tbus *wobjs, int *wcount, int *timeout) { - if (trans_get_wait_objs(self, robjs, rcount) != 0) + if (self == 0) + { + return 1; + } + + if (self->status != TRANS_STATUS_UP) { return 1; } + if ((self->si != 0) && (self->si->source[self->my_source] > MAX_SBYTES)) + { + } + else + { + if (trans_get_wait_objs(self, robjs, rcount) != 0) + { + return 1; + } + } + if (self->wait_s != 0) { wobjs[*wcount] = self->sck; @@ -187,7 +205,7 @@ trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, /*****************************************************************************/ int APP_CC -send_waiting(struct trans *self, int block) +trans_send_waiting(struct trans *self, int block) { struct stream *temp_s; int bytes; @@ -209,9 +227,13 @@ send_waiting(struct trans *self, int block) if (sent > 0) { temp_s->p += sent; + if (temp_s->source != 0) + { + temp_s->source[0] -= sent; + } if (temp_s->p >= temp_s->end) { - self->wait_s = (struct stream *) (temp_s->next_packet); + self->wait_s = temp_s->next; free_stream(temp_s); } } @@ -227,6 +249,18 @@ send_waiting(struct trans *self, int block) } } } + else if (block) + { + /* check for term here */ + if (self->is_term != 0) + { + if (self->is_term()) + { + /* term */ + return 1; + } + } + } } else { @@ -247,6 +281,7 @@ trans_check_wait_objs(struct trans *self) int to_read = 0; int read_so_far = 0; int rv = 0; + int cur_source; if (self == 0) { @@ -262,7 +297,7 @@ trans_check_wait_objs(struct trans *self) if (self->type1 == TRANS_TYPE_LISTENER) /* listening */ { - if (g_tcp_can_recv(self->sck, 0)) + if (g_sck_can_recv(self->sck, 0)) { in_sck = g_sck_accept(self->sck, self->addr, sizeof(self->addr), self->port, sizeof(self->port)); @@ -310,8 +345,17 @@ trans_check_wait_objs(struct trans *self) } else /* connected server or client (2 or 3) */ { - if (self->trans_can_recv(self, self->sck, 0)) + if (self->si != 0 && self->si->source[self->my_source] > MAX_SBYTES) + { + } + else if (self->trans_can_recv(self, self->sck, 0)) { + cur_source = 0; + if (self->si != 0) + { + cur_source = self->si->cur_source; + self->si->cur_source = self->my_source; + } read_so_far = (int) (self->in_s->end - self->in_s->data); to_read = self->header_size - read_so_far; @@ -329,6 +373,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } } @@ -336,6 +384,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } else @@ -357,8 +409,12 @@ trans_check_wait_objs(struct trans *self) } } } + if (self->si != 0) + { + self->si->cur_source = cur_source; + } } - if (send_waiting(self, 0) != 0) + if (trans_send_waiting(self, 0) != 0) { /* error */ self->status = TRANS_STATUS_DOWN; @@ -368,6 +424,7 @@ trans_check_wait_objs(struct trans *self) return rv; } + /*****************************************************************************/ int APP_CC trans_force_read_s(struct trans *self, struct stream *in_s, int size) @@ -378,7 +435,6 @@ trans_force_read_s(struct trans *self, struct stream *in_s, int size) { return 1; } - while (size > 0) { /* make sure stream has room */ @@ -386,47 +442,47 @@ trans_force_read_s(struct trans *self, struct stream *in_s, int size) { return 1; } - - rcvd = self->trans_recv(self, in_s->end, size); - - if (rcvd == -1) + if (self->trans_can_recv(self, self->sck, 100)) { - if (g_tcp_last_error_would_block(self->sck)) + rcvd = self->trans_recv(self, in_s->end, size); + if (rcvd == -1) { - if (!g_tcp_can_recv(self->sck, 100)) + if (g_tcp_last_error_would_block(self->sck)) { - /* check for term here */ - if (self->is_term != 0) - { - if (self->is_term()) - { - /* term */ - self->status = TRANS_STATUS_DOWN; - return 1; - } - } + } + else + { + /* error */ + self->status = TRANS_STATUS_DOWN; + return 1; } } - else + else if (rcvd == 0) { /* error */ self->status = TRANS_STATUS_DOWN; return 1; } - } - else if (rcvd == 0) - { - /* error */ - self->status = TRANS_STATUS_DOWN; - return 1; + else + { + in_s->end += rcvd; + size -= rcvd; + } } else { - in_s->end += rcvd; - size -= rcvd; + /* check for term here */ + if (self->is_term != 0) + { + if (self->is_term()) + { + /* term */ + self->status = TRANS_STATUS_DOWN; + return 1; + } + } } } - return 0; } @@ -449,57 +505,55 @@ trans_force_write_s(struct trans *self, struct stream *out_s) { return 1; } - size = (int) (out_s->end - out_s->data); total = 0; - - if (send_waiting(self, 1) != 0) + if (trans_send_waiting(self, 1) != 0) { self->status = TRANS_STATUS_DOWN; return 1; } - while (total < size) { - sent = self->trans_send(self, out_s->data + total, size - total); - - if (sent == -1) + if (g_tcp_can_send(self->sck, 100)) { - if (g_tcp_last_error_would_block(self->sck)) + sent = self->trans_send(self, out_s->data + total, size - total); + if (sent == -1) { - if (!g_tcp_can_send(self->sck, 100)) + if (g_tcp_last_error_would_block(self->sck)) { - /* check for term here */ - if (self->is_term != 0) - { - if (self->is_term()) - { - /* term */ - self->status = TRANS_STATUS_DOWN; - return 1; - } - } + } + else + { + /* error */ + self->status = TRANS_STATUS_DOWN; + return 1; } } - else + else if (sent == 0) { /* error */ self->status = TRANS_STATUS_DOWN; return 1; } - } - else if (sent == 0) - { - /* error */ - self->status = TRANS_STATUS_DOWN; - return 1; + else + { + total = total + sent; + } } else { - total = total + sent; + /* check for term here */ + if (self->is_term != 0) + { + if (self->is_term()) + { + /* term */ + self->status = TRANS_STATUS_DOWN; + return 1; + } + } } } - return 0; } @@ -512,23 +566,69 @@ trans_force_write(struct trans *self) /*****************************************************************************/ int APP_CC -trans_write_copy(struct trans *self) +trans_write_copy_s(struct trans *self, struct stream *out_s) { int size; - struct stream *out_s; + int sent; struct stream *wait_s; struct stream *temp_s; + char *out_data; if (self->status != TRANS_STATUS_UP) { return 1; } - - out_s = self->out_s; + /* try to send any left over */ + if (trans_send_waiting(self, 0) != 0) + { + /* error */ + self->status = TRANS_STATUS_DOWN; + return 1; + } + out_data = out_s->data; + sent = 0; size = (int) (out_s->end - out_s->data); + if (self->wait_s == 0) + { + /* if no left over, try to send this new data */ + if (g_tcp_can_send(self->sck, 0)) + { + sent = self->trans_send(self, out_s->data, size); + if (sent > 0) + { + out_data += sent; + size -= sent; + } + else if (sent == 0) + { + return 1; + } + else + { + if (!g_tcp_last_error_would_block(self->sck)) + { + return 1; + } + } + } + } + if (size < 1) + { + return 0; + } + /* did not send right away, have to copy */ make_stream(wait_s); init_stream(wait_s, size); - out_uint8a(wait_s, out_s->data, size); + if (self->si != 0) + { + if ((self->si->cur_source != 0) && + (self->si->cur_source != self->my_source)) + { + self->si->source[self->si->cur_source] += size; + wait_s->source = self->si->source + self->si->cur_source; + } + } + out_uint8a(wait_s, out_data, size); s_mark_end(wait_s); wait_s->p = wait_s->data; if (self->wait_s == 0) @@ -538,53 +638,110 @@ trans_write_copy(struct trans *self) else { temp_s = self->wait_s; - while (temp_s->next_packet != 0) + while (temp_s->next != 0) { - temp_s = (struct stream *) (temp_s->next_packet); + temp_s = temp_s->next; } - temp_s->next_packet = (char *) wait_s; - } - - /* try to send */ - if (send_waiting(self, 0) != 0) - { - /* error */ - self->status = TRANS_STATUS_DOWN; - return 1; + temp_s->next = wait_s; } - return 0; } /*****************************************************************************/ int APP_CC +trans_write_copy(struct trans* self) +{ + return trans_write_copy_s(self, self->out_s); +} + +/*****************************************************************************/ +int APP_CC trans_connect(struct trans *self, const char *server, const char *port, int timeout) { int error; + int now; + int start_time; + + start_time = g_time3(); if (self->sck != 0) { g_tcp_close(self->sck); + self->sck = 0; } if (self->mode == TRANS_MODE_TCP) /* tcp */ { self->sck = g_tcp_socket(); if (self->sck < 0) + { + self->status = TRANS_STATUS_DOWN; return 1; - + } g_tcp_set_non_blocking(self->sck); - error = g_tcp_connect(self->sck, server, port); + while (1) + { + error = g_tcp_connect(self->sck, server, port); + if (error == 0) + { + break; + } + else + { + if (timeout < 1) + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + now = g_time3(); + if (now - start_time < timeout) + { + g_sleep(timeout / 5); + } + else + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + } + } } else if (self->mode == TRANS_MODE_UNIX) /* unix socket */ { self->sck = g_tcp_local_socket(); if (self->sck < 0) + { + self->status = TRANS_STATUS_DOWN; return 1; - + } g_tcp_set_non_blocking(self->sck); - error = g_tcp_local_connect(self->sck, port); + while (1) + { + error = g_tcp_local_connect(self->sck, port); + if (error == 0) + { + break; + } + else + { + if (timeout < 1) + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + now = g_time3(); + if (now - start_time < timeout) + { + g_sleep(timeout / 5); + } + else + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + } + } } else { @@ -596,6 +753,15 @@ trans_connect(struct trans *self, const char *server, const char *port, { if (g_tcp_last_error_would_block(self->sck)) { + now = g_time3(); + if (now - start_time < timeout) + { + timeout = timeout - (now - start_time); + } + else + { + timeout = 0; + } if (g_tcp_can_send(self->sck, timeout)) { self->status = TRANS_STATUS_UP; /* ok */ @@ -717,6 +883,7 @@ trans_get_out_s(struct trans *self, int size) return rv; } + /*****************************************************************************/ /* returns error */ int APP_CC @@ -742,6 +909,7 @@ trans_set_tls_mode(struct trans *self, const char *key, const char *cert) return 0; } + /*****************************************************************************/ /* returns error */ int APP_CC |
