Diagrams

digraph {
  label = "step 1: #4 replies\nquery #3";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      4, 5, 6
  }
  self -> 4 -> 3 [color=red];
  4, 3 [fillcolor=red];

  self -> 5 [color=blue]; 5 -> 2 [style=invis];
  5 [fillcolor=blue];

  self -> 6 [color=green];
  6 [fillcolor=green];

  4 -> 1, 2
  5 -> 1, 3 [style=invis];
  6 -> 4, 2, 3 [style=invis];
}
digraph {
  label = "step 2: #5 replies\nquery #2";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      4, 5, 6
  }
  self -> 4 -> 3 [color=red];
  4, 3 [fillcolor=red];

  self -> 5 -> 2 [color=blue];
  5, 2 [fillcolor=blue];

  self -> 6 [color=green];
  6 [fillcolor=green];

  4 -> 1, 2
  5 -> 1, 3
  6 -> 4, 2, 3 [style=invis];
}
digraph {
  label = "step 3 (bad): #6 replies\nnothing to do, green path is dead";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      4, 5, 6
  }
  self -> 4 -> 3 [color=red];
  4, 3 [fillcolor=red];

  self -> 5 -> 2 [color=blue];
  5, 2 [fillcolor=blue];

  self -> 6 [color=green];
  6 [fillcolor=green];

  4 -> 1, 2
  5 -> 1, 3
  6 -> 4, 2, 3
}
digraph {
  label = "step 3 (good): #6 replies\nre-flow to keep green path alive";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      4, 5, 6
  }
  self -> 4 -> 1 [color=red];
  4, 1 [fillcolor=red];

  self -> 5 -> 3 [color=blue];
  5, 3 [fillcolor=blue];

  self -> 6 -> 2 [color=green];
  6, 2 [fillcolor=green];

  4 -> 2, 3
  5 -> 1, 2
  6 -> 4, 3
  6 -> 5 [style=invis] ## force dot to preserve layout
}
digraph {
  label = "step 1: #5 replies\nquery #1";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      10, 11, 12
  }
  self -> 10 -> 5 -> 1[color=red];
  10, 5, 1 [fillcolor=red];

  self -> 11 -> 6 [color=blue];
  11, 6 [fillcolor=blue];

  self -> 12 -> 8 [color=green];
  12, 8 [fillcolor=green];

  10 -> 6
  11 -> 7
  5 -> 2
}
digraph {
  label = "step 2: #1 fails\nquery #2";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      10, 11, 12
  }
  self -> 10 -> 5 [color=red];
  5 -> 1
  5 -> 2 [color=red];
  10, 5, 2 [fillcolor=red];

  self -> 11 -> 6 [color=blue];
  11, 6 [fillcolor=blue];

  self -> 12 -> 8 [color=green];
  12, 8 [fillcolor=green];

  10 -> 6
  11 -> 7
  1 [shape=diamond]
}
digraph {
  label = "step 3 (bad): #2 fails\nred path is dead";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      10, 11, 12
  }
  self -> 10 -> 5 [color=red];
  5 -> 1
  5 -> 2;
  10, 5 [fillcolor=red];

  self -> 11 -> 6 [color=blue];
  11, 6 [fillcolor=blue];

  self -> 12 -> 8 [color=green];
  12, 8 [fillcolor=green];

  10 -> 6
  11 -> 7
  1 [shape=diamond]
  2 [shape=diamond]
}
digraph {
  label = "step 3 (good): #2 fails\nre-flow to keep red path alive";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph hop1 {
      rank = same
      10, 11, 12
  }
  self -> 10 -> 6 [color=red];
  10 -> 5 -> 1;
  5 -> 2;
  10, 6 [fillcolor=red];

  self -> 11 -> 7 [color=blue];
  11, 7 [fillcolor=blue];

  self -> 12 -> 8 [color=green];
  12, 8 [fillcolor=green];

  11 -> 6
  1 [shape=diamond]
  2 [shape=diamond]
}
digraph {
  label = "step 1 (bad): #400 replies\nuse #400 as red final, but they are far & we miss 140";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph {
     rank = same
     400, 450
  }
  subgraph {
     rank = same
     300, 220, 200
  }
  self -> 400 [color=red];
  400 [fillcolor=red];

  self -> 300 [color=blue];
  300 [fillcolor=blue];

  self -> 200 -> 50 [color=green];
  200, 50 [fillcolor=green];

  400 -> 450, 300, 200 [color=red,arrowhead=diamond]
  200 -> 140, 220
  50 -> 100, 160 [color=green,arrowhead=diamond]
}
digraph {
  label = "step 1 (good): #400 replies\nuse #200 as red final, we get 140";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  nodesep = 1;
  ranksep = 1;
  subgraph {
     rank = same
     400, 450
  }
  subgraph {
     rank = same
     300, 220, 200
  }
  self -> 400 -> 200 [color=red];
  400 [fillcolor=red];

  self -> 300 [color=blue];
  300 [fillcolor=blue];

  self -> 200 -> 50 [color=green];
  200, 50 [fillcolor=green];
  200 [color=red];

  400 -> 450, 300
  200 -> 140, 220 [color=red,arrowhead=diamond]
  50 -> 100, 160 [color=green,arrowhead=diamond]
}
digraph F {
  label = "results (bad):\ncan't distinguish between results";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  subgraph cluster_F3 {
    label="x"
    labelloc="b"
     style=invis
    subgraph {
      rank=same
      1,2,3
    }
  }
  self -> 1 [color=red];
  1 [fillcolor=red];

  self -> 2 [color=blue];
  2 [fillcolor=blue];

  self -> 3 [color=green];
  3 [fillcolor=green];

  subgraph cluster_r {
    label="x"
    labelloc="b"
    fontsize=16
    style=invis
    subgraph cluster_duck {
      margin=0
      label="x"
      labelloc="t"
      fontsize=16
      style=invis
      4,5,6,7,8,9
    }
  }

  1 -> 4,5,6 [color=red,arrowhead=diamond]
  1 -> 2,3 [weight=0,color=red,arrowhead=diamond]
  2 -> 5,6,7 [color=blue,arrowhead=diamond]
  2 -> 1,3 [weight=0,color=blue,arrowhead=diamond]
  3 -> 7,8,9 [color=green,arrowhead=diamond]
  3 -> 1,2 [weight=0,color=green,arrowhead=diamond]
}
digraph {
  label = "results (good):\ncan sort nodes on their distance and flow";
  rankdir = LR;
  node [style=filled];
  edge [color=gray];
  subgraph cluster_F3 {
    label="flow=3"
    labelloc="b"
    subgraph {
      rank=same
      1,2,3
    }
  }
  self -> 1 [color=red];
  1 [fillcolor=red];

  self -> 2 [color=blue];
  2 [fillcolor=blue];

  self -> 3 [color=green];
  3 [fillcolor=green];

  subgraph cluster_F2 {
    label="flow=2"
    5,6,7
  }
  subgraph cluster_F1 {
    label="flow=1"
    labelloc="b"
    4,8,9
  }

  1 -> 4,5,6 [color=red,arrowhead=diamond]
  1 -> 2,3 [weight=0,color=red,arrowhead=diamond]
  2 -> 5,6,7 [color=blue,arrowhead=diamond]
  2 -> 1,3 [weight=0,color=blue,arrowhead=diamond]
  3 -> 7,8,9 [color=green,arrowhead=diamond]
  3 -> 1,2 [weight=0,color=green,arrowhead=diamond]
}

Code listings

S-Kademlia max-flow-min-cost algorithm

from collections import defaultdict
from copy import deepcopy
from functools import reduce
from itertools import combinations
from math import gcd
from pprint import pprint

class ddict0(dict):
  """A dict that returns 0 for missing keys BUT DOES NOT INSERT THEM.

  This means we can use [k] instead of .get(k, 0), without adding extra edges
  to the graph which would happen with defaultdict.
  """
  def __getitem__(self, key):
    if key in self:
      return super().__getitem__(key)
    else:
      return 0

def check_keys(d):
  k1 = set(d.keys())
  k2 = set(s for succ in d.values() for s in succ)
  assert k2.issubset(k1)

def ensure_keys_ddict0(d):
  k1 = set(d.keys())
  k2 = set(s for succ in d.values() for s in succ)
  for k in k2.difference(k1):
    d[k] = ddict0()

def pprintd(d):
  pprint({k: {k_: v for (k_, v) in kv.items()} for (k, kv) in d.items()})

def dinic_bfs(Cap, Flow, src):
  queue = []
  queue.append(src)
  level = ddict0()
  level[src] = 1
  while queue:
    k = queue.pop(0)
    for i in Cap.keys(): # important to visit every node, not just successors
      if Flow[k][i] < Cap[k][i] and level[i] == 0: # 0 means not-yet-visited
        level[i] = level[k] + 1
        queue.append(i)
  return level

def dinic_dfs(Cap, Flow, dst, level, k, flow):
  if k == dst:
    return flow
  for i in Cap.keys(): # important to visit every node, not just successors
    if Flow[k][i] < Cap[k][i] and level[i] == level[k] + 1:
      curr_flow = min(flow, Cap[k][i] - Flow[k][i])
      temp_flow = dinic_dfs(Cap, Flow, dst, level, i, curr_flow)
      if temp_flow > 0:
        Flow[k][i] += temp_flow
        Flow[i][k] -= temp_flow
        return temp_flow
  return 0

def max_flow_dinic(Cap, src, dst, max=float("inf")):
  # https://www.geeksforgeeks.org/dinics-algorithm-maximum-flow/
  # the algorithm has to occasionally iterate through all the nodes;
  # check that Cap.keys() does contain all the nodes so we can use it later
  check_keys(Cap)
  Flow = defaultdict(ddict0) # flow graph, successor-map
  flow = 0
  while True:
    level = dinic_bfs(Cap, Flow, src)
    if not level[dst]: break
    flow += dinic_dfs(Cap, Flow, dst, level, src, max)
  # residual graph is not explicitly expressed in this implementation of the
  # algorithm, but it can be calculated as (C - F) or residual_graph(C, F)
  #
  # note that Flow contains reverse edges with negative flows, one for every
  # forward edge in the actual flow. if you want to calculate the cost of the
  # flow against a rate graph, be sure to filter out the negative edges first.
  return (flow, Flow)

def residual_graph(Cap, Flow):
  Res = deepcopy(Cap)
  for k, kc in Cap.items():
    for i, _ in kc.items():
      Res[i][k] = Res[i][k] # ensure any 0 value exists for the opposite edge
  # perform subtraction
  for k, kf in Flow.items():
    for i, f in kf.items():
      Res[k][i] -= f
  return Res

def tests_max_flow_dinic():
  C = defaultdict(ddict0)
  C["S"][1] = 3
  C["S"][2] = 3
  C[1][2] = 2
  C[1][3] = 3
  C[2][4] = 2
  C[3][4] = 4
  C[3]["T"] = 2
  C[4]["T"] = 2
  C["T"] = ddict0()
  ensure_keys_ddict0(C)
  assert (max_flow_dinic(C, "S", "T")[0] == 4)

  D = defaultdict(ddict0)
  D["S"][1] = 1
  D["S"][2] = 1
  D["S"][3] = 1
  D[1][4] = 1
  D[2][5] = 1
  D[3][6] = 1
  D[4][7] = 1
  D[4][8] = 1
  D[4][9] = 1
  D[5][7] = 1
  D[5][8] = 1
  D[5][9] = 1
  D[6][7] = 1
  D[6][8] = 1
  D[6][1] = 1
  D[7]["T"] = 1
  D[8]["T"] = 1
  D[9]["T"] = 1
  ensure_keys_ddict0(D)
  assert (max_flow_dinic(D, "S", "T")[0] == 3)

  E = defaultdict(ddict0)
  E["S"][1] = 11
  E["S"][2] = 12
  E[1][2] = 10
  E[1][3] = 12
  E[2][1] = 4
  E[2][4] = 14
  E[3][2] = 9
  E[3]["T"] = 20
  E[4][3] = 7
  E[4]["T"] = 4
  ensure_keys_ddict0(E)
  assert (max_flow_dinic(E, "S", "T")[0] == 23)

def residual_rates(Cap, Res, Rates):
  # rates in a residual graph
  ResRates = defaultdict(ddict0)
  # be very careful to match the edges in Cap
  for k, kv in Cap.items():
    for i, _ in kv.items():
      # if res is 0 we can't push flow back, so ignore it
      if Res[k][i] != 0:
        ResRates[k][i] = Rates[k][i] - Rates[i][k]
      if Res[i][k] != 0:
        ResRates[i][k] = Rates[i][k] - Rates[k][i]
  return ResRates

def graph_cost(G, Rates):
  cost = 0
  for k, kc in G.items():
    for i, c in kc.items():
      cost += c * Rates[k][i]
  return cost

def get_negcycle(G, max=float("inf")):
  # https://cp-algorithms.com/graph/finding-negative-cycle-in-graph.html
  # based on Bellman-Ford
  #pprint(G)
  D = defaultdict(lambda: 0)
  pred = defaultdict(lambda: None)

  # relaxation
  for _ in list(G.keys()):
    changed = None
    for k, kd in G.items():
      for i, d in kd.items():
        if D[i] > D[k] + d:
          D[i] = D[k] + d
          pred[i] = k
          changed = i

  if changed is None:
    return []
  for _ in G.keys():
    changed = pred[changed]

  # get the cycle. most basic implementations of Bellman-Ford don't have this
  cycle = []
  v = changed
  while True:
    cycle.append(v)
    if v == changed and len(cycle) > 1:
      break
    v = pred[v]
  #print(cycle)
  return list(reversed(cycle))

def max_flow_min_cost(Cap, Rates, src, dst, max=float("inf"), verbose=False):
  # https://courses.csail.mit.edu/6.854/06/scribe/s12-minCostFlowAlg.pdf
  (max_flow, Flow) = max_flow_dinic(Cap, src, dst)

  # to find min-cost, first convert into a circulation problem
  Cap[dst][src] = max_flow
  Rates[dst][src] = -(sum(r for kr in Rates.values() for r in kr.values()) + 1)
  Flow[dst][src] = max_flow
  Flow[src][dst] = -max_flow
  flow_cost = graph_cost(Flow, Rates)

  Res = residual_graph(Cap, Flow)
  # keep adding negative-cost cycles to the flow, until there is none left
  while True:
    # construct residual rates graph
    ResRates = residual_rates(Cap, Res, Rates)
    # note: a lot of resources will talk about finding a "negative-cost" cycle,
    # this is confusing terminology - more precisely it's a "negative-rate"
    # cycle we want, i.e. not multiplied by the residue (which is different for
    # each edge). the reason is, later we need to push flow back through this
    # cycle and of course we have to push an *equal* amount of flow per edge,
    # the cost of this flow being determined by the rate of the cycle, i.e. the
    # sum of rates across the edges in the cycle.
    negcycle = get_negcycle(ResRates, max=max)
    if not negcycle: break

    # add the cycle to F, and update R
    edges = list(zip(negcycle[:-1], negcycle[1:]))
    flow = min(Res[u][v] for (u, v) in edges)
    assert flow > 0 # fails if we didn't ignore 0-residue edges in residual_rates
    for (u, v) in edges:
      Flow[u][v] += flow
      Flow[v][u] -= flow
      Res[u][v] -= flow
      Res[v][u] += flow

    # check that new cost of F is strictly lower than previous cost
    cost = graph_cost(Flow, Rates)
    assert (cost < flow_cost)
    flow_cost = cost

  # clean up F a bit before returning it
  del Flow[src][dst]
  del Flow[dst][src]
  min_cost = graph_cost(Flow, Rates)
  # reverse our changes to Cap & Rates
  del Cap[dst][src]
  del Rates[dst][src]
  #pprintd(Cap)
  #pprintd(Flow)
  return (max_flow, min_cost, Flow)

def tests_max_flow_min_cost():
  C = defaultdict(ddict0)
  C["S"][1] = 1
  C["S"][4] = 1
  C[1][2] = 1
  C[1][3] = 1
  C[2]["T"] = 1
  C[3]["T"] = 1
  C[4]["T"] = 1
  ensure_keys_ddict0(C)
  CR = defaultdict(ddict0)
  CR[2]["T"] = 2
  CR[3]["T"] = 1
  CR[4]["T"] = 3
  assert (max_flow_min_cost(C, CR, "S", "T")[0:2] == (2, 4))


## Kademlia stuff follows

K = 20

def distance(a, b):
  return a ^ b

FLOW_SRC = "self"
FLOW_SINK = "target"

class SKadLookup(object):

  def __init__(self, init_queries, target_key, num_parallel=None, prioritise_unique_results=False):
    self.target_key = target_key

    # parallelism, the main parameter of this algorithm
    if num_parallel is None:
      num_parallel = len(init_queries)
    if len(init_queries) < num_parallel:
      raise ValueError("unsupported: len(init_queries) < num_parallel")
    self.num_parallel = num_parallel

    # query flow graph, capacities
    self.query_succ = { FLOW_SRC: set(init_queries) }

    # nodes queried that have given a reply
    self.query_result = set()
    # nodes queried that have failed, i.e. timed out
    self.query_failed = set()
    # nodes queried that are awaiting a reply
    # this should be size (d - query_gap)
    self.query_expect = set()
    # number of queries missing. this can happen when the initial nodes all
    # return too-few results to sustain a max-flow of d throughout the query
    self.query_gap = 0

    for nodeId in init_queries:
      self.query_succ[nodeId] = {}
    for q in self._get_best_queries(lambda k: True):
      self.launch_query("init %3s" % target_key, q)

  def distance_to(self, n):
    return distance(n, self.target_key)

  def launch_query(self, ctx, nodeId):
    self.query_expect.add(nodeId)
    # fake send, for our testing purposes
    print(ctx, ": sending query to node %s" % nodeId)

  def warn_no_query(self, ctx):
    print(ctx, ": too-few results to select a next peer") # TODO more error detail

  def _get_best_queries(self, matching, verbose=False):
    # construct input for max-flow-min-cost, as per our security model
    C = defaultdict(ddict0)
    for k, succs in self.query_succ.items():
      if succs:
        # restrict all nodes to only being on one "disjoint path" by converting
        # them into two nodes (k) and (k, "out") with capacity 1
        # across the edge between them
        if k == FLOW_SRC:
          # source has num_parallel flow
          C[k][(k, "out")] = self.num_parallel
        else:
          C[k][(k, "out")] = 1
        for succ in sorted(succs, key=self.distance_to):
          C[(k, "out")][succ] = 1
    # every candidate has capacity := num_parallel to the sink
    candidates = list(k for k in self.query_succ.keys() if k != FLOW_SRC and matching(k))
    Rates = defaultdict(ddict0)
    for k in candidates:
      C[k][FLOW_SINK] = 1
      Rates[k][FLOW_SINK] = self.distance_to(k)
    # In summary we have:
    # - capacity num_parallel from the source
    # - capacity |candidates| to the sink
    # In the best case, a max-flow will have num_parallel flow as that is the
    # maximum capacity available from the source. We will achieve less if e.g.
    # the query graph was bottlenecked at one node.
    # The other things above encode the constraints to solve for S-Kademlia:
    # - split-nodes -> disjoint paths
    # - min-cost -> lowest distance
    # Moreover due to the min-cost constraint, it is not possible for a flow to
    # be "split" across two final nodes - all possible flows will fully-use the
    # capacity of a final node, i.e. the one that has the lower cost.

    # run max-flow-min-cost
    ensure_keys_ddict0(C)
    (max_flow, min_cost, F) = max_flow_min_cost(C, Rates, FLOW_SRC, FLOW_SINK, verbose=verbose)

    queries = [k for k in candidates if F[k][FLOW_SINK] > 0]
    # sort by flow (descending) then by distance (ascending)
    queries.sort(key=self.distance_to)
    assert all(F[q][FLOW_SINK] == 1 for q in queries)
    assert len(queries) <= self.num_parallel
    assert max_flow == len(queries)
    return queries

  def select_next_query(self, verbose=False):
    next_to_query = self._get_best_queries(
      lambda k: k not in self.query_result and
                k not in self.query_failed, verbose=verbose)
    expected = self.num_parallel - self.query_gap
    if len(next_to_query) < expected:
        # sanity check: if we got fewer than expected from max-flow-min-distance,
        # this means we've hit a(nother) bottleneck in our query graph and
        # we're actually already querying everything there is to query
        assert all(q in self.query_expect for q in next_to_query)
    # filter out stuff we're already querying. if these leaves us with nothing
    # then it means we've hit a bottleneck in the query graph
    return [q for q in next_to_query if q not in self.query_expect]

  def recv_result(self, peer, reply, verbose=False):
    if verbose:
      print("RECV RESULT:", peer, reply)

    if peer not in self.query_expect:
      raise ValueError("unexpected reply from peer %s: %s" % (peer, reply))

    if peer in self.query_result:
      raise ValueError("duplicate reply from peer %s: %s" % (peer, reply))

    if peer in self.query_failed:
      raise ValueError("ignoring too-late reply from peer %s: %s" % (peer, reply))

    if reply is None:
      # query failed, record it as failed
      self.query_succ[peer] = set()
      self.query_failed.add(peer)

    else:
      # check that all replies are closer to the target, with exemption for
      # peers that are already very close to the target
      old_d = self.distance_to(peer)
      if any(self.distance_to(r) >= self.distance_to(peer) > K for r in reply):
        # probably malicious
        raise ValueError("divergent reply from peer %s: %s" % (peer, reply))

      # everything OK, now store the reply in the query graph
      self.query_succ[peer] = set(reply)
      for r in reply:
        self.query_succ.setdefault(r, {}) # ensure r is in query_succ.keys()

      self.query_result.add(peer)

    assert (len(self.query_expect) + self.query_gap == self.num_parallel)
    self.query_expect.remove(peer)
    candidates = self.select_next_query(verbose=verbose)

    if not candidates:
      # not enough data returned by neighbours to select a next_peer.
      # record this to keep track of it, and emit a warning.
      self.warn_no_query("recv %3s" % peer)
      self.query_gap += 1
      return None
    else:
      next_peer = candidates[0]
      # make the actual query
      self.launch_query("recv %3s" % peer, next_peer)
      assert (len(self.query_expect) + self.query_gap == self.num_parallel)
      return next_peer

  def peek_best_queries(self, verbose=False):
    return self._get_best_queries(lambda k: k not in self.query_failed, verbose=verbose)

  def _get_best_results(self, termini, matching, verbose=False):
    # filter out failed successors, add self as implicit successor
    query_succ = {k: {k} | {s for s in self.query_succ[k] if matching(s)} for k in termini}

    # every terminus is allowed to push an equal amount of flow
    # calculate normalising factors to make this effective
    lcm = reduce(lambda x, y: x*y//gcd(x,y), (len(query_succ[k]) for k in termini))
    norm = {k: lcm // len(query_succ[k]) for k in termini}
    #k not in self.query_failed

    # construct input for max-flow-min-cost
    C = defaultdict(ddict0)
    Rates = defaultdict(ddict0)
    all_succ = {s for k in termini for s in query_succ[k]}
    for k in termini:
      C[FLOW_SRC][(k, "out")] = lcm
      for s in list(query_succ[k]):
        C[(k, "out")][s] = norm[k]
    for s in all_succ:
      C[s][FLOW_SINK] = lcm * len(termini)
      Rates[s][FLOW_SINK] = self.distance_to(s)

    # run max-flow-min-cost
    ensure_keys_ddict0(C)
    (max_flow, min_cost, F) = max_flow_min_cost(C, Rates, FLOW_SRC, FLOW_SINK, verbose=verbose)

    results = [(k, F[k][FLOW_SINK]) for k in C.keys() if F[k][FLOW_SINK] > 0]
    results.sort(key=lambda v: (-v[1], self.distance_to(v[0])))
    assert max_flow == lcm * len(termini)
    return results

  def maybe_get_results(self, include_waiting=True, flows=False, verbose=False):
    """Check if the query is in a state where it can be finished."""
    queries = self.peek_best_queries(verbose=verbose)
    if any(q not in self.query_result for q in queries):
      return None # some queries are still being waited on
    matching = (lambda k:
        (k not in self.query_failed) # includes result nodes, queried nodes, and unqueried nodes
        if include_waiting else
        (k in self.query_result))
    results = self._get_best_results(queries, matching, verbose=verbose)
    return results if flows else [r[0] for r in results]

def tests_skademlia():
  print("----")
  q = SKadLookup([4,5,6], 0)
  assert(q.recv_result(4, {1,2,3}) == 1)
  assert(q.recv_result(5, {1,2,3}) == 2)
  assert(q.recv_result(6, {4,1,2}) == 3)
  # ^ corner case, correctly selects 3 even though 3 not part of 6's reply
  assert(q.peek_best_queries() == [1,2,3])
  # ^ this assert fails if using max-flow without cost considerations

  print("----")
  q = SKadLookup([5,6,7,8], 0)
  assert(q.recv_result(5, {3,4}) == 3)
  assert(q.recv_result(6, {3,4}) == 4)
  assert(q.recv_result(7, {3,4}) == None)
  assert(q.recv_result(8, {3,4}) == None)
  assert(q.recv_result(3, {1}) == 1)
  assert(q.recv_result(4, {1}) == None)
  # ^ selects None since we actually don't have enough results
  assert(q.query_gap == 3)

  print("----")
  q = SKadLookup([5,6,7], 0)
  assert(q.recv_result(5, {4}) == 4)
  assert(q.recv_result(4, {1,2,3}) == 1)
  assert(q.recv_result(6, {4}) == None) # e.g. not 2
  assert(q.recv_result(7, {4}) == None)
  # ^ this test fails if "restrict nodes to only being on 1 flow" is not implemented
  assert(q.query_gap == 2)
  assert(q.peek_best_queries() == [1,4,5])

  print("----")
  q = SKadLookup([10,11,12], 0)
  assert(q.recv_result(10, {5,6}) == 5)
  assert(q.recv_result(11, {6,7}) == 6)
  assert(q.recv_result(12, {8}) == 8)
  assert(q.recv_result(5, {1,2}) == 1)
  assert(q.recv_result(1, None) == 2)
  assert(q.recv_result(2, None) == 7)
  assert(q.peek_best_queries() == [5, 6, 8])
  assert(q.maybe_get_results(True) is None)
  assert(q.recv_result(6, {}) == None)
  assert(q.recv_result(8, {}) == None)
  assert(q.maybe_get_results(True) == [5, 6, 8])
  # ^ corner case involving backtracking and failure - correctly selects 7,
  # even though it's unrelated to the "10 -> 5 -> 1" path, because 6 was
  # already selected previously

  # maybe_get_results at different times, and with overlapping results
  print("----")
  q = SKadLookup([1,2,3], 0)
  assert(q.recv_result(1, {2,3,4,5,6,7}) == 4)
  assert(q.maybe_get_results() == None)
  assert(q.recv_result(2, {1,3,5,6,7,8}) == 5)
  assert(q.maybe_get_results() == None)
  assert(q.recv_result(3, {2,9,10,11,12,13}) == 9)
  assert(q.maybe_get_results(True) == [2, 3, 1, 5, 6, 7, 4, 8, 9, 10, 11, 12, 13])

  # query where len(init_queries) != num_parallel
  print("----")
  q = SKadLookup([5,6,7,8,9], 0, num_parallel=3)
  assert(q.query_expect == set([5,6,7]))
  assert(q.recv_result(5, {1,2}) == 1)
  assert(q.recv_result(7, None) == 8)
  assert(q.recv_result(6, {10}) == 9)

if __name__ == "__main__":
  tests_max_flow_dinic()
  tests_max_flow_min_cost()
  tests_skademlia()