From f31e2d8b897cee6006a1494c4a195caa8c2f2d7e Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Tue, 20 Sep 2005 19:24:11 +0000 Subject: [PATCH] * Allocate request messages on piece creation. The request objects can be shared by several peers. At least in end game. * Link blocks with the peers we are loading them from and vice versa. * Limit the number of requests / peer in end game too. * Improve end game by using some sort of round robin for block requests. --- btpd/peer.c | 83 ++++++---------- btpd/peer.h | 19 +++- btpd/policy.h | 6 +- btpd/policy_if.c | 51 ++++++---- btpd/policy_subr.c | 243 ++++++++++++++++++++++++++++++++++----------- btpd/torrent.h | 13 ++- 6 files changed, 278 insertions(+), 137 deletions(-) diff --git a/btpd/peer.c b/btpd/peer.c index fdd79ee..71bcd64 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -44,13 +44,6 @@ peer_kill(struct peer *p) free(nl); nl = next; } - nl = BTPDQ_FIRST(&p->my_reqs); - while (nl != NULL) { - struct nb_link *next = BTPDQ_NEXT(nl, entry); - nb_drop(nl->nb); - free(nl); - nl = next; - } p->reader->kill(p->reader); if (p->piece_field != NULL) @@ -119,49 +112,40 @@ peer_sent(struct peer *p, struct net_buf *nb) } void -peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len) +peer_request(struct peer *p, struct block_request *req) { - if (p->tp->endgame == 0) - assert(p->nreqs_out < MAXPIPEDREQUESTS); + assert(p->nreqs_out < MAXPIPEDREQUESTS); p->nreqs_out++; - struct net_buf *nb = nb_create_request(index, begin, len); - struct nb_link *nl = btpd_calloc(1, sizeof(*nl)); - nl->nb = nb; - nb_hold(nb); - BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry); - peer_send(p, nb); + BTPDQ_INSERT_TAIL(&p->my_reqs, req, p_entry); + peer_send(p, req->blk->msg); +} + +int +peer_requested(struct peer *p, struct block *blk) +{ + struct block_request *req; + BTPDQ_FOREACH(req, &p->my_reqs, p_entry) + if (req->blk == blk) + return 1; + return 0; } void -peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len) +peer_cancel(struct peer *p, struct block_request *req, struct net_buf *nb) { - struct net_buf *nb = NULL; + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + p->nreqs_out--; + + int removed = 0; struct nb_link *nl; -again: - BTPDQ_FOREACH(nl, &p->my_reqs, entry) { - int match = nb_get_begin(nl->nb) == begin - && nb_get_index(nl->nb) == index - && nb_get_length(nl->nb) == len; - if (match) + BTPDQ_FOREACH(nl, &p->outq, entry) { + if (nl->nb == req->blk->msg) { + removed = peer_unsend(p, nl); break; - } - if (nl != NULL) { - if (nb == NULL) { - nb = nb_create_cancel(index, begin, len); - peer_send(p, nb); } - BTPDQ_REMOVE(&p->my_reqs, nl, entry); - nb_drop(nl->nb); - free(nl); - p->nreqs_out--; - goto again; } -} - -void -peer_have(struct peer *p, uint32_t index) -{ - peer_send(p, nb_create_have(index)); + if (!removed) + peer_send(p, nb); } void @@ -343,19 +327,18 @@ void peer_on_piece(struct peer *p, uint32_t index, uint32_t begin, uint32_t length, const char *data) { - struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); - if (nl != NULL && - nb_get_begin(nl->nb) == begin && - nb_get_index(nl->nb) == index && - nb_get_length(nl->nb) == length) { + struct block_request *req = BTPDQ_FIRST(&p->my_reqs); + if (req == NULL) + return; + struct net_buf *nb = req->blk->msg; + if (nb_get_begin(nb) == begin && + nb_get_index(nb) == index && + nb_get_length(nb) == length) { assert(p->nreqs_out > 0); p->nreqs_out--; - BTPDQ_REMOVE(&p->my_reqs, nl, entry); - nb_drop(nl->nb); - free(nl); - - cm_on_block(p, index, begin, length, data); + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + cm_on_block(p, req, index, begin, length, data); } } diff --git a/btpd/peer.h b/btpd/peer.h index 675c9a8..a8eb80d 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -15,6 +15,15 @@ #define MAXPIECEMSGS 128 #define MAXPIPEDREQUESTS 10 +struct block_request { + struct peer *p; + struct block *blk; + BTPDQ_ENTRY(block_request) p_entry; + BTPDQ_ENTRY(block_request) blk_entry; +}; + +BTPDQ_HEAD(block_request_tq, block_request); + struct peer { int sd; uint16_t flags; @@ -26,7 +35,7 @@ struct peer { struct torrent *tp; - struct nb_tq my_reqs; + struct block_request_tq my_reqs; unsigned nreqs_out; unsigned npiece_msgs; @@ -58,11 +67,11 @@ void peer_unchoke(struct peer *p); void peer_choke(struct peer *p); void peer_unwant(struct peer *p, uint32_t index); void peer_want(struct peer *p, uint32_t index); -void peer_request(struct peer *p, uint32_t index, - uint32_t begin, uint32_t len); -void peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len); +void peer_request(struct peer *p, struct block_request *req); +void peer_cancel(struct peer *p, struct block_request *req, + struct net_buf *nb); -void peer_have(struct peer *p, uint32_t index); +int peer_requested(struct peer *p, struct block *blk); unsigned long peer_get_rate(unsigned long *rates); diff --git a/btpd/policy.h b/btpd/policy.h index 76369a0..c039644 100644 --- a/btpd/policy.h +++ b/btpd/policy.h @@ -17,11 +17,11 @@ void cm_on_piece(struct piece *pc); struct piece *cm_new_piece(struct torrent *tp, uint32_t index); struct piece *cm_find_piece(struct torrent *tp, uint32_t index); unsigned cm_piece_assign_requests(struct piece *pc, struct peer *p); -void cm_piece_assign_requests_eg(struct piece *pc, struct peer *p); unsigned cm_assign_requests(struct peer *p); void cm_assign_requests_eg(struct peer *p); void cm_unassign_requests(struct peer *p); void cm_unassign_requests_eg(struct peer *p); +void cm_piece_reorder_eg(struct piece *pc); // policy_if.c @@ -39,8 +39,8 @@ void cm_on_uninterest(struct peer *p); void cm_on_download(struct peer *p); void cm_on_undownload(struct peer *p); void cm_on_piece_ann(struct peer *p, uint32_t index); -void cm_on_block(struct peer *p, uint32_t index, uint32_t begin, - uint32_t length, const char *data); +void cm_on_block(struct peer *p, struct block_request *req, + uint32_t index, uint32_t begin, uint32_t length, const char *data); void cm_on_ok_piece(struct piece *pc); void cm_on_bad_piece(struct piece *pc); diff --git a/btpd/policy_if.c b/btpd/policy_if.c index f1b2e44..2b49357 100644 --- a/btpd/policy_if.c +++ b/btpd/policy_if.c @@ -40,11 +40,10 @@ cm_on_piece_ann(struct peer *p, uint32_t index) return; struct piece *pc = cm_find_piece(tp, index); if (tp->endgame) { - if (pc != NULL) { - peer_want(p, index); - if (!peer_chokes(p)) - cm_piece_assign_requests_eg(pc, p); - } + assert(pc != NULL); + peer_want(p, index); + if (!peer_chokes(p) && !peer_laden(p)) + cm_assign_requests_eg(p); } else if (pc == NULL) { peer_want(p, index); if (!peer_chokes(p) && !peer_laden(p)) { @@ -146,6 +145,7 @@ cm_on_ok_piece(struct piece *pc) if (peer_has(p, pc->index)) peer_unwant(p, pc->index); + assert(pc->nreqs == 0); piece_free(pc); if (torrent_has_all(tp)) { @@ -178,8 +178,8 @@ cm_on_bad_piece(struct piece *pc) if (tp->endgame) { struct peer *p; BTPDQ_FOREACH(p, &tp->peers, cm_entry) { - if (peer_has(p, pc->index) && peer_leech_ok(p)) - cm_piece_assign_requests_eg(pc, p); + if (peer_has(p, pc->index) && peer_leech_ok(p) && !peer_laden(p)) + cm_assign_requests_eg(p); } } else cm_on_piece_unfull(pc); // XXX: May get bad data again. @@ -245,31 +245,46 @@ cm_on_lost_peer(struct peer *p) } void -cm_on_block(struct peer *p, uint32_t index, uint32_t begin, uint32_t length, - const char *data) +cm_on_block(struct peer *p, struct block_request *req, + uint32_t index, uint32_t begin, uint32_t length, const char *data) { struct torrent *tp = p->tp; + struct block *blk = req->blk; + struct piece *pc = blk->pc; + + BTPDQ_REMOVE(&blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; off_t cbegin = index * p->tp->meta.piece_length + begin; torrent_put_bytes(p->tp, data, cbegin, length); - struct piece *pc = cm_find_piece(tp, index); - assert(pc != NULL); - - uint32_t block = begin / PIECE_BLOCKLEN; - set_bit(pc->have_field, block); + set_bit(pc->have_field, begin / PIECE_BLOCKLEN); pc->ngot++; if (tp->endgame) { - BTPDQ_FOREACH(p, &tp->peers, cm_entry) { - if (peer_has(p, index) && p->nreqs_out > 0) - peer_cancel(p, index, begin, length); + if (!BTPDQ_EMPTY(&blk->reqs)) { + struct net_buf *nb = nb_create_cancel(index, begin, length); + nb_hold(nb); + struct block_request *req = BTPDQ_FIRST(&blk->reqs); + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, blk_entry); + peer_cancel(req->p, req, nb); + free(req); + pc->nreqs--; + req = next; + } + BTPDQ_INIT(&blk->reqs); + nb_drop(nb); } + cm_piece_reorder_eg(pc); if (pc->ngot == pc->nblocks) cm_on_piece(pc); + if (peer_leech_ok(p) && !peer_laden(p)) + cm_assign_requests_eg(p); } else { // XXX: Needs to be looked at if we introduce snubbing. - clear_bit(pc->down_field, block); + clear_bit(pc->down_field, begin / PIECE_BLOCKLEN); pc->nbusy--; if (pc->ngot == pc->nblocks) cm_on_piece(pc); diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c index 142abf8..5683ed8 100644 --- a/btpd/policy_subr.c +++ b/btpd/policy_subr.c @@ -35,13 +35,14 @@ piece_alloc(struct torrent *tp, uint32_t index) assert(!has_bit(tp->busy_field, index) && tp->npcs_busy < tp->meta.npieces); struct piece *pc; - size_t mem, field; + size_t mem, field, blocks; unsigned nblocks; off_t piece_length = torrent_piece_size(tp, index); nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN); + blocks = sizeof(pc->blocks[0]) * nblocks; field = (size_t)ceil(nblocks / 8.0); - mem = sizeof(*pc) + field; + mem = sizeof(*pc) + field + blocks; pc = btpd_calloc(1, mem); pc->tp = tp; @@ -49,13 +50,28 @@ piece_alloc(struct torrent *tp, uint32_t index) pc->have_field = tp->block_field + index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17)); - pc->nblocks = nblocks; + pc->index = index; + pc->nblocks = nblocks; + + pc->nreqs = 0; + pc->next_block = 0; for (unsigned i = 0; i < nblocks; i++) if (has_bit(pc->have_field, i)) pc->ngot++; + pc->blocks = (struct block *)(pc->down_field + field); + for (unsigned i = 0; i < nblocks; i++) { + uint32_t start = i * PIECE_BLOCKLEN; + uint32_t len = torrent_block_size(pc, i); + struct block *blk = &pc->blocks[i]; + blk->pc = pc; + BTPDQ_INIT(&blk->reqs); + blk->msg = nb_create_request(index, start, len); + nb_hold(blk->msg); + } + tp->npcs_busy++; set_bit(tp->busy_field, index); BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry); @@ -70,6 +86,15 @@ piece_free(struct piece *pc) tp->npcs_busy--; clear_bit(tp->busy_field, pc->index); BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + for (unsigned i = 0; i < pc->nblocks; i++) { + struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs); + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, blk_entry); + free(req); + req = next; + } + nb_drop(pc->blocks[i].msg); + } free(pc); } @@ -97,27 +122,66 @@ cm_should_enter_endgame(struct torrent *tp) return should; } +static void +cm_piece_insert_eg(struct piece *pc) +{ + struct piece_tq *getlst = &pc->tp->getlst; + if (pc->nblocks == pc->ngot) + BTPDQ_INSERT_TAIL(getlst, pc, entry); + else { + unsigned r = pc->nreqs / (pc->nblocks - pc->ngot); + struct piece *it; + BTPDQ_FOREACH(it, getlst, entry) { + if ((it->nblocks == it->ngot + || r < it->nreqs / (it->nblocks - it->ngot))) { + BTPDQ_INSERT_BEFORE(it, pc, entry); + break; + } + } + if (it == NULL) + BTPDQ_INSERT_TAIL(getlst, pc, entry); + } +} + +void +cm_piece_reorder_eg(struct piece *pc) +{ + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + cm_piece_insert_eg(pc); +} + static void cm_enter_endgame(struct torrent *tp) { struct peer *p; struct piece *pc; + struct piece *pcs[tp->npcs_busy]; + unsigned pi; + btpd_log(BTPD_L_POL, "Entering end game\n"); tp->endgame = 1; + + pi = 0; BTPDQ_FOREACH(pc, &tp->getlst, entry) { - for (uint32_t i = 0; i < pc->nblocks; i++) + for (unsigned i = 0; i < pc->nblocks; i++) clear_bit(pc->down_field, i); pc->nbusy = 0; + pcs[pi] = pc; + pi++; + } + BTPDQ_INIT(&tp->getlst); + while (pi > 0) { + pi--; + cm_piece_insert_eg(pcs[pi]); } BTPDQ_FOREACH(p, &tp->peers, cm_entry) { assert(p->nwant == 0); BTPDQ_FOREACH(pc, &tp->getlst, entry) { - if (peer_has(p, pc->index)) { + if (peer_has(p, pc->index)) peer_want(p, pc->index); - if (peer_leech_ok(p)) - cm_piece_assign_requests_eg(pc, p); - } } + if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p)) + cm_assign_requests_eg(p); } } @@ -320,6 +384,10 @@ cm_on_piece_unfull(struct piece *pc) } } +#define INCNEXTBLOCK(pc) \ + (pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks + + /* * Request as many blocks as possible on this piece from * the peer. If the piece becomes full we call cm_on_piece_full. @@ -331,18 +399,29 @@ cm_piece_assign_requests(struct piece *pc, struct peer *p) { assert(!piece_full(pc) && !peer_laden(p)); unsigned count = 0; - for (uint32_t i = 0; !piece_full(pc) && !peer_laden(p); i++) { - if (has_bit(pc->have_field, i) || has_bit(pc->down_field, i)) - continue; - set_bit(pc->down_field, i); + do { + while ((has_bit(pc->have_field, pc->next_block) + || has_bit(pc->down_field, pc->next_block))) + INCNEXTBLOCK(pc); + + struct block *blk = &pc->blocks[pc->next_block]; + struct block_request *req = btpd_malloc(sizeof(*req)); + req->p = p; + req->blk = blk; + BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry); + + peer_request(p, req); + + set_bit(pc->down_field, pc->next_block); pc->nbusy++; - uint32_t start = i * PIECE_BLOCKLEN; - uint32_t len = torrent_block_size(pc, i); - peer_request(p, pc->index, start, len); + pc->nreqs++; count++; - } + INCNEXTBLOCK(pc); + } while (!piece_full(pc) && !peer_laden(p)); + if (piece_full(pc)) cm_on_piece_full(pc); + return count; } @@ -390,74 +469,118 @@ cm_assign_requests(struct peer *p) void cm_unassign_requests(struct peer *p) { - struct torrent *tp = p->tp; - - struct piece *pc = BTPDQ_FIRST(&tp->getlst); - while (pc != NULL) { + while (p->nreqs_out > 0) { + struct block_request *req = BTPDQ_FIRST(&p->my_reqs); + struct piece *pc = req->blk->pc; int was_full = piece_full(pc); - struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); - while (nl != NULL) { - struct nb_link *next = BTPDQ_NEXT(nl, entry); - - if (pc->index == nb_get_index(nl->nb)) { - uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN; - // XXX: Needs to be looked at if we introduce snubbing. - assert(has_bit(pc->down_field, block)); - clear_bit(pc->down_field, block); - pc->nbusy--; - BTPDQ_REMOVE(&p->my_reqs, nl, entry); - nb_drop(nl->nb); - free(nl); - } - - nl = next; + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, p_entry); + + uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN; + struct block *blk = req->blk; + // XXX: Needs to be looked at if we introduce snubbing. + assert(has_bit(pc->down_field, blki)); + clear_bit(pc->down_field, blki); + pc->nbusy--; + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + p->nreqs_out--; + BTPDQ_REMOVE(&blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; + + while (next != NULL && next->blk->pc != pc) + next = BTPDQ_NEXT(next, p_entry); + req = next; } - + if (was_full && !piece_full(pc)) cm_on_piece_unfull(pc); - - pc = BTPDQ_NEXT(pc, entry); } - assert(BTPDQ_EMPTY(&p->my_reqs)); - p->nreqs_out = 0; } - -void +static void cm_piece_assign_requests_eg(struct piece *pc, struct peer *p) { - for (uint32_t i = 0; i < pc->nblocks; i++) { - if (!has_bit(pc->have_field, i)) { - uint32_t start = i * PIECE_BLOCKLEN; - uint32_t len = torrent_block_size(pc, i); - peer_request(p, pc->index, start, len); + unsigned first_block = pc->next_block; + do { + if ((has_bit(pc->have_field, pc->next_block) + || peer_requested(p, &pc->blocks[pc->next_block]))) { + INCNEXTBLOCK(pc); + continue; } - } + struct block_request *req = btpd_calloc(1, sizeof(*req)); + req->blk = &pc->blocks[pc->next_block]; + req->p = p; + BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry); + pc->nreqs++; + INCNEXTBLOCK(pc); + peer_request(p, req); + } while (!peer_laden(p) && pc->next_block != first_block); } void cm_assign_requests_eg(struct peer *p) { + assert(!peer_laden(p)); struct torrent *tp = p->tp; - struct piece *pc; - BTPDQ_FOREACH(pc, &tp->getlst, entry) { - if (peer_has(p, pc->index)) + struct piece_tq tmp; + BTPDQ_INIT(&tmp); + + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (!peer_laden(p) && pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) { cm_piece_assign_requests_eg(pc, p); + BTPDQ_REMOVE(&tp->getlst, pc, entry); + BTPDQ_INSERT_HEAD(&tmp, pc, entry); + } + pc = next; + } + + pc = BTPDQ_FIRST(&tmp); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + cm_piece_insert_eg(pc); + pc = next; } } void cm_unassign_requests_eg(struct peer *p) { - struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs); - while (nl != NULL) { - struct nb_link *next = BTPDQ_NEXT(nl, entry); - nb_drop(nl->nb); - free(nl); - nl = next; + struct block_request *req; + struct piece *pc; + struct piece_tq tmp; + BTPDQ_INIT(&tmp); + + while (p->nreqs_out > 0) { + req = BTPDQ_FIRST(&p->my_reqs); + + pc = req->blk->pc; + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + BTPDQ_INSERT_HEAD(&tmp, pc, entry); + + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, p_entry); + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + p->nreqs_out--; + BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; + + while (next != NULL && next->blk->pc != pc) + next = BTPDQ_NEXT(next, p_entry); + req = next; + } + } + assert(BTPDQ_EMPTY(&p->my_reqs)); + + pc = BTPDQ_FIRST(&tmp); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + cm_piece_insert_eg(pc); + pc = next; } - BTPDQ_INIT(&p->my_reqs); - p->nreqs_out = 0; } diff --git a/btpd/torrent.h b/btpd/torrent.h index 6398eb4..e5b7b2c 100644 --- a/btpd/torrent.h +++ b/btpd/torrent.h @@ -3,14 +3,25 @@ #define PIECE_BLOCKLEN (1 << 14) +struct block { + struct piece *pc; + struct net_buf *msg; + struct block_request_tq reqs; +}; + struct piece { struct torrent *tp; uint32_t index; - unsigned nblocks; + unsigned nreqs; + + unsigned nblocks; unsigned ngot; unsigned nbusy; + unsigned next_block; + + struct block *blocks; uint8_t *have_field; uint8_t *down_field;