diff --git a/.gitignore b/.gitignore index 7d7df4cfc..15d0f6808 100644 --- a/.gitignore +++ b/.gitignore @@ -36,8 +36,8 @@ lambda_ethereum_consensus-*.tar # Compiled artifacts. *.o *.a -*.h *.so +native/libp2p_nif/main.h # VSCode configuration dir. .vscode/ diff --git a/lib/lambda_ethereum_consensus/libp2p.ex b/lib/lambda_ethereum_consensus/libp2p.ex index 8fd5fdf3a..65799d9a0 100644 --- a/lib/lambda_ethereum_consensus/libp2p.ex +++ b/lib/lambda_ethereum_consensus/libp2p.ex @@ -13,37 +13,37 @@ defmodule Libp2p do @typedoc """ A handle to a Go resource. """ - @type handle :: integer + @opaque handle :: reference @typedoc """ A handle to a host.Host. """ - @type host :: handle + @opaque host :: handle @typedoc """ A handle to a peerstore.Peerstore. """ - @type peerstore :: handle + @opaque peerstore :: handle @typedoc """ A handle to a peer.ID. """ - @type peer_id :: handle + @opaque peer_id :: handle @typedoc """ A handle to a []multiaddr.MultiAddr. """ - @type addrs :: handle + @opaque addrs :: handle @typedoc """ A handle to a stream. """ - @type stream :: handle + @opaque stream :: handle @typedoc """ A handle to an Option. """ - @type option :: handle + @opaque option :: handle @typedoc """ An error returned by this module. diff --git a/native/libp2p_nif/libp2p.c b/native/libp2p_nif/libp2p.c index 8c4196201..36a6a3b37 100644 --- a/native/libp2p_nif/libp2p.c +++ b/native/libp2p_nif/libp2p.c @@ -1,16 +1,17 @@ #include "main.h" #include "utils.h" +#include #include #define ERL_FUNCTION(FUNCTION_NAME) static ERL_NIF_TERM FUNCTION_NAME(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) -#define ERL_FUNCTION_GETTER(NAME, GETTER) \ - ERL_FUNCTION(NAME) \ - { \ - uintptr_t _handle = get_handle_from_term(env, argv[0]); \ - IF_ERROR(_handle == 0, "invalid first argument"); \ - uintptr_t _res = GETTER(_handle); \ - return get_handle_result(env, _res); \ +#define ERL_FUNCTION_GETTER(NAME, RECV_TYPE, ATTR_TYPE, GETTER) \ + ERL_FUNCTION(NAME) \ + { \ + uintptr_t _handle = get_handle_from_term(env, RECV_TYPE, argv[0]); \ + IF_ERROR(_handle == 0, "invalid first argument"); \ + uintptr_t _res = GETTER(_handle); \ + return get_handle_result(env, ATTR_TYPE, _res); \ } #define IF_ERROR(COND, MSG) \ @@ -19,29 +20,72 @@ return make_error_msg(env, (MSG)); \ } -#define GET_HANDLE(TERM, NAME) \ - ({ \ - uintptr_t _handle = get_handle_from_term(env, (TERM)); \ - IF_ERROR(_handle == 0, "invalid " NAME); \ - _handle; \ +#define GET_HANDLE(TERM, TYPE) \ + ({ \ + uintptr_t _handle = get_handle_from_term(env, (TYPE), (TERM)); \ + IF_ERROR(_handle == 0, "invalid " #TYPE); \ + _handle; \ }) -#define NIF_ENTRY(FUNCTION_NAME, ARITY) \ - { \ - #FUNCTION_NAME, ARITY, FUNCTION_NAME \ +#define NIF_ENTRY(FUNCTION_NAME, ARITY, ...) \ + { \ + #FUNCTION_NAME, ARITY, FUNCTION_NAME, __VA_ARGS__ \ } -const uint64_t PID_LENGTH = 1024; const uint64_t BUFFER_SIZE = 4096; +/*************/ +/* NIF Setup */ +/*************/ + +ErlNifResourceType *Option; +ErlNifResourceType *Host; +ErlNifResourceType *Peerstore; +ErlNifResourceType *peer_ID; +ErlNifResourceType *Multiaddr_arr; +ErlNifResourceType *Stream; + +// Resource type helpers +void handle_cleanup(ErlNifEnv *env, void *obj) +{ + uintptr_t *handle = obj; + DeleteHandle(*handle); +} + +#define OPEN_RESOURCE_TYPE(NAME) ((NAME) = enif_open_resource_type(env, NULL, (#NAME), handle_cleanup, flags, NULL)) + +static int open_resource_types(ErlNifEnv *env, ErlNifResourceFlags flags) +{ + int failed = false; + failed |= NULL == OPEN_RESOURCE_TYPE(Option); + failed |= NULL == OPEN_RESOURCE_TYPE(Host); + failed |= NULL == OPEN_RESOURCE_TYPE(Peerstore); + failed |= NULL == OPEN_RESOURCE_TYPE(peer_ID); + failed |= NULL == OPEN_RESOURCE_TYPE(Multiaddr_arr); + failed |= NULL == OPEN_RESOURCE_TYPE(Stream); + return failed; +} + +static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) +{ + return open_resource_types(env, ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER); +} + +static int upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, + ERL_NIF_TERM load_info) +{ + return open_resource_types(env, ERL_NIF_RT_TAKEOVER); +} + /***********/ /* Helpers */ /***********/ -static uintptr_t get_handle_from_term(ErlNifEnv *env, ERL_NIF_TERM term) +static uintptr_t get_handle_from_term(ErlNifEnv *env, ErlNifResourceType *type, ERL_NIF_TERM term) { - uintptr_t handle; - return enif_get_uint64(env, term, &handle) ? handle : 0; + uintptr_t *obj; + int result = enif_get_resource(env, term, type, (void **)&obj); + return (!result || obj == NULL) ? 0 : *obj; } static ERL_NIF_TERM _make_error_msg(ErlNifEnv *env, uint len, const char *msg) @@ -62,10 +106,32 @@ static ERL_NIF_TERM make_ok_tuple2(ErlNifEnv *env, ERL_NIF_TERM term) return enif_make_tuple2(env, enif_make_atom(env, "ok"), term); } -static ERL_NIF_TERM get_handle_result(ErlNifEnv *env, uintptr_t handle) +static ERL_NIF_TERM get_handle_result(ErlNifEnv *env, ErlNifResourceType *type, uintptr_t handle) { IF_ERROR(handle == 0, "invalid handle returned"); - return make_ok_tuple2(env, enif_make_uint64(env, handle)); + uintptr_t *obj = enif_alloc_resource(type, sizeof(uintptr_t)); + IF_ERROR(obj == NULL, "couldn't create resource"); + *obj = handle; + ERL_NIF_TERM term = enif_make_resource(env, obj); + // NOTE: we need to release our reference, so it can be GC'd + enif_release_resource(obj); + return make_ok_tuple2(env, term); +} + +void send_message(erl_pid_t _pid, uintptr_t stream_handle) +{ + // Passed as void* to avoid including erl_nif.h in the header. + ErlNifPid *pid = (ErlNifPid *)_pid; + ErlNifEnv *env = enif_alloc_env(); + + ERL_NIF_TERM message = get_handle_result(env, Stream, stream_handle); + + int result = enif_send(NULL, pid, env, message); + // On error, the env isn't freed by the function. + if (!result) + { + enif_free_env(env); + } } /*********/ @@ -80,7 +146,7 @@ ERL_FUNCTION(listen_addr_strings) uintptr_t handle = ListenAddrStrings(listen_addr); - return get_handle_result(env, handle); + return get_handle_result(env, Option, handle); } /****************/ @@ -97,23 +163,24 @@ ERL_FUNCTION(host_new) while (!enif_is_empty_list(env, tail) && i < MAX_OPTIONS) { enif_get_list_cell(env, tail, &head, &tail); - options[i++] = GET_HANDLE(head, "option"); + uintptr_t handle = GET_HANDLE(head, Option); + options[i++] = handle; } GoSlice go_options = {options, i, MAX_OPTIONS}; uintptr_t result = HostNew(go_options); - return get_handle_result(env, result); + return get_handle_result(env, Host, result); } ERL_FUNCTION(host_close) { - uintptr_t host = GET_HANDLE(argv[0], "host"); + uintptr_t host = GET_HANDLE(argv[0], Host); HostClose(host); return enif_make_atom(env, "ok"); } ERL_FUNCTION(host_set_stream_handler) { - uintptr_t host = GET_HANDLE(argv[0], "host"); + uintptr_t host = GET_HANDLE(argv[0], Host); ErlNifBinary bin; IF_ERROR(!enif_inspect_binary(env, argv[1], &bin), "invalid protocol ID"); @@ -124,27 +191,27 @@ ERL_FUNCTION(host_set_stream_handler) IF_ERROR(!enif_self(env, pid), "failed to get pid"); - SetStreamHandler(host, proto_id, (void *)pid); + HostSetStreamHandler(host, proto_id, (void *)pid, send_message); return enif_make_atom(env, "ok"); } ERL_FUNCTION(host_new_stream) { - uintptr_t host = GET_HANDLE(argv[0], "host"); - uintptr_t id = GET_HANDLE(argv[1], "peer id"); + uintptr_t host = GET_HANDLE(argv[0], Host); + uintptr_t id = GET_HANDLE(argv[1], peer_ID); ErlNifBinary bin; IF_ERROR(!enif_inspect_binary(env, argv[2], &bin), "invalid protocol ID"); GoString proto_id = {(const char *)bin.data, bin.size}; - int result = NewStream(host, id, proto_id); - return get_handle_result(env, result); + uintptr_t result = HostNewStream(host, id, proto_id); + return get_handle_result(env, Stream, result); } -ERL_FUNCTION_GETTER(host_peerstore, Peerstore) -ERL_FUNCTION_GETTER(host_id, ID) -ERL_FUNCTION_GETTER(host_addrs, Addrs) +ERL_FUNCTION_GETTER(host_peerstore, Host, Peerstore, HostPeerstore) +ERL_FUNCTION_GETTER(host_id, Host, peer_ID, HostID) +ERL_FUNCTION_GETTER(host_addrs, Host, Multiaddr_arr, HostAddrs) /*********************/ /* Peerstore methods */ @@ -152,13 +219,13 @@ ERL_FUNCTION_GETTER(host_addrs, Addrs) ERL_FUNCTION(peerstore_add_addrs) { - uintptr_t ps = GET_HANDLE(argv[0], "peerstore"); - uintptr_t id = GET_HANDLE(argv[1], "peer id"); - uintptr_t addrs = GET_HANDLE(argv[2], "addrs"); + uintptr_t ps = GET_HANDLE(argv[0], Peerstore); + uintptr_t id = GET_HANDLE(argv[1], peer_ID); + uintptr_t addrs = GET_HANDLE(argv[2], Multiaddr_arr); u_long ttl; IF_ERROR(!enif_get_uint64(env, argv[3], &ttl), "invalid TTL"); - AddAddrs(ps, id, addrs, ttl); + PeerstoreAddAddrs(ps, id, addrs, ttl); return enif_make_atom(env, "ok"); } @@ -168,7 +235,7 @@ ERL_FUNCTION(peerstore_add_addrs) ERL_FUNCTION(stream_read) { - uintptr_t stream = GET_HANDLE(argv[0], "stream"); + uintptr_t stream = GET_HANDLE(argv[0], Stream); char buffer[BUFFER_SIZE]; GoSlice go_buffer = {buffer, BUFFER_SIZE, BUFFER_SIZE}; @@ -185,7 +252,7 @@ ERL_FUNCTION(stream_read) ERL_FUNCTION(stream_write) { - uintptr_t stream = GET_HANDLE(argv[0], "stream"); + uintptr_t stream = GET_HANDLE(argv[0], Stream); ErlNifBinary bin; IF_ERROR(!enif_inspect_binary(env, argv[1], &bin), "invalid data"); @@ -199,7 +266,7 @@ ERL_FUNCTION(stream_write) ERL_FUNCTION(stream_close) { - uintptr_t stream = GET_HANDLE(argv[0], "stream"); + uintptr_t stream = GET_HANDLE(argv[0], Stream); StreamClose(stream); return enif_make_atom(env, "ok"); } @@ -209,14 +276,15 @@ static ErlNifFunc nif_funcs[] = { NIF_ENTRY(host_new, 1), NIF_ENTRY(host_close, 1), NIF_ENTRY(host_set_stream_handler, 2), - NIF_ENTRY(host_new_stream, 3), + // TODO: check if host_new_stream is truly dirty + NIF_ENTRY(host_new_stream, 3, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks negotiating protocol NIF_ENTRY(host_peerstore, 1), NIF_ENTRY(host_id, 1), NIF_ENTRY(host_addrs, 1), NIF_ENTRY(peerstore_add_addrs, 4), - NIF_ENTRY(stream_read, 1), - NIF_ENTRY(stream_write, 2), + NIF_ENTRY(stream_read, 1, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks until reading + NIF_ENTRY(stream_write, 2, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks when buffer is full NIF_ENTRY(stream_close, 1), }; -ERL_NIF_INIT(Elixir.Libp2p, nif_funcs, NULL, NULL, NULL, NULL) +ERL_NIF_INIT(Elixir.Libp2p, nif_funcs, load, NULL, upgrade, NULL) diff --git a/native/libp2p_nif/main.go b/native/libp2p_nif/main.go index 33290b3cd..4c68fc0be 100644 --- a/native/libp2p_nif/main.go +++ b/native/libp2p_nif/main.go @@ -41,6 +41,11 @@ func callGetter[T any, R any](h C.uintptr_t, g func(T) R) C.uintptr_t { /* Utils */ /*********/ +//export DeleteHandle +func DeleteHandle(h C.uintptr_t) { + cgo.Handle(h).Delete() +} + //export ListenAddrStrings func ListenAddrStrings(listenAddr string) C.uintptr_t { // TODO: this function is variadic @@ -72,25 +77,23 @@ func HostNew(options []C.uintptr_t) C.uintptr_t { //export HostClose func (h C.uintptr_t) HostClose() { handle := cgo.Handle(h) - defer handle.Delete() handle.Value().(host.Host).Close() } -//export SetStreamHandler -func (h C.uintptr_t) SetStreamHandler(protoId string, procId C.erl_pid_t) { +//export HostSetStreamHandler +func (h C.uintptr_t) HostSetStreamHandler(protoId string, procId C.erl_pid_t, callback C.send_message_t) { handle := cgo.Handle(h) host := handle.Value().(host.Host) // WARN: we clone the string because the underlying buffer is owned by Elixir goProtoId := protocol.ID(strings.Clone(protoId)) handler := func(stream network.Stream) { - // NOTE: the stream handle should be deleted by calling Stream.Close() - C.send_message(procId, C.uintptr_t(cgo.NewHandle(stream))) + C.run_callback(callback, procId, C.uintptr_t(cgo.NewHandle(stream))) } host.SetStreamHandler(protocol.ID(goProtoId), handler) } -//export NewStream -func (h C.uintptr_t) NewStream(pid C.uintptr_t, protoId string) C.uintptr_t { +//export HostNewStream +func (h C.uintptr_t) HostNewStream(pid C.uintptr_t, protoId string) C.uintptr_t { host := cgo.Handle(h).Value().(host.Host) peerId := cgo.Handle(pid).Value().(peer.ID) // WARN: we clone the string because the underlying buffer is owned by Elixir @@ -105,18 +108,18 @@ func (h C.uintptr_t) NewStream(pid C.uintptr_t, protoId string) C.uintptr_t { return C.uintptr_t(cgo.NewHandle(stream)) } -//export Peerstore -func (h C.uintptr_t) Peerstore() C.uintptr_t { +//export HostPeerstore +func (h C.uintptr_t) HostPeerstore() C.uintptr_t { return callGetter(h, host.Host.Peerstore) } -//export ID -func (h C.uintptr_t) ID() C.uintptr_t { +//export HostID +func (h C.uintptr_t) HostID() C.uintptr_t { return callGetter(h, host.Host.ID) } -//export Addrs -func (h C.uintptr_t) Addrs() C.uintptr_t { +//export HostAddrs +func (h C.uintptr_t) HostAddrs() C.uintptr_t { return callGetter(h, host.Host.Addrs) } @@ -124,8 +127,8 @@ func (h C.uintptr_t) Addrs() C.uintptr_t { /* Peerstore methods */ /*********************/ -//export AddAddrs -func (ps C.uintptr_t) AddAddrs(id, addrs C.uintptr_t, ttl uint64) { +//export PeerstoreAddAddrs +func (ps C.uintptr_t) PeerstoreAddAddrs(id, addrs C.uintptr_t, ttl uint64) { psv := cgo.Handle(ps).Value().(peerstore.Peerstore) idv := cgo.Handle(id).Value().(peer.ID) addrsv := cgo.Handle(addrs).Value().([]multiaddr.Multiaddr) @@ -163,7 +166,6 @@ func (s C.uintptr_t) StreamWrite(data []byte) int { //export StreamClose func (s C.uintptr_t) StreamClose() { handle := cgo.Handle(s) - defer handle.Delete() handle.Value().(network.Stream).Close() } diff --git a/native/libp2p_nif/utils.c b/native/libp2p_nif/utils.c index e4a4b9ec1..9be6b9807 100644 --- a/native/libp2p_nif/utils.c +++ b/native/libp2p_nif/utils.c @@ -1,23 +1,6 @@ #include "utils.h" -#include -ErlNifPid *get_pid(erl_pid_t _pid) +void run_callback(send_message_t send_message, erl_pid_t pid, uintptr_t stream) { - return (ErlNifPid *)_pid; -} - -void send_message(erl_pid_t _pid, uintptr_t stream_handle) -{ - // Passed as void* to avoid including erl_nif.h in the header. - ErlNifPid *pid = get_pid(_pid); - ErlNifEnv *env = enif_alloc_env(); - - ERL_NIF_TERM message = enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_uint64(env, stream_handle)); - - int result = enif_send(NULL, pid, env, message); - // On error, the env isn't freed by the function. - if (!result) - { - enif_free_env(env); - } + send_message(pid, stream); } diff --git a/native/libp2p_nif/utils.h b/native/libp2p_nif/utils.h index 5285f2b3d..cc48a6b64 100644 --- a/native/libp2p_nif/utils.h +++ b/native/libp2p_nif/utils.h @@ -2,6 +2,11 @@ #include // for uintptr_t +// For better readability. Pointer is casted to opaque, +// to avoid having to include erl_nif.h in Go. typedef void *erl_pid_t; -void send_message(erl_pid_t pid, uintptr_t stream); +// send_message function signature. +typedef void (*send_message_t)(erl_pid_t pid, uintptr_t stream); + +void run_callback(send_message_t send_message, erl_pid_t pid, uintptr_t stream);