Skip to content

Add pipeline mode API #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#
# For more information, see https://github.com/haskell-CI/haskell-ci
#
# version: 0.15.20221225
# version: 0.15.20230312
#
# REGENDATA ("0.15.20221225",["github","postgresql-libpq.cabal"])
# REGENDATA ("0.15.20230312",["github","postgresql-libpq.cabal"])
#
name: Haskell-CI
on:
Expand All @@ -27,7 +27,7 @@ jobs:
timeout-minutes:
60
container:
image: buildpack-deps:bionic
image: buildpack-deps:jammy
services:
postgres:
image: postgres:14
Expand Down Expand Up @@ -79,15 +79,15 @@ jobs:
curl -sL https://downloads.haskell.org/ghcup/0.1.18.0/x86_64-linux-ghcup-0.1.18.0 > "$HOME/.ghcup/bin/ghcup"
chmod a+x "$HOME/.ghcup/bin/ghcup"
"$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" || (cat "$HOME"/.ghcup/logs/*.* && false)
"$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
"$HOME/.ghcup/bin/ghcup" install cabal 3.10.1.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
else
apt-add-repository -y 'ppa:hvr/ghc'
apt-get update
apt-get install -y "$HCNAME"
mkdir -p "$HOME/.ghcup/bin"
curl -sL https://downloads.haskell.org/ghcup/0.1.18.0/x86_64-linux-ghcup-0.1.18.0 > "$HOME/.ghcup/bin/ghcup"
chmod a+x "$HOME/.ghcup/bin/ghcup"
"$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
"$HOME/.ghcup/bin/ghcup" install cabal 3.10.1.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
fi
env:
HCKIND: ${{ matrix.compilerKind }}
Expand All @@ -105,13 +105,13 @@ jobs:
echo "HC=$HC" >> "$GITHUB_ENV"
echo "HCPKG=$HOME/.ghcup/bin/$HCKIND-pkg-$HCVER" >> "$GITHUB_ENV"
echo "HADDOCK=$HOME/.ghcup/bin/haddock-$HCVER" >> "$GITHUB_ENV"
echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV"
echo "CABAL=$HOME/.ghcup/bin/cabal-3.10.1.0 -vnormal+nowrap" >> "$GITHUB_ENV"
else
HC=$HCDIR/bin/$HCKIND
echo "HC=$HC" >> "$GITHUB_ENV"
echo "HCPKG=$HCDIR/bin/$HCKIND-pkg" >> "$GITHUB_ENV"
echo "HADDOCK=$HCDIR/bin/haddock" >> "$GITHUB_ENV"
echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV"
echo "CABAL=$HOME/.ghcup/bin/cabal-3.10.1.0 -vnormal+nowrap" >> "$GITHUB_ENV"
fi

HCNUMVER=$(${HC} --numeric-version|perl -ne '/^(\d+)\.(\d+)\.(\d+)(\.(\d+))?$/; print(10000 * $1 + 100 * $2 + ($3 == 0 ? $5 != 1 : $3))')
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
chmod a+x $HOME/.cabal/bin/cabal-plan
cabal-plan --version
- name: checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
path: source
- name: initial cabal.project for sdist
Expand Down Expand Up @@ -205,8 +205,8 @@ jobs:
run: |
$CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH --dry-run all
cabal-plan
- name: cache
uses: actions/cache@v2
- name: restore cache
uses: actions/cache/restore@v3
with:
key: ${{ runner.os }}-${{ matrix.compiler }}-${{ github.sha }}
path: ~/.cabal/store
Expand All @@ -230,8 +230,14 @@ jobs:
${CABAL} -vnormal check
- name: haddock
run: |
$CABAL v2-haddock $ARG_COMPILER --with-haddock $HADDOCK $ARG_TESTS $ARG_BENCH all
$CABAL v2-haddock --disable-documentation $ARG_COMPILER --with-haddock $HADDOCK $ARG_TESTS $ARG_BENCH all
- name: unconstrained build
run: |
rm -f cabal.project.local
$CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks all
- name: save cache
uses: actions/cache/save@v3
if: always()
with:
key: ${{ runner.os }}-${{ matrix.compiler }}-${{ github.sha }}
path: ~/.cabal/store
39 changes: 39 additions & 0 deletions src/Database/PostgreSQL/LibPQ.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ module Database.PostgreSQL.LibPQ
, FlushStatus(..)
, flush

-- * Pipeline Mode
-- $pipeline
, PipelineStatus(..)
, pipelineStatus
, enterPipelineMode
, exitPipelineMode
, pipelineSync
, sendFlushRequest

-- * Cancelling Queries in Progress
-- $cancel
, Cancel
Expand Down Expand Up @@ -1624,6 +1633,36 @@ flush connection =
_ -> return FlushFailed


pipelineStatus :: Connection
-> IO PipelineStatus
pipelineStatus connection = do
stat <- withConn connection c_PQpipelineStatus
maybe
(fail $ "Unknown pipeline status " ++ show stat)
return
(fromCInt stat)

enterPipelineMode :: Connection
-> IO Bool
enterPipelineMode connection =
enumFromConn connection c_PQenterPipelineMode

exitPipelineMode :: Connection
-> IO Bool
exitPipelineMode connection =
enumFromConn connection c_PQexitPipelineMode

pipelineSync :: Connection
-> IO Bool
pipelineSync connection =
enumFromConn connection c_PQpipelineSync

sendFlushRequest :: Connection
-> IO Bool
sendFlushRequest connection =
enumFromConn connection c_PQsendFlushRequest


-- $cancel
-- A client application can request cancellation of a command that is
-- still being processed by the server, using the functions described
Expand Down
52 changes: 41 additions & 11 deletions src/Database/PostgreSQL/LibPQ/Enums.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,36 @@ data ExecStatus
| NonfatalError -- ^ A nonfatal error (a notice or
-- warning) occurred.
| FatalError -- ^ A fatal error occurred.
| SingleTuple -- ^ The PGresult contains a single result tuple
| SingleTuple -- ^ The 'Result' contains a single result tuple
-- from the current command. This status occurs
-- only when single-row mode has been selected
-- for the query.
| PipelineSync -- ^ The 'Result' represents a synchronization
-- point in pipeline mode, requested by
-- 'pipelineSync'. This status occurs only
-- when pipeline mode has been selected.
| PipelineAbort -- ^ The 'Result' represents a pipeline that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a name clash between PGRES_PIPELINE_ABORTED and PQ_PIPELINE_ABORTED

-- has received an error from the server.
-- 'getResult' must be called repeatedly,
-- and each time it will return this status
-- code until the end of the current pipeline,
-- at which point it will return 'PipelineSync'
-- and normal processing can resume.
deriving (Eq, Show)

instance FromCInt ExecStatus where
fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery
fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk
fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk
fromCInt (#const PGRES_COPY_OUT) = Just CopyOut
fromCInt (#const PGRES_COPY_IN) = Just CopyIn
fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth
fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse
fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError
fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError
fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple
fromCInt (#const PGRES_EMPTY_QUERY) = Just EmptyQuery
fromCInt (#const PGRES_COMMAND_OK) = Just CommandOk
fromCInt (#const PGRES_TUPLES_OK) = Just TuplesOk
fromCInt (#const PGRES_COPY_OUT) = Just CopyOut
fromCInt (#const PGRES_COPY_IN) = Just CopyIn
fromCInt (#const PGRES_COPY_BOTH) = Just CopyBoth
fromCInt (#const PGRES_BAD_RESPONSE) = Just BadResponse
fromCInt (#const PGRES_NONFATAL_ERROR) = Just NonfatalError
fromCInt (#const PGRES_FATAL_ERROR) = Just FatalError
fromCInt (#const PGRES_SINGLE_TUPLE) = Just SingleTuple
fromCInt (#const PGRES_PIPELINE_SYNC) = Just PipelineSync
fromCInt (#const PGRES_PIPELINE_ABORTED) = Just PipelineAbort
fromCInt _ = Nothing

instance ToCInt ExecStatus where
Expand All @@ -67,6 +80,8 @@ instance ToCInt ExecStatus where
toCInt NonfatalError = (#const PGRES_NONFATAL_ERROR)
toCInt FatalError = (#const PGRES_FATAL_ERROR)
toCInt SingleTuple = (#const PGRES_SINGLE_TUPLE)
toCInt PipelineSync = (#const PGRES_PIPELINE_SYNC)
toCInt PipelineAbort = (#const PGRES_PIPELINE_ABORTED)


data FieldCode
Expand Down Expand Up @@ -263,6 +278,21 @@ instance FromCInt Format where
fromCInt 1 = Just Binary
fromCInt _ = Nothing

data PipelineStatus
= PipelineOn -- ^ The 'Connection' is in pipeline mode.
| PipelineOff -- ^ The 'Connection' is /not/ in pipeline mode.
| PipelineAborted -- ^ The 'Connection' is in pipeline mode and an error
-- occurred while processing the current pipeline. The
-- aborted flag is cleared when 'getResult' returns a
-- result with status 'PipelineSync'.
deriving (Eq, Show)

instance FromCInt PipelineStatus where
fromCInt (#const PQ_PIPELINE_ON) = return PipelineOn
fromCInt (#const PQ_PIPELINE_OFF) = return PipelineOff
fromCInt (#const PQ_PIPELINE_ABORTED) = return PipelineAborted
fromCInt _ = Nothing

-------------------------------------------------------------------------------
-- System.IO enumerations
-------------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions src/Database/PostgreSQL/LibPQ/FFI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,21 @@ foreign import capi unsafe "hs-libpq.h &PQfreemem"
foreign import capi unsafe "hs-libpq.h PQfreemem"
c_PQfreemem :: Ptr a -> IO ()

foreign import capi "hs-libpq.h PQpipelineStatus"
c_PQpipelineStatus :: Ptr PGconn -> IO CInt

foreign import capi "hs-libpq.h PQenterPipelineMode"
c_PQenterPipelineMode :: Ptr PGconn -> IO CInt

foreign import capi "hs-libpq.h PQexitPipelineMode"
c_PQexitPipelineMode :: Ptr PGconn -> IO CInt

foreign import capi "hs-libpq.h PQpipelineSync"
c_PQpipelineSync :: Ptr PGconn -> IO CInt

foreign import capi "hs-libpq.h PQsendFlushRequest"
c_PQsendFlushRequest :: Ptr PGconn -> IO CInt

-------------------------------------------------------------------------------
-- FFI imports: noticebuffers
-------------------------------------------------------------------------------
Expand Down
38 changes: 37 additions & 1 deletion test/Smoke.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import qualified Data.ByteString.Char8 as BS8
main :: IO ()
main = do
libpqVersion >>= print
withConnstring smoke
withConnstring $ \connstring -> do
smoke connstring
testPipeline connstring

withConnstring :: (BS8.ByteString -> IO ()) -> IO ()
withConnstring kont = do
Expand Down Expand Up @@ -48,8 +50,42 @@ smoke connstring = do
transactionStatus conn >>= print
protocolVersion conn >>= print
serverVersion conn >>= print
pipelineStatus conn >>= print

s <- status conn
unless (s == ConnectionOk) exitFailure

finish conn

testPipeline :: BS8.ByteString -> IO ()
testPipeline connstring = do
conn <- connectdb connstring

setnonblocking conn True `shouldReturn` True
enterPipelineMode conn `shouldReturn` True
pipelineStatus conn `shouldReturn` PipelineOn
sendQueryParams conn (BS8.pack "select 1") [] Text `shouldReturn` True
sendQueryParams conn (BS8.pack "select 2") [] Text `shouldReturn` True
pipelineSync conn `shouldReturn` True

Just r1 <- getResult conn
resultStatus r1 `shouldReturn` TuplesOk
getvalue r1 0 0 `shouldReturn` Just (BS8.pack "1")
Nothing <- getResult conn

Just r2 <- getResult conn
getvalue r2 0 0 `shouldReturn` Just (BS8.pack "2")
Nothing <- getResult conn

Just r3 <- getResult conn
resultStatus r3 `shouldReturn` PipelineSync

finish conn
where
shouldBe r value =
unless (r == value) $ do
print $ "expected " <> show value <> ", got " <> show r
exitFailure
shouldReturn action value = do
r <- action
r `shouldBe` value