A clone of btpd with my configuration changes.
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

692 rindas
17 KiB

  1. #include <sys/types.h>
  2. #include <sys/uio.h>
  3. #include <sys/socket.h>
  4. #include <netinet/in.h>
  5. #include <netdb.h>
  6. #include <sys/mman.h>
  7. #include <sys/wait.h>
  8. #include <assert.h>
  9. #include <errno.h>
  10. #include <fcntl.h>
  11. #include <math.h>
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. #include <string.h>
  15. #include <unistd.h>
  16. #include "btpd.h"
  17. static unsigned long m_bw_bytes_in;
  18. static unsigned long m_bw_bytes_out;
  19. static unsigned long m_rate_up;
  20. static unsigned long m_rate_dwn;
  21. static struct event m_net_incoming;
  22. static unsigned m_ntorrents;
  23. static struct net_tq m_torrents = BTPDQ_HEAD_INITIALIZER(m_torrents);
  24. unsigned net_npeers;
  25. struct peer_tq net_bw_readq = BTPDQ_HEAD_INITIALIZER(net_bw_readq);
  26. struct peer_tq net_bw_writeq = BTPDQ_HEAD_INITIALIZER(net_bw_writeq);
  27. struct peer_tq net_unattached = BTPDQ_HEAD_INITIALIZER(net_unattached);
  28. int
  29. net_torrent_has_peer(struct net *n, const uint8_t *id)
  30. {
  31. int has = 0;
  32. struct peer *p = BTPDQ_FIRST(&n->peers);
  33. while (p != NULL) {
  34. if (bcmp(p->id, id, 20) == 0) {
  35. has = 1;
  36. break;
  37. }
  38. p = BTPDQ_NEXT(p, p_entry);
  39. }
  40. return has;
  41. }
  42. void
  43. net_create(struct torrent *tp)
  44. {
  45. size_t field_size = ceil(tp->meta.npieces / 8.0);
  46. size_t mem = sizeof(*(tp->net)) + field_size +
  47. tp->meta.npieces * sizeof(*(tp->net->piece_count));
  48. struct net *n = btpd_calloc(1, mem);
  49. n->tp = tp;
  50. tp->net = n;
  51. BTPDQ_INIT(&n->getlst);
  52. n->busy_field = (uint8_t *)(n + 1);
  53. n->piece_count = (unsigned *)(n->busy_field + field_size);
  54. }
  55. void
  56. net_kill(struct torrent *tp)
  57. {
  58. free(tp->net);
  59. tp->net = NULL;
  60. }
  61. void
  62. net_start(struct torrent *tp)
  63. {
  64. struct net *n = tp->net;
  65. BTPDQ_INSERT_HEAD(&m_torrents, n, entry);
  66. m_ntorrents++;
  67. n->active = 1;
  68. }
  69. void
  70. net_stop(struct torrent *tp)
  71. {
  72. struct net *n = tp->net;
  73. assert(m_ntorrents > 0);
  74. m_ntorrents--;
  75. BTPDQ_REMOVE(&m_torrents, n, entry);
  76. n->active = 0;
  77. n->rate_up = 0;
  78. n->rate_dwn = 0;
  79. ul_on_lost_torrent(n);
  80. struct piece *pc;
  81. while ((pc = BTPDQ_FIRST(&n->getlst)) != NULL)
  82. piece_free(pc);
  83. struct peer *p = BTPDQ_FIRST(&net_unattached);
  84. while (p != NULL) {
  85. struct peer *next = BTPDQ_NEXT(p, p_entry);
  86. if (p->n == n)
  87. peer_kill(p);
  88. p = next;
  89. }
  90. p = BTPDQ_FIRST(&n->peers);
  91. while (p != NULL) {
  92. struct peer *next = BTPDQ_NEXT(p, p_entry);
  93. peer_kill(p);
  94. p = next;
  95. }
  96. }
  97. int
  98. net_active(struct torrent *tp)
  99. {
  100. return tp->net->active;
  101. }
  102. void
  103. net_write32(void *buf, uint32_t num)
  104. {
  105. uint8_t *p = buf;
  106. *p = (num >> 24) & 0xff;
  107. *(p + 1) = (num >> 16) & 0xff;
  108. *(p + 2) = (num >> 8) & 0xff;
  109. *(p + 3) = num & 0xff;
  110. }
  111. uint32_t
  112. net_read32(const void *buf)
  113. {
  114. const uint8_t *p = buf;
  115. return (uint32_t)*p << 24 | (uint32_t)*(p + 1) << 16
  116. | (uint16_t)*(p + 2) << 8 | *(p + 3);
  117. }
  118. static unsigned long
  119. net_write(struct peer *p, unsigned long wmax)
  120. {
  121. struct nb_link *nl;
  122. struct iovec iov[IOV_MAX];
  123. int niov;
  124. int limited;
  125. ssize_t nwritten;
  126. unsigned long bcount;
  127. limited = wmax > 0;
  128. niov = 0;
  129. assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
  130. while ((niov < IOV_MAX && nl != NULL
  131. && (!limited || (limited && wmax > 0)))) {
  132. if (niov > 0) {
  133. iov[niov].iov_base = nl->nb->buf;
  134. iov[niov].iov_len = nl->nb->len;
  135. } else {
  136. iov[niov].iov_base = nl->nb->buf + p->outq_off;
  137. iov[niov].iov_len = nl->nb->len - p->outq_off;
  138. }
  139. if (limited) {
  140. if (iov[niov].iov_len > wmax)
  141. iov[niov].iov_len = wmax;
  142. wmax -= iov[niov].iov_len;
  143. }
  144. niov++;
  145. nl = BTPDQ_NEXT(nl, entry);
  146. }
  147. nwritten = writev(p->sd, iov, niov);
  148. if (nwritten < 0) {
  149. if (errno == EAGAIN) {
  150. btpd_ev_add(&p->out_ev, NULL);
  151. p->t_wantwrite = btpd_seconds;
  152. return 0;
  153. } else {
  154. btpd_log(BTPD_L_CONN, "write error: %s\n", strerror(errno));
  155. peer_kill(p);
  156. return 0;
  157. }
  158. } else if (nwritten == 0) {
  159. btpd_log(BTPD_L_CONN, "connection closed by peer.\n");
  160. peer_kill(p);
  161. return 0;
  162. }
  163. bcount = nwritten;
  164. nl = BTPDQ_FIRST(&p->outq);
  165. while (bcount > 0) {
  166. unsigned long bufdelta = nl->nb->len - p->outq_off;
  167. if (bcount >= bufdelta) {
  168. peer_sent(p, nl->nb);
  169. if (nl->nb->type == NB_TORRENTDATA) {
  170. p->n->uploaded += bufdelta;
  171. p->count_up += bufdelta;
  172. }
  173. bcount -= bufdelta;
  174. BTPDQ_REMOVE(&p->outq, nl, entry);
  175. nb_drop(nl->nb);
  176. free(nl);
  177. p->outq_off = 0;
  178. nl = BTPDQ_FIRST(&p->outq);
  179. } else {
  180. if (nl->nb->type == NB_TORRENTDATA) {
  181. p->n->uploaded += bcount;
  182. p->count_up += bcount;
  183. }
  184. p->outq_off += bcount;
  185. bcount = 0;
  186. }
  187. }
  188. if (!BTPDQ_EMPTY(&p->outq)) {
  189. btpd_ev_add(&p->out_ev, NULL);
  190. p->t_wantwrite = btpd_seconds;
  191. }
  192. return nwritten;
  193. }
  194. static int
  195. net_dispatch_msg(struct peer *p, const char *buf)
  196. {
  197. uint32_t index, begin, length;
  198. int res = 0;
  199. switch (p->in.msg_num) {
  200. case MSG_CHOKE:
  201. peer_on_choke(p);
  202. break;
  203. case MSG_UNCHOKE:
  204. peer_on_unchoke(p);
  205. break;
  206. case MSG_INTEREST:
  207. peer_on_interest(p);
  208. break;
  209. case MSG_UNINTEREST:
  210. peer_on_uninterest(p);
  211. break;
  212. case MSG_HAVE:
  213. peer_on_have(p, net_read32(buf));
  214. break;
  215. case MSG_BITFIELD:
  216. if (p->npieces == 0)
  217. peer_on_bitfield(p, buf);
  218. else
  219. res = 1;
  220. break;
  221. case MSG_REQUEST:
  222. if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
  223. index = net_read32(buf);
  224. begin = net_read32(buf + 4);
  225. length = net_read32(buf + 8);
  226. if ((length > PIECE_BLOCKLEN
  227. || index >= p->n->tp->meta.npieces
  228. || !cm_has_piece(p->n->tp, index)
  229. || begin + length > torrent_piece_size(p->n->tp, index))) {
  230. btpd_log(BTPD_L_MSG, "bad request: (%u, %u, %u) from %p\n",
  231. index, begin, length, p);
  232. res = 1;
  233. break;
  234. }
  235. peer_on_request(p, index, begin, length);
  236. }
  237. break;
  238. case MSG_CANCEL:
  239. index = net_read32(buf);
  240. begin = net_read32(buf + 4);
  241. length = net_read32(buf + 8);
  242. peer_on_cancel(p, index, begin, length);
  243. break;
  244. case MSG_PIECE:
  245. length = p->in.msg_len - 9;
  246. peer_on_piece(p, p->in.pc_index, p->in.pc_begin, length, buf);
  247. break;
  248. default:
  249. abort();
  250. }
  251. return res;
  252. }
  253. static int
  254. net_mh_ok(struct peer *p)
  255. {
  256. uint32_t mlen = p->in.msg_len;
  257. switch (p->in.msg_num) {
  258. case MSG_CHOKE:
  259. case MSG_UNCHOKE:
  260. case MSG_INTEREST:
  261. case MSG_UNINTEREST:
  262. return mlen == 1;
  263. case MSG_HAVE:
  264. return mlen == 5;
  265. case MSG_BITFIELD:
  266. return mlen == (uint32_t)ceil(p->n->tp->meta.npieces / 8.0) + 1;
  267. case MSG_REQUEST:
  268. case MSG_CANCEL:
  269. return mlen == 13;
  270. case MSG_PIECE:
  271. return mlen <= PIECE_BLOCKLEN + 9;
  272. default:
  273. return 0;
  274. }
  275. }
  276. static void
  277. net_progress(struct peer *p, size_t length)
  278. {
  279. if (p->in.state == BTP_MSGBODY && p->in.msg_num == MSG_PIECE) {
  280. p->n->downloaded += length;
  281. p->count_dwn += length;
  282. }
  283. }
  284. static int
  285. net_state(struct peer *p, const char *buf)
  286. {
  287. switch (p->in.state) {
  288. case SHAKE_PSTR:
  289. if (bcmp(buf, "\x13""BitTorrent protocol", 20) != 0)
  290. goto bad;
  291. peer_set_in_state(p, SHAKE_INFO, 20);
  292. break;
  293. case SHAKE_INFO:
  294. if (p->flags & PF_INCOMING) {
  295. struct net *n;
  296. BTPDQ_FOREACH(n, &m_torrents, entry)
  297. if (bcmp(buf, n->tp->meta.info_hash, 20) == 0)
  298. break;
  299. if (n == NULL)
  300. goto bad;
  301. p->n = n;
  302. peer_send(p, nb_create_shake(p->n->tp));
  303. } else if (bcmp(buf, p->n->tp->meta.info_hash, 20) != 0)
  304. goto bad;
  305. peer_set_in_state(p, SHAKE_ID, 20);
  306. break;
  307. case SHAKE_ID:
  308. if ((net_torrent_has_peer(p->n, buf)
  309. || bcmp(buf, btpd_get_peer_id(), 20) == 0))
  310. goto bad;
  311. bcopy(buf, p->id, 20);
  312. peer_on_shake(p);
  313. peer_set_in_state(p, BTP_MSGSIZE, 4);
  314. break;
  315. case BTP_MSGSIZE:
  316. p->in.msg_len = net_read32(buf);
  317. if (p->in.msg_len == 0)
  318. peer_on_keepalive(p);
  319. else
  320. peer_set_in_state(p, BTP_MSGHEAD, 1);
  321. break;
  322. case BTP_MSGHEAD:
  323. p->in.msg_num = buf[0];
  324. if (!net_mh_ok(p))
  325. goto bad;
  326. else if (p->in.msg_len == 1) {
  327. if (net_dispatch_msg(p, buf) != 0)
  328. goto bad;
  329. peer_set_in_state(p, BTP_MSGSIZE, 4);
  330. } else if (p->in.msg_num == MSG_PIECE)
  331. peer_set_in_state(p, BTP_PIECEMETA, 8);
  332. else
  333. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 1);
  334. break;
  335. case BTP_PIECEMETA:
  336. p->in.pc_index = net_read32(buf);
  337. p->in.pc_begin = net_read32(buf + 4);
  338. peer_set_in_state(p, BTP_MSGBODY, p->in.msg_len - 9);
  339. break;
  340. case BTP_MSGBODY:
  341. if (net_dispatch_msg(p, buf) != 0)
  342. goto bad;
  343. peer_set_in_state(p, BTP_MSGSIZE, 4);
  344. break;
  345. default:
  346. abort();
  347. }
  348. return 0;
  349. bad:
  350. btpd_log(BTPD_L_CONN, "bad data from %p (%u, %u, %u).\n",
  351. p, p->in.state, p->in.msg_len, p->in.msg_num);
  352. peer_kill(p);
  353. return -1;
  354. }
  355. #define GRBUFLEN (1 << 15)
  356. static unsigned long
  357. net_read(struct peer *p, unsigned long rmax)
  358. {
  359. size_t rest = p->in.buf != NULL ? p->in.st_bytes - p->in.off : 0;
  360. char buf[GRBUFLEN];
  361. struct iovec iov[2] = {
  362. {
  363. p->in.buf + p->in.off,
  364. rest
  365. }, {
  366. buf,
  367. sizeof(buf)
  368. }
  369. };
  370. if (rmax > 0) {
  371. if (iov[0].iov_len > rmax)
  372. iov[0].iov_len = rmax;
  373. iov[1].iov_len = min(rmax - iov[0].iov_len, iov[1].iov_len);
  374. }
  375. ssize_t nread = readv(p->sd, iov, 2);
  376. if (nread < 0 && errno == EAGAIN)
  377. goto out;
  378. else if (nread < 0) {
  379. btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
  380. peer_kill(p);
  381. return 0;
  382. } else if (nread == 0) {
  383. btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
  384. peer_kill(p);
  385. return 0;
  386. }
  387. if (rest > 0) {
  388. if (nread < rest) {
  389. p->in.off += nread;
  390. net_progress(p, nread);
  391. goto out;
  392. }
  393. net_progress(p, rest);
  394. if (net_state(p, p->in.buf) != 0)
  395. return nread;
  396. free(p->in.buf);
  397. p->in.buf = NULL;
  398. p->in.off = 0;
  399. }
  400. iov[1].iov_len = nread - rest;
  401. while (p->in.st_bytes <= iov[1].iov_len) {
  402. size_t consumed = p->in.st_bytes;
  403. net_progress(p, consumed);
  404. if (net_state(p, iov[1].iov_base) != 0)
  405. return nread;
  406. iov[1].iov_base += consumed;
  407. iov[1].iov_len -= consumed;
  408. }
  409. if (iov[1].iov_len > 0) {
  410. net_progress(p, iov[1].iov_len);
  411. p->in.off = iov[1].iov_len;
  412. p->in.buf = btpd_malloc(p->in.st_bytes);
  413. bcopy(iov[1].iov_base, p->in.buf, iov[1].iov_len);
  414. }
  415. out:
  416. btpd_ev_add(&p->in_ev, NULL);
  417. return nread > 0 ? nread : 0;
  418. }
  419. int
  420. net_connect2(struct sockaddr *sa, socklen_t salen, int *sd)
  421. {
  422. if ((*sd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  423. return errno;
  424. set_nonblocking(*sd);
  425. if (connect(*sd, sa, salen) == -1 && errno != EINPROGRESS) {
  426. int err = errno;
  427. btpd_log(BTPD_L_CONN, "Botched connection %s.\n", strerror(errno));
  428. close(*sd);
  429. return err;
  430. }
  431. return 0;
  432. }
  433. int
  434. net_connect(const char *ip, int port, int *sd)
  435. {
  436. struct addrinfo hints, *res;
  437. char portstr[6];
  438. assert(net_npeers < net_max_peers);
  439. if (snprintf(portstr, sizeof(portstr), "%d", port) >= sizeof(portstr))
  440. return EINVAL;
  441. bzero(&hints, sizeof(hints));
  442. hints.ai_family = AF_UNSPEC;
  443. hints.ai_flags = AI_NUMERICHOST;
  444. hints.ai_socktype = SOCK_STREAM;
  445. if (getaddrinfo(ip, portstr, &hints, &res) != 0)
  446. return errno;
  447. int error = net_connect2(res->ai_addr, res->ai_addrlen, sd);
  448. freeaddrinfo(res);
  449. return error;
  450. }
  451. void
  452. net_connection_cb(int sd, short type, void *arg)
  453. {
  454. int nsd;
  455. nsd = accept(sd, NULL, NULL);
  456. if (nsd < 0) {
  457. if (errno == EWOULDBLOCK || errno == ECONNABORTED)
  458. return;
  459. else
  460. btpd_err("accept4: %s\n", strerror(errno));
  461. }
  462. if (set_nonblocking(nsd) != 0) {
  463. close(nsd);
  464. return;
  465. }
  466. assert(net_npeers <= net_max_peers);
  467. if (net_npeers == net_max_peers) {
  468. close(nsd);
  469. return;
  470. }
  471. peer_create_in(nsd);
  472. btpd_log(BTPD_L_CONN, "got connection.\n");
  473. }
  474. #define RATEHISTORY 20
  475. static unsigned long
  476. compute_rate_sub(unsigned long rate)
  477. {
  478. if (rate > 256 * RATEHISTORY)
  479. return rate / RATEHISTORY;
  480. else
  481. return min(256, rate);
  482. }
  483. static void
  484. compute_rates(void) {
  485. unsigned long tot_up = 0, tot_dwn = 0;
  486. struct net *n;
  487. BTPDQ_FOREACH(n, &m_torrents, entry) {
  488. unsigned long tp_up = 0, tp_dwn = 0;
  489. struct peer *p;
  490. BTPDQ_FOREACH(p, &n->peers, p_entry) {
  491. if (p->count_up > 0 || peer_active_up(p)) {
  492. tp_up += p->count_up;
  493. p->rate_up += p->count_up - compute_rate_sub(p->rate_up);
  494. p->count_up = 0;
  495. }
  496. if (p->count_dwn > 0 || peer_active_down(p)) {
  497. tp_dwn += p->count_dwn;
  498. p->rate_dwn += p->count_dwn - compute_rate_sub(p->rate_dwn);
  499. p->count_dwn = 0;
  500. }
  501. }
  502. n->rate_up += tp_up - compute_rate_sub(n->rate_up);
  503. n->rate_dwn += tp_dwn - compute_rate_sub(n->rate_dwn);
  504. tot_up += tp_up;
  505. tot_dwn += tp_dwn;
  506. }
  507. m_rate_up += tot_up - compute_rate_sub(m_rate_up);
  508. m_rate_dwn += tot_dwn - compute_rate_sub(m_rate_dwn);
  509. }
  510. static void
  511. net_bw_tick(void)
  512. {
  513. struct peer *p;
  514. m_bw_bytes_out = net_bw_limit_out;
  515. m_bw_bytes_in = net_bw_limit_in;
  516. if (net_bw_limit_in > 0) {
  517. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL && m_bw_bytes_in > 0) {
  518. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  519. p->flags &= ~PF_ON_READQ;
  520. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  521. }
  522. } else {
  523. while ((p = BTPDQ_FIRST(&net_bw_readq)) != NULL) {
  524. BTPDQ_REMOVE(&net_bw_readq, p, rq_entry);
  525. p->flags &= ~PF_ON_READQ;
  526. net_read(p, 0);
  527. }
  528. }
  529. if (net_bw_limit_out) {
  530. while (((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL
  531. && m_bw_bytes_out > 0)) {
  532. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  533. p->flags &= ~PF_ON_WRITEQ;
  534. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  535. }
  536. } else {
  537. while ((p = BTPDQ_FIRST(&net_bw_writeq)) != NULL) {
  538. BTPDQ_REMOVE(&net_bw_writeq, p, wq_entry);
  539. p->flags &= ~PF_ON_WRITEQ;
  540. net_write(p, 0);
  541. }
  542. }
  543. }
  544. static void
  545. run_peer_ticks(void)
  546. {
  547. struct net *n;
  548. struct peer *p, *next;
  549. BTPDQ_FOREACH_MUTABLE(p, &net_unattached, p_entry, next)
  550. peer_on_tick(p);
  551. BTPDQ_FOREACH(n, &m_torrents, entry)
  552. BTPDQ_FOREACH_MUTABLE(p, &n->peers, p_entry, next)
  553. peer_on_tick(p);
  554. }
  555. void
  556. net_on_tick(void)
  557. {
  558. run_peer_ticks();
  559. compute_rates();
  560. net_bw_tick();
  561. }
  562. void
  563. net_read_cb(int sd, short type, void *arg)
  564. {
  565. struct peer *p = (struct peer *)arg;
  566. if (net_bw_limit_in == 0)
  567. net_read(p, 0);
  568. else if (m_bw_bytes_in > 0)
  569. m_bw_bytes_in -= net_read(p, m_bw_bytes_in);
  570. else {
  571. p->flags |= PF_ON_READQ;
  572. BTPDQ_INSERT_TAIL(&net_bw_readq, p, rq_entry);
  573. }
  574. }
  575. void
  576. net_write_cb(int sd, short type, void *arg)
  577. {
  578. struct peer *p = (struct peer *)arg;
  579. if (net_bw_limit_out == 0)
  580. net_write(p, 0);
  581. else if (m_bw_bytes_out > 0)
  582. m_bw_bytes_out -= net_write(p, m_bw_bytes_out);
  583. else {
  584. p->flags |= PF_ON_WRITEQ;
  585. BTPDQ_INSERT_TAIL(&net_bw_writeq, p, wq_entry);
  586. }
  587. }
  588. void
  589. net_init(void)
  590. {
  591. m_bw_bytes_out = net_bw_limit_out;
  592. m_bw_bytes_in = net_bw_limit_in;
  593. int safe_fds = min(getdtablesize(), FD_SETSIZE) * 4 / 5;
  594. if (net_max_peers == 0 || net_max_peers > safe_fds)
  595. net_max_peers = safe_fds;
  596. int sd;
  597. int flag = 1;
  598. struct sockaddr_in addr;
  599. addr.sin_family = AF_INET;
  600. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  601. addr.sin_port = htons(net_port);
  602. if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
  603. btpd_err("socket: %s\n", strerror(errno));
  604. setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
  605. if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  606. btpd_err("bind: %s\n", strerror(errno));
  607. listen(sd, 10);
  608. set_nonblocking(sd);
  609. event_set(&m_net_incoming, sd, EV_READ | EV_PERSIST,
  610. net_connection_cb, NULL);
  611. btpd_ev_add(&m_net_incoming, NULL);
  612. }