diff --git a/README.md b/README.md index 88c7dbb..d73f007 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,17 @@ # P2pChat -**TODO: Add description** +## Wire-Protocol + +The protocol is separated into two parts. +The lower layer takes care of p2p coordination while the upper chat layer sits on top of it. + +### P2P + +TODO + +### Chat + +TODO ## Installation diff --git a/flake.nix b/flake.nix index 7776d70..3be22b0 100644 --- a/flake.nix +++ b/flake.nix @@ -18,6 +18,7 @@ in { devShells.default = pkgs.mkShell { + ERL_AFLAGS = "-kernel shell_history enabled"; packages = with pkgs; [ elixir elixir-ls diff --git a/lib/p2p_chat.ex b/lib/p2p_chat.ex index 2ba9b26..17aae51 100644 --- a/lib/p2p_chat.ex +++ b/lib/p2p_chat.ex @@ -1,18 +1,27 @@ defmodule P2pChat do - @moduledoc """ - Documentation for `P2pChat`. - """ - - @doc """ - Hello world. - - ## Examples - - iex> P2pChat.hello() - :world - - """ def hello do - :world + {:ok, pid1} = P2pChat.Transport.start_link(port: 12345) + netid1 = P2pChat.Transport.get_netid(pid1) + + {:ok, pid2} = P2pChat.Transport.start_link(port: 12346) + netid2 = P2pChat.Transport.get_netid(pid2) + + # {:ok, addr4} = :inet.parse_address ~c"2600::" + # {:ok, addr6} = :inet.parse_address ~c"9.9.9.9" + {:ok, addr6} = :inet.parse_address ~c"::" + {:ok, addr4} = :inet.parse_address ~c"127.0.0.1" + + P2pChat.Transport.add_neighbor(pid1, netid2, {addr4, 12346}) + P2pChat.Transport.add_neighbor(pid1, netid2, {addr6, 12346}) + + Process.sleep(500) + P2pChat.Transport.inform_about_self(pid1) + + Process.sleep(500) + neighbors2 = P2pChat.Transport.get_neighbors(pid2) + IO.inspect(neighbors2) + + GenServer.stop(pid1) + GenServer.stop(pid2) end end diff --git a/lib/p2p_chat/transport.ex b/lib/p2p_chat/transport.ex new file mode 100644 index 0000000..04bc1c9 --- /dev/null +++ b/lib/p2p_chat/transport.ex @@ -0,0 +1,114 @@ +defmodule P2pChat.Transport do + require Logger + import P2pChat.TransportProto + alias P2pChat.TransportProto.Messages, as: Messages + use GenServer + + @uuid_namespace "8a6a64ee-1628-4fdb-b3fc-574ae5eb5797" + + defstruct [:socket, :netid, :neighbors] + + def start_link(args \\ []) do + GenServer.start_link(__MODULE__, args) + end + + @impl true + def init(args) do + {:ok, socket} = open_listening_socket(args) + {:ok, hostname} = :net.gethostname() + netid = UUID.uuid5(@uuid_namespace, to_string(hostname)) + initial_neighbors = Access.get(args, :initial_neighbors, %{}) + + {:ok, + %__MODULE__{ + socket: socket, + netid: netid, + neighbors: initial_neighbors + }} + end + + def open_listening_socket(args) do + port = Access.get(args, :port, 12345) + listen_options = [:binary, :inet] + Logger.info("Starting transport on port #{port}") + {:ok, socket} = :gen_udp.open(port, listen_options) + {:ok, socket} + end + + @impl true + def handle_info({:udp, _socket, peer_ip, peer_port, payload}, state) do + msg = decode_message(payload) + Logger.debug("Received UDP message from #{:inet.ntoa(peer_ip)}@#{peer_port}: #{inspect(msg)}") + + {_, state} = case msg do + %Messages.Hello{} -> __MODULE__.handle_cast({:add_neighbor, msg.netid, {peer_ip, peer_port}}, state) + _ -> throw(:unhandled_message) + end + + {:noreply, state} + end + + @impl true + def handle_call(:get_netid, _from, state) do + {:reply, state.netid, state} + end + + @impl true + def handle_call(:get_neighbors, _from, state) do + {:reply, state.neighbors, state} + end + + @impl true + def handle_cast({:add_neighbor, netid, connector}, state) do + new_neighbors = + Map.update(state.neighbors, netid, MapSet.new([connector]), fn current_value -> + MapSet.put(current_value, connector) + end) + + new_state = %__MODULE__{ + socket: state.socket, + netid: state.netid, + neighbors: new_neighbors + } + + {:noreply, new_state} + end + + @impl true + def handle_cast(:inform_about_self, state) do + inform_msg = + encode_message(%Messages.Hello{ + netid: state.netid + }) + + Enum.each(Map.keys(state.neighbors), fn i_netid -> + Enum.each(Map.get(state.neighbors, i_netid), fn i_connector -> + {i_addr, i_port} = i_connector + Logger.debug("Informing neighbor #{i_netid} at #{:inet.ntoa(i_addr)}@#{i_port} about self") + + :gen_udp.send(state.socket, i_addr, i_port, inform_msg) + end) + end) + + {:noreply, state} + end + + # + # CLIENT API + # + def get_netid(pid) do + GenServer.call(pid, :get_netid) + end + + def get_neighbors(pid) do + GenServer.call(pid, :get_neighbors) + end + + def add_neighbor(pid, netid, connector) do + GenServer.cast(pid, {:add_neighbor, netid, connector}) + end + + def inform_about_self(pid) do + GenServer.cast(pid, :inform_about_self) + end +end diff --git a/lib/p2p_chat/transport_proto.ex b/lib/p2p_chat/transport_proto.ex new file mode 100644 index 0000000..349a624 --- /dev/null +++ b/lib/p2p_chat/transport_proto.ex @@ -0,0 +1,67 @@ +defmodule P2pChat.TransportProto do + defmodule Messages do + defmodule Hello do + defstruct [:netid] + end + + defmodule InformNeighbor do + defstruct [:netid, :addr, :port] + end + + defmodule Goodbye do + defstruct [] + end + end + + # + # Public API + # + + def encode_message(%Messages.Hello{} = msg) do + [encode_version(), <<0x01::8>>, encode_string(msg.netid)] + end + + def encode_message(%Messages.InformNeighbor{} = msg) do + [encode_version(), <<0x02::8>>, encode_string(msg.netid), encode_ip_addr(msg.addr)] + end + + def encode_message(%Messages.Goodbye{} = _msg) do + [encode_version(), <<0x03::8>>] + end + + def decode_message(<<0x01::8, 0x01::8, payload::binary>>) do + <> = payload + %Messages.Hello{netid: netid} + end + + def decode_message(<<0x01::8, 0x02::8, payload::binary>>) do + <> = payload + %Messages.InformNeighbor{netid: netid, addr: addr} + end + + def decode_message(<<0x01::8, 0x03::8>>) do + %Messages.Goodbye{} + end + + def decode_message(<<0x01::8, _::binary>>), do: :error + def decode_message(<<_::binary>>), do: :error + + # + # Private Utilities + # + + defp encode_version() do + <<0x01::8>> + end + + defp encode_string(str) do + [<>, str] + end + + defp encode_ip_addr({a1, a2, a3, a4, a5, a6, a7, a8}) do + <<6::8, a1::8, a2::8, a3::8, a4::8, a5::8, a6::8, a7::8, a8::8>> + end + defp encode_ip_addr({a1, a2, a3, a4}) do + <<4::8, a1::8, a2::8, a3::8, a4::8>> + end +end diff --git a/mix.exs b/mix.exs index b6dc4ba..78a63e6 100644 --- a/mix.exs +++ b/mix.exs @@ -21,8 +21,9 @@ defmodule P2pChat.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ + { :uuid, "~> 1.1" } + # {:quicer, git: "https://github.com/emqx/quic.git", tag: "0.2.4"}, # {:dep_from_hexpm, "~> 0.3.0"}, - # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] end end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..d39f745 --- /dev/null +++ b/mix.lock @@ -0,0 +1,5 @@ +%{ + "quicer": {:git, "https://github.com/emqx/quic.git", "fc5946302e01148ce4dbd3aebe16ec3446caf31c", [tag: "0.2.4"]}, + "snabbkaffe": {:git, "https://github.com/kafka4beam/snabbkaffe.git", "b59298334ed349556f63405d1353184c63c66534", [tag: "1.0.10"]}, + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, +}