Skip to content

Commit 0a57982

Browse files
intermediary
1 parent cb72d84 commit 0a57982

File tree

14 files changed

+120
-377
lines changed

14 files changed

+120
-377
lines changed

crates/pg_lsp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pg_base_db.workspace = true
3232
pg_schema_cache.workspace = true
3333
pg_workspace.workspace = true
3434
pg_diagnostics.workspace = true
35-
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "sync", "time"] }
35+
tokio = { version = "1.40.0", features = ["io-std", "macros", "rt-multi-thread", "sync", "time"] }
3636
tokio-util = "0.7.12"
3737
tower-lsp = "0.20.0"
3838

crates/pg_lsp/src/client/client_flags.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ use tower_lsp::lsp_types::InitializeParams;
66
pub struct ClientFlags {
77
/// If `true`, the server can pull configuration from the client.
88
pub supports_pull_opts: bool,
9-
10-
/// If `true`, the client notifies the server when its configuration changes.
11-
pub supports_dynamic_registration: bool,
129
}
1310

1411
impl ClientFlags {
@@ -20,17 +17,6 @@ impl ClientFlags {
2017
.and_then(|w| w.configuration)
2118
.unwrap_or(false);
2219

23-
let supports_dynamic_registration = params
24-
.capabilities
25-
.workspace
26-
.as_ref()
27-
.and_then(|w| w.did_change_configuration)
28-
.and_then(|c| c.dynamic_registration)
29-
.unwrap_or(false);
30-
31-
Self {
32-
supports_pull_opts,
33-
supports_dynamic_registration,
34-
}
20+
Self { supports_pull_opts }
3521
}
3622
}

crates/pg_lsp/src/db_connection.rs

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,66 +2,74 @@ use pg_schema_cache::SchemaCache;
22
use sqlx::{postgres::PgListener, PgPool};
33
use tokio::task::JoinHandle;
44

5-
#[derive(Debug)]
65
pub(crate) struct DbConnection {
76
pool: PgPool,
87
connection_string: String,
9-
schema_update_handle: Option<JoinHandle<()>>,
8+
schema_update_handle: JoinHandle<()>,
9+
close_tx: tokio::sync::oneshot::Sender<()>,
1010
}
1111

1212
impl DbConnection {
13-
pub(crate) async fn new(connection_string: String) -> Result<Self, sqlx::Error> {
14-
let pool = PgPool::connect(&connection_string).await?;
15-
Ok(Self {
16-
pool,
17-
connection_string: connection_string,
18-
schema_update_handle: None,
19-
})
20-
}
21-
22-
pub(crate) fn connected_to(&self, connection_string: &str) -> bool {
23-
connection_string == self.connection_string
24-
}
25-
26-
pub(crate) async fn close(self) {
27-
if self.schema_update_handle.is_some() {
28-
self.schema_update_handle.unwrap().abort();
29-
}
30-
self.pool.close().await;
31-
}
32-
33-
pub(crate) async fn listen_for_schema_updates<F>(
34-
&mut self,
13+
pub(crate) async fn new<F>(
14+
connection_string: String,
3515
on_schema_update: F,
36-
) -> anyhow::Result<()>
16+
) -> Result<Self, sqlx::Error>
3717
where
3818
F: Fn(SchemaCache) -> () + Send + 'static,
3919
{
40-
let mut listener = PgListener::connect_with(&self.pool).await?;
20+
let pool = PgPool::connect(&connection_string).await?;
21+
22+
let mut listener = PgListener::connect_with(&pool).await?;
4123
listener.listen_all(["postgres_lsp", "pgrst"]).await?;
4224

43-
let pool = self.pool.clone();
25+
let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
26+
27+
let cloned_pool = pool.clone();
28+
29+
let schema_update_handle: JoinHandle<()> = tokio::spawn(async move {
30+
let mut moved_rx = close_rx;
4431

45-
let handle: JoinHandle<()> = tokio::spawn(async move {
4632
loop {
47-
match listener.recv().await {
48-
Ok(not) => {
49-
if not.payload().to_string() == "reload schema" {
50-
let schema_cache = SchemaCache::load(&pool).await;
51-
on_schema_update(schema_cache);
52-
};
33+
tokio::select! {
34+
res = listener.recv() => {
35+
match res {
36+
Ok(not) => {
37+
if not.payload().to_string() == "reload schema" {
38+
let schema_cache = SchemaCache::load(&cloned_pool).await;
39+
on_schema_update(schema_cache);
40+
};
41+
}
42+
Err(why) => {
43+
eprintln!("Error receiving notification: {:?}", why);
44+
break;
45+
}
46+
}
5347
}
54-
Err(why) => {
55-
eprintln!("Error receiving notification: {:?}", why);
56-
break;
48+
49+
_ = &mut moved_rx => {
50+
return;
5751
}
5852
}
5953
}
6054
});
6155

62-
self.schema_update_handle = Some(handle);
56+
Ok(Self {
57+
pool,
58+
connection_string: connection_string,
59+
schema_update_handle,
60+
close_tx,
61+
})
62+
}
63+
64+
pub(crate) fn connected_to(&self, connection_string: &str) -> bool {
65+
connection_string == self.connection_string
66+
}
67+
68+
pub(crate) async fn close(self) {
69+
let _ = self.close_tx.send(());
70+
let _ = self.schema_update_handle.await;
6371

64-
Ok(())
72+
self.pool.close().await;
6573
}
6674

6775
pub(crate) fn get_pool(&self) -> PgPool {

crates/pg_lsp/src/debouncer.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,33 @@
1-
use std::{future::Future, pin::Pin};
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
sync::{atomic::AtomicBool, Arc},
5+
};
26

37
type AsyncBlock = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
48

59
pub(crate) struct SimpleTokioDebouncer {
610
handle: tokio::task::JoinHandle<()>,
711
tx: tokio::sync::mpsc::Sender<AsyncBlock>,
12+
shutdown_flag: Arc<AtomicBool>,
813
}
914

1015
impl SimpleTokioDebouncer {
1116
pub fn new(timeout: std::time::Duration) -> Self {
1217
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
1318

19+
let shutdown_flag = Arc::new(AtomicBool::new(false));
20+
let shutdown_flag_clone = shutdown_flag.clone();
21+
1422
let handle = tokio::spawn(async move {
1523
let mut maybe_args: Option<AsyncBlock> = None;
1624
let mut instant = tokio::time::Instant::now() + timeout;
1725

1826
loop {
27+
if shutdown_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
28+
break;
29+
}
30+
1931
tokio::select! {
2032
// If the timeout is reached, execute and reset the last received action
2133
_ = tokio::time::sleep_until(instant) => {
@@ -42,14 +54,30 @@ impl SimpleTokioDebouncer {
4254
}
4355
});
4456

45-
Self { handle, tx }
57+
Self {
58+
handle,
59+
tx,
60+
shutdown_flag,
61+
}
4662
}
4763

4864
pub async fn debounce(&self, block: AsyncBlock) {
65+
if self
66+
.shutdown_flag
67+
.load(std::sync::atomic::Ordering::Relaxed)
68+
{
69+
println!(
70+
"Trying to debounce tasks, but the Debouncer is in the process of shutting down."
71+
);
72+
return;
73+
}
74+
4975
self.tx.send(block).await.unwrap();
5076
}
5177

52-
pub async fn shutdown(self) {
53-
let _ = self.handle.await.unwrap();
78+
pub async fn shutdown(&self) {
79+
self.shutdown_flag
80+
.store(true, std::sync::atomic::Ordering::Relaxed);
81+
let _ = self.handle.abort(); // we don't care about any errors during shutdown
5482
}
5583
}

crates/pg_lsp/src/main.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
use pg_lsp::server::Server;
2-
use tower_lsp::LspService;
1+
use pg_lsp::server::LspServer;
2+
use tower_lsp::{LspService, Server};
33

44
#[tokio::main]
5-
async fn main() -> anyhow::Result<()> {
6-
let (server, client_socket) = LspService::build(|client| Server::new(client)).finish();
5+
async fn main() {
6+
let stdin = tokio::io::stdin();
7+
let stdout = tokio::io::stdout();
78

8-
Ok(())
9+
let (service, socket) = LspService::new(|client| LspServer::new(client));
10+
11+
Server::new(stdin, stdout, socket).serve(service).await;
912
}

crates/pg_lsp/src/server.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use notification::ShowMessage;
24
use pg_commands::CommandType;
35
use tokio::sync::RwLock;
@@ -13,18 +15,18 @@ use crate::utils::file_path;
1315
use crate::utils::normalize_uri;
1416
use crate::utils::to_proto;
1517

16-
pub struct Server {
17-
client: Client,
18-
session: Session,
18+
pub struct LspServer {
19+
client: Arc<Client>,
20+
session: Arc<Session>,
1921
client_capabilities: RwLock<Option<ClientFlags>>,
2022
debouncer: SimpleTokioDebouncer,
2123
}
2224

23-
impl Server {
25+
impl LspServer {
2426
pub fn new(client: Client) -> Self {
2527
Self {
26-
client,
27-
session: Session::new(),
28+
client: Arc::new(client),
29+
session: Arc::new(Session::new()),
2830
client_capabilities: RwLock::new(None),
2931
debouncer: SimpleTokioDebouncer::new(std::time::Duration::from_millis(500)),
3032
}
@@ -120,8 +122,8 @@ impl Server {
120122
}
121123

122124
async fn publish_diagnostics_debounced(&self, mut uri: Url) {
123-
let session = self.session.clone();
124-
let client = self.client.clone();
125+
let client = Arc::clone(&self.client);
126+
let session = Arc::clone(&self.session);
125127

126128
self.debouncer
127129
.debounce(Box::pin(async move {
@@ -157,7 +159,7 @@ impl Server {
157159
}
158160

159161
#[tower_lsp::async_trait]
160-
impl LanguageServer for Server {
162+
impl LanguageServer for LspServer {
161163
async fn initialize(&self, params: InitializeParams) -> jsonrpc::Result<InitializeResult> {
162164
let flags = ClientFlags::from_initialize_request_params(&params);
163165
self.client_capabilities.blocking_write().replace(flags);
@@ -199,11 +201,13 @@ impl LanguageServer for Server {
199201
}
200202

201203
async fn shutdown(&self) -> jsonrpc::Result<()> {
202-
// TODO: Shutdown stuff.
204+
self.session.shutdown().await;
205+
self.debouncer.shutdown().await;
203206

204207
self.client
205208
.log_message(MessageType::INFO, "Postgres LSP terminated.")
206209
.await;
210+
207211
Ok(())
208212
}
209213

@@ -247,7 +251,10 @@ impl LanguageServer for Server {
247251
self.client
248252
.show_message(
249253
MessageType::ERROR,
250-
format!("Used Client Options from params but failed to set them: {}", err),
254+
format!(
255+
"Used Client Options from params but failed to set them: {}",
256+
err
257+
),
251258
)
252259
.await
253260
}

0 commit comments

Comments
 (0)