Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libp2p-identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ libp2p-swarm = { path = "../libp2p-swarm" }
multiaddr = "0.2.0"
protobuf = "1.4.2"
tokio-io = "0.1.0"
varint = { path = "../varint-rs" }

[dev-dependencies]
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
Expand Down
6 changes: 3 additions & 3 deletions libp2p-identify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate protobuf;
extern crate tokio_io;
extern crate varint;

use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, Sink};
Expand All @@ -42,7 +43,7 @@ use protobuf::repeated::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use varint::VarintCodec;

mod structs_proto;

Expand Down Expand Up @@ -94,8 +95,7 @@ impl<C> ConnectionUpgrade<C> for IdentifyProtocol
}

fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
// TODO: use jack's varint library instead
let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket);
let socket = socket.framed(VarintCodec::default());

match ty {
Endpoint::Dialer => {
Expand Down
78 changes: 76 additions & 2 deletions varint-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ extern crate futures;
#[macro_use]
extern crate error_chain;

use bytes::BytesMut;
use bytes::{BufMut, BytesMut, IntoBuf};
use futures::{Poll, Async};
use num_bigint::BigUint;
use num_traits::ToPrimitive;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Decoder;
use tokio_io::codec::{Encoder, Decoder};
use std::io;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::mem;

mod errors {
error_chain! {
Expand Down Expand Up @@ -395,6 +397,78 @@ impl<T: Default + DecoderHelper> Decoder for VarintDecoder<T> {
}
}

#[derive(Debug)]
pub struct VarintCodec<W> {
inner: VarintCodecInner,
marker: PhantomData<W>,
}

impl<T> Default for VarintCodec<T> {
#[inline]
fn default() -> VarintCodec<T> {
VarintCodec {
inner: VarintCodecInner::WaitingForLen(VarintDecoder::default()),
marker: PhantomData,
}
}
}

#[derive(Debug)]
enum VarintCodecInner {
WaitingForLen(VarintDecoder<usize>),
WaitingForData(usize),
Poisonned,
}

impl<T> Decoder for VarintCodec<T> {
type Item = BytesMut;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.inner, VarintCodecInner::Poisonned) {
VarintCodecInner::WaitingForData(len) => {
if src.len() >= len {
self.inner = VarintCodecInner::WaitingForLen(VarintDecoder::default());
return Ok(Some(src.split_to(len)));
} else {
self.inner = VarintCodecInner::WaitingForData(len);
return Ok(None);
}
},
VarintCodecInner::WaitingForLen(mut decoder) => {
match decoder.decode(src)? {
None => {
self.inner = VarintCodecInner::WaitingForLen(decoder);
return Ok(None);
},
Some(len) => {
self.inner = VarintCodecInner::WaitingForData(len);
},
}
},
VarintCodecInner::Poisonned => {
panic!("varint codec was poisoned")
},
}
}
}
}

impl<D> Encoder for VarintCodec<D>
where D: IntoBuf + AsRef<[u8]>,
{
type Item = D;
type Error = io::Error;

fn encode(&mut self, item: D, dst: &mut BytesMut) -> Result<(), io::Error> {
let encoded_len = encode(item.as_ref().len()); // TODO: can be optimized by not allocating?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this allocate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encode returns a Vec<u8>, while we could directly write the length to dst.

dst.put(encoded_len);
dst.put(item);
Ok(())
}
}

/// Syncronously decode a number from a `Read`
pub fn decode<R: Read, T: Default + DecoderHelper>(mut input: R) -> errors::Result<T> {
let mut decoder = DecoderState::default();
Expand Down