Skip to content

Naming improvements #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ impl Counter for CounterImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.with_service(CounterImpl.serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
HttpServer::new(Endpoint::builder().bind(CounterImpl.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
10 changes: 3 additions & 7 deletions examples/failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ impl FailureExample for FailureExampleImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.with_service(FailureExampleImpl.serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
HttpServer::new(Endpoint::builder().bind(FailureExampleImpl.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
10 changes: 3 additions & 7 deletions examples/greeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ impl Greeter for GreeterImpl {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.with_service(GreeterImpl.serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
HttpServer::new(Endpoint::builder().bind(GreeterImpl.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
4 changes: 2 additions & 2 deletions examples/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl RunExample for RunExampleImpl {

Ok(Json::from(res))
})
.named("get_ip")
.name("get_ip")
.await?
.into_inner();

Expand All @@ -39,7 +39,7 @@ async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(
Endpoint::builder()
.with_service(RunExampleImpl(reqwest::Client::new()).serve())
.bind(RunExampleImpl(reqwest::Client::new()).serve())
.build(),
)
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
Expand Down
36 changes: 18 additions & 18 deletions src/context/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,30 @@ pub trait RunFuture<O>: Future<Output = O> {
///
/// If unspecified, the `run` will be retried using the [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server),
/// which by default retries indefinitely.
fn with_retry_policy(self, retry_policy: RunRetryPolicy) -> Self;
fn retry_policy(self, retry_policy: RunRetryPolicy) -> Self;

/// Define a name for this `run` operation.
///
/// This is used mainly for observability.
fn named(self, name: impl Into<String>) -> Self;
fn name(self, name: impl Into<String>) -> Self;
}

/// This struct represents the policy to execute retries for run closures.
#[derive(Debug, Clone)]
pub struct RunRetryPolicy {
pub(crate) initial_interval: Duration,
pub(crate) initial_delay: Duration,
pub(crate) factor: f32,
pub(crate) max_interval: Option<Duration>,
pub(crate) max_delay: Option<Duration>,
pub(crate) max_attempts: Option<u32>,
pub(crate) max_duration: Option<Duration>,
}

impl Default for RunRetryPolicy {
fn default() -> Self {
Self {
initial_interval: Duration::from_millis(100),
initial_delay: Duration::from_millis(100),
factor: 2.0,
max_interval: Some(Duration::from_secs(2)),
max_delay: Some(Duration::from_secs(2)),
max_attempts: None,
max_duration: Some(Duration::from_secs(50)),
}
Expand All @@ -65,29 +65,29 @@ impl RunRetryPolicy {
/// Create a new retry policy.
pub fn new() -> Self {
Self {
initial_interval: Duration::from_millis(100),
initial_delay: Duration::from_millis(100),
factor: 1.0,
max_interval: None,
max_delay: None,
max_attempts: None,
max_duration: None,
}
}

/// Initial interval for the first retry attempt.
pub fn with_initial_interval(mut self, initial_interval: Duration) -> Self {
self.initial_interval = initial_interval;
/// Initial retry delay for the first retry attempt.
pub fn initial_delay(mut self, initial_interval: Duration) -> Self {
self.initial_delay = initial_interval;
self
}

/// Maximum interval between retries.
pub fn with_factor(mut self, factor: f32) -> Self {
/// Exponentiation factor to use when computing the next retry delay.
pub fn exponentiation_factor(mut self, factor: f32) -> Self {
self.factor = factor;
self
}

/// Maximum interval between retries.
pub fn with_max_interval(mut self, max_interval: Duration) -> Self {
self.max_interval = Some(max_interval);
/// Maximum delay between retries.
pub fn max_delay(mut self, max_interval: Duration) -> Self {
self.max_delay = Some(max_interval);
self
}

Expand All @@ -98,7 +98,7 @@ impl RunRetryPolicy {
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
///
/// Infinite retries if this field and `max_duration` are unset.
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = Some(max_attempts);
self
}
Expand All @@ -110,7 +110,7 @@ impl RunRetryPolicy {
/// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
///
/// Infinite retries if this field and `max_attempts` are unset.
pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
pub fn max_duration(mut self, max_duration: Duration) -> Self {
self.max_duration = Some(max_duration);
self
}
Expand Down
8 changes: 4 additions & 4 deletions src/endpoint/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,18 +638,18 @@ where
Fut: Future<Output = HandlerResult<Out>> + Send + Sync,
Out: Serialize + Deserialize,
{
fn with_retry_policy(mut self, retry_policy: RunRetryPolicy) -> Self {
fn retry_policy(mut self, retry_policy: RunRetryPolicy) -> Self {
self.retry_policy = RetryPolicy::Exponential {
initial_interval: retry_policy.initial_interval,
initial_interval: retry_policy.initial_delay,
factor: retry_policy.factor,
max_interval: retry_policy.max_interval,
max_interval: retry_policy.max_delay,
max_attempts: retry_policy.max_attempts,
max_duration: retry_policy.max_duration,
};
self
}

fn named(mut self, name: impl Into<String>) -> Self {
fn name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
Expand Down
8 changes: 4 additions & 4 deletions src/endpoint/futures/intercept_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ impl<F, R> RunFuture<R> for InterceptErrorFuture<F>
where
F: RunFuture<Result<R, Error>>,
{
fn with_retry_policy(mut self, retry_policy: RunRetryPolicy) -> Self {
self.fut = self.fut.with_retry_policy(retry_policy);
fn retry_policy(mut self, retry_policy: RunRetryPolicy) -> Self {
self.fut = self.fut.retry_policy(retry_policy);
self
}

fn named(mut self, name: impl Into<String>) -> Self {
self.fut = self.fut.named(name);
fn name(mut self, name: impl Into<String>) -> Self {
self.fut = self.fut.name(name);
self
}
}
6 changes: 3 additions & 3 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ impl Builder {
Self::default()
}

/// Add a service.
/// Add a [`Service`] to this endpoint.
///
/// When using the service/object/workflow macros, you need to pass the result of the `serve` method.
pub fn with_service<
pub fn bind<
S: Service<Future = BoxFuture<'static, Result<(), Error>>>
+ Discoverable
+ Send
Expand All @@ -214,7 +214,7 @@ impl Builder {
}

/// Add identity key, e.g. `publickeyv1_ChjENKeMvCtRnqG2mrBK1HmPKufgFUc98K8B3ononQvp`.
pub fn with_identity_key(mut self, key: &str) -> Result<Self, KeyError> {
pub fn identity_key(mut self, key: &str) -> Result<Self, KeyError> {
self.identity_verifier = self.identity_verifier.with_key(key)?;
Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! async fn main() {
//! HttpServer::new(
//! Endpoint::builder()
//! .with_service(GreeterImpl.serve())
//! .bind(GreeterImpl.serve())
//! .build(),
//! )
//! .listen_and_serve("0.0.0.0:9080".parse().unwrap())
Expand Down
16 changes: 8 additions & 8 deletions test-services/src/failing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ impl Failing for FailingImpl {
Err(anyhow!("Failed at attempt {current_attempt}"))?
}
})
.with_retry_policy(
.retry_policy(
RunRetryPolicy::new()
.with_initial_interval(Duration::from_millis(10))
.with_factor(1.0),
.initial_delay(Duration::from_millis(10))
.exponentiation_factor(1.0),
)
.named("failing_side_effect")
.name("failing_side_effect")
.await?;

Ok(success_attempt)
Expand All @@ -117,11 +117,11 @@ impl Failing for FailingImpl {
let current_attempt = cloned_counter.fetch_add(1, Ordering::SeqCst) + 1;
Err::<(), _>(anyhow!("Failed at attempt {current_attempt}").into())
})
.with_retry_policy(
.retry_policy(
RunRetryPolicy::new()
.with_initial_interval(Duration::from_millis(10))
.with_factor(1.0)
.with_max_attempts(retry_policy_max_retry_count as u32),
.initial_delay(Duration::from_millis(10))
.exponentiation_factor(1.0)
.max_attempts(retry_policy_max_retry_count as u32),
)
.await
.is_err()
Expand Down
28 changes: 14 additions & 14 deletions test-services/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,63 @@ async fn main() {
let mut builder = Endpoint::builder();

if services == "*" || services.contains("Counter") {
builder = builder.with_service(counter::Counter::serve(counter::CounterImpl))
builder = builder.bind(counter::Counter::serve(counter::CounterImpl))
}
if services == "*" || services.contains("Proxy") {
builder = builder.with_service(proxy::Proxy::serve(proxy::ProxyImpl))
builder = builder.bind(proxy::Proxy::serve(proxy::ProxyImpl))
}
if services == "*" || services.contains("MapObject") {
builder = builder.with_service(map_object::MapObject::serve(map_object::MapObjectImpl))
builder = builder.bind(map_object::MapObject::serve(map_object::MapObjectImpl))
}
if services == "*" || services.contains("ListObject") {
builder = builder.with_service(list_object::ListObject::serve(list_object::ListObjectImpl))
builder = builder.bind(list_object::ListObject::serve(list_object::ListObjectImpl))
}
if services == "*" || services.contains("AwakeableHolder") {
builder = builder.with_service(awakeable_holder::AwakeableHolder::serve(
builder = builder.bind(awakeable_holder::AwakeableHolder::serve(
awakeable_holder::AwakeableHolderImpl,
))
}
if services == "*" || services.contains("BlockAndWaitWorkflow") {
builder = builder.with_service(block_and_wait_workflow::BlockAndWaitWorkflow::serve(
builder = builder.bind(block_and_wait_workflow::BlockAndWaitWorkflow::serve(
block_and_wait_workflow::BlockAndWaitWorkflowImpl,
))
}
if services == "*" || services.contains("CancelTestRunner") {
builder = builder.with_service(cancel_test::CancelTestRunner::serve(
builder = builder.bind(cancel_test::CancelTestRunner::serve(
cancel_test::CancelTestRunnerImpl,
))
}
if services == "*" || services.contains("CancelTestBlockingService") {
builder = builder.with_service(cancel_test::CancelTestBlockingService::serve(
builder = builder.bind(cancel_test::CancelTestBlockingService::serve(
cancel_test::CancelTestBlockingServiceImpl,
))
}
if services == "*" || services.contains("Failing") {
builder = builder.with_service(failing::Failing::serve(failing::FailingImpl::default()))
builder = builder.bind(failing::Failing::serve(failing::FailingImpl::default()))
}
if services == "*" || services.contains("KillTestRunner") {
builder = builder.with_service(kill_test::KillTestRunner::serve(
builder = builder.bind(kill_test::KillTestRunner::serve(
kill_test::KillTestRunnerImpl,
))
}
if services == "*" || services.contains("KillTestSingleton") {
builder = builder.with_service(kill_test::KillTestSingleton::serve(
builder = builder.bind(kill_test::KillTestSingleton::serve(
kill_test::KillTestSingletonImpl,
))
}
if services == "*" || services.contains("NonDeterministic") {
builder = builder.with_service(non_deterministic::NonDeterministic::serve(
builder = builder.bind(non_deterministic::NonDeterministic::serve(
non_deterministic::NonDeterministicImpl::default(),
))
}
if services == "*" || services.contains("TestUtilsService") {
builder = builder.with_service(test_utils_service::TestUtilsService::serve(
builder = builder.bind(test_utils_service::TestUtilsService::serve(
test_utils_service::TestUtilsServiceImpl,
))
}

if let Ok(key) = env::var("E2E_REQUEST_SIGNING_ENV") {
builder = builder.with_identity_key(&key).unwrap()
builder = builder.identity_key(&key).unwrap()
}

HttpServer::new(builder.build())
Expand Down