Monday, February 8, 2010

Xus Rsync/Bittorrent/Git approach

Approach

I started working on my file sharing approach. Sharing will not involve deltas or packs, just chunks, by Git id. The idea is to share Git commits, with all of their associated tree and blob objects. They'll be shared using a manifest for each commit that exhaustively lists all of the chunks in that commit. A manifest will be a binary XML document (using fastinfoset, same as Xus uses for its protocol), since that's a pretty efficient binary serialization format (using BERencoding for numbers in a lot of places and whatnot) and then using another XML doc to list chunks for the manifest:
  • A commit manifest contains the serialized commit data, the commit id, and a list of tree object ids/crcs and blob ids with blob chunk ids/crcs
  • A commit manifest chunk list contains a list of chunks for the commit manifest file, since a commit manifest file could get large
I'll probably use an extra branch to store the chunks that a peer currently has for that branch. When it gets new chunks, it can store them in its chunk-branch for that Git branch (probably named [branch]-chunks). Git will properly reuse shared chunks because that's what Git does.

A word about chunks. Git stores its data compressed, but to get good chunks, we have to uncompress the data first and then break it into chunks. This is so that we can effectively scavenge the Git repository for chunks so that we can avoid downloading the ones we already have, ala rsync.

How rsync works

The brilliant idea of rsync is that you can cheaply compute a "rolling crc" as you "slide a window" down a file, byte-by-byte. An example of this type of crc is the standard shift-xor hash function a lot of people use for strings. For each byte in the string, shift the crc up one bit and xor the byte in. Assuming you are using a 32-bit hash, the hash is only "good" for 32 bytes of a string.

Here's an implementation of the Rsync chunk finder with a very simplistic rolling crc function:
import java.io.FileInputStream
import scala.collection.mutable.ArrayBuffer
import java.security.MessageDigest

object Rsync {
def use(test: Boolean)(block: => Any) {
if (test) {
block
} else {
println("Usage: Findit [-hash string] | [-find file crc sha-1]")
System.exit(1)
}
}

def pad(n: BigInt, width: Int) = {
val s = n.toString(16)

if (s.length >= width) {
s
} else {
("%0"+(width - s.length)+"d").format(0)+s
}
}

def next(hash: Long, number: Int) = {
var p = 0
var n = number

for (i <- 1 to 8) {
p ^= n
n >>= 1
}
(hash << 1) | (p & 1)
}

def sha(msg: String) = {
val digest = MessageDigest.getInstance("SHA1")
val i = BigInt(digest.digest(msg.getBytes))

digest.reset
i
}

def find(file: String, crcIn: Long, shaIn: BigInt, len: Int): Int = {
val input = new FileInputStream(file)
var crc = 0L
val buf = ArrayBuffer[Char]()
var offset = -1
var mask = (1 << len) - 1

while (true) {
val c = input.read()

if (c == -1) {
input.close
return -1
} else {
offset += 1
crc = next(crc, c)
buf.append(c.asInstanceOf[Char])
if (buf.length > len) buf.remove(0)
if ((crc & mask) == crcIn && sha(buf.mkString) == shaIn) {
input.close
return offset
}
}
}
return -1
}

def main(args: Array[String]) {
use(args.length > 0) {
args(0) match {
case "-hash" => use(args.length == 2)(println("args: -find file "+pad(BigInt(args(1).foldLeft(0L)(next(_, _))), 16)+" "+pad(sha(args(1)), 40)+" "+args(1).length))
case "-find" => use(args.length == 5)(println("offset: "+find(args(1), BigInt(args(2), 16).longValue, BigInt(args(3), 16), args(4).toInt)))
}
}
}
}
This is more or less what Xus will use. As you can see, you can efficiently update the crc a byte at a time, so it's quick to locate a chunk. When you have several chunks, just use a set of CRCs. As you scan the file, check each crc to see if it's in the set.

So after Xus gets a manifest it has to download all of the chunks it doesn't have. Here's how it will work:
  1. Check the Git repository for the file id. If it's in there, no need to download
  2. Check the Git repository for a previous version of that file (we can eventually include extra information in the manifest if the creator's Git detected a rename). If there is a previous version, scavenge the previous version for chunks using the rsync algorithm
  3. If there are any outstanding chunks we need to download, request them from the DHT
Requesting from the DHT

When a peer needs to download a chunk, we use the topic as a DHT, backed by the master peer's storage:
  1. Use a DHT message to send a chunk request to the peer with ID nearest the chunk's ID
  2. If the peer does not have the chunk, it requests it from a topic master using a mini topic master DHT to distribute the load
  3. Once the peer has the chunk, it connects to the requesting peer and transfers the chunk to it
Connecting to a Peer

Some peers may not be able to accept connections. This interferes with chunk sending requests. Xus has two ways to deal with this problem:
  1. The preferred way is the connection service: if one peer can receive connections but the other cannot, the peer which can receive connections can delegate a message to the other peer to connect back to it
  2. If neither peer can receive connections, they can delegate all of their messages through the topic master
So, there you have a fairly detailed idea of how I'm implementing the Xus file sharing service.