Skip to content

add support for MERGE statement #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,19 @@ pub enum Statement {
/// A SQL query that specifies what to explain
statement: Box<Statement>,
},
// MERGE INTO statement, based on Snowflake. See <https://docs.snowflake.com/en/sql-reference/sql/merge.html>
Merge {
// Specifies the table to merge
table: TableFactor,
// Specifies the table or subquery to join with the target table
source: Box<SetExpr>,
// Specifies alias to the table that is joined with target table
alias: Option<TableAlias>,
// Specifies the expression on which to join the target table and source
on: Box<Expr>,
// Specifies the actions to perform when values match or do not match.
clauses: Vec<MergeClause>,
},
}

impl fmt::Display for Statement {
Expand Down Expand Up @@ -1606,6 +1619,20 @@ impl fmt::Display for Statement {
write!(f, "NULL")
}
}
Statement::Merge {
table,
source,
alias,
on,
clauses,
} => {
write!(f, "MERGE INTO {} USING {} ", table, source)?;
if let Some(a) = alias {
write!(f, "as {} ", a)?;
};
write!(f, "ON {} ", on)?;
write!(f, "{}", display_separated(clauses, " "))
}
}
}
}
Expand Down Expand Up @@ -2137,6 +2164,68 @@ impl fmt::Display for SqliteOnConflict {
}
}

///
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum MergeClause {
MatchedUpdate {
predicate: Option<Expr>,
assignments: Vec<Assignment>,
},
MatchedDelete(Option<Expr>),
NotMatched {
predicate: Option<Expr>,
columns: Vec<Ident>,
values: Values,
},
}

impl fmt::Display for MergeClause {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use MergeClause::*;
write!(f, "WHEN")?;
match self {
MatchedUpdate {
predicate,
assignments,
} => {
write!(f, " MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(
f,
" THEN UPDATE SET {}",
display_comma_separated(assignments)
)
}
MatchedDelete(predicate) => {
write!(f, " MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(f, " THEN DELETE")
}
NotMatched {
predicate,
columns,
values,
} => {
write!(f, " NOT MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(
f,
" THEN INSERT ({}) {}",
display_comma_separated(columns),
values
)
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ define_keywords!(
LOWER,
MANAGEDLOCATION,
MATCH,
MATCHED,
MATERIALIZED,
MAX,
MEMBER,
Expand Down
99 changes: 99 additions & 0 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl<'a> Parser<'a> {
Keyword::DEALLOCATE => Ok(self.parse_deallocate()?),
Keyword::EXECUTE => Ok(self.parse_execute()?),
Keyword::PREPARE => Ok(self.parse_prepare()?),
Keyword::MERGE => Ok(self.parse_merge()?),
Keyword::REPLACE if dialect_of!(self is SQLiteDialect ) => {
self.prev_token();
Ok(self.parse_insert()?)
Expand Down Expand Up @@ -3890,6 +3891,104 @@ impl<'a> Parser<'a> {
comment,
})
}

pub fn parse_merge_clauses(&mut self) -> Result<Vec<MergeClause>, ParserError> {
let mut clauses: Vec<MergeClause> = vec![];
loop {
if self.peek_token() == Token::EOF {
break;
}
self.expect_keyword(Keyword::WHEN)?;

let is_not_matched = self.parse_keyword(Keyword::NOT);
self.expect_keyword(Keyword::MATCHED)?;

let predicate = if self.parse_keyword(Keyword::AND) {
Some(self.parse_expr()?)
} else {
None
};

self.expect_keyword(Keyword::THEN)?;

clauses.push(
match self.parse_one_of_keywords(&[
Keyword::UPDATE,
Keyword::INSERT,
Keyword::DELETE,
]) {
Some(Keyword::UPDATE) => {
if is_not_matched {
return Err(ParserError::ParserError(
"UPDATE in NOT MATCHED merge clause".to_string(),
));
}
self.expect_keyword(Keyword::SET)?;
let assignments = self.parse_comma_separated(Parser::parse_assignment)?;
MergeClause::MatchedUpdate {
predicate,
assignments,
}
}
Some(Keyword::DELETE) => {
if is_not_matched {
return Err(ParserError::ParserError(
"DELETE in NOT MATCHED merge clause".to_string(),
));
}
MergeClause::MatchedDelete(predicate)
}
Some(Keyword::INSERT) => {
if !is_not_matched {
return Err(ParserError::ParserError(
"INSERT in MATCHED merge clause".to_string(),
));
}
let columns = self.parse_parenthesized_column_list(Optional)?;
self.expect_keyword(Keyword::VALUES)?;
let values = self.parse_values()?;
MergeClause::NotMatched {
predicate,
columns,
values,
}
}
Some(_) => {
return Err(ParserError::ParserError(
"expected UPDATE, DELETE or INSERT in merge clause".to_string(),
))
}
None => {
return Err(ParserError::ParserError(
"expected UPDATE, DELETE or INSERT in merge clause".to_string(),
))
}
},
);
}
Ok(clauses)
}

pub fn parse_merge(&mut self) -> Result<Statement, ParserError> {
self.expect_keyword(Keyword::INTO)?;

let table = self.parse_table_factor()?;

self.expect_keyword(Keyword::USING)?;
let source = self.parse_query_body(0)?;
let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?;
self.expect_keyword(Keyword::ON)?;
let on = self.parse_expr()?;
let clauses = self.parse_merge_clauses()?;

Ok(Statement::Merge {
table,
source: Box::new(source),
alias,
on: Box::new(on),
clauses,
})
}
}

impl Word {
Expand Down
138 changes: 138 additions & 0 deletions tests/sqlparser_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4203,6 +4203,144 @@ fn test_revoke() {
}
}

#[test]
fn parse_merge() {
let sql = "MERGE INTO s.bar AS dest USING (SELECT * FROM s.foo) as stg ON dest.D = stg.D AND dest.E = stg.E WHEN NOT MATCHED THEN INSERT (A, B, C) VALUES (stg.A, stg.B, stg.C) WHEN MATCHED AND dest.A = 'a' THEN UPDATE SET dest.F = stg.F, dest.G = stg.G WHEN MATCHED THEN DELETE";
match verified_stmt(sql) {
Statement::Merge {
table,
source,
alias,
on,
clauses,
} => {
assert_eq!(
table,
TableFactor::Table {
name: ObjectName(vec![Ident::new("s"), Ident::new("bar")]),
alias: Some(TableAlias {
name: Ident::new("dest"),
columns: vec![]
}),
args: vec![],
with_hints: vec![]
}
);
assert_eq!(
source,
Box::new(SetExpr::Query(Box::new(Query {
with: None,
body: SetExpr::Select(Box::new(Select {
distinct: false,
top: None,
projection: vec![SelectItem::Wildcard],
from: vec![TableWithJoins {
relation: TableFactor::Table {
name: ObjectName(vec![Ident::new("s"), Ident::new("foo")]),
alias: None,
args: vec![],
with_hints: vec![],
},
joins: vec![]
}],
lateral_views: vec![],
selection: None,
group_by: vec![],
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None
})),
order_by: vec![],
limit: None,
offset: None,
fetch: None,
lock: None
})))
);
assert_eq!(
alias,
Some(TableAlias {
name: Ident::new("stg"),
columns: vec![]
})
);
assert_eq!(
on,
Box::new(Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("D")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("D")
]))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("E")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("E")
]))
})
})
);
assert_eq!(
clauses,
vec![
MergeClause::NotMatched {
predicate: None,
columns: vec![Ident::new("A"), Ident::new("B"), Ident::new("C")],
values: Values(vec![vec![
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("A")]),
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("B")]),
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("C")]),
]])
},
MergeClause::MatchedUpdate {
predicate: Some(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("A")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"a".to_string()
)))
}),
assignments: vec![
Assignment {
id: vec![Ident::new("dest"), Ident::new("F")],
value: Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("F")
])
},
Assignment {
id: vec![Ident::new("dest"), Ident::new("G")],
value: Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("G")
])
}
]
},
MergeClause::MatchedDelete(None)
]
)
}
_ => unreachable!(),
}
}

#[test]
fn test_lock() {
let sql = "SELECT * FROM student WHERE id = '1' FOR UPDATE";
Expand Down