|
@ -107,6 +107,10 @@ ssize_t send_buffers_send(send_buffer_t *sb_array, int count, int fd, size_t *re |
|
|
if (real_wr) *real_wr = twr; |
|
|
if (real_wr) *real_wr = twr; |
|
|
return twr; |
|
|
return twr; |
|
|
} |
|
|
} |
|
|
|
|
|
bool conn_partner_alive(tproxy_conn_t *conn) |
|
|
|
|
|
{ |
|
|
|
|
|
return conn->partner && conn->partner->state!=CONN_CLOSED; |
|
|
|
|
|
} |
|
|
bool conn_buffers_present(tproxy_conn_t *conn) |
|
|
bool conn_buffers_present(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
return send_buffers_present(conn->wr_buf,sizeof(conn->wr_buf)/sizeof(conn->wr_buf[0])); |
|
|
return send_buffers_present(conn->wr_buf,sizeof(conn->wr_buf)/sizeof(conn->wr_buf[0])); |
|
@ -122,9 +126,15 @@ bool conn_has_unsent(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
return !conn->remote && conn->wr_unsent || conn_buffers_present(conn); |
|
|
return !conn->remote && conn->wr_unsent || conn_buffers_present(conn); |
|
|
} |
|
|
} |
|
|
|
|
|
int conn_bytes_unread(tproxy_conn_t *conn) |
|
|
|
|
|
{ |
|
|
|
|
|
int numbytes=-1; |
|
|
|
|
|
ioctl(conn->fd, FIONREAD, &numbytes)!=-1; |
|
|
|
|
|
return numbytes; |
|
|
|
|
|
} |
|
|
bool conn_has_unsent_pair(tproxy_conn_t *conn) |
|
|
bool conn_has_unsent_pair(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
return conn_has_unsent(conn) || (conn->partner && conn_has_unsent(conn->partner)); |
|
|
return conn_has_unsent(conn) || (conn_partner_alive(conn) && conn_has_unsent(conn->partner)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -365,7 +375,7 @@ bool epoll_update_flow(tproxy_conn_t *conn) |
|
|
uint32_t evtmask = (conn->state==CONN_RDHUP ? 0 : EPOLLRDHUP)|(conn->bFlowIn?EPOLLIN:0)|(conn->bFlowOut?EPOLLOUT:0); |
|
|
uint32_t evtmask = (conn->state==CONN_RDHUP ? 0 : EPOLLRDHUP)|(conn->bFlowIn?EPOLLIN:0)|(conn->bFlowOut?EPOLLOUT:0); |
|
|
if (!epoll_set(conn, evtmask)) |
|
|
if (!epoll_set(conn, evtmask)) |
|
|
return false; |
|
|
return false; |
|
|
//printf("SET FLOW fd=%d to in=%d out=%d state_rdhup=%d\n",conn->fd,conn->bFlowIn,conn->bFlowOut,conn->state==CONN_RDHUP);
|
|
|
DBGPRINT("SET FLOW fd=%d to in=%d out=%d state_rdhup=%d",conn->fd,conn->bFlowIn,conn->bFlowOut,conn->state==CONN_RDHUP); |
|
|
conn->bFlowInPrev = conn->bFlowIn; |
|
|
conn->bFlowInPrev = conn->bFlowIn; |
|
|
conn->bFlowOutPrev = conn->bFlowOut; |
|
|
conn->bFlowOutPrev = conn->bFlowOut; |
|
|
conn->bPrevRdhup = (conn->state==CONN_RDHUP); |
|
|
conn->bPrevRdhup = (conn->state==CONN_RDHUP); |
|
@ -496,16 +506,21 @@ bool check_connection_attempt(tproxy_conn_t *conn, int efd) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool epoll_set_flow_pair(tproxy_conn_t *conn) |
|
|
bool epoll_set_flow_pair(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
bool bHasUnsent = conn_has_unsent(conn); |
|
|
bool bHasUnsent = conn_has_unsent(conn); |
|
|
bool bHasUnsentPartner = conn->partner ? conn_has_unsent(conn->partner) : false; |
|
|
bool bHasUnsentPartner = conn_partner_alive(conn) ? conn_has_unsent(conn->partner) : false; |
|
|
|
|
|
|
|
|
if (!epoll_set_flow(conn, !bHasUnsentPartner, bHasUnsent)) |
|
|
DBGPRINT("epoll_set_flow_pair fd=%d partner_fd=%d bHasUnsent=%d bHasUnsentPartner=%d state_rdhup=%d", |
|
|
|
|
|
conn->fd , conn_partner_alive(conn) ? conn->partner->fd : 0, bHasUnsent, bHasUnsentPartner, conn->state==CONN_RDHUP); |
|
|
|
|
|
if (!epoll_set_flow(conn, !bHasUnsentPartner && (conn->state!=CONN_RDHUP), bHasUnsent || conn->state==CONN_RDHUP)) |
|
|
return false; |
|
|
return false; |
|
|
if (conn->partner) |
|
|
if (conn_partner_alive(conn)) |
|
|
if (!epoll_set_flow(conn->partner, !bHasUnsent, bHasUnsentPartner)) |
|
|
{ |
|
|
|
|
|
if (!epoll_set_flow(conn->partner, !bHasUnsent && (conn->partner->state!=CONN_RDHUP), conn->partner->bFlowOut = bHasUnsentPartner || conn->partner->state==CONN_RDHUP)) |
|
|
return false; |
|
|
return false; |
|
|
|
|
|
} |
|
|
return true; |
|
|
return true; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -513,7 +528,7 @@ bool handle_unsent(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
ssize_t wr=0,twr=0; |
|
|
ssize_t wr=0,twr=0; |
|
|
|
|
|
|
|
|
//printf("+handle_unsent, fd=%d has_unsent=%d has_unsent_partner=%d\n",conn->fd,conn_has_unsent(conn),conn->partner ? conn_has_unsent(conn->partner) : false);
|
|
|
DBGPRINT("+handle_unsent, fd=%d has_unsent=%d has_unsent_partner=%d",conn->fd,conn_has_unsent(conn),conn_partner_alive(conn) ? conn_has_unsent(conn->partner) : false); |
|
|
|
|
|
|
|
|
// its possible to have unsent data both in the pipe and in buffers
|
|
|
// its possible to have unsent data both in the pipe and in buffers
|
|
|
// but we initialize pipe only on local leg
|
|
|
// but we initialize pipe only on local leg
|
|
@ -522,7 +537,7 @@ bool handle_unsent(tproxy_conn_t *conn) |
|
|
if (conn->wr_unsent) |
|
|
if (conn->wr_unsent) |
|
|
{ |
|
|
{ |
|
|
wr = splice(conn->splice_pipe[0], NULL, conn->fd, NULL, conn->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
wr = splice(conn->splice_pipe[0], NULL, conn->fd, NULL, conn->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
//printf("splice unsent=%zd wr=%zd err=%d\n",conn->wr_unsent,wr,errno);
|
|
|
DBGPRINT("splice unsent=%zd wr=%zd err=%d",conn->wr_unsent,wr,errno); |
|
|
if (wr<0) |
|
|
if (wr<0) |
|
|
{ |
|
|
{ |
|
|
if (errno==EAGAIN) wr=0; |
|
|
if (errno==EAGAIN) wr=0; |
|
@ -536,6 +551,7 @@ bool handle_unsent(tproxy_conn_t *conn) |
|
|
if (!conn->wr_unsent && conn_buffers_present(conn)) |
|
|
if (!conn->wr_unsent && conn_buffers_present(conn)) |
|
|
{ |
|
|
{ |
|
|
wr=conn_buffers_send(conn); |
|
|
wr=conn_buffers_send(conn); |
|
|
|
|
|
DBGPRINT("conn_buffers_send wr=%zd",wr); |
|
|
if (wr<0) return false; |
|
|
if (wr<0) return false; |
|
|
twr += wr; |
|
|
twr += wr; |
|
|
} |
|
|
} |
|
@ -552,17 +568,17 @@ bool handle_epoll(tproxy_conn_t *conn, uint32_t evt) |
|
|
size_t bs; |
|
|
size_t bs; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//printf("+handle_epoll\n");
|
|
|
DBGPRINT("+handle_epoll"); |
|
|
|
|
|
|
|
|
if (!handle_unsent(conn)) |
|
|
if (!handle_unsent(conn)) |
|
|
return false; // error
|
|
|
return false; // error
|
|
|
if (!conn->partner && !conn_has_unsent(conn)) |
|
|
if (!conn_partner_alive(conn) && !conn_has_unsent(conn)) |
|
|
return false; // when no partner, we only waste read and send unsent
|
|
|
return false; // when no partner, we only waste read and send unsent
|
|
|
|
|
|
|
|
|
if (!(evt & EPOLLIN)) |
|
|
if (!(evt & EPOLLIN)) |
|
|
return true; // nothing to read
|
|
|
return true; // nothing to read
|
|
|
|
|
|
|
|
|
if (!conn->partner) |
|
|
if (!conn_partner_alive(conn)) |
|
|
{ |
|
|
{ |
|
|
// throw it to a black hole
|
|
|
// throw it to a black hole
|
|
|
char waste[1448]; |
|
|
char waste[1448]; |
|
@ -573,18 +589,16 @@ bool handle_epoll(tproxy_conn_t *conn, uint32_t evt) |
|
|
rd+=rrd; |
|
|
rd+=rrd; |
|
|
conn->trd+=rrd; |
|
|
conn->trd+=rrd; |
|
|
} |
|
|
} |
|
|
//printf("wasted recv=%zd all_rd=%zd err=%d\n",rrd,rd,errno);
|
|
|
DBGPRINT("wasted recv=%zd all_rd=%zd err=%d",rrd,rd,errno); |
|
|
return true; |
|
|
return true; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// do not receive new until old is sent
|
|
|
// do not receive new until old is sent
|
|
|
if (conn_has_unsent(conn->partner)) |
|
|
if (conn_has_unsent(conn->partner)) |
|
|
return true; |
|
|
return true; |
|
|
|
|
|
|
|
|
numbytes=-1; |
|
|
numbytes=conn_bytes_unread(conn); |
|
|
ioctl(conn->fd, FIONREAD, &numbytes)!=-1; |
|
|
DBGPRINT("numbytes=%d",numbytes); |
|
|
//printf("numbytes=%d\n",numbytes);
|
|
|
|
|
|
|
|
|
|
|
|
if (numbytes>0) |
|
|
if (numbytes>0) |
|
|
{ |
|
|
{ |
|
|
if (conn->remote) |
|
|
if (conn->remote) |
|
@ -593,14 +607,14 @@ bool handle_epoll(tproxy_conn_t *conn, uint32_t evt) |
|
|
// pipe is in the local leg, so its in conn->partner->splice_pipe
|
|
|
// pipe is in the local leg, so its in conn->partner->splice_pipe
|
|
|
|
|
|
|
|
|
rd = splice(conn->fd, NULL, conn->partner->splice_pipe[1], NULL, SPLICE_LEN, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
rd = splice(conn->fd, NULL, conn->partner->splice_pipe[1], NULL, SPLICE_LEN, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
//printf("splice len=%d rd=%zd err=%d\n",SPLICE_LEN,rd,errno);
|
|
|
DBGPRINT("splice len=%d rd=%zd err=%d",SPLICE_LEN,rd,errno); |
|
|
if (rd<0 && errno==EAGAIN) rd=0; |
|
|
if (rd<0 && errno==EAGAIN) rd=0; |
|
|
if (rd>0) |
|
|
if (rd>0) |
|
|
{ |
|
|
{ |
|
|
conn->trd += rd; |
|
|
conn->trd += rd; |
|
|
conn->partner->wr_unsent += rd; |
|
|
conn->partner->wr_unsent += rd; |
|
|
wr = splice(conn->partner->splice_pipe[0], NULL, conn->partner->fd, NULL, conn->partner->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
wr = splice(conn->partner->splice_pipe[0], NULL, conn->partner->fd, NULL, conn->partner->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK); |
|
|
//printf("splice wr=%zd err=%d\n",wr,errno);
|
|
|
DBGPRINT("splice wr=%zd err=%d",wr,errno); |
|
|
if (wr<0 && errno==EAGAIN) wr=0; |
|
|
if (wr<0 && errno==EAGAIN) wr=0; |
|
|
if (wr>0) |
|
|
if (wr>0) |
|
|
{ |
|
|
{ |
|
@ -648,7 +662,7 @@ bool handle_epoll(tproxy_conn_t *conn, uint32_t evt) |
|
|
return false; |
|
|
return false; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//printf("-handle_epoll rd=%zd wr=%zd\n",rd,wr);
|
|
|
DBGPRINT("-handle_epoll rd=%zd wr=%zd",rd,wr); |
|
|
|
|
|
|
|
|
return rd != -1 && wr != -1; |
|
|
return rd != -1 && wr != -1; |
|
|
} |
|
|
} |
|
@ -664,8 +678,8 @@ bool remove_closed_connections(int efd, struct tailhead *close_list) |
|
|
|
|
|
|
|
|
shutdown(conn->fd,SHUT_RDWR); |
|
|
shutdown(conn->fd,SHUT_RDWR); |
|
|
epoll_del(conn); |
|
|
epoll_del(conn); |
|
|
printf("Socket fd=%d (partner_fd=%d, remote=%d) closed, connection removed. total_read=%zu total_write=%zu\n", |
|
|
printf("Socket fd=%d (partner_fd=%d, remote=%d) closed, connection removed. total_read=%zu total_write=%zu event_count=%d\n", |
|
|
conn->fd, conn->partner ? conn->partner->fd : 0, conn->remote, conn->trd, conn->twr); |
|
|
conn->fd, conn->partner ? conn->partner->fd : 0, conn->remote, conn->trd, conn->twr, conn->event_count); |
|
|
free_conn(conn); |
|
|
free_conn(conn); |
|
|
bRemoved = true; |
|
|
bRemoved = true; |
|
|
} |
|
|
} |
|
@ -683,11 +697,10 @@ void close_tcp_conn(tproxy_conn_t *conn, struct tailhead *conn_list, struct tail |
|
|
|
|
|
|
|
|
bool read_all_and_buffer(tproxy_conn_t *conn) |
|
|
bool read_all_and_buffer(tproxy_conn_t *conn) |
|
|
{ |
|
|
{ |
|
|
if (conn->partner) |
|
|
if (conn_partner_alive(conn)) |
|
|
{ |
|
|
{ |
|
|
//printf("read_all_and_buffer\n");
|
|
|
int numbytes=conn_bytes_unread(conn); |
|
|
int numbytes=-1; |
|
|
DBGPRINT("read_all_and_buffer numbytes=%d",numbytes); |
|
|
ioctl(conn->fd, FIONREAD, &numbytes); |
|
|
|
|
|
if (numbytes>0) |
|
|
if (numbytes>0) |
|
|
{ |
|
|
{ |
|
|
if (send_buffer_create(conn->partner->wr_buf+2, NULL, numbytes)) |
|
|
if (send_buffer_create(conn->partner->wr_buf+2, NULL, numbytes)) |
|
@ -740,20 +753,14 @@ void print_legs(struct tailhead *conn_list) |
|
|
if (conn->state!=CONN_CLOSED) close_tcp_conn(conn, &conn_list, &close_list); \ |
|
|
if (conn->state!=CONN_CLOSED) close_tcp_conn(conn, &conn_list, &close_list); \ |
|
|
} |
|
|
} |
|
|
#define CONN_CLOSE_BOTH(conn) { \ |
|
|
#define CONN_CLOSE_BOTH(conn) { \ |
|
|
if (conn->partner) CONN_CLOSE(conn->partner); \ |
|
|
if (conn_partner_alive(conn)) CONN_CLOSE(conn->partner); \ |
|
|
CONN_CLOSE(conn); \ |
|
|
CONN_CLOSE(conn); \ |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#define CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn) { \ |
|
|
#define CONN_CLOSE_WITH_PARTNER_CHECK(conn) { \ |
|
|
CONN_CLOSE(conn); \ |
|
|
CONN_CLOSE(conn); \ |
|
|
if (conn->partner) conn_shutdown(conn->partner); \ |
|
|
if (conn_partner_alive(conn) && !conn_has_unsent(conn->partner)) \ |
|
|
} |
|
|
CONN_CLOSE(conn->partner); \ |
|
|
|
|
|
|
|
|
bool conn_shutdown(tproxy_conn_t *conn) |
|
|
|
|
|
{ |
|
|
|
|
|
// after shutdown we must receive data remainder and send unsent
|
|
|
|
|
|
// if we dont receive data, connectin can be stalled because of "TCP window full"
|
|
|
|
|
|
return !shutdown(conn->fd,SHUT_RD) && epoll_set_flow(conn, true, true); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
int event_loop(int listen_fd) |
|
|
int event_loop(int listen_fd) |
|
@ -789,7 +796,7 @@ int event_loop(int listen_fd) |
|
|
|
|
|
|
|
|
while (1) |
|
|
while (1) |
|
|
{ |
|
|
{ |
|
|
//printf("\nepoll_wait\n");
|
|
|
DBGPRINT("epoll_wait"); |
|
|
|
|
|
|
|
|
if ((num_events = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1)) == -1) |
|
|
if ((num_events = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1)) == -1) |
|
|
{ |
|
|
{ |
|
@ -832,8 +839,9 @@ int event_loop(int listen_fd) |
|
|
else |
|
|
else |
|
|
{ |
|
|
{ |
|
|
conn = (tproxy_conn_t*)events[i].data.ptr; |
|
|
conn = (tproxy_conn_t*)events[i].data.ptr; |
|
|
|
|
|
conn->event_count++; |
|
|
|
|
|
|
|
|
//printf("\nEVENT mask %08X fd=%d fd_partner=%d\n",events[i].events,conn->fd,conn->partner ? conn->partner->fd : 0);
|
|
|
DBGPRINT("\nEVENT mask %08X fd=%d fd_partner=%d",events[i].events,conn->fd,conn_partner_alive(conn) ? conn->partner->fd : 0); |
|
|
|
|
|
|
|
|
if (conn->state != CONN_CLOSED) |
|
|
if (conn->state != CONN_CLOSED) |
|
|
{ |
|
|
{ |
|
@ -847,7 +855,7 @@ int event_loop(int listen_fd) |
|
|
{ |
|
|
{ |
|
|
if (!check_connection_attempt(conn, efd)) |
|
|
if (!check_connection_attempt(conn, efd)) |
|
|
{ |
|
|
{ |
|
|
fprintf(stderr, "Connection attempt failed for %d\n", conn->fd); |
|
|
fprintf(stderr, "Connection attempt failed for fd=%d\n", conn->fd); |
|
|
CONN_CLOSE_BOTH(conn); |
|
|
CONN_CLOSE_BOTH(conn); |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
@ -858,15 +866,14 @@ int event_loop(int listen_fd) |
|
|
|
|
|
|
|
|
if (conn_has_unsent(conn)) |
|
|
if (conn_has_unsent(conn)) |
|
|
{ |
|
|
{ |
|
|
//printf("conn fd=%d has unsent, not closing\n", conn->fd);
|
|
|
DBGPRINT("conn fd=%d has unsent, not closing", conn->fd); |
|
|
conn_shutdown(conn); |
|
|
|
|
|
if (conn->partner) conn_shutdown(conn->partner); |
|
|
|
|
|
conn->state = CONN_RDHUP; // only writes
|
|
|
conn->state = CONN_RDHUP; // only writes
|
|
|
|
|
|
epoll_set_flow(conn,false,true); |
|
|
} |
|
|
} |
|
|
else |
|
|
else |
|
|
{ |
|
|
{ |
|
|
//printf("conn fd=%d has no unsent, closing\n", conn->fd);
|
|
|
DBGPRINT("conn fd=%d has no unsent, closing", conn->fd); |
|
|
CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn); |
|
|
CONN_CLOSE_WITH_PARTNER_CHECK(conn); |
|
|
} |
|
|
} |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
@ -876,8 +883,8 @@ int event_loop(int listen_fd) |
|
|
// will not receive this until successful check_connection_attempt()
|
|
|
// will not receive this until successful check_connection_attempt()
|
|
|
if (!handle_epoll(conn, events[i].events)) |
|
|
if (!handle_epoll(conn, events[i].events)) |
|
|
{ |
|
|
{ |
|
|
//printf("handle_epoll false\n");
|
|
|
DBGPRINT("handle_epoll false"); |
|
|
CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn); |
|
|
CONN_CLOSE_WITH_PARTNER_CHECK(conn); |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -896,4 +903,3 @@ int event_loop(int listen_fd) |
|
|
|
|
|
|
|
|
return retval; |
|
|
return retval; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|