module Network.JsonRpc.Interface
(
JsonRpcT
, runJsonRpcT
, decodeConduit
, encodeConduit
, sendRequest
, sendNotif
, receiveNotif
, dummyRespond
, dummySrv
, jsonRpcTcpClient
, jsonRpcTcpServer
) where
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.Trans.State
import Control.Monad.Trans.Control
import Data.Aeson
import Data.Aeson.Types (parseMaybe)
import Data.Attoparsec.ByteString
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as L8
import Data.Either
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as M
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Network
import Data.Conduit.TMChan
import qualified Data.Text as T
import Network.JsonRpc.Data
type SentRequests = HashMap Id (TMVar (Either Err Response))
data Session = Session { inCh :: TBMChan (Either Err Message)
, outCh :: TBMChan Message
, notifCh :: TBMChan (Either Err Notif)
, lastId :: TVar Id
, sentReqs :: TVar SentRequests
, rpcVer :: Ver
}
type JsonRpcT = ReaderT Session
initSession :: Ver -> STM Session
initSession v = Session <$> newTBMChan 16
<*> newTBMChan 16
<*> newTBMChan 16
<*> newTVar (IdInt 0)
<*> newTVar M.empty
<*> return v
encodeConduit :: MonadLogger m => Conduit Message m ByteString
encodeConduit = CL.mapM $ \m -> do
$(logDebug) $ T.pack $ unwords $ case m of
MsgError e -> [ "Sending error id:", fromId (getErrId e) ]
MsgRequest q -> [ "Sending request id:", fromId (getReqId q) ]
MsgNotif _ -> [ "Sending notification" ]
MsgResponse r -> [ "Sending response id:", fromId (getResId r) ]
return . L8.toStrict $ encode m
decodeConduit :: MonadLogger m
=> Ver -> Conduit ByteString m (Either Err Message)
decodeConduit ver = evalStateT loop Nothing where
loop = lift await >>= maybe flush process
flush = get >>= \kM -> case kM of Nothing -> return ()
Just k -> handle (k B8.empty)
process = runParser >=> handle
runParser ck = maybe (parse json' ck) ($ ck) <$> get <* put Nothing
handle (Fail {}) = do
$(logWarn) "Error parsing incoming message"
lift . yield . Left $ Err ver (errorParse Null) IdNull
loop
handle (Partial k) = put (Just k) >> loop
handle (Done rest v) = do
let msg = decod v
when (isLeft msg) $ $(logWarn) "Received invalid message"
lift $ yield msg
if B8.null rest then loop else process rest
decod v = case parseMaybe parseJSON v of
Just msg -> Right msg
Nothing -> Left $ Err ver (errorInvalid v) IdNull
processIncoming :: (Functor m, MonadLoggerIO m, FromRequest q, ToJSON r)
=> Respond q m r -> JsonRpcT m ()
processIncoming r = do
i <- reader inCh
o <- reader outCh
n <- reader notifCh
s <- reader sentReqs
v <- reader rpcVer
join . liftIO . atomically $ readTBMChan i >>= \inc -> case inc of
Nothing -> return $ do
$(logDebug) "Closed incoming channel"
return ()
Just (Left e) -> do
writeTBMChan o (MsgError e)
return $ processIncoming r
Just (Right (MsgNotif t)) -> do
writeTBMChan n (Right t)
return $ do
$(logDebug) "Received notification"
processIncoming r
Just (Right (MsgRequest q)) -> return $ do
$(logDebug) $ T.pack $ unwords
[ "Received request id:", fromId (getReqId q) ]
msg <- lift $ either MsgError MsgResponse <$> buildResponse r q
liftIO . atomically $ writeTBMChan o msg
processIncoming r
Just (Right (MsgResponse res@(Response _ _ x))) -> do
m <- readTVar s
let pM = x `M.lookup` m
case pM of
Nothing ->
writeTBMChan o . MsgError $ Err v (errorId x) IdNull
Just p ->
writeTVar s (x `M.delete` m) >> putTMVar p (Right res)
return $ do
case pM of
Nothing -> $(logWarn) $ T.pack $ unwords
[ "Got response with unkwnown id:", fromId x ]
_ -> $(logDebug) $ T.pack $ unwords
[ "Received response id:", fromId x ]
processIncoming r
Just (Right (MsgError err@(Err _ _ IdNull))) -> do
writeTBMChan n $ Left err
return $ do
$(logWarn) "Got standalone error message"
processIncoming r
Just (Right (MsgError err@(Err _ _ x))) -> do
m <- readTVar s
let pM = x `M.lookup` m
case pM of
Nothing ->
writeTBMChan o . MsgError $ Err v (errorId x) IdNull
Just p ->
writeTVar s (x `M.delete` m) >> putTMVar p (Left err)
return $ do
case pM of
Nothing -> $(logWarn) $ T.pack $ unwords
[ "Got error with unknown id:", fromId x ]
_ -> $(logWarn) $ T.pack $ unwords
[ "Received error id:", fromId x ]
processIncoming r
sendRequest :: (MonadLoggerIO m, ToJSON q, ToRequest q, FromResponse r)
=> q -> JsonRpcT m (Either ErrorObj (Maybe r))
sendRequest q = do
v <- reader rpcVer
l <- reader lastId
s <- reader sentReqs
o <- reader outCh
p <- liftIO . atomically $ do
p <- newEmptyTMVar
i <- succ <$> readTVar l
m <- readTVar s
let req = buildRequest v q i
writeTVar s $ M.insert i p m
writeTBMChan o $ MsgRequest req
writeTVar l i
return p
liftIO . atomically $ takeTMVar p >>= \pE -> case pE of
Left e -> return . Left $ getErrObj e
Right y -> case fromResponse (requestMethod q) y of
Nothing -> return $ Right Nothing
Just x -> return . Right $ Just x
sendNotif :: (ToJSON no, ToNotif no, MonadLoggerIO m) => no -> JsonRpcT m ()
sendNotif n = do
o <- reader outCh
v <- reader rpcVer
let notif = buildNotif v n
liftIO . atomically $ writeTBMChan o (MsgNotif notif)
receiveNotif :: (MonadLoggerIO m, FromNotif n)
=> JsonRpcT m (Maybe (Either ErrorObj (Maybe n)))
receiveNotif = do
c <- reader notifCh
liftIO . atomically $ readTBMChan c >>= \nM -> case nM of
Nothing -> return Nothing
Just (Left e) -> return . Just . Left $ getErrObj e
Just (Right n) -> case fromNotif n of
Nothing -> return . Just $ Right Nothing
Just x -> return . Just . Right $ Just x
runJsonRpcT :: ( MonadLoggerIO m, MonadBaseControl IO m
, FromRequest q, ToJSON r
)
=> Ver
-> Respond q m r
-> Sink Message m ()
-> Source m (Either Err Message)
-> JsonRpcT m a
-> m a
runJsonRpcT ver r snk src f = do
qs <- liftIO . atomically $ initSession ver
let inSnk = sinkTBMChan (inCh qs) True
outSrc = sourceTBMChan (outCh qs)
withAsync (src $$ inSnk) $ const $
withAsync (outSrc $$ snk) $ const $
withAsync (runReaderT (processIncoming r) qs) $ const $
runReaderT f qs
cr :: Monad m => Conduit ByteString m ByteString
cr = CL.map (`B8.snoc` '\n')
ln :: Monad m => Conduit ByteString m ByteString
ln = await >>= \bsM -> case bsM of
Nothing -> return ()
Just bs -> let (l, ls) = B8.break (=='\n') bs in case ls of
"" -> await >>= \bsM' -> case bsM' of
Nothing -> unless (B8.null l) $ yield l
Just bs' -> leftover (bs `B8.append` bs') >> ln
_ -> case l of
"" -> leftover (B8.tail ls) >> ln
_ -> leftover (B8.tail ls) >> yield l >> ln
dummySrv :: MonadLoggerIO m => JsonRpcT m ()
dummySrv = receiveNotif >>= \nM -> case nM of
Just n -> (n :: Either ErrorObj (Maybe ())) `seq` dummySrv
Nothing -> return ()
dummyRespond :: MonadLoggerIO m => Respond () m ()
dummyRespond = const . return $ Right ()
jsonRpcTcpClient
:: ( MonadLoggerIO m, MonadBaseControl IO m
, FromRequest q, ToJSON r
)
=> Ver
-> ClientSettings
-> Respond q m r
-> JsonRpcT m a
-> m a
jsonRpcTcpClient ver cs r f = runGeneralTCPClient cs $ \ad ->
runJsonRpcT ver r
(encodeConduit =$ cr =$ appSink ad)
(appSource ad $= ln $= decodeConduit ver) f
jsonRpcTcpServer
:: ( MonadLoggerIO m, MonadBaseControl IO m
, FromRequest q, ToJSON r)
=> Ver
-> ServerSettings
-> Respond q m r
-> JsonRpcT m ()
-> m a
jsonRpcTcpServer ver ss r f = runGeneralTCPServer ss $ \cl ->
runJsonRpcT ver r
(encodeConduit =$ cr =$ appSink cl)
(appSource cl $= ln $= decodeConduit ver) f