summaryrefslogtreecommitdiff
path: root/code/core/rbuf.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
commitcbba099541d27400ad45083a4b1102b86f9e8dea (patch)
treee260d7da3d07b4de0e989726917f08d23c55f70d /code/core/rbuf.c
parente9ced8e60689c6f46ac4fc31013b88f4c3f4fa80 (diff)
rbuffer almost implemented
Diffstat (limited to 'code/core/rbuf.c')
-rw-r--r--code/core/rbuf.c49
1 files changed, 43 insertions, 6 deletions
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index 0865e67..bb110b9 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -3,14 +3,15 @@
#include <string.h>
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
- memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
rbuf->msg = msg;
if (msg_size) {
+ if (msg == NULL) return ECP_ERR;
rbuf->msg_size = msg_size;
+ memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size);
} else {
rbuf->msg_size = ECP_RBUF_SEQ_HALF;
}
-
+
return ECP_OK;
}
@@ -22,17 +23,53 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
return ECP_ERR_RBUF_IDX;
}
-ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
- int idx = ecp_rbuf_msg_idx(rbuf, seq);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
+ idx = idx < 0 ? ecp_rbuf_msg_idx(rbuf, seq) : idx;
if (idx < 0) return idx;
if (rbuf->msg == NULL) return 0;
if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG;
- memcpy(rbuf->msg[idx].msg, msg, msg_size);
- rbuf->msg[idx].size = msg_size;
+ if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) {
+ memcpy(rbuf->msg[idx].msg, msg, msg_size);
+ rbuf->msg[idx].size = msg_size;
+ }
rbuf->msg[idx].flags = set_flags;
return msg_size;
}
+int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv = ecp_conn_rbuf_send_start(conn);
+ if (rv) return rv;
+
+ if (!conn->out) {
+ rv = ecp_conn_rbuf_recv_start(conn, seq);
+ if (rv) return rv;
+ }
+
+ return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
+ int do_send;
+ ECPRBSend *buf = conn->rbuf.send;
+
+ ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
+ if (rv < 0) return rv;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ if (buf->in_transit < buf->win_size) {
+ buf->in_transit++;
+ do_send = 1;
+ }
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
+ return rv;
+}
+