flm01/server/api/flukso/deps/mysql/src/mysql.erl
Bart Van Der Meerssche 3a28838b29 change repository layout
2010-07-07 16:37:03 +02:00

860 lines
26 KiB
Erlang

%%% File : mysql.erl
%%% Author : Magnus Ahltorp <ahltorp@nada.kth.se>
%%% Descrip.: MySQL client.
%%%
%%% Created : 4 Aug 2005 by Magnus Ahltorp <ahltorp@nada.kth.se>
%%%
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
%%% See the file COPYING
%%%
%%% Modified: 9/12/2006 by Yariv Sadan <yarivvv@gmail.com>
%%% Note: Added support for prepared statements,
%%% transactions, better connection pooling, more efficient logging
%%% and made other internal enhancements.
%%%
%%% Modified: 9/23/2006 Rewrote the transaction handling code to
%%% provide a simpler, Mnesia-style transaction interface. Also,
%%% moved much of the prepared statement handling code to mysql_conn.erl
%%% and added versioning to prepared statements.
%%%
%%%
%%% Usage:
%%%
%%%
%%% Call one of the start-functions before any call to fetch/2
%%%
%%% start_link(PoolId, Host, User, Password, Database)
%%% start_link(PoolId, Host, Port, User, Password, Database)
%%% start_link(PoolId, Host, User, Password, Database, LogFun)
%%% start_link(PoolId, Host, Port, User, Password, Database, LogFun)
%%%
%%% (These functions also have non-linking coutnerparts.)
%%%
%%% PoolId is a connection pool identifier. If you want to have more
%%% than one connection to a server (or a set of MySQL replicas),
%%% add more with
%%%
%%% connect(PoolId, Host, Port, User, Password, Database, Reconnect)
%%%
%%% use 'undefined' as Port to get default MySQL port number (3306).
%%% MySQL querys will be sent in a per-PoolId round-robin fashion.
%%% Set Reconnect to 'true' if you want the dispatcher to try and
%%% open a new connection, should this one die.
%%%
%%% When you have a mysql_dispatcher running, this is how you make a
%%% query :
%%%
%%% fetch(PoolId, "select * from hello") -> Result
%%% Result = {data, MySQLRes} | {updated, MySQLRes} |
%%% {error, MySQLRes}
%%%
%%% Actual data can be extracted from MySQLRes by calling the following API
%%% functions:
%%% - on data received:
%%% FieldInfo = mysql:get_result_field_info(MysqlRes)
%%% AllRows = mysql:get_result_rows(MysqlRes)
%%% with FieldInfo = list() of {Table, Field, Length, Name}
%%% and AllRows = list() of list() representing records
%%% - on update:
%%% Affected = mysql:get_result_affected_rows(MysqlRes)
%%% with Affected = integer()
%%% - on error:
%%% Reason = mysql:get_result_reason(MysqlRes)
%%% with Reason = string()
%%%
%%% If you just want a single MySQL connection, or want to manage your
%%% connections yourself, you can use the mysql_conn module as a
%%% stand-alone single MySQL connection. See the comment at the top of
%%% mysql_conn.erl.
-module(mysql).
-behaviour(gen_server).
%% @type mysql_result() = term()
%% @type query_result = {data, mysql_result()} | {updated, mysql_result()} |
%% {error, mysql_result()}
%% External exports
-export([start_link/5,
start_link/6,
start_link/7,
start_link/8,
start/5,
start/6,
start/7,
start/8,
connect/7,
connect/8,
connect/9,
fetch/1,
fetch/2,
fetch/3,
prepare/2,
execute/1,
execute/2,
execute/3,
execute/4,
unprepare/1,
get_prepared/1,
get_prepared/2,
transaction/2,
transaction/3,
get_result_field_info/1,
get_result_rows/1,
get_result_affected_rows/1,
get_result_reason/1,
encode/1,
encode/2,
asciz_binary/2
]).
%% Internal exports - just for mysql_* modules
-export([log/4
]).
%% Internal exports - gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% Records
-include("mysql.hrl").
-record(conn, {
pool_id, %% atom(), the pool's id
pid, %% pid(), mysql_conn process
reconnect, %% true | false, should mysql_dispatcher try
%% to reconnect if this connection dies?
host, %% string()
port, %% integer()
user, %% string()
password, %% string()
database, %% string()
encoding
}).
-record(state, {
%% gb_tree mapping connection
%% pool id to a connection pool tuple
conn_pools = gb_trees:empty(),
%% gb_tree mapping connection Pid
%% to pool id
pids_pools = gb_trees:empty(),
%% function for logging,
log_fun,
%% maps names to {Statement::binary(), Version::integer()} values
prepares = gb_trees:empty()
}).
%% Macros
-define(SERVER, mysql_dispatcher).
-define(STATE_VAR, mysql_connection_state).
-define(CONNECT_TIMEOUT, 5000).
-define(LOCAL_FILES, 128).
-define(PORT, 3306).
%% used for debugging
-define(L(Msg), io:format("~p:~b ~p ~n", [?MODULE, ?LINE, Msg])).
%% Log messages are designed to instantiated lazily only if the logging level
%% permits a log message to be logged
-define(Log(LogFun,Level,Msg),
LogFun(?MODULE,?LINE,Level,fun()-> {Msg,[]} end)).
-define(Log2(LogFun,Level,Msg,Params),
LogFun(?MODULE,?LINE,Level,fun()-> {Msg,Params} end)).
log(Module, Line, _Level, FormatFun) ->
{Format, Arguments} = FormatFun(),
io:format("~w:~b: "++ Format ++ "~n", [Module, Line] ++ Arguments).
%% External functions
%% @doc Starts the MySQL client gen_server process.
%%
%% The Port and LogFun parameters are optional.
%%
%% @spec start_link(PoolId::atom(), Host::string(), Port::integer(),
%% Username::string(), Password::string(), Database::string(),
%% LogFun::undefined | function() of arity 4) ->
%% {ok, Pid} | ignore | {error, Err}
start_link(PoolId, Host, User, Password, Database) ->
start_link(PoolId, Host, ?PORT, User, Password, Database).
start_link(PoolId, Host, Port, User, Password, Database) ->
start_link(PoolId, Host, Port, User, Password, Database, undefined,
undefined).
start_link(PoolId, Host, undefined, User, Password, Database, LogFun) ->
start_link(PoolId, Host, ?PORT, User, Password, Database, LogFun,
undefined);
start_link(PoolId, Host, Port, User, Password, Database, LogFun) ->
start_link(PoolId, Host, Port, User, Password, Database, LogFun,
undefined).
start_link(PoolId, Host, undefined, User, Password, Database, LogFun,
Encoding) ->
start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding,
start_link);
start_link(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) ->
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
start_link).
%% @doc These functions are similar to their start_link counterparts,
%% but they call gen_server:start() instead of gen_server:start_link()
start(PoolId, Host, User, Password, Database) ->
start(PoolId, Host, ?PORT, User, Password, Database).
start(PoolId, Host, Port, User, Password, Database) ->
start(PoolId, Host, Port, User, Password, Database, undefined).
start(PoolId, Host, undefined, User, Password, Database, LogFun) ->
start(PoolId, Host, ?PORT, User, Password, Database, LogFun);
start(PoolId, Host, Port, User, Password, Database, LogFun) ->
start(PoolId, Host, Port, User, Password, Database, LogFun, undefined).
start(PoolId, Host, undefined, User, Password, Database, LogFun, Encoding) ->
start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding,
start);
start(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) ->
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
start).
start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding,
StartFunc) ->
crypto:start(),
gen_server:StartFunc(
{local, ?SERVER}, ?MODULE,
[PoolId, Host, Port, User, Password, Database, LogFun, Encoding], []).
%% @equiv connect(PoolId, Host, Port, User, Password, Database, Encoding,
%% Reconnect, true)
connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect) ->
connect(PoolId, Host, Port, User, Password, Database, Encoding,
Reconnect, true).
%% @doc Starts a MySQL connection and, if successful, add it to the
%% connection pool in the dispatcher.
%%
%% @spec: connect(PoolId::atom(), Host::string(), Port::integer() | undefined,
%% User::string(), Password::string(), Database::string(),
%% Encoding::string(), Reconnect::bool(), LinkConnection::bool()) ->
%% {ok, ConnPid} | {error, Reason}
connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect,
LinkConnection) ->
Port1 = if Port == undefined -> ?PORT; true -> Port end,
Fun = if LinkConnection ->
fun mysql_conn:start_link/8;
true ->
fun mysql_conn:start/8
end,
{ok, LogFun} = gen_server:call(?SERVER, get_logfun),
case Fun(Host, Port1, User, Password, Database, LogFun,
Encoding, PoolId) of
{ok, ConnPid} ->
Conn = new_conn(PoolId, ConnPid, Reconnect, Host, Port1, User,
Password, Database, Encoding),
case gen_server:call(
?SERVER, {add_conn, Conn}) of
ok ->
{ok, ConnPid};
Res ->
Res
end;
Err->
Err
end.
new_conn(PoolId, ConnPid, Reconnect, Host, Port, User, Password, Database,
Encoding) ->
case Reconnect of
true ->
#conn{pool_id = PoolId,
pid = ConnPid,
reconnect = true,
host = Host,
port = Port,
user = User,
password = Password,
database = Database,
encoding = Encoding
};
false ->
#conn{pool_id = PoolId,
pid = ConnPid,
reconnect = false}
end.
%% @doc Fetch a query inside a transaction.
%%
%% @spec fetch(Query::iolist()) -> query_result()
fetch(Query) ->
case get(?STATE_VAR) of
undefined ->
{error, not_in_transaction};
State ->
mysql_conn:fetch_local(State, Query)
end.
%% @doc Send a query to a connection from the connection pool and wait
%% for the result. If this function is called inside a transaction,
%% the PoolId parameter is ignored.
%%
%% @spec fetch(PoolId::atom(), Query::iolist(), Timeout::integer()) ->
%% query_result()
fetch(PoolId, Query) ->
fetch(PoolId, Query, undefined).
fetch(PoolId, Query, Timeout) ->
case get(?STATE_VAR) of
undefined ->
call_server({fetch, PoolId, Query}, Timeout);
State ->
mysql_conn:fetch_local(State, Query)
end.
%% @doc Register a prepared statement with the dispatcher. This call does not
%% prepare the statement in any connections. The statement is prepared
%% lazily in each connection when it is told to execute the statement.
%% If the Name parameter matches the name of a statement that has
%% already been registered, the version of the statement is incremented
%% and all connections that have already prepared the statement will
%% prepare it again with the newest version.
%%
%% @spec prepare(Name::atom(), Query::iolist()) -> ok
prepare(Name, Query) ->
gen_server:cast(?SERVER, {prepare, Name, Query}).
%% @doc Unregister a statement that has previously been register with
%% the dispatcher. All calls to execute() with the given statement
%% will fail once the statement is unprepared. If the statement hasn't
%% been prepared, nothing happens.
%%
%% @spec unprepare(Name::atom()) -> ok
unprepare(Name) ->
gen_server:cast(?SERVER, {unprepare, Name}).
%% @doc Get the prepared statement with the given name.
%%
%% This function is called from mysql_conn when the connection is
%% told to execute a prepared statement it has not yet prepared, or
%% when it is told to execute a statement inside a transaction and
%% it's not sure that it has the latest version of the statement.
%%
%% If the latest version of the prepared statement matches the Version
%% parameter, the return value is {ok, latest}. This saves the cost
%% of sending the query when the connection already has the latest version.
%%
%% @spec get_prepared(Name::atom(), Version::integer()) ->
%% {ok, latest} | {ok, Statement::binary()} | {error, Err}
get_prepared(Name) ->
get_prepared(Name, undefined).
get_prepared(Name, Version) ->
gen_server:call(?SERVER, {get_prepared, Name, Version}).
%% @doc Execute a query inside a transaction.
%%
%% @spec execute(Name::atom, Params::[term()]) -> mysql_result()
execute(Name) ->
execute(Name, []).
execute(Name, Params) when is_atom(Name), is_list(Params) ->
case get(?STATE_VAR) of
undefined ->
{error, not_in_transaction};
State ->
mysql_conn:execute_local(State, Name, Params)
end;
%% @doc Execute a query in the connection pool identified by
%% PoolId. This function optionally accepts a list of parameters to pass
%% to the prepared statement and a Timeout parameter.
%% If this function is called inside a transaction, the PoolId paramter is
%% ignored.
%%
%% @spec execute(PoolId::atom(), Name::atom(), Params::[term()],
%% Timeout::integer()) -> mysql_result()
execute(PoolId, Name) when is_atom(PoolId), is_atom(Name) ->
execute(PoolId, Name, []).
execute(PoolId, Name, Timeout) when is_integer(Timeout) ->
execute(PoolId, Name, [], Timeout);
execute(PoolId, Name, Params) when is_list(Params) ->
execute(PoolId, Name, Params, undefined).
execute(PoolId, Name, Params, Timeout) ->
case get(?STATE_VAR) of
undefined ->
call_server({execute, PoolId, Name, Params}, Timeout);
State ->
case mysql_conn:execute_local(State, Name, Params) of
{ok, Res, NewState} ->
put(?STATE_VAR, NewState),
Res;
Err ->
Err
end
end.
%% @doc Execute a transaction in a connection belonging to the connection pool.
%% Fun is a function containing a sequence of calls to fetch() and/or
%% execute().
%% If an error occurs, or if the function does any of the following:
%%
%% - throw(error)
%% - throw({error, Err})
%% - return error
%% - return {error, Err}
%% - exit(Reason)
%%
%% the transaction is automatically rolled back.
%%
%% @spec transaction(PoolId::atom(), Fun::function()) ->
%% {atomic, Result} | {aborted, {Reason, {rollback_result, Result}}}
transaction(PoolId, Fun) ->
transaction(PoolId, Fun, undefined).
transaction(PoolId, Fun, Timeout) ->
case get(?STATE_VAR) of
undefined ->
call_server({transaction, PoolId, Fun}, Timeout);
State ->
case mysql_conn:get_pool_id(State) of
PoolId ->
case catch Fun() of
error = Err -> throw(Err);
{error, _} = Err -> throw(Err);
{'EXIT', _} = Err -> throw(Err);
Other -> {atomic, Other}
end;
_Other ->
call_server({transaction, PoolId, Fun}, Timeout)
end
end.
%% @doc Extract the FieldInfo from MySQL Result on data received.
%%
%% @spec get_result_field_info(MySQLRes::mysql_result()) ->
%% [{Table, Field, Length, Name}]
get_result_field_info(#mysql_result{fieldinfo = FieldInfo}) ->
FieldInfo.
%% @doc Extract the Rows from MySQL Result on data received
%%
%% @spec get_result_rows(MySQLRes::mysql_result()) -> [Row::list()]
get_result_rows(#mysql_result{rows=AllRows}) ->
AllRows.
%% @doc Extract the Rows from MySQL Result on update
%%
%% @spec get_result_affected_rows(MySQLRes::mysql_result()) ->
%% AffectedRows::integer()
get_result_affected_rows(#mysql_result{affectedrows=AffectedRows}) ->
AffectedRows.
%% @doc Extract the error Reason from MySQL Result on error
%%
%% @spec get_result_reason(MySQLRes::mysql_result()) ->
%% Reason::string()
get_result_reason(#mysql_result{error=Reason}) ->
Reason.
connect(PoolId, Host, undefined, User, Password, Database, Reconnect) ->
connect(PoolId, Host, ?PORT, User, Password, Database, undefined,
Reconnect).
%% gen_server callbacks
init([PoolId, Host, Port, User, Password, Database, LogFun, Encoding]) ->
LogFun1 = if LogFun == undefined -> fun log/4; true -> LogFun end,
case mysql_conn:start(Host, Port, User, Password, Database, LogFun1,
Encoding, PoolId) of
{ok, ConnPid} ->
Conn = new_conn(PoolId, ConnPid, true, Host, Port, User, Password,
Database, Encoding),
State = #state{log_fun = LogFun1},
{ok, add_conn(Conn, State)};
{error, Reason} ->
?Log(LogFun1, error,
"failed starting first MySQL connection handler, "
"exiting"),
{stop, {error, Reason}}
end.
handle_call({fetch, PoolId, Query}, From, State) ->
fetch_queries(PoolId, From, State, [Query]);
handle_call({get_prepared, Name, Version}, _From, State) ->
case gb_trees:lookup(Name, State#state.prepares) of
none ->
{reply, {error, {undefined, Name}}, State};
{value, {_StmtBin, Version1}} when Version1 == Version ->
{reply, {ok, latest}, State};
{value, Stmt} ->
{reply, {ok, Stmt}, State}
end;
handle_call({execute, PoolId, Name, Params}, From, State) ->
with_next_conn(
PoolId, State,
fun(Conn, State1) ->
case gb_trees:lookup(Name, State1#state.prepares) of
none ->
{reply, {error, {no_such_statement, Name}}, State1};
{value, {_Stmt, Version}} ->
mysql_conn:execute(Conn#conn.pid, Name,
Version, Params, From),
{noreply, State1}
end
end);
handle_call({transaction, PoolId, Fun}, From, State) ->
with_next_conn(
PoolId, State,
fun(Conn, State1) ->
mysql_conn:transaction(Conn#conn.pid, Fun, From),
{noreply, State1}
end);
handle_call({add_conn, Conn}, _From, State) ->
NewState = add_conn(Conn, State),
{PoolId, ConnPid} = {Conn#conn.pool_id, Conn#conn.pid},
LogFun = State#state.log_fun,
?Log2(LogFun, normal,
"added connection with id '~p' (pid ~p) to my list",
[PoolId, ConnPid]),
{reply, ok, NewState};
handle_call(get_logfun, _From, State) ->
{reply, {ok, State#state.log_fun}, State}.
handle_cast({prepare, Name, Stmt}, State) ->
LogFun = State#state.log_fun,
Version1 =
case gb_trees:lookup(Name, State#state.prepares) of
{value, {_Stmt, Version}} ->
Version + 1;
none ->
1
end,
?Log2(LogFun, debug,
"received prepare/2: ~p (ver ~p) ~p", [Name, Version1, Stmt]),
{noreply, State#state{prepares =
gb_trees:enter(Name, {Stmt, Version1},
State#state.prepares)}};
handle_cast({unprepare, Name}, State) ->
LogFun = State#state.log_fun,
?Log2(LogFun, debug, "received unprepare/1: ~p", [Name]),
State1 =
case gb_trees:lookup(Name, State#state.prepares) of
none ->
?Log2(LogFun, warn, "trying to unprepare a non-existing "
"statement: ~p", [Name]),
State;
{value, _Stmt} ->
State#state{prepares =
gb_trees:delete(Name, State#state.prepares)}
end,
{noreply, State1}.
%% Called when a connection to the database has been lost. If
%% The 'reconnect' flag was set to true for the connection, we attempt
%% to establish a new connection to the database.
handle_info({'DOWN', _MonitorRef, process, Pid, Info}, State) ->
LogFun = State#state.log_fun,
case remove_conn(Pid, State) of
{ok, Conn, NewState} ->
LogLevel = case Info of
normal -> normal;
_ -> error
end,
?Log2(LogFun, LogLevel,
"connection pid ~p exited : ~p", [Pid, Info]),
case Conn#conn.reconnect of
true ->
start_reconnect(Conn, LogFun);
false ->
ok
end,
{noreply, NewState};
error ->
?Log2(LogFun, error,
"received 'DOWN' signal from pid ~p not in my list", [Pid]),
{noreply, State}
end.
terminate(Reason, State) ->
LogFun = State#state.log_fun,
LogLevel = case Reason of
normal -> debug;
_ -> error
end,
?Log2(LogFun, LogLevel, "terminating with reason: ~p", [Reason]),
Reason.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Internal functions
fetch_queries(PoolId, From, State, QueryList) ->
with_next_conn(
PoolId, State,
fun(Conn, State1) ->
Pid = Conn#conn.pid,
mysql_conn:fetch(Pid, QueryList, From),
{noreply, State1}
end).
with_next_conn(PoolId, State, Fun) ->
case get_next_conn(PoolId, State) of
{ok, Conn, NewState} ->
Fun(Conn, NewState);
error ->
%% we have no active connection matching PoolId
{reply, {error, {no_connection_in_pool, PoolId}}, State}
end.
call_server(Msg, Timeout) ->
if Timeout == undefined ->
gen_server:call(?SERVER, Msg);
true ->
gen_server:call(?SERVER, Msg, Timeout)
end.
add_conn(Conn, State) ->
Pid = Conn#conn.pid,
erlang:monitor(process, Conn#conn.pid),
PoolId = Conn#conn.pool_id,
ConnPools = State#state.conn_pools,
NewPool =
case gb_trees:lookup(PoolId, ConnPools) of
none ->
{[Conn],[]};
{value, {Unused, Used}} ->
{[Conn | Unused], Used}
end,
State#state{conn_pools =
gb_trees:enter(PoolId, NewPool,
ConnPools),
pids_pools = gb_trees:enter(Pid, PoolId,
State#state.pids_pools)}.
remove_pid_from_list(Pid, Conns) ->
lists:foldl(
fun(OtherConn, {NewConns, undefined}) ->
if OtherConn#conn.pid == Pid ->
{NewConns, OtherConn};
true ->
{[OtherConn | NewConns], undefined}
end;
(OtherConn, {NewConns, FoundConn}) ->
{[OtherConn|NewConns], FoundConn}
end, {[],undefined}, lists:reverse(Conns)).
remove_pid_from_lists(Pid, Conns1, Conns2) ->
case remove_pid_from_list(Pid, Conns1) of
{NewConns1, undefined} ->
{NewConns2, Conn} = remove_pid_from_list(Pid, Conns2),
{Conn, {NewConns1, NewConns2}};
{NewConns1, Conn} ->
{Conn, {NewConns1, Conns2}}
end.
remove_conn(Pid, State) ->
PidsPools = State#state.pids_pools,
case gb_trees:lookup(Pid, PidsPools) of
none ->
error;
{value, PoolId} ->
ConnPools = State#state.conn_pools,
case gb_trees:lookup(PoolId, ConnPools) of
none ->
error;
{value, {Unused, Used}} ->
{Conn, NewPool} = remove_pid_from_lists(Pid, Unused, Used),
NewConnPools = gb_trees:enter(PoolId, NewPool, ConnPools),
{ok, Conn, State#state{conn_pools = NewConnPools,
pids_pools =
gb_trees:delete(Pid, PidsPools)}}
end
end.
get_next_conn(PoolId, State) ->
ConnPools = State#state.conn_pools,
case gb_trees:lookup(PoolId, ConnPools) of
none ->
error;
{value, {[],[]}} ->
error;
%% We maintain 2 lists: one for unused connections and one for used
%% connections. When we run out of unused connections, we recycle
%% the list of used connections.
{value, {[], Used}} ->
[Conn | Conns] = lists:reverse(Used),
{ok, Conn,
State#state{conn_pools =
gb_trees:enter(PoolId, {Conns, [Conn]}, ConnPools)}};
{value, {[Conn|Unused], Used}} ->
{ok, Conn, State#state{
conn_pools =
gb_trees:enter(PoolId, {Unused, [Conn|Used]},
ConnPools)}}
end.
start_reconnect(Conn, LogFun) ->
Pid = spawn(fun () ->
reconnect_loop(Conn#conn{pid = undefined}, LogFun, 0)
end),
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
?Log2(LogFun, debug,
"started pid ~p to try and reconnect to ~p:~s:~p (replacing "
"connection with pid ~p)",
[Pid, PoolId, Host, Port, Conn#conn.pid]),
ok.
reconnect_loop(Conn, LogFun, N) ->
{PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port},
case connect(PoolId,
Host,
Port,
Conn#conn.user,
Conn#conn.password,
Conn#conn.database,
Conn#conn.encoding,
Conn#conn.reconnect) of
{ok, ConnPid} ->
?Log2(LogFun, debug,
"managed to reconnect to ~p:~s:~p "
"(connection pid ~p)", [PoolId, Host, Port, ConnPid]),
ok;
{error, Reason} ->
%% log every once in a while
NewN = case N of
10 ->
?Log2(LogFun, debug,
"reconnect: still unable to connect to "
"~p:~s:~p (~p)", [PoolId, Host, Port, Reason]),
0;
_ ->
N + 1
end,
%% sleep between every unsuccessful attempt
timer:sleep(5 * 1000),
reconnect_loop(Conn, LogFun, NewN)
end.
%% @doc Encode a value so that it can be included safely in a MySQL query.
%%
%% @spec encode(Val::term(), AsBinary::bool()) ->
%% string() | binary() | {error, Error}
encode(Val) ->
encode(Val, false).
encode(Val, false) when Val == undefined; Val == null ->
"null";
encode(Val, true) when Val == undefined; Val == null ->
<<"null">>;
encode(Val, false) when is_binary(Val) ->
binary_to_list(quote(Val));
encode(Val, true) when is_binary(Val) ->
quote(Val);
encode(Val, true) ->
list_to_binary(encode(Val,false));
encode(Val, false) when is_atom(Val) ->
quote(atom_to_list(Val));
encode(Val, false) when is_list(Val) ->
quote(Val);
encode(Val, false) when is_integer(Val) ->
integer_to_list(Val);
encode(Val, false) when is_float(Val) ->
[Res] = io_lib:format("~w", [Val]),
Res;
encode({datetime, Val}, AsBinary) ->
encode(Val, AsBinary);
encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) ->
Res = two_digits([Year, Month, Day, Hour, Minute, Second]),
lists:flatten(Res);
encode({TimeType, Val}, AsBinary)
when TimeType == 'date';
TimeType == 'time' ->
encode(Val, AsBinary);
encode({Time1, Time2, Time3}, false) ->
Res = two_digits([Time1, Time2, Time3]),
lists:flatten(Res);
encode(Val, _AsBinary) ->
{error, {unrecognized_value, Val}}.
two_digits(Nums) when is_list(Nums) ->
[two_digits(Num) || Num <- Nums];
two_digits(Num) ->
[Str] = io_lib:format("~b", [Num]),
case length(Str) of
1 -> [$0 | Str];
_ -> Str
end.
%% Quote a string or binary value so that it can be included safely in a
%% MySQL query.
quote(String) when is_list(String) ->
[39 | lists:reverse([39 | quote(String, [])])]; %% 39 is $'
quote(Bin) when is_binary(Bin) ->
list_to_binary(quote(binary_to_list(Bin))).
quote([], Acc) ->
Acc;
quote([0 | Rest], Acc) ->
quote(Rest, [$0, $\\ | Acc]);
quote([10 | Rest], Acc) ->
quote(Rest, [$n, $\\ | Acc]);
quote([13 | Rest], Acc) ->
quote(Rest, [$r, $\\ | Acc]);
quote([$\\ | Rest], Acc) ->
quote(Rest, [$\\ , $\\ | Acc]);
quote([39 | Rest], Acc) -> %% 39 is $'
quote(Rest, [39, $\\ | Acc]); %% 39 is $'
quote([34 | Rest], Acc) -> %% 34 is $"
quote(Rest, [34, $\\ | Acc]); %% 34 is $"
quote([26 | Rest], Acc) ->
quote(Rest, [$Z, $\\ | Acc]);
quote([C | Rest], Acc) ->
quote(Rest, [C | Acc]).
%% @doc Find the first zero-byte in Data and add everything before it
%% to Acc, as a string.
%%
%% @spec asciz_binary(Data::binary(), Acc::list()) ->
%% {NewList::list(), Rest::binary()}
asciz_binary(<<>>, Acc) ->
{lists:reverse(Acc), <<>>};
asciz_binary(<<0:8, Rest/binary>>, Acc) ->
{lists:reverse(Acc), Rest};
asciz_binary(<<C:8, Rest/binary>>, Acc) ->
asciz_binary(Rest, [C | Acc]).