From 0502d4bf9219527eb4af327db24d94763e1e49c9 Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Wed, 20 Jul 2005 20:11:03 +0000 Subject: [PATCH] Rework the download algorithm. This isn't tested yet, but it compiles so it must be bug free :) --- btpd/Makefile.am | 2 +- btpd/peer.c | 12 +- btpd/peer.h | 6 +- btpd/policy.c | 737 -------------------------------------------- btpd/policy.h | 66 ++-- btpd/policy_choke.c | 90 ++++++ btpd/policy_if.c | 263 ++++++++++++++++ btpd/policy_subr.c | 483 +++++++++++++++++++++++++++++ btpd/torrent.c | 19 ++ btpd/torrent.h | 14 +- 10 files changed, 929 insertions(+), 763 deletions(-) delete mode 100644 btpd/policy.c create mode 100644 btpd/policy_choke.c create mode 100644 btpd/policy_if.c create mode 100644 btpd/policy_subr.c diff --git a/btpd/Makefile.am b/btpd/Makefile.am index c227bea..b18b432 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -5,7 +5,7 @@ btpd_SOURCES=\ net.c net.h\ queue.h \ peer.c peer.h\ - policy.c policy.h\ + policy_choke.c policy_if.c policy_subr.c policy.h\ torrent.c torrent.h\ tracker_req.c tracker_req.h diff --git a/btpd/peer.c b/btpd/peer.c index ec5de83..d73b79c 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -68,6 +68,7 @@ peer_kill(struct peer *p) void peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len) { + p->nreqs_out++; struct piece_req *req = btpd_calloc(1, sizeof(*req)); req->index = index; req->begin = begin; @@ -179,7 +180,6 @@ peer_create_out(struct torrent *tp, const uint8_t *id, p = peer_create_common(sd); p->tp = tp; - //bcopy(id, p->id, 20); net_handshake(p, 0); } @@ -273,14 +273,18 @@ void peer_on_piece(struct peer *p, uint32_t index, uint32_t begin, uint32_t length, const char *data) { - off_t cbegin = index * p->tp->meta.piece_length + begin; struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); if (req != NULL && req->index == index && req->begin == begin && req->length == length) { - torrent_put_bytes(p->tp, data, cbegin, length); - cm_on_block(p); + + assert(p->nreqs_out > 0); + p->nreqs_out--; + BTPDQ_REMOVE(&p->my_reqs, req, entry); + free(req); + + cm_on_block(p, index, begin, length, data); } } diff --git a/btpd/peer.h b/btpd/peer.h index 4547d2a..725f0a0 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -12,12 +12,14 @@ #define RATEHISTORY 20 +#define MAXPIPEDREQUESTS 5 + struct peer { int sd; uint8_t flags; uint8_t *piece_field; uint32_t npieces; - unsigned nwant; + uint32_t nwant; uint8_t id[20]; @@ -25,6 +27,8 @@ struct peer { struct piece_req_tq p_reqs, my_reqs; + unsigned nreqs_out; + struct io_tq outq; struct event in_ev; diff --git a/btpd/policy.c b/btpd/policy.c deleted file mode 100644 index 5b8ea67..0000000 --- a/btpd/policy.c +++ /dev/null @@ -1,737 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "btpd.h" -#include "stream.h" -#include "tracker_req.h" - -#define BLOCKLEN (1 << 14) - -static void cm_on_piece(struct torrent *tp, struct piece *piece); - -static void -assign_piece_requests_eg(struct piece *piece, struct peer *peer) -{ - for (unsigned i = 0; i < piece->nblocks; i++) { - if (!has_bit(piece->have_field, i)) { - uint32_t start = i * BLOCKLEN; - uint32_t len; - if (i < piece->nblocks - 1) - len = BLOCKLEN; - else if (piece->index < peer->tp->meta.npieces - 1) - len = peer->tp->meta.piece_length - i * BLOCKLEN; - else { - off_t piece_len = - peer->tp->meta.total_length - - peer->tp->meta.piece_length * - (peer->tp->meta.npieces - 1); - len = piece_len - i * BLOCKLEN; - } - peer_request(peer, piece->index, start, len); - } - } -} - -static void -cm_assign_requests_eg(struct peer *peer) -{ - struct piece *piece; - BTPDQ_FOREACH(piece, &peer->tp->getlst, entry) { - if (has_bit(peer->piece_field, piece->index)) { - peer_want(peer, piece->index); - if ((peer->flags & PF_P_CHOKE) == 0) - assign_piece_requests_eg(piece, peer); - } - } -} - -static void -cm_unassign_requests_eg(struct peer *peer) -{ - struct piece_req *req = BTPDQ_FIRST(&peer->my_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - free(req); - req = next; - } - BTPDQ_INIT(&peer->my_reqs); -} - -static void -cm_enter_endgame(struct torrent *tp) -{ - struct peer *peer; - btpd_log(BTPD_L_POL, "Entering end game\n"); - tp->endgame = 1; - BTPDQ_FOREACH(peer, &tp->peers, cm_entry) - cm_assign_requests_eg(peer); -} - -static int -piece_full(struct piece *p) -{ - return p->ngot + p->nbusy == p->nblocks; -} - -static int -cm_should_schedule(struct torrent *tp) -{ - if (!tp->endgame) { - int should = 1; - struct piece *p = BTPDQ_FIRST(&tp->getlst); - while (p != NULL) { - if (!piece_full(p)) { - should = 0; - break; - } - p = BTPDQ_NEXT(p, entry); - } - return should; - } else - return 0; -} - -static void -cm_on_peerless_piece(struct torrent *tp, struct piece *piece) -{ - if (!tp->endgame) { - assert(tp->piece_count[piece->index] == 0); - btpd_log(BTPD_L_POL, "peerless piece %u\n", piece->index); - msync(tp->imem, tp->isiz, MS_ASYNC); - BTPDQ_REMOVE(&tp->getlst, piece, entry); - free(piece); - if (cm_should_schedule(tp)) - cm_schedule_piece(tp); - } -} - -static int -rate_cmp(unsigned long rate1, unsigned long rate2) -{ - if (rate1 < rate2) - return -1; - else if (rate1 == rate2) - return 0; - else - return 1; -} - -static int -dwnrate_cmp(const void *p1, const void *p2) -{ - unsigned long rate1 = peer_get_rate((*(struct peer **)p1)->rate_to_me); - unsigned long rate2 = peer_get_rate((*(struct peer **)p2)->rate_to_me); - return rate_cmp(rate1, rate2); -} - -static int -uprate_cmp(const void *p1, const void *p2) -{ - unsigned long rate1 = peer_get_rate((*(struct peer **)p1)->rate_from_me); - unsigned long rate2 = peer_get_rate((*(struct peer **)p2)->rate_from_me); - return rate_cmp(rate1, rate2); -} - -static void -choke_alg(struct torrent *tp) -{ - int i; - struct peer *p; - struct peer **psort; - - assert(tp->npeers > 0); - - psort = (struct peer **)btpd_malloc(tp->npeers * sizeof(p)); - i = 0; - BTPDQ_FOREACH(p, &tp->peers, cm_entry) - psort[i++] = p; - - if (tp->have_npieces == tp->meta.npieces) - qsort(psort, tp->npeers, sizeof(p), uprate_cmp); - else - qsort(psort, tp->npeers, sizeof(p), dwnrate_cmp); - - tp->ndown = 0; - if (tp->optimistic != NULL) { - if (tp->optimistic->flags & PF_I_CHOKE) - peer_unchoke(tp->optimistic); - if (tp->optimistic->flags & PF_P_WANT) - tp->ndown = 1; - } - - for (i = tp->npeers - 1; i >= 0; i--) { - if (psort[i] == tp->optimistic) - continue; - if (tp->ndown < 4) { - if (psort[i]->flags & PF_P_WANT) - tp->ndown++; - if (psort[i]->flags & PF_I_CHOKE) - peer_unchoke(psort[i]); - } else { - if ((psort[i]->flags & PF_I_CHOKE) == 0) - peer_choke(psort[i]); - } - } - free(psort); - - tp->choke_time = btpd.seconds + 10; -} - -static void -next_optimistic(struct torrent *tp, struct peer *np) -{ - if (np != NULL) - tp->optimistic = np; - else if (tp->optimistic == NULL) - tp->optimistic = BTPDQ_FIRST(&tp->peers); - else { - np = BTPDQ_NEXT(tp->optimistic, cm_entry); - if (np != NULL) - tp->optimistic = np; - else - tp->optimistic = BTPDQ_FIRST(&tp->peers); - } - assert(tp->optimistic != NULL); - choke_alg(tp); - tp->opt_time = btpd.seconds + 30; -} - -void -cm_on_upload(struct peer *peer) -{ - choke_alg(peer->tp); -} - -void -cm_on_unupload(struct peer *peer) -{ - choke_alg(peer->tp); -} - -void -cm_on_interest(struct peer *peer) -{ - if ((peer->flags & PF_I_CHOKE) == 0) - cm_on_upload(peer); -} - -void -cm_on_uninterest(struct peer *peer) -{ - if ((peer->flags & PF_I_CHOKE) == 0) - cm_on_unupload(peer); -} - -void -cm_by_second(struct torrent *tp) -{ - if (btpd.seconds == tp->tracker_time) - tracker_req(tp, TR_EMPTY); - - if (btpd.seconds == tp->opt_time) - next_optimistic(tp, NULL); - - if (btpd.seconds == tp->choke_time) - choke_alg(tp); -} - -void -cm_on_download(struct peer *peer) -{ - if (!peer->tp->endgame) - assert(cm_assign_requests(peer, 5) != 0); - else - cm_assign_requests_eg(peer); -} - -void -cm_on_undownload(struct peer *peer) -{ - if (!peer->tp->endgame) - cm_unassign_requests(peer); - else - cm_unassign_requests_eg(peer); -} - -void -cm_on_unchoke(struct peer *peer) -{ - if ((peer->flags & PF_I_WANT) != 0) - cm_on_download(peer); -} - -void -cm_on_choke(struct peer *peer) -{ - if ((peer->flags & PF_I_WANT) != 0) - cm_on_undownload(peer); -} - -void -cm_on_piece_ann(struct peer *peer, uint32_t piece) -{ - struct piece *p; - struct torrent *tp = peer->tp; - - tp->piece_count[piece]++; - - if (has_bit(tp->piece_field, piece)) - return; - - p = BTPDQ_FIRST(&tp->getlst); - while (p != NULL && p->index != piece) - p = BTPDQ_NEXT(p, entry); - - if (p != NULL && tp->endgame) { - peer_want(peer, p->index); - if ((peer->flags & PF_P_CHOKE) == 0) - cm_on_download(peer); - } else if (p != NULL && !piece_full(p)) { - peer_want(peer, p->index); - if ((peer->flags & PF_P_CHOKE) == 0 && BTPDQ_EMPTY(&peer->my_reqs)) - cm_on_download(peer); - } else if (p == NULL && cm_should_schedule(tp)) - cm_schedule_piece(tp); -} - -void -cm_on_lost_peer(struct peer *peer) -{ - struct torrent *tp = peer->tp; - struct piece *piece; - - tp->npeers--; - peer->flags &= ~PF_ATTACHED; - if (tp->npeers == 0) { - BTPDQ_REMOVE(&tp->peers, peer, cm_entry); - tp->optimistic = NULL; - tp->choke_time = tp->opt_time = 0; - } else if (tp->optimistic == peer) { - struct peer *next = BTPDQ_NEXT(peer, cm_entry); - BTPDQ_REMOVE(&tp->peers, peer, cm_entry); - next_optimistic(peer->tp, next); - } else if ((peer->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { - BTPDQ_REMOVE(&tp->peers, peer, cm_entry); - cm_on_unupload(peer); - } else { - BTPDQ_REMOVE(&tp->peers, peer, cm_entry); - } - - for (size_t i = 0; i < peer->tp->meta.npieces; i++) - if (has_bit(peer->piece_field, i)) - tp->piece_count[i]--; - - if ((peer->flags & (PF_I_WANT|PF_P_CHOKE)) == PF_I_WANT) - cm_on_undownload(peer); - - piece = BTPDQ_FIRST(&tp->getlst); - while (piece != NULL) { - struct piece *next = BTPDQ_NEXT(piece, entry); - if (has_bit(peer->piece_field, piece->index) && - tp->piece_count[piece->index] == 0) - cm_on_peerless_piece(tp, piece); - piece = next; - } -} - -void -cm_on_new_peer(struct peer *peer) -{ - struct torrent *tp = peer->tp; - - tp->npeers++; - peer->flags |= PF_ATTACHED; - BTPDQ_REMOVE(&btpd.unattached, peer, cm_entry); - - if (tp->npeers == 1) { - BTPDQ_INSERT_HEAD(&tp->peers, peer, cm_entry); - next_optimistic(peer->tp, peer); - } else { - if (random() > RAND_MAX / 3) - BTPDQ_INSERT_AFTER(&tp->peers, tp->optimistic, peer, cm_entry); - else - BTPDQ_INSERT_TAIL(&tp->peers, peer, cm_entry); - } -} - -static int -missing_piece(struct torrent *tp, uint32_t index) -{ - struct piece *p; - if (has_bit(tp->piece_field, index)) - return 0; - BTPDQ_FOREACH(p, &tp->getlst, entry) - if (p->index == index) - return 0; - return 1; -} - -static struct piece * -alloc_piece(struct torrent *tp, uint32_t piece) -{ - struct piece *res; - size_t mem, field; - unsigned long nblocks; - off_t piece_length = tp->meta.piece_length; - - if (piece == tp->meta.npieces - 1) { - off_t totl = tp->meta.total_length; - off_t npm1 = tp->meta.npieces - 1; - piece_length = totl - npm1 * piece_length; - } - - nblocks = (unsigned)ceil((double)piece_length / BLOCKLEN); - field = (size_t)ceil(nblocks / 8.0); - mem = sizeof(*res) + field; - - res = btpd_calloc(1, mem); - res->down_field = (uint8_t *)res + sizeof(*res); - res->have_field = - tp->block_field + - (size_t)ceil(piece * tp->meta.piece_length / (double)(1 << 17)); - res->nblocks = nblocks; - res->index = piece; - - for (unsigned i = 0; i < nblocks; i++) - if (has_bit(res->have_field, i)) - res->ngot++; - - return res; -} - -static void -activate_piece_peers(struct torrent *tp, struct piece *piece) -{ - struct peer *peer; - assert(!piece_full(piece) && tp->endgame == 0); - BTPDQ_FOREACH(peer, &tp->peers, cm_entry) - if (has_bit(peer->piece_field, piece->index)) - peer_want(peer, piece->index); - peer = BTPDQ_FIRST(&tp->peers); - while (peer != NULL && !piece_full(piece)) { - if ((peer->flags & (PF_P_CHOKE|PF_I_WANT)) == PF_I_WANT && - BTPDQ_EMPTY(&peer->my_reqs)) { - // - cm_on_download(peer); - } - peer = BTPDQ_NEXT(peer, cm_entry); - } -} - -void -cm_schedule_piece(struct torrent *tp) -{ - uint32_t i; - uint32_t min_i; - unsigned min_c; - struct piece *piece; - int enter_end_game = 1; - - assert(tp->endgame == 0); - - for (i = 0; i < tp->meta.npieces; i++) - if (missing_piece(tp, i)) { - enter_end_game = 0; - if (tp->piece_count[i] > 0) - break; - } - - if (i == tp->meta.npieces) { - if (enter_end_game) - cm_enter_endgame(tp); - return; - } - - min_i = i; - min_c = 1; - for(i++; i < tp->meta.npieces; i++) { - if (missing_piece(tp, i) && tp->piece_count[i] > 0) { - if (tp->piece_count[i] == tp->piece_count[min_i]) - min_c++; - else if (tp->piece_count[i] < tp->piece_count[min_i]) { - min_i = i; - min_c = 1; - } - } - } - if (min_c > 1) { - min_c = 1 + rint((double)random() * (min_c - 1) / RAND_MAX); - for (i = min_i; min_c > 0; i++) { - if (missing_piece(tp, i) && - tp->piece_count[i] == tp->piece_count[min_i]) { - // - min_c--; - min_i = i; - } - } - } - - btpd_log(BTPD_L_POL, "scheduled piece: %u.\n", min_i); - piece = alloc_piece(tp, min_i); - BTPDQ_INSERT_HEAD(&tp->getlst, piece, entry); - if (piece->ngot == piece->nblocks) { - cm_on_piece(tp, piece); - if (cm_should_schedule(tp)) - cm_schedule_piece(tp); - } else - activate_piece_peers(tp, piece); -} - -static void -cm_on_piece_unfull(struct torrent *tp, struct piece *piece) -{ - activate_piece_peers(tp, piece); -} - -static void -cm_on_piece_full(struct torrent *tp, struct piece *piece) -{ - struct peer *p; - - if (cm_should_schedule(tp)) - cm_schedule_piece(tp); - BTPDQ_FOREACH(p, &tp->peers, cm_entry) { - if (has_bit(p->piece_field, piece->index)) - peer_unwant(p, piece->index); - } -} - -static int -cm_assign_request(struct peer *peer) -{ - struct piece *piece; - unsigned i; - uint32_t start, len; - - piece = BTPDQ_FIRST(&peer->tp->getlst); - while (piece != NULL) { - if (!piece_full(piece) && has_bit(peer->piece_field, piece->index)) - break; - piece = BTPDQ_NEXT(piece, entry); - } - - if (piece == NULL) - return 0; - - i = 0; - while(has_bit(piece->have_field, i) || has_bit(piece->down_field, i)) - i++; - - start = i * BLOCKLEN; - - if (i < piece->nblocks - 1) - len = BLOCKLEN; - else if (piece->index < peer->tp->meta.npieces - 1) - len = peer->tp->meta.piece_length - i * BLOCKLEN; - else { - off_t piece_len = - peer->tp->meta.total_length - - peer->tp->meta.piece_length * (peer->tp->meta.npieces - 1); - len = piece_len - i * BLOCKLEN; - } - - peer_request(peer, piece->index, start, len); - set_bit(piece->down_field, i); - piece->nbusy++; - - if (piece_full(piece)) - cm_on_piece_full(peer->tp, piece); - - return 1; -} - -int -cm_assign_requests(struct peer *peer, int nreqs) -{ - int onreqs = nreqs; - - while (nreqs > 0 && cm_assign_request(peer)) - nreqs--; - - return onreqs - nreqs; -} - -void -cm_unassign_requests(struct peer *peer) -{ - struct torrent *tp = peer->tp; - struct piece *piece = BTPDQ_FIRST(&tp->getlst); - - while (piece != NULL) { - int was_full = piece_full(piece); - - struct piece_req *req = BTPDQ_FIRST(&peer->my_reqs); - while (req != NULL) { - struct piece_req *next = BTPDQ_NEXT(req, entry); - - if (piece->index == req->index) { - assert(has_bit(piece->down_field, req->begin / BLOCKLEN)); - clear_bit(piece->down_field, req->begin / BLOCKLEN); - piece->nbusy--; - BTPDQ_REMOVE(&peer->my_reqs, req, entry); - free(req); - } - - req = next; - } - - if (was_full && !piece_full(piece)) - cm_on_piece_unfull(tp, piece); - - piece = BTPDQ_NEXT(piece, entry); - } - - assert(BTPDQ_EMPTY(&peer->my_reqs)); -} - -static int -test_hash(struct torrent *tp, uint8_t *hash, unsigned long index) -{ - if (tp->meta.piece_hash != NULL) - return memcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH); - else { - char piece_hash[SHA_DIGEST_LENGTH]; - int fd; - int bufi; - int err; - - err = vopen(&fd, O_RDONLY, "%s", tp->relpath); - if (err != 0) - btpd_err("test_hash: %s\n", strerror(err)); - - err = lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, - SEEK_SET); - if (err < 0) - btpd_err("test_hash: %s\n", strerror(errno)); - - bufi = 0; - while (bufi < SHA_DIGEST_LENGTH) { - ssize_t nread = - read(fd, piece_hash + bufi, SHA_DIGEST_LENGTH - bufi); - bufi += nread; - } - close(fd); - - return memcmp(hash, piece_hash, SHA_DIGEST_LENGTH); - } -} - -static int -ro_fd_cb(const char *path, int *fd, void *arg) -{ - struct torrent *tp = arg; - return vopen(fd, O_RDONLY, "%s.d/%s", tp->relpath, path); -} - -static void -cm_on_piece(struct torrent *tp, struct piece *piece) -{ - int err; - uint8_t hash[20]; - struct bt_stream_ro *bts; - off_t plen = tp->meta.piece_length; - if (piece->index == tp->meta.npieces - 1) { - plen = - tp->meta.total_length - - tp->meta.piece_length * (tp->meta.npieces - 1); - } - if ((bts = bts_open_ro(&tp->meta, piece->index * tp->meta.piece_length, - ro_fd_cb, tp)) == NULL) - btpd_err("Out of memory.\n"); - - - if ((err = bts_sha(bts, plen, hash)) != 0) - btpd_err("Ouch! %s\n", strerror(err)); - - bts_close_ro(bts); - - if (test_hash(tp, hash, piece->index) == 0) { - btpd_log(BTPD_L_POL, "Got piece: %u.\n", piece->index); - struct peer *p; - set_bit(tp->piece_field, piece->index); - tp->have_npieces++; - if (tp->have_npieces == tp->meta.npieces) { - btpd_log(BTPD_L_BTPD, "Finished: %s.\n", tp->relpath); - tracker_req(tp, TR_COMPLETED); - } - msync(tp->imem, tp->isiz, MS_ASYNC); - BTPDQ_FOREACH(p, &tp->peers, cm_entry) - peer_have(p, piece->index); - if (tp->endgame) - BTPDQ_FOREACH(p, &tp->peers, cm_entry) - peer_unwant(p, piece->index); - BTPDQ_REMOVE(&tp->getlst, piece, entry); - free(piece); - } else if (tp->endgame) { - struct peer *p; - btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n", - piece->index, tp->relpath); - for (unsigned i = 0; i < piece->nblocks; i++) - clear_bit(piece->have_field, i); - piece->ngot = 0; - BTPDQ_FOREACH(p, &tp->peers, cm_entry) - if (has_bit(p->piece_field, piece->index) && - (p->flags & PF_P_CHOKE) == 0) { - // - assign_piece_requests_eg(piece, p); - } - } else { - btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n", - piece->index, tp->relpath); - for (unsigned i = 0; i < piece->nblocks; i++) { - clear_bit(piece->have_field, i); - assert(!has_bit(piece->down_field, i)); - } - msync(tp->imem, tp->isiz, MS_ASYNC); - BTPDQ_REMOVE(&tp->getlst, piece, entry); - free(piece); - if (cm_should_schedule(tp)) - cm_schedule_piece(tp); - } -} - -void -cm_on_block(struct peer *peer) -{ - struct torrent *tp = peer->tp; - struct piece_req *req = BTPDQ_FIRST(&peer->my_reqs); - struct piece *piece = BTPDQ_FIRST(&tp->getlst); - unsigned block = req->begin / BLOCKLEN; - while (piece != NULL && piece->index != req->index) - piece = BTPDQ_NEXT(piece, entry); - set_bit(piece->have_field, block); - clear_bit(piece->down_field, block); - piece->ngot++; - piece->nbusy--; - if (tp->endgame) { - uint32_t index = req->index; - uint32_t begin = req->begin; - uint32_t length = req->length; - struct peer *p; - - BTPDQ_REMOVE(&peer->my_reqs, req, entry); - free(req); - - BTPDQ_FOREACH(p, &tp->peers, cm_entry) { - if (has_bit(p->piece_field, index) && - (peer->flags & PF_P_CHOKE) == 0) - peer_cancel(p, index, begin, length); - } - if (piece->ngot == piece->nblocks) - cm_on_piece(tp, piece); - } else { - BTPDQ_REMOVE(&peer->my_reqs, req, entry); - free(req); - if (piece->ngot == piece->nblocks) - cm_on_piece(tp, piece); - if ((peer->flags & (PF_I_WANT|PF_P_CHOKE)) == PF_I_WANT) - cm_assign_requests(peer, 1); - } -} diff --git a/btpd/policy.h b/btpd/policy.h index a57bbac..a5f8b11 100644 --- a/btpd/policy.h +++ b/btpd/policy.h @@ -1,25 +1,55 @@ #ifndef BTPD_POLICY_H #define BTPD_POLICY_H +// policy_choke.c + +void choke_alg(struct torrent *tp); +void next_optimistic(struct torrent *tp, struct peer *np); + +// policy_subr.c + +struct piece *torrent_get_piece(struct torrent *tp, uint32_t index); +int piece_full(struct piece *pc); +int peer_chokes(struct peer *p); +int peer_wanted(struct peer *p); +int peer_laden(struct peer *p); +int peer_has(struct peer *p, uint32_t index); +int peer_leech_ok(struct peer *p); + +void piece_free(struct piece *pc); + +void cm_on_piece_unfull(struct piece *pc); +void cm_on_piece(struct piece *pc); + +struct piece *cm_new_piece(struct torrent *tp, uint32_t index); +unsigned cm_piece_assign_requests(struct piece *pc, struct peer *p); +void cm_piece_assign_requests_eg(struct piece *pc, struct peer *p); +unsigned cm_assign_requests(struct peer *p); +void cm_assign_requests_eg(struct peer *p); + +void cm_unassign_requests(struct peer *p); +void cm_unassign_requests_eg(struct peer *p); + +// policy_if.c + void cm_by_second(struct torrent *tp); -void cm_on_new_peer(struct peer *peer); -void cm_on_lost_peer(struct peer *peer); - -void cm_on_choke(struct peer *peer); -void cm_on_unchoke(struct peer *peer); -void cm_on_upload(struct peer *peer); -void cm_on_unupload(struct peer *peer); -void cm_on_interest(struct peer *peer); -void cm_on_uninterest(struct peer *peer); -void cm_on_download(struct peer *peer); -void cm_on_undownload(struct peer *peer); -void cm_on_piece_ann(struct peer *peer, uint32_t piece); -void cm_on_block(struct peer *peer); - -void cm_schedule_piece(struct torrent *tp); -int cm_assign_requests(struct peer *peer, int nreqs); - -void cm_unassign_requests(struct peer *peer); +void cm_on_new_peer(struct peer *p); +void cm_on_lost_peer(struct peer *p); + +void cm_on_choke(struct peer *p); +void cm_on_unchoke(struct peer *p); +void cm_on_upload(struct peer *p); +void cm_on_unupload(struct peer *p); +void cm_on_interest(struct peer *p); +void cm_on_uninterest(struct peer *p); +void cm_on_download(struct peer *p); +void cm_on_undownload(struct peer *p); +void cm_on_piece_ann(struct peer *p, uint32_t index); +void cm_on_block(struct peer *p, uint32_t index, uint32_t begin, + uint32_t length, const char *data); + +void cm_on_ok_piece(struct piece *pc); +void cm_on_bad_piece(struct piece *pc); #endif diff --git a/btpd/policy_choke.c b/btpd/policy_choke.c new file mode 100644 index 0000000..64a2b0d --- /dev/null +++ b/btpd/policy_choke.c @@ -0,0 +1,90 @@ +#include "btpd.h" + +static int +rate_cmp(unsigned long rate1, unsigned long rate2) +{ + if (rate1 < rate2) + return -1; + else if (rate1 == rate2) + return 0; + else + return 1; +} + +static int +dwnrate_cmp(const void *p1, const void *p2) +{ + unsigned long rate1 = peer_get_rate((*(struct peer **)p1)->rate_to_me); + unsigned long rate2 = peer_get_rate((*(struct peer **)p2)->rate_to_me); + return rate_cmp(rate1, rate2); +} + +static int +uprate_cmp(const void *p1, const void *p2) +{ + unsigned long rate1 = peer_get_rate((*(struct peer **)p1)->rate_from_me); + unsigned long rate2 = peer_get_rate((*(struct peer **)p2)->rate_from_me); + return rate_cmp(rate1, rate2); +} + +void +choke_alg(struct torrent *tp) +{ + assert(tp->npeers > 0); + + int i; + struct peer *p; + struct peer *psort[tp->npeers]; + + i = 0; + BTPDQ_FOREACH(p, &tp->peers, cm_entry) + psort[i++] = p; + + if (tp->have_npieces == tp->meta.npieces) + qsort(psort, tp->npeers, sizeof(p), uprate_cmp); + else + qsort(psort, tp->npeers, sizeof(p), dwnrate_cmp); + + tp->ndown = 0; + if (tp->optimistic != NULL) { + if (tp->optimistic->flags & PF_I_CHOKE) + peer_unchoke(tp->optimistic); + if (tp->optimistic->flags & PF_P_WANT) + tp->ndown = 1; + } + + for (i = tp->npeers - 1; i >= 0; i--) { + if (psort[i] == tp->optimistic) + continue; + if (tp->ndown < 4) { + if (psort[i]->flags & PF_P_WANT) + tp->ndown++; + if (psort[i]->flags & PF_I_CHOKE) + peer_unchoke(psort[i]); + } else { + if ((psort[i]->flags & PF_I_CHOKE) == 0) + peer_choke(psort[i]); + } + } + + tp->choke_time = btpd.seconds + 10; +} + +void +next_optimistic(struct torrent *tp, struct peer *np) +{ + if (np != NULL) + tp->optimistic = np; + else if (tp->optimistic == NULL) + tp->optimistic = BTPDQ_FIRST(&tp->peers); + else { + np = BTPDQ_NEXT(tp->optimistic, cm_entry); + if (np != NULL) + tp->optimistic = np; + else + tp->optimistic = BTPDQ_FIRST(&tp->peers); + } + assert(tp->optimistic != NULL); + choke_alg(tp); + tp->opt_time = btpd.seconds + 30; +} diff --git a/btpd/policy_if.c b/btpd/policy_if.c new file mode 100644 index 0000000..b524eca --- /dev/null +++ b/btpd/policy_if.c @@ -0,0 +1,263 @@ +#include +#include + +#include "btpd.h" +#include "tracker_req.h" + +void +cm_by_second(struct torrent *tp) +{ + if (btpd.seconds == tp->tracker_time) + tracker_req(tp, TR_EMPTY); + + if (btpd.seconds == tp->opt_time) + next_optimistic(tp, NULL); + + if (btpd.seconds == tp->choke_time) + choke_alg(tp); +} + +/* + * Called when a peer announces it's got a new piece. + * + * If the piece is missing or unfull we increase the peer's + * wanted level and if possible call cm_on_download. + */ +void +cm_on_piece_ann(struct peer *p, uint32_t index) +{ + struct torrent *tp = p->tp; + tp->piece_count[index]++; + if (has_bit(tp->piece_field, index)) + return; + struct piece *pc = torrent_get_piece(tp, index); + if (tp->endgame) { + if (pc != NULL && !piece_full(pc)) { + peer_want(p, index); + if (!peer_chokes(p)) + cm_piece_assign_requests_eg(pc, p); + } + } else if (pc == NULL) { + peer_want(p, index); + if (!peer_chokes(p)) { + pc = cm_new_piece(tp, index); + if (pc != NULL) + cm_piece_assign_requests(pc, p); + } + } else if (!piece_full(pc)) { + peer_want(p, index); + if (!peer_chokes(p)) + cm_piece_assign_requests(pc, p); + } +} + +void +cm_on_download(struct peer *p) +{ + assert(peer_wanted(p)); + struct torrent *tp = p->tp; + if (tp->endgame) { + cm_assign_requests_eg(p); + } else if (cm_assign_requests(p) == 0) + assert(!peer_wanted(p) || peer_laden(p)); +} + +void +cm_on_unchoke(struct peer *p) +{ + if (peer_wanted(p)) + cm_on_download(p); +} + +void +cm_on_undownload(struct peer *p) +{ + if (!p->tp->endgame) + cm_unassign_requests(p); + else + cm_unassign_requests_eg(p); +} + +void +cm_on_choke(struct peer *p) +{ + if (peer_wanted(p)) + cm_on_undownload(p); +} + +void +cm_on_upload(struct peer *p) +{ + choke_alg(p->tp); +} + +void +cm_on_interest(struct peer *p) +{ + if ((p->flags & PF_I_CHOKE) == 0) + cm_on_upload(p); +} + +void +cm_on_unupload(struct peer *p) +{ + choke_alg(p->tp); +} + +void +cm_on_uninterest(struct peer *p) +{ + if ((p->flags & PF_I_CHOKE) == 0) + cm_on_unupload(p); +} + +/** + * Called when a piece has been tested positively. + */ +void +cm_on_ok_piece(struct piece *pc) +{ + struct peer *p; + struct torrent *tp = pc->tp; + + btpd_log(BTPD_L_POL, "Got piece: %u.\n", pc->index); + + set_bit(tp->piece_field, pc->index); + tp->have_npieces++; + msync(tp->imem, tp->isiz, MS_ASYNC); + + BTPDQ_FOREACH(p, &tp->peers, cm_entry) + peer_have(p, pc->index); + + if (tp->endgame) + BTPDQ_FOREACH(p, &tp->peers, cm_entry) + if (peer_has(p, pc->index)) + peer_unwant(p, pc->index); + + piece_free(pc); + + if (torrent_has_all(tp)) { + btpd_log(BTPD_L_BTPD, "Finished: %s.\n", tp->relpath); + tracker_req(tp, TR_COMPLETED); + } +} + +/* + * Called when a piece has been tested negatively. + */ +void +cm_on_bad_piece(struct piece *pc) +{ + struct torrent *tp = pc->tp; + + btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n", + pc->index, tp->relpath); + + for (uint32_t i = 0; i < pc->nblocks; i++) { + clear_bit(pc->down_field, i); + clear_bit(pc->have_field, i); + } + pc->ngot = 0; + pc->nbusy = 0; + msync(tp->imem, tp->isiz, MS_ASYNC); + + if (tp->endgame) { + struct peer *p; + BTPDQ_FOREACH(p, &tp->peers, cm_entry) { + if (peer_has(p, pc->index) && peer_leech_ok(p)) + cm_piece_assign_requests_eg(pc, p); + } + } else + cm_on_piece_unfull(pc); // XXX: May get bad data again. +} + +void +cm_on_new_peer(struct peer *p) +{ + struct torrent *tp = p->tp; + + tp->npeers++; + p->flags |= PF_ATTACHED; + + if (tp->npeers == 1) { + BTPDQ_INSERT_HEAD(&tp->peers, p, cm_entry); + next_optimistic(tp, p); + } else { + if (random() > RAND_MAX / 3) + BTPDQ_INSERT_AFTER(&tp->peers, tp->optimistic, p, cm_entry); + else + BTPDQ_INSERT_TAIL(&tp->peers, p, cm_entry); + } +} + +void +cm_on_lost_peer(struct peer *p) +{ + struct torrent *tp = p->tp; + + tp->npeers--; + p->flags &= ~PF_ATTACHED; + if (tp->npeers == 0) { + BTPDQ_REMOVE(&tp->peers, p, cm_entry); + tp->optimistic = NULL; + tp->choke_time = tp->opt_time = 0; + } else if (tp->optimistic == p) { + struct peer *next = BTPDQ_NEXT(p, cm_entry); + BTPDQ_REMOVE(&tp->peers, p, cm_entry); + next_optimistic(tp, next); + } else if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { + BTPDQ_REMOVE(&tp->peers, p, cm_entry); + cm_on_unupload(p); + } else { + BTPDQ_REMOVE(&tp->peers, p, cm_entry); + } + + for (uint32_t i = 0; i < tp->meta.npieces; i++) + if (peer_has(p, i)) + tp->piece_count[i]--; + + if (peer_leech_ok(p)) + cm_on_undownload(p); +#if 0 + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + if (peer_has(p, pc->index) && tp->piece_count[pc->index] == 0) + cm_on_peerless_piece(pc); + pc = next; + } +#endif +} + +void +cm_on_block(struct peer *p, uint32_t index, uint32_t begin, uint32_t length, + const char *data) +{ + struct torrent *tp = p->tp; + + off_t cbegin = index * p->tp->meta.piece_length + begin; + torrent_put_bytes(p->tp, data, cbegin, length); + + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (pc != NULL && pc->index != index) + pc = BTPDQ_NEXT(pc, entry); + uint32_t block = begin / PIECE_BLOCKLEN; + set_bit(pc->have_field, block); + clear_bit(pc->down_field, block); + pc->ngot++; + pc->nbusy--; + + if (tp->endgame) { + BTPDQ_FOREACH(p, &tp->peers, cm_entry) { + if (peer_has(p, index) && peer_leech_ok(p)) + peer_cancel(p, index, begin, length); + } + if (pc->ngot == pc->nblocks) + cm_on_piece(pc); + } else { + if (pc->ngot == pc->nblocks) + cm_on_piece(pc); + if (peer_leech_ok(p)) + cm_assign_requests(p); + } +} diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c new file mode 100644 index 0000000..598f541 --- /dev/null +++ b/btpd/policy_subr.c @@ -0,0 +1,483 @@ +/* + * The commandments: + * + * A peer is wanted except when it only has pieces we've already + * downloaded or fully requested. Thus, a peer's wanted count is + * increased for each missing or unfull piece it announces, or + * when a piece it has becomes unfull. + * + * When a peer we want unchokes us, requests will primarily + * be put on pieces we're already downloading and then on + * possible new pieces. + * + * When choosing between several different new pieces to start + * downloading, the rarest piece will be chosen. + * + * End game mode sets in when all missing blocks are requested. + * In end game mode no piece is counted as full unless it's + * downloaded. + * + */ + +#include +#include +#include +#include + +#include + +#include "btpd.h" +#include "stream.h" + +static int +cm_should_enter_endgame(struct torrent *tp) +{ + int should; + if (tp->have_npieces + tp->npcs_busy == tp->meta.npieces) { + should = 1; + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (!piece_full(pc)) { + should = 0; + break; + } + } + } else + should = 0; + return should; +} + +static void +cm_enter_endgame(struct torrent *tp) +{ + struct peer *p; + btpd_log(BTPD_L_POL, "Entering end game\n"); + tp->endgame = 1; + BTPDQ_FOREACH(p, &tp->peers, cm_entry) { + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (peer_has(p, pc->index)) { + peer_want(p, pc->index); + if (peer_leech_ok(p)) + cm_piece_assign_requests_eg(pc, p); + } + } + } +} + +int +peer_chokes(struct peer *p) +{ + return p->flags & PF_P_CHOKE; +} + +int +peer_has(struct peer *p, uint32_t index) +{ + return has_bit(p->piece_field, index); +} + +int +peer_laden(struct peer *p) +{ + return p->nreqs_out >= MAXPIPEDREQUESTS; +} + +int +peer_wanted(struct peer *p) +{ + return (p->flags & PF_I_WANT) == PF_I_WANT; +} + +int +peer_leech_ok(struct peer *p) +{ + return (p->flags & (PF_I_WANT|PF_P_CHOKE)) == PF_I_WANT; +} + +int +piece_full(struct piece *pc) +{ + return pc->ngot + pc->nbusy == pc->nblocks; +} + +struct piece * +torrent_get_piece(struct torrent *tp, uint32_t index) +{ + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) + if (pc->index == index) + break; + return pc; +} + +static struct piece * +piece_alloc(struct torrent *tp, uint32_t index) +{ + assert(!has_bit(tp->busy_field, index) + && tp->npcs_busy < tp->meta.npieces); + struct piece *pc; + size_t mem, field; + unsigned nblocks; + off_t piece_length = torrent_piece_size(tp, index); + + nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN); + field = (size_t)ceil(nblocks / 8.0); + mem = sizeof(*pc) + field; + + pc = btpd_calloc(1, mem); + pc->tp = tp; + pc->down_field = (uint8_t *)(pc + 1); + pc->have_field = + tp->block_field + + (size_t)ceil(index * tp->meta.piece_length / (double)(1 << 17)); + pc->nblocks = nblocks; + pc->index = index; + + for (unsigned i = 0; i < nblocks; i++) + if (has_bit(pc->have_field, i)) + pc->ngot++; + + tp->npcs_busy++; + set_bit(tp->busy_field, index); + BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry); + return pc; +} + +void +piece_free(struct piece *pc) +{ + struct torrent *tp = pc->tp; + assert(tp->npcs_busy > 0); + tp->npcs_busy--; + clear_bit(tp->busy_field, pc->index); + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + free(pc); +} + +static int +test_hash(struct torrent *tp, uint8_t *hash, unsigned long index) +{ + if (tp->meta.piece_hash != NULL) + return memcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH); + else { + char piece_hash[SHA_DIGEST_LENGTH]; + int fd; + int bufi; + int err; + + err = vopen(&fd, O_RDONLY, "%s", tp->relpath); + if (err != 0) + btpd_err("test_hash: %s\n", strerror(err)); + + err = lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, + SEEK_SET); + if (err < 0) + btpd_err("test_hash: %s\n", strerror(errno)); + + bufi = 0; + while (bufi < SHA_DIGEST_LENGTH) { + ssize_t nread = + read(fd, piece_hash + bufi, SHA_DIGEST_LENGTH - bufi); + bufi += nread; + } + close(fd); + + return memcmp(hash, piece_hash, SHA_DIGEST_LENGTH); + } +} + +static int +ro_fd_cb(const char *path, int *fd, void *arg) +{ + struct torrent *tp = arg; + return vopen(fd, O_RDONLY, "%s.d/%s", tp->relpath, path); +} + +static void +torrent_test_piece(struct piece *pc) +{ + struct torrent *tp = pc->tp; + int err; + uint8_t hash[20]; + struct bt_stream_ro *bts; + off_t plen = torrent_piece_size(tp, pc->index); + + if ((bts = bts_open_ro(&tp->meta, pc->index * tp->meta.piece_length, + ro_fd_cb, tp)) == NULL) + btpd_err("Out of memory.\n"); + + if ((err = bts_sha(bts, plen, hash)) != 0) + btpd_err("Ouch! %s\n", strerror(err)); + + bts_close_ro(bts); + + if (test_hash(tp, hash, pc->index) == 0) + cm_on_ok_piece(pc); + else + cm_on_bad_piece(pc); +} + +void +cm_on_piece(struct piece *pc) +{ + torrent_test_piece(pc); +} + +static int +cm_piece_startable(struct peer *p, uint32_t index) +{ + return peer_has(p, index) && !has_bit(p->tp->piece_field, index) + && !has_bit(p->tp->busy_field, index); +} + +/* + * Find the rarest piece the peer has, that isn't already allocated + * for download or already downloaded. If no such piece can be found + * return ENOENT. + * + * Return 0 or ENOENT, index in res. + */ +static int +cm_choose_rarest(struct peer *p, uint32_t *res) +{ + uint32_t i; + struct torrent *tp = p->tp; + + assert(tp->endgame == 0); + + for (i = 0; i < tp->meta.npieces && !cm_piece_startable(p, i); i++) + ; + + if (i == tp->meta.npieces) + return ENOENT; + + uint32_t min_i = i; + uint32_t min_c = 1; + for(i++; i < tp->meta.npieces; i++) { + if (cm_piece_startable(p, i)) { + if (tp->piece_count[i] == tp->piece_count[min_i]) + min_c++; + else if (tp->piece_count[i] < tp->piece_count[min_i]) { + min_i = i; + min_c = 1; + } + } + } + if (min_c > 1) { + min_c = 1 + rint((double)random() * (min_c - 1) / RAND_MAX); + for (i = min_i; min_c > 0; i++) { + if (cm_piece_startable(p, i) + && tp->piece_count[i] == tp->piece_count[min_i]) { + min_c--; + min_i = i; + } + } + } + *res = min_i; + return 0; +} + +/* + * Allocate the piece indicated by the index for download. + * There's a small possibility that a piece is fully downloaded + * but haven't been tested. If such is the case the piece will + * be tested and NULL will be returned. Also, we might then enter + * end game. + * + * Return the piece or NULL. + */ +struct piece * +cm_new_piece(struct torrent *tp, uint32_t index) +{ + btpd_log(BTPD_L_POL, "Started on piece %u.\n", index); + struct piece *pc = piece_alloc(tp, index); + if (pc->ngot == pc->nblocks) { + cm_on_piece(pc); + if (cm_should_enter_endgame(tp)) + cm_enter_endgame(tp); + return NULL; + } else + return pc; +} + +/* + * Called from either cm_piece_assign_requests or cm_new_piece, + * when a pice becomes full. The wanted level of the peers + * that has this piece will be decreased. This function is + * the only one that may trigger end game. + */ +static void +cm_on_piece_full(struct piece *pc) +{ + struct peer *p; + BTPDQ_FOREACH(p, &pc->tp->peers, cm_entry) { + if (peer_has(p, pc->index)) + peer_unwant(p, pc->index); + } + if (cm_should_enter_endgame(pc->tp)) + cm_enter_endgame(pc->tp); +} + + +/* + * Called when a previously full piece loses a peer. + * This is needed because we have decreased the wanted + * level for the peers that have this piece when it got + * full. Thus we have to increase the wanted level and + * try to assign requests for this piece. + */ +void +cm_on_piece_unfull(struct piece *pc) +{ + struct torrent *tp = pc->tp; + struct peer *p; + assert(!piece_full(pc) && tp->endgame == 0); + BTPDQ_FOREACH(p, &tp->peers, cm_entry) + if (peer_has(p, pc->index)) + peer_want(p, pc->index); + p = BTPDQ_FIRST(&tp->peers); + while (p != NULL && !piece_full(pc)) { + if (peer_leech_ok(p) && !peer_laden(p)) + cm_piece_assign_requests(pc, p); // Cannot provoke end game here. + p = BTPDQ_NEXT(p, cm_entry); + } +} + +/* + * Request as many blocks as possible on this piece from + * the peer. If the piece becomes full we call cm_on_piece_full. + * + * Return the number of requests sent. + */ +unsigned +cm_piece_assign_requests(struct piece *pc, struct peer *p) +{ + assert(!piece_full(pc) && !peer_laden(p)); + unsigned count = 0; + for (uint32_t i = 0; !piece_full(pc) && !peer_laden(p); i++) { + if (has_bit(pc->have_field, i) || has_bit(pc->down_field, i)) + continue; + set_bit(pc->down_field, i); + pc->nbusy++; + uint32_t start = i * PIECE_BLOCKLEN; + uint32_t len = torrent_block_size(pc, i); + peer_request(p, pc->index, start, len); + count++; + } + if (piece_full(pc)) + cm_on_piece_full(pc); + return count; +} + +/* + * Request as many blocks as possible from the peer. Puts + * requests on already active pieces before starting on new + * ones. Care must be taken since end game mode may be triggered + * by the calls to cm_piece_assign_requests. + * + * Returns number of requests sent. + * + * XXX: should do something smart when deciding on which + * already started piece to put requests on. + */ +unsigned +cm_assign_requests(struct peer *p) +{ + assert(!p->tp->endgame); + struct piece *pc; + struct torrent *tp = p->tp; + unsigned count = 0; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (piece_full(pc) || !peer_has(p, pc->index)) + continue; + count += cm_piece_assign_requests(pc, p); + if (tp->endgame) + break; + if (!piece_full(pc)) + assert(peer_laden(p)); + if (peer_laden(p)) + break; + } + while (!peer_laden(p) && !tp->endgame) { + uint32_t index; + if (cm_choose_rarest(p, &index) == 0) { + pc = cm_new_piece(tp, index); + if (pc != NULL) + count += cm_piece_assign_requests(pc, p); + } else + break; + } + return count; +} + +void +cm_unassign_requests(struct peer *p) +{ + struct torrent *tp = p->tp; + + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (pc != NULL) { + int was_full = piece_full(pc); + + struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); + while (req != NULL) { + struct piece_req *next = BTPDQ_NEXT(req, entry); + + if (pc->index == req->index) { + // XXX: Needs to be looked at if we introduce snubbing. + assert(has_bit(pc->down_field, req->begin / PIECE_BLOCKLEN)); + clear_bit(pc->down_field, req->begin / PIECE_BLOCKLEN); + pc->nbusy--; + BTPDQ_REMOVE(&p->my_reqs, req, entry); + free(req); + } + + req = next; + } + + if (was_full && !piece_full(pc)) + cm_on_piece_unfull(pc); + + pc = BTPDQ_NEXT(pc, entry); + } + + assert(BTPDQ_EMPTY(&p->my_reqs)); +} + + +void +cm_piece_assign_requests_eg(struct piece *pc, struct peer *p) +{ + for (uint32_t i = 0; i < pc->nblocks; i++) { + if (!has_bit(pc->have_field, i)) { + uint32_t start = i * PIECE_BLOCKLEN; + uint32_t len = torrent_block_size(pc, i); + peer_request(p, pc->index, start, len); + } + } +} + +void +cm_assign_requests_eg(struct peer *p) +{ + struct torrent *tp = p->tp; + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (peer_has(p, pc->index)) + cm_piece_assign_requests_eg(pc, p); + } +} + +void +cm_unassign_requests_eg(struct peer *p) +{ + struct piece_req *req = BTPDQ_FIRST(&p->my_reqs); + while (req != NULL) { + struct piece_req *next = BTPDQ_NEXT(req, entry); + free(req); + req = next; + } + BTPDQ_INIT(&p->my_reqs); + p->nreqs_out = 0; +} diff --git a/btpd/torrent.c b/btpd/torrent.c index 90a463b..88553f8 100644 --- a/btpd/torrent.c +++ b/btpd/torrent.c @@ -43,6 +43,7 @@ torrent_load3(const char *file, struct metainfo *mi, char *mem, size_t memsiz) btpd_err("Out of memory.\n"); tp->piece_count = btpd_calloc(mi->npieces, sizeof(tp->piece_count[0])); + tp->busy_field = btpd_calloc(ceil(mi->npieces / 8.0), 1); BTPDQ_INIT(&tp->peers); BTPDQ_INIT(&tp->getlst); @@ -178,6 +179,7 @@ torrent_unload(struct torrent *tp) } free(tp->piece_count); + free(tp->busy_field); free((void *)tp->relpath); clear_metainfo(&tp->meta); @@ -262,3 +264,20 @@ torrent_piece_size(struct torrent *tp, uint32_t index) return tp->meta.total_length - allbutlast; } } + +uint32_t +torrent_block_size(struct piece *pc, uint32_t index) +{ + if (index < pc->nblocks - 1) + return PIECE_BLOCKLEN; + else { + uint32_t allbutlast = PIECE_BLOCKLEN * (pc->nblocks - 1); + return torrent_piece_size(pc->tp, pc->index) - allbutlast; + } +} + +int +torrent_has_all(struct torrent *tp) +{ + return tp->have_npieces == tp->meta.npieces; +} diff --git a/btpd/torrent.h b/btpd/torrent.h index 022201b..6398eb4 100644 --- a/btpd/torrent.h +++ b/btpd/torrent.h @@ -1,7 +1,11 @@ #ifndef BTPD_TORRENT_H #define BTPD_TORRENT_H +#define PIECE_BLOCKLEN (1 << 14) + struct piece { + struct torrent *tp; + uint32_t index; unsigned nblocks; @@ -28,9 +32,12 @@ struct torrent { uint8_t *piece_field; uint8_t *block_field; + uint8_t *busy_field; + uint32_t npcs_busy; + uint32_t have_npieces; - - unsigned long *piece_count; + + unsigned *piece_count; uint64_t uploaded, downloaded; @@ -65,5 +72,8 @@ int torrent_has_peer(struct torrent *tp, const uint8_t *id); struct torrent *torrent_get_by_hash(const uint8_t *hash); off_t torrent_piece_size(struct torrent *tp, uint32_t index); +uint32_t torrent_block_size(struct piece *pc, uint32_t index); + +int torrent_has_all(struct torrent *tp); #endif