Skip to content

feat: wrap cgo Handles with Erlang Resources #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ lambda_ethereum_consensus-*.tar
# Compiled artifacts.
*.o
*.a
*.h
*.so
native/libp2p_nif/main.h

# VSCode configuration dir.
.vscode/
Expand Down
14 changes: 7 additions & 7 deletions lib/lambda_ethereum_consensus/libp2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,37 @@ defmodule Libp2p do
@typedoc """
A handle to a Go resource.
"""
@type handle :: integer
@opaque handle :: reference

@typedoc """
A handle to a host.Host.
"""
@type host :: handle
@opaque host :: handle

@typedoc """
A handle to a peerstore.Peerstore.
"""
@type peerstore :: handle
@opaque peerstore :: handle

@typedoc """
A handle to a peer.ID.
"""
@type peer_id :: handle
@opaque peer_id :: handle

@typedoc """
A handle to a []multiaddr.MultiAddr.
"""
@type addrs :: handle
@opaque addrs :: handle

@typedoc """
A handle to a stream.
"""
@type stream :: handle
@opaque stream :: handle

@typedoc """
A handle to an Option.
"""
@type option :: handle
@opaque option :: handle

@typedoc """
An error returned by this module.
Expand Down
158 changes: 113 additions & 45 deletions native/libp2p_nif/libp2p.c
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#include "main.h"
#include "utils.h"
#include <stdbool.h>
#include <erl_nif.h>

#define ERL_FUNCTION(FUNCTION_NAME) static ERL_NIF_TERM FUNCTION_NAME(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])

#define ERL_FUNCTION_GETTER(NAME, GETTER) \
ERL_FUNCTION(NAME) \
{ \
uintptr_t _handle = get_handle_from_term(env, argv[0]); \
IF_ERROR(_handle == 0, "invalid first argument"); \
uintptr_t _res = GETTER(_handle); \
return get_handle_result(env, _res); \
#define ERL_FUNCTION_GETTER(NAME, RECV_TYPE, ATTR_TYPE, GETTER) \
ERL_FUNCTION(NAME) \
{ \
uintptr_t _handle = get_handle_from_term(env, RECV_TYPE, argv[0]); \
IF_ERROR(_handle == 0, "invalid first argument"); \
uintptr_t _res = GETTER(_handle); \
return get_handle_result(env, ATTR_TYPE, _res); \
}

#define IF_ERROR(COND, MSG) \
Expand All @@ -19,29 +20,72 @@
return make_error_msg(env, (MSG)); \
}

#define GET_HANDLE(TERM, NAME) \
({ \
uintptr_t _handle = get_handle_from_term(env, (TERM)); \
IF_ERROR(_handle == 0, "invalid " NAME); \
_handle; \
#define GET_HANDLE(TERM, TYPE) \
({ \
uintptr_t _handle = get_handle_from_term(env, (TYPE), (TERM)); \
IF_ERROR(_handle == 0, "invalid " #TYPE); \
_handle; \
})

#define NIF_ENTRY(FUNCTION_NAME, ARITY) \
{ \
#FUNCTION_NAME, ARITY, FUNCTION_NAME \
#define NIF_ENTRY(FUNCTION_NAME, ARITY, ...) \
{ \
#FUNCTION_NAME, ARITY, FUNCTION_NAME, __VA_ARGS__ \
}

const uint64_t PID_LENGTH = 1024;
const uint64_t BUFFER_SIZE = 4096;

/*************/
/* NIF Setup */
/*************/

ErlNifResourceType *Option;
ErlNifResourceType *Host;
ErlNifResourceType *Peerstore;
ErlNifResourceType *peer_ID;
ErlNifResourceType *Multiaddr_arr;
ErlNifResourceType *Stream;

// Resource type helpers
void handle_cleanup(ErlNifEnv *env, void *obj)
{
uintptr_t *handle = obj;
DeleteHandle(*handle);
}

#define OPEN_RESOURCE_TYPE(NAME) ((NAME) = enif_open_resource_type(env, NULL, (#NAME), handle_cleanup, flags, NULL))

static int open_resource_types(ErlNifEnv *env, ErlNifResourceFlags flags)
{
int failed = false;
failed |= NULL == OPEN_RESOURCE_TYPE(Option);
failed |= NULL == OPEN_RESOURCE_TYPE(Host);
failed |= NULL == OPEN_RESOURCE_TYPE(Peerstore);
failed |= NULL == OPEN_RESOURCE_TYPE(peer_ID);
failed |= NULL == OPEN_RESOURCE_TYPE(Multiaddr_arr);
failed |= NULL == OPEN_RESOURCE_TYPE(Stream);
return failed;
}

static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{
return open_resource_types(env, ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER);
}

static int upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data,
ERL_NIF_TERM load_info)
{
return open_resource_types(env, ERL_NIF_RT_TAKEOVER);
}

/***********/
/* Helpers */
/***********/

static uintptr_t get_handle_from_term(ErlNifEnv *env, ERL_NIF_TERM term)
static uintptr_t get_handle_from_term(ErlNifEnv *env, ErlNifResourceType *type, ERL_NIF_TERM term)
{
uintptr_t handle;
return enif_get_uint64(env, term, &handle) ? handle : 0;
uintptr_t *obj;
int result = enif_get_resource(env, term, type, (void **)&obj);
return (!result || obj == NULL) ? 0 : *obj;
}

static ERL_NIF_TERM _make_error_msg(ErlNifEnv *env, uint len, const char *msg)
Expand All @@ -62,10 +106,32 @@ static ERL_NIF_TERM make_ok_tuple2(ErlNifEnv *env, ERL_NIF_TERM term)
return enif_make_tuple2(env, enif_make_atom(env, "ok"), term);
}

static ERL_NIF_TERM get_handle_result(ErlNifEnv *env, uintptr_t handle)
static ERL_NIF_TERM get_handle_result(ErlNifEnv *env, ErlNifResourceType *type, uintptr_t handle)
{
IF_ERROR(handle == 0, "invalid handle returned");
return make_ok_tuple2(env, enif_make_uint64(env, handle));
uintptr_t *obj = enif_alloc_resource(type, sizeof(uintptr_t));
IF_ERROR(obj == NULL, "couldn't create resource");
*obj = handle;
ERL_NIF_TERM term = enif_make_resource(env, obj);
// NOTE: we need to release our reference, so it can be GC'd
enif_release_resource(obj);
return make_ok_tuple2(env, term);
}

void send_message(erl_pid_t _pid, uintptr_t stream_handle)
{
// Passed as void* to avoid including erl_nif.h in the header.
ErlNifPid *pid = (ErlNifPid *)_pid;
ErlNifEnv *env = enif_alloc_env();

ERL_NIF_TERM message = get_handle_result(env, Stream, stream_handle);

int result = enif_send(NULL, pid, env, message);
// On error, the env isn't freed by the function.
if (!result)
{
enif_free_env(env);
}
}

/*********/
Expand All @@ -80,7 +146,7 @@ ERL_FUNCTION(listen_addr_strings)

uintptr_t handle = ListenAddrStrings(listen_addr);

return get_handle_result(env, handle);
return get_handle_result(env, Option, handle);
}

/****************/
Expand All @@ -97,23 +163,24 @@ ERL_FUNCTION(host_new)
while (!enif_is_empty_list(env, tail) && i < MAX_OPTIONS)
{
enif_get_list_cell(env, tail, &head, &tail);
options[i++] = GET_HANDLE(head, "option");
uintptr_t handle = GET_HANDLE(head, Option);
options[i++] = handle;
}
GoSlice go_options = {options, i, MAX_OPTIONS};
uintptr_t result = HostNew(go_options);
return get_handle_result(env, result);
return get_handle_result(env, Host, result);
}

ERL_FUNCTION(host_close)
{
uintptr_t host = GET_HANDLE(argv[0], "host");
uintptr_t host = GET_HANDLE(argv[0], Host);
HostClose(host);
return enif_make_atom(env, "ok");
}

ERL_FUNCTION(host_set_stream_handler)
{
uintptr_t host = GET_HANDLE(argv[0], "host");
uintptr_t host = GET_HANDLE(argv[0], Host);

ErlNifBinary bin;
IF_ERROR(!enif_inspect_binary(env, argv[1], &bin), "invalid protocol ID");
Expand All @@ -124,41 +191,41 @@ ERL_FUNCTION(host_set_stream_handler)

IF_ERROR(!enif_self(env, pid), "failed to get pid");

SetStreamHandler(host, proto_id, (void *)pid);
HostSetStreamHandler(host, proto_id, (void *)pid, send_message);

return enif_make_atom(env, "ok");
}

ERL_FUNCTION(host_new_stream)
{
uintptr_t host = GET_HANDLE(argv[0], "host");
uintptr_t id = GET_HANDLE(argv[1], "peer id");
uintptr_t host = GET_HANDLE(argv[0], Host);
uintptr_t id = GET_HANDLE(argv[1], peer_ID);

ErlNifBinary bin;
IF_ERROR(!enif_inspect_binary(env, argv[2], &bin), "invalid protocol ID");
GoString proto_id = {(const char *)bin.data, bin.size};

int result = NewStream(host, id, proto_id);
return get_handle_result(env, result);
uintptr_t result = HostNewStream(host, id, proto_id);
return get_handle_result(env, Stream, result);
}

ERL_FUNCTION_GETTER(host_peerstore, Peerstore)
ERL_FUNCTION_GETTER(host_id, ID)
ERL_FUNCTION_GETTER(host_addrs, Addrs)
ERL_FUNCTION_GETTER(host_peerstore, Host, Peerstore, HostPeerstore)
ERL_FUNCTION_GETTER(host_id, Host, peer_ID, HostID)
ERL_FUNCTION_GETTER(host_addrs, Host, Multiaddr_arr, HostAddrs)

/*********************/
/* Peerstore methods */
/*********************/

ERL_FUNCTION(peerstore_add_addrs)
{
uintptr_t ps = GET_HANDLE(argv[0], "peerstore");
uintptr_t id = GET_HANDLE(argv[1], "peer id");
uintptr_t addrs = GET_HANDLE(argv[2], "addrs");
uintptr_t ps = GET_HANDLE(argv[0], Peerstore);
uintptr_t id = GET_HANDLE(argv[1], peer_ID);
uintptr_t addrs = GET_HANDLE(argv[2], Multiaddr_arr);
u_long ttl;
IF_ERROR(!enif_get_uint64(env, argv[3], &ttl), "invalid TTL");

AddAddrs(ps, id, addrs, ttl);
PeerstoreAddAddrs(ps, id, addrs, ttl);
return enif_make_atom(env, "ok");
}

Expand All @@ -168,7 +235,7 @@ ERL_FUNCTION(peerstore_add_addrs)

ERL_FUNCTION(stream_read)
{
uintptr_t stream = GET_HANDLE(argv[0], "stream");
uintptr_t stream = GET_HANDLE(argv[0], Stream);

char buffer[BUFFER_SIZE];
GoSlice go_buffer = {buffer, BUFFER_SIZE, BUFFER_SIZE};
Expand All @@ -185,7 +252,7 @@ ERL_FUNCTION(stream_read)

ERL_FUNCTION(stream_write)
{
uintptr_t stream = GET_HANDLE(argv[0], "stream");
uintptr_t stream = GET_HANDLE(argv[0], Stream);

ErlNifBinary bin;
IF_ERROR(!enif_inspect_binary(env, argv[1], &bin), "invalid data");
Expand All @@ -199,7 +266,7 @@ ERL_FUNCTION(stream_write)

ERL_FUNCTION(stream_close)
{
uintptr_t stream = GET_HANDLE(argv[0], "stream");
uintptr_t stream = GET_HANDLE(argv[0], Stream);
StreamClose(stream);
return enif_make_atom(env, "ok");
}
Expand All @@ -209,14 +276,15 @@ static ErlNifFunc nif_funcs[] = {
NIF_ENTRY(host_new, 1),
NIF_ENTRY(host_close, 1),
NIF_ENTRY(host_set_stream_handler, 2),
NIF_ENTRY(host_new_stream, 3),
// TODO: check if host_new_stream is truly dirty
NIF_ENTRY(host_new_stream, 3, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks negotiating protocol
NIF_ENTRY(host_peerstore, 1),
NIF_ENTRY(host_id, 1),
NIF_ENTRY(host_addrs, 1),
NIF_ENTRY(peerstore_add_addrs, 4),
NIF_ENTRY(stream_read, 1),
NIF_ENTRY(stream_write, 2),
NIF_ENTRY(stream_read, 1, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks until reading
NIF_ENTRY(stream_write, 2, ERL_NIF_DIRTY_JOB_IO_BOUND), // blocks when buffer is full
NIF_ENTRY(stream_close, 1),
};

ERL_NIF_INIT(Elixir.Libp2p, nif_funcs, NULL, NULL, NULL, NULL)
ERL_NIF_INIT(Elixir.Libp2p, nif_funcs, load, NULL, upgrade, NULL)
Loading