diff --git a/web/api/flukso/deps/mysql/src/mysql.erl b/web/api/flukso/deps/mysql/src/mysql.erl index 7dde778..4ed5edb 100644 --- a/web/api/flukso/deps/mysql/src/mysql.erl +++ b/web/api/flukso/deps/mysql/src/mysql.erl @@ -1,4 +1,3 @@ -%%%------------------------------------------------------------------- %%% File : mysql.erl %%% Author : Magnus Ahltorp %%% Descrip.: MySQL client. @@ -8,31 +7,44 @@ %%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan %%% See the file COPYING %%% +%%% Modified: 9/12/2006 by Yariv Sadan +%%% Note: Added support for prepared statements, +%%% transactions, better connection pooling, more efficient logging +%%% and made other internal enhancements. +%%% +%%% Modified: 9/23/2006 Rewrote the transaction handling code to +%%% provide a simpler, Mnesia-style transaction interface. Also, +%%% moved much of the prepared statement handling code to mysql_conn.erl +%%% and added versioning to prepared statements. +%%% +%%% %%% Usage: %%% %%% %%% Call one of the start-functions before any call to fetch/2 %%% -%%% start_link(Id, Host, User, Password, Database) -%%% start_link(Id, Host, Port, User, Password, Database) -%%% start_link(Id, Host, User, Password, Database, LogFun) -%%% start_link(Id, Host, Port, User, Password, Database, LogFun) +%%% start_link(PoolId, Host, User, Password, Database) +%%% start_link(PoolId, Host, Port, User, Password, Database) +%%% start_link(PoolId, Host, User, Password, Database, LogFun) +%%% start_link(PoolId, Host, Port, User, Password, Database, LogFun) %%% -%%% Id is a connection group identifier. If you want to have more +%%% (These functions also have non-linking coutnerparts.) +%%% +%%% PoolId is a connection pool identifier. If you want to have more %%% than one connection to a server (or a set of MySQL replicas), %%% add more with %%% -%%% connect(Id, Host, Port, User, Password, Database, Reconnect) +%%% connect(PoolId, Host, Port, User, Password, Database, Reconnect) %%% %%% use 'undefined' as Port to get default MySQL port number (3306). -%%% MySQL querys will be sent in a per-Id round-robin fashion. +%%% MySQL querys will be sent in a per-PoolId round-robin fashion. %%% Set Reconnect to 'true' if you want the dispatcher to try and %%% open a new connection, should this one die. %%% %%% When you have a mysql_dispatcher running, this is how you make a %%% query : %%% -%%% fetch(Id, "select * from hello") -> Result +%%% fetch(PoolId, "select * from hello") -> Result %%% Result = {data, MySQLRes} | {updated, MySQLRes} | %%% {error, MySQLRes} %%% @@ -54,43 +66,62 @@ %%% connections yourself, you can use the mysql_conn module as a %%% stand-alone single MySQL connection. See the comment at the top of %%% mysql_conn.erl. -%%% -%%%------------------------------------------------------------------- --module(mysql). +-module(mysql). -behaviour(gen_server). -%%-------------------------------------------------------------------- + +%% @type mysql_result() = term() +%% @type query_result = {data, mysql_result()} | {updated, mysql_result()} | +%% {error, mysql_result()} + + %% External exports -%%-------------------------------------------------------------------- -export([start_link/5, start_link/6, start_link/7, + start_link/8, + start/5, + start/6, + start/7, + start/8, + + connect/7, + connect/8, + connect/9, + + fetch/1, fetch/2, fetch/3, + + prepare/2, + execute/1, + execute/2, + execute/3, + execute/4, + unprepare/1, + get_prepared/1, + get_prepared/2, + + transaction/2, + transaction/3, get_result_field_info/1, get_result_rows/1, get_result_affected_rows/1, get_result_reason/1, - quote/1, - asciz_binary/2, - - connect/7 + encode/1, + encode/2, + asciz_binary/2 ]). -%%-------------------------------------------------------------------- %% Internal exports - just for mysql_* modules -%%-------------------------------------------------------------------- --export([log/3, - log/4 +-export([log/4 ]). -%%-------------------------------------------------------------------- %% Internal exports - gen_server callbacks -%%-------------------------------------------------------------------- -export([init/1, handle_call/3, handle_cast/2, @@ -99,139 +130,702 @@ code_change/3 ]). -%%-------------------------------------------------------------------- %% Records -%%-------------------------------------------------------------------- -include("mysql.hrl"). + +-record(conn, { + pool_id, %% atom(), the pool's id + pid, %% pid(), mysql_conn process + reconnect, %% true | false, should mysql_dispatcher try + %% to reconnect if this connection dies? + host, %% string() + port, %% integer() + user, %% string() + password, %% string() + database, %% string() + encoding + }). + -record(state, { - conn_list, %% list() of mysql_connection record() - log_fun %% undefined | function for logging, + %% gb_tree mapping connection + %% pool id to a connection pool tuple + conn_pools = gb_trees:empty(), + + + %% gb_tree mapping connection Pid + %% to pool id + pids_pools = gb_trees:empty(), + + %% function for logging, + log_fun, + + + %% maps names to {Statement::binary(), Version::integer()} values + prepares = gb_trees:empty() }). --record(mysql_connection, { - id, %% term(), user of 'mysql' modules id of this socket group - conn_pid, %% pid(), mysql_conn process - reconnect, %% true | false, should mysql_dispatcher try to reconnect if this connection dies? - host, %% undefined | string() - port, %% undefined | integer() - user, %% undefined | string() - password, %% undefined | string() - database %% undefined | string() - }). - -%%-------------------------------------------------------------------- %% Macros -%%-------------------------------------------------------------------- -define(SERVER, mysql_dispatcher). +-define(STATE_VAR, mysql_connection_state). -define(CONNECT_TIMEOUT, 5000). -define(LOCAL_FILES, 128). - -define(PORT, 3306). +%% used for debugging +-define(L(Msg), io:format("~p:~b ~p ~n", [?MODULE, ?LINE, Msg])). + +%% Log messages are designed to instantiated lazily only if the logging level +%% permits a log message to be logged +-define(Log(LogFun,Level,Msg), + LogFun(?MODULE,?LINE,Level,fun()-> {Msg,[]} end)). +-define(Log2(LogFun,Level,Msg,Params), + LogFun(?MODULE,?LINE,Level,fun()-> {Msg,Params} end)). + + +log(Module, Line, _Level, FormatFun) -> + {Format, Arguments} = FormatFun(), + io:format("~w:~b: "++ Format ++ "~n", [Module, Line] ++ Arguments). + -%%==================================================================== %% External functions -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link(Id, Host, User, Password, Database) -%% start_link(Id, Host, Port, User, Password, Database) -%% start_link(Id, Host, User, Password, Database, LogFun) -%% start_link(Id, Host, Port, User, Password, Database, -%% LogFun) -%% Id = term(), first connection-group Id -%% Host = string() -%% Port = integer() -%% User = string() -%% Password = string() -%% Database = string() -%% LogFun = undefined | function() of arity 3 -%% Descrip.: Starts the MySQL client gen_server process. -%% Returns : {ok, Pid} | ignore | {error, Error} -%%-------------------------------------------------------------------- -start_link(Id, Host, User, Password, Database) when is_list(Host), is_list(User), is_list(Password), - is_list(Database) -> - start_link(Id, Host, ?PORT, User, Password, Database, undefined). +%% @doc Starts the MySQL client gen_server process. +%% +%% The Port and LogFun parameters are optional. +%% +%% @spec start_link(PoolId::atom(), Host::string(), Port::integer(), +%% Username::string(), Password::string(), Database::string(), +%% LogFun::undefined | function() of arity 4) -> +%% {ok, Pid} | ignore | {error, Err} +start_link(PoolId, Host, User, Password, Database) -> + start_link(PoolId, Host, ?PORT, User, Password, Database). -start_link(Id, Host, Port, User, Password, Database) when is_list(Host), is_integer(Port), is_list(User), - is_list(Password), is_list(Database) -> - start_link(Id, Host, Port, User, Password, Database, undefined); +start_link(PoolId, Host, Port, User, Password, Database) -> + start_link(PoolId, Host, Port, User, Password, Database, undefined, + undefined). -start_link(Id, Host, User, Password, Database, LogFun) when is_list(Host), is_list(User), is_list(Password), - is_list(Database) -> - start_link(Id, Host, ?PORT, User, Password, Database, LogFun). +start_link(PoolId, Host, undefined, User, Password, Database, LogFun) -> + start_link(PoolId, Host, ?PORT, User, Password, Database, LogFun, + undefined); +start_link(PoolId, Host, Port, User, Password, Database, LogFun) -> + start_link(PoolId, Host, Port, User, Password, Database, LogFun, + undefined). -start_link(Id, Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User), - is_list(Password), is_list(Database) -> +start_link(PoolId, Host, undefined, User, Password, Database, LogFun, + Encoding) -> + start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding, + start_link); +start_link(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) -> + start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding, + start_link). + +%% @doc These functions are similar to their start_link counterparts, +%% but they call gen_server:start() instead of gen_server:start_link() +start(PoolId, Host, User, Password, Database) -> + start(PoolId, Host, ?PORT, User, Password, Database). + +start(PoolId, Host, Port, User, Password, Database) -> + start(PoolId, Host, Port, User, Password, Database, undefined). + +start(PoolId, Host, undefined, User, Password, Database, LogFun) -> + start(PoolId, Host, ?PORT, User, Password, Database, LogFun); +start(PoolId, Host, Port, User, Password, Database, LogFun) -> + start(PoolId, Host, Port, User, Password, Database, LogFun, undefined). + +start(PoolId, Host, undefined, User, Password, Database, LogFun, Encoding) -> + start1(PoolId, Host, ?PORT, User, Password, Database, LogFun, Encoding, + start); +start(PoolId, Host, Port, User, Password, Database, LogFun, Encoding) -> + start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding, + start). + +start1(PoolId, Host, Port, User, Password, Database, LogFun, Encoding, + StartFunc) -> crypto:start(), - gen_server:start_link({local, ?SERVER}, ?MODULE, [Id, Host, Port, User, Password, Database, LogFun], []). + gen_server:StartFunc( + {local, ?SERVER}, ?MODULE, + [PoolId, Host, Port, User, Password, Database, LogFun, Encoding], []). -%%-------------------------------------------------------------------- -%% Function: fetch(Id, Query) -%% fetch(Id, Query, Timeout) -%% Id = term(), connection-group Id -%% Query = string(), MySQL query in verbatim -%% Timeout = integer() | infinity, gen_server timeout value -%% Descrip.: Send a query and wait for the result. -%% Returns : {data, MySQLRes} | -%% {updated, MySQLRes} | -%% {error, MySQLRes} -%% MySQLRes = term() -%%-------------------------------------------------------------------- -fetch(Id, Query) when is_list(Query) -> - gen_server:call(?SERVER, {fetch, Id, Query}). -fetch(Id, Query, Timeout) when is_list(Query) -> - gen_server:call(?SERVER, {fetch, Id, Query}, Timeout). -%%-------------------------------------------------------------------- -%% Function: get_result_field_info(MySQLRes) -%% MySQLRes = term(), result of fetch function on "data" -%% Descrip.: Extract the FieldInfo from MySQL Result on data received -%% Returns : FieldInfo -%% FieldInfo = list() of {Table, Field, Length, Name} -%%-------------------------------------------------------------------- +%% @equiv connect(PoolId, Host, Port, User, Password, Database, Encoding, +%% Reconnect, true) +connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect) -> + connect(PoolId, Host, Port, User, Password, Database, Encoding, + Reconnect, true). + +%% @doc Starts a MySQL connection and, if successful, add it to the +%% connection pool in the dispatcher. +%% +%% @spec: connect(PoolId::atom(), Host::string(), Port::integer() | undefined, +%% User::string(), Password::string(), Database::string(), +%% Encoding::string(), Reconnect::bool(), LinkConnection::bool()) -> +%% {ok, ConnPid} | {error, Reason} +connect(PoolId, Host, Port, User, Password, Database, Encoding, Reconnect, + LinkConnection) -> + Port1 = if Port == undefined -> ?PORT; true -> Port end, + Fun = if LinkConnection -> + fun mysql_conn:start_link/8; + true -> + fun mysql_conn:start/8 + end, + + {ok, LogFun} = gen_server:call(?SERVER, get_logfun), + case Fun(Host, Port1, User, Password, Database, LogFun, + Encoding, PoolId) of + {ok, ConnPid} -> + Conn = new_conn(PoolId, ConnPid, Reconnect, Host, Port1, User, + Password, Database, Encoding), + case gen_server:call( + ?SERVER, {add_conn, Conn}) of + ok -> + {ok, ConnPid}; + Res -> + Res + end; + Err-> + Err + end. + +new_conn(PoolId, ConnPid, Reconnect, Host, Port, User, Password, Database, + Encoding) -> + case Reconnect of + true -> + #conn{pool_id = PoolId, + pid = ConnPid, + reconnect = true, + host = Host, + port = Port, + user = User, + password = Password, + database = Database, + encoding = Encoding + }; + false -> + #conn{pool_id = PoolId, + pid = ConnPid, + reconnect = false} + end. + +%% @doc Fetch a query inside a transaction. +%% +%% @spec fetch(Query::iolist()) -> query_result() +fetch(Query) -> + case get(?STATE_VAR) of + undefined -> + {error, not_in_transaction}; + State -> + mysql_conn:fetch_local(State, Query) + end. + +%% @doc Send a query to a connection from the connection pool and wait +%% for the result. If this function is called inside a transaction, +%% the PoolId parameter is ignored. +%% +%% @spec fetch(PoolId::atom(), Query::iolist(), Timeout::integer()) -> +%% query_result() +fetch(PoolId, Query) -> + fetch(PoolId, Query, undefined). + +fetch(PoolId, Query, Timeout) -> + case get(?STATE_VAR) of + undefined -> + call_server({fetch, PoolId, Query}, Timeout); + State -> + mysql_conn:fetch_local(State, Query) + end. + + +%% @doc Register a prepared statement with the dispatcher. This call does not +%% prepare the statement in any connections. The statement is prepared +%% lazily in each connection when it is told to execute the statement. +%% If the Name parameter matches the name of a statement that has +%% already been registered, the version of the statement is incremented +%% and all connections that have already prepared the statement will +%% prepare it again with the newest version. +%% +%% @spec prepare(Name::atom(), Query::iolist()) -> ok +prepare(Name, Query) -> + gen_server:cast(?SERVER, {prepare, Name, Query}). + +%% @doc Unregister a statement that has previously been register with +%% the dispatcher. All calls to execute() with the given statement +%% will fail once the statement is unprepared. If the statement hasn't +%% been prepared, nothing happens. +%% +%% @spec unprepare(Name::atom()) -> ok +unprepare(Name) -> + gen_server:cast(?SERVER, {unprepare, Name}). + +%% @doc Get the prepared statement with the given name. +%% +%% This function is called from mysql_conn when the connection is +%% told to execute a prepared statement it has not yet prepared, or +%% when it is told to execute a statement inside a transaction and +%% it's not sure that it has the latest version of the statement. +%% +%% If the latest version of the prepared statement matches the Version +%% parameter, the return value is {ok, latest}. This saves the cost +%% of sending the query when the connection already has the latest version. +%% +%% @spec get_prepared(Name::atom(), Version::integer()) -> +%% {ok, latest} | {ok, Statement::binary()} | {error, Err} +get_prepared(Name) -> + get_prepared(Name, undefined). +get_prepared(Name, Version) -> + gen_server:call(?SERVER, {get_prepared, Name, Version}). + + +%% @doc Execute a query inside a transaction. +%% +%% @spec execute(Name::atom, Params::[term()]) -> mysql_result() +execute(Name) -> + execute(Name, []). + +execute(Name, Params) when is_atom(Name), is_list(Params) -> + case get(?STATE_VAR) of + undefined -> + {error, not_in_transaction}; + State -> + mysql_conn:execute_local(State, Name, Params) + end; + +%% @doc Execute a query in the connection pool identified by +%% PoolId. This function optionally accepts a list of parameters to pass +%% to the prepared statement and a Timeout parameter. +%% If this function is called inside a transaction, the PoolId paramter is +%% ignored. +%% +%% @spec execute(PoolId::atom(), Name::atom(), Params::[term()], +%% Timeout::integer()) -> mysql_result() +execute(PoolId, Name) when is_atom(PoolId), is_atom(Name) -> + execute(PoolId, Name, []). + +execute(PoolId, Name, Timeout) when is_integer(Timeout) -> + execute(PoolId, Name, [], Timeout); + +execute(PoolId, Name, Params) when is_list(Params) -> + execute(PoolId, Name, Params, undefined). + +execute(PoolId, Name, Params, Timeout) -> + case get(?STATE_VAR) of + undefined -> + call_server({execute, PoolId, Name, Params}, Timeout); + State -> + case mysql_conn:execute_local(State, Name, Params) of + {ok, Res, NewState} -> + put(?STATE_VAR, NewState), + Res; + Err -> + Err + end + end. + +%% @doc Execute a transaction in a connection belonging to the connection pool. +%% Fun is a function containing a sequence of calls to fetch() and/or +%% execute(). +%% If an error occurs, or if the function does any of the following: +%% +%% - throw(error) +%% - throw({error, Err}) +%% - return error +%% - return {error, Err} +%% - exit(Reason) +%% +%% the transaction is automatically rolled back. +%% +%% @spec transaction(PoolId::atom(), Fun::function()) -> +%% {atomic, Result} | {aborted, {Reason, {rollback_result, Result}}} +transaction(PoolId, Fun) -> + transaction(PoolId, Fun, undefined). + +transaction(PoolId, Fun, Timeout) -> + case get(?STATE_VAR) of + undefined -> + call_server({transaction, PoolId, Fun}, Timeout); + State -> + case mysql_conn:get_pool_id(State) of + PoolId -> + case catch Fun() of + error = Err -> throw(Err); + {error, _} = Err -> throw(Err); + {'EXIT', _} = Err -> throw(Err); + Other -> {atomic, Other} + end; + _Other -> + call_server({transaction, PoolId, Fun}, Timeout) + end + end. + +%% @doc Extract the FieldInfo from MySQL Result on data received. +%% +%% @spec get_result_field_info(MySQLRes::mysql_result()) -> +%% [{Table, Field, Length, Name}] get_result_field_info(#mysql_result{fieldinfo = FieldInfo}) -> FieldInfo. -%%-------------------------------------------------------------------- -%% Function: get_result_rows(MySQLRes) -%% MySQLRes = term(), result of fetch function on "data" -%% Descrip.: Extract the Rows from MySQL Result on data received -%% Returns : Rows -%% Rows = list() of list() representing records -%%-------------------------------------------------------------------- +%% @doc Extract the Rows from MySQL Result on data received +%% +%% @spec get_result_rows(MySQLRes::mysql_result()) -> [Row::list()] get_result_rows(#mysql_result{rows=AllRows}) -> AllRows. -%%-------------------------------------------------------------------- -%% Function: get_result_affected_rows(MySQLRes) -%% MySQLRes = term(), result of fetch function on "updated" -%% Descrip.: Extract the Rows from MySQL Result on update -%% Returns : AffectedRows -%% AffectedRows = integer() -%%-------------------------------------------------------------------- +%% @doc Extract the Rows from MySQL Result on update +%% +%% @spec get_result_affected_rows(MySQLRes::mysql_result()) -> +%% AffectedRows::integer() get_result_affected_rows(#mysql_result{affectedrows=AffectedRows}) -> AffectedRows. -%%-------------------------------------------------------------------- -%% Function: get_result_reason(MySQLRes) -%% MySQLRes = term(), result of fetch function on "error" -%% Descrip.: Extract the error Reason from MySQL Result on error -%% Returns : Reason -%% Reason = string() -%%-------------------------------------------------------------------- +%% @doc Extract the error Reason from MySQL Result on error +%% +%% @spec get_result_reason(MySQLRes::mysql_result()) -> +%% Reason::string() get_result_reason(#mysql_result{error=Reason}) -> Reason. -%%-------------------------------------------------------------------- -%% Function: quote(String) -%% String = string() -%% Descrip.: Quote a string so that it can be included safely in a -%% MySQL query. -%% Returns : Quoted = string() -%%-------------------------------------------------------------------- +connect(PoolId, Host, undefined, User, Password, Database, Reconnect) -> + connect(PoolId, Host, ?PORT, User, Password, Database, undefined, + Reconnect). + +%% gen_server callbacks + +init([PoolId, Host, Port, User, Password, Database, LogFun, Encoding]) -> + LogFun1 = if LogFun == undefined -> fun log/4; true -> LogFun end, + case mysql_conn:start(Host, Port, User, Password, Database, LogFun1, + Encoding, PoolId) of + {ok, ConnPid} -> + Conn = new_conn(PoolId, ConnPid, true, Host, Port, User, Password, + Database, Encoding), + State = #state{log_fun = LogFun1}, + {ok, add_conn(Conn, State)}; + {error, Reason} -> + ?Log(LogFun1, error, + "failed starting first MySQL connection handler, " + "exiting"), + {stop, {error, Reason}} + end. + +handle_call({fetch, PoolId, Query}, From, State) -> + fetch_queries(PoolId, From, State, [Query]); + +handle_call({get_prepared, Name, Version}, _From, State) -> + case gb_trees:lookup(Name, State#state.prepares) of + none -> + {reply, {error, {undefined, Name}}, State}; + {value, {_StmtBin, Version1}} when Version1 == Version -> + {reply, {ok, latest}, State}; + {value, Stmt} -> + {reply, {ok, Stmt}, State} + end; + +handle_call({execute, PoolId, Name, Params}, From, State) -> + with_next_conn( + PoolId, State, + fun(Conn, State1) -> + case gb_trees:lookup(Name, State1#state.prepares) of + none -> + {reply, {error, {no_such_statement, Name}}, State1}; + {value, {_Stmt, Version}} -> + mysql_conn:execute(Conn#conn.pid, Name, + Version, Params, From), + {noreply, State1} + end + end); + +handle_call({transaction, PoolId, Fun}, From, State) -> + with_next_conn( + PoolId, State, + fun(Conn, State1) -> + mysql_conn:transaction(Conn#conn.pid, Fun, From), + {noreply, State1} + end); + +handle_call({add_conn, Conn}, _From, State) -> + NewState = add_conn(Conn, State), + {PoolId, ConnPid} = {Conn#conn.pool_id, Conn#conn.pid}, + LogFun = State#state.log_fun, + ?Log2(LogFun, normal, + "added connection with id '~p' (pid ~p) to my list", + [PoolId, ConnPid]), + {reply, ok, NewState}; + +handle_call(get_logfun, _From, State) -> + {reply, {ok, State#state.log_fun}, State}. + +handle_cast({prepare, Name, Stmt}, State) -> + LogFun = State#state.log_fun, + Version1 = + case gb_trees:lookup(Name, State#state.prepares) of + {value, {_Stmt, Version}} -> + Version + 1; + none -> + 1 + end, + ?Log2(LogFun, debug, + "received prepare/2: ~p (ver ~p) ~p", [Name, Version1, Stmt]), + {noreply, State#state{prepares = + gb_trees:enter(Name, {Stmt, Version1}, + State#state.prepares)}}; + +handle_cast({unprepare, Name}, State) -> + LogFun = State#state.log_fun, + ?Log2(LogFun, debug, "received unprepare/1: ~p", [Name]), + State1 = + case gb_trees:lookup(Name, State#state.prepares) of + none -> + ?Log2(LogFun, warn, "trying to unprepare a non-existing " + "statement: ~p", [Name]), + State; + {value, _Stmt} -> + State#state{prepares = + gb_trees:delete(Name, State#state.prepares)} + end, + {noreply, State1}. + +%% Called when a connection to the database has been lost. If +%% The 'reconnect' flag was set to true for the connection, we attempt +%% to establish a new connection to the database. +handle_info({'DOWN', _MonitorRef, process, Pid, Info}, State) -> + LogFun = State#state.log_fun, + case remove_conn(Pid, State) of + {ok, Conn, NewState} -> + LogLevel = case Info of + normal -> normal; + _ -> error + end, + ?Log2(LogFun, LogLevel, + "connection pid ~p exited : ~p", [Pid, Info]), + case Conn#conn.reconnect of + true -> + start_reconnect(Conn, LogFun); + false -> + ok + end, + {noreply, NewState}; + error -> + ?Log2(LogFun, error, + "received 'DOWN' signal from pid ~p not in my list", [Pid]), + {noreply, State} + end. + +terminate(Reason, State) -> + LogFun = State#state.log_fun, + LogLevel = case Reason of + normal -> debug; + _ -> error + end, + ?Log2(LogFun, LogLevel, "terminating with reason: ~p", [Reason]), + Reason. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Internal functions + +fetch_queries(PoolId, From, State, QueryList) -> + with_next_conn( + PoolId, State, + fun(Conn, State1) -> + Pid = Conn#conn.pid, + mysql_conn:fetch(Pid, QueryList, From), + {noreply, State1} + end). + +with_next_conn(PoolId, State, Fun) -> + case get_next_conn(PoolId, State) of + {ok, Conn, NewState} -> + Fun(Conn, NewState); + error -> + %% we have no active connection matching PoolId + {reply, {error, {no_connection_in_pool, PoolId}}, State} + end. + +call_server(Msg, Timeout) -> + if Timeout == undefined -> + gen_server:call(?SERVER, Msg); + true -> + gen_server:call(?SERVER, Msg, Timeout) + end. + +add_conn(Conn, State) -> + Pid = Conn#conn.pid, + erlang:monitor(process, Conn#conn.pid), + PoolId = Conn#conn.pool_id, + ConnPools = State#state.conn_pools, + NewPool = + case gb_trees:lookup(PoolId, ConnPools) of + none -> + {[Conn],[]}; + {value, {Unused, Used}} -> + {[Conn | Unused], Used} + end, + State#state{conn_pools = + gb_trees:enter(PoolId, NewPool, + ConnPools), + pids_pools = gb_trees:enter(Pid, PoolId, + State#state.pids_pools)}. + +remove_pid_from_list(Pid, Conns) -> + lists:foldl( + fun(OtherConn, {NewConns, undefined}) -> + if OtherConn#conn.pid == Pid -> + {NewConns, OtherConn}; + true -> + {[OtherConn | NewConns], undefined} + end; + (OtherConn, {NewConns, FoundConn}) -> + {[OtherConn|NewConns], FoundConn} + end, {[],undefined}, lists:reverse(Conns)). + +remove_pid_from_lists(Pid, Conns1, Conns2) -> + case remove_pid_from_list(Pid, Conns1) of + {NewConns1, undefined} -> + {NewConns2, Conn} = remove_pid_from_list(Pid, Conns2), + {Conn, {NewConns1, NewConns2}}; + {NewConns1, Conn} -> + {Conn, {NewConns1, Conns2}} + end. + +remove_conn(Pid, State) -> + PidsPools = State#state.pids_pools, + case gb_trees:lookup(Pid, PidsPools) of + none -> + error; + {value, PoolId} -> + ConnPools = State#state.conn_pools, + case gb_trees:lookup(PoolId, ConnPools) of + none -> + error; + {value, {Unused, Used}} -> + {Conn, NewPool} = remove_pid_from_lists(Pid, Unused, Used), + NewConnPools = gb_trees:enter(PoolId, NewPool, ConnPools), + {ok, Conn, State#state{conn_pools = NewConnPools, + pids_pools = + gb_trees:delete(Pid, PidsPools)}} + end + end. + +get_next_conn(PoolId, State) -> + ConnPools = State#state.conn_pools, + case gb_trees:lookup(PoolId, ConnPools) of + none -> + error; + {value, {[],[]}} -> + error; + %% We maintain 2 lists: one for unused connections and one for used + %% connections. When we run out of unused connections, we recycle + %% the list of used connections. + {value, {[], Used}} -> + [Conn | Conns] = lists:reverse(Used), + {ok, Conn, + State#state{conn_pools = + gb_trees:enter(PoolId, {Conns, [Conn]}, ConnPools)}}; + {value, {[Conn|Unused], Used}} -> + {ok, Conn, State#state{ + conn_pools = + gb_trees:enter(PoolId, {Unused, [Conn|Used]}, + ConnPools)}} + end. + +start_reconnect(Conn, LogFun) -> + Pid = spawn(fun () -> + reconnect_loop(Conn#conn{pid = undefined}, LogFun, 0) + end), + {PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port}, + ?Log2(LogFun, debug, + "started pid ~p to try and reconnect to ~p:~s:~p (replacing " + "connection with pid ~p)", + [Pid, PoolId, Host, Port, Conn#conn.pid]), + ok. + +reconnect_loop(Conn, LogFun, N) -> + {PoolId, Host, Port} = {Conn#conn.pool_id, Conn#conn.host, Conn#conn.port}, + case connect(PoolId, + Host, + Port, + Conn#conn.user, + Conn#conn.password, + Conn#conn.database, + Conn#conn.encoding, + Conn#conn.reconnect) of + {ok, ConnPid} -> + ?Log2(LogFun, debug, + "managed to reconnect to ~p:~s:~p " + "(connection pid ~p)", [PoolId, Host, Port, ConnPid]), + ok; + {error, Reason} -> + %% log every once in a while + NewN = case N of + 10 -> + ?Log2(LogFun, debug, + "reconnect: still unable to connect to " + "~p:~s:~p (~p)", [PoolId, Host, Port, Reason]), + 0; + _ -> + N + 1 + end, + %% sleep between every unsuccessful attempt + timer:sleep(5 * 1000), + reconnect_loop(Conn, LogFun, NewN) + end. + + +%% @doc Encode a value so that it can be included safely in a MySQL query. +%% +%% @spec encode(Val::term(), AsBinary::bool()) -> +%% string() | binary() | {error, Error} +encode(Val) -> + encode(Val, false). +encode(Val, false) when Val == undefined; Val == null -> + "null"; +encode(Val, true) when Val == undefined; Val == null -> + <<"null">>; +encode(Val, false) when is_binary(Val) -> + binary_to_list(quote(Val)); +encode(Val, true) when is_binary(Val) -> + quote(Val); +encode(Val, true) -> + list_to_binary(encode(Val,false)); +encode(Val, false) when is_atom(Val) -> + quote(atom_to_list(Val)); +encode(Val, false) when is_list(Val) -> + quote(Val); +encode(Val, false) when is_integer(Val) -> + integer_to_list(Val); +encode(Val, false) when is_float(Val) -> + [Res] = io_lib:format("~w", [Val]), + Res; +encode({datetime, Val}, AsBinary) -> + encode(Val, AsBinary); +encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) -> + Res = two_digits([Year, Month, Day, Hour, Minute, Second]), + lists:flatten(Res); +encode({TimeType, Val}, AsBinary) + when TimeType == 'date'; + TimeType == 'time' -> + encode(Val, AsBinary); +encode({Time1, Time2, Time3}, false) -> + Res = two_digits([Time1, Time2, Time3]), + lists:flatten(Res); +encode(Val, _AsBinary) -> + {error, {unrecognized_value, Val}}. + +two_digits(Nums) when is_list(Nums) -> + [two_digits(Num) || Num <- Nums]; +two_digits(Num) -> + [Str] = io_lib:format("~b", [Num]), + case length(Str) of + 1 -> [$0 | Str]; + _ -> Str + end. + +%% Quote a string or binary value so that it can be included safely in a +%% MySQL query. quote(String) when is_list(String) -> - [34 | lists:reverse([34 | quote(String, [])])]. %% 34 is $" + [39 | lists:reverse([39 | quote(String, [])])]; %% 39 is $' +quote(Bin) when is_binary(Bin) -> + list_to_binary(quote(binary_to_list(Bin))). quote([], Acc) -> Acc; @@ -252,405 +846,15 @@ quote([26 | Rest], Acc) -> quote([C | Rest], Acc) -> quote(Rest, [C | Acc]). -%%-------------------------------------------------------------------- -%% Function: asciz_binary(Data, Acc) -%% Data = binary() -%% Acc = list(), input accumulator -%% Descrip.: Find the first zero-byte in Data and add everything -%% before it to Acc, as a string. -%% Returns : {NewList, Rest} -%% NewList = list(), Acc plus what we extracted from Data -%% Rest = binary(), whatever was left of Data, not -%% including the zero-byte -%%-------------------------------------------------------------------- + +%% @doc Find the first zero-byte in Data and add everything before it +%% to Acc, as a string. +%% +%% @spec asciz_binary(Data::binary(), Acc::list()) -> +%% {NewList::list(), Rest::binary()} asciz_binary(<<>>, Acc) -> {lists:reverse(Acc), <<>>}; asciz_binary(<<0:8, Rest/binary>>, Acc) -> {lists:reverse(Acc), Rest}; asciz_binary(<>, Acc) -> asciz_binary(Rest, [C | Acc]). - -%%-------------------------------------------------------------------- -%% Function: connect(Id, Host, Port, User, Password, Database, -%% Reconnect) -%% Id = term(), connection-group Id -%% Host = string() -%% Port = undefined | integer() -%% User = string() -%% Password = string() -%% Database = string() -%% Reconnect = true | false -%% Descrip.: Starts a MySQL connection and, if successfull, registers -%% it with the mysql_dispatcher. -%% Returns : {ok, ConnPid} | {error, Reason} -%%-------------------------------------------------------------------- -connect(Id, Host, undefined, User, Password, Database, Reconnect) -> - connect(Id, Host, ?PORT, User, Password, Database, Reconnect); -connect(Id, Host, Port, User, Password, Database, Reconnect) -> - {ok, LogFun} = gen_server:call(?SERVER, get_logfun), - case mysql_conn:start(Host, Port, User, Password, Database, LogFun) of - {ok, ConnPid} -> - MysqlConn = - case Reconnect of - true -> - #mysql_connection{id = Id, - conn_pid = ConnPid, - reconnect = true, - host = Host, - port = Port, - user = User, - password = Password, - database = Database - }; - false -> - #mysql_connection{id = Id, - conn_pid = ConnPid, - reconnect = false - } - end, - case gen_server:call(?SERVER, {add_mysql_connection, MysqlConn}) of - ok -> - {ok, ConnPid}; - Res -> - Res - end; - {error, Reason} -> - {error, Reason} - end. - -%%-------------------------------------------------------------------- -%% Function: log(LogFun, Level, Format) -%% log(LogFun, Level, Format, Arguments) -%% LogFun = undefined | function() with arity 3 -%% Level = debug | normal | error -%% Format = string() -%% Arguments = list() of term() -%% Descrip.: Either call the function LogFun with the Level, Format -%% and Arguments as parameters or log it to the console if -%% LogFun is undefined. -%% Returns : void() -%% -%% Note : Exported only for use by the mysql_* modules. -%% -%%-------------------------------------------------------------------- -log(LogFun, Level, Format) -> - log(LogFun, Level, Format, []). - -log(LogFun, Level, Format, Arguments) when is_function(LogFun) -> - LogFun(Level, Format, Arguments); -log(undefined, _Level, Format, Arguments) -> - %% default is to log to console - io:format(Format, Arguments), - io:format("~n", []). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Args = [Id, Host, Port, User, Password, Database, LogFun] -%% Id = term(), connection-group Id -%% Host = string() -%% Port = integer() -%% User = string() -%% Password = string() -%% Database = string() -%% LogFun = undefined | function() with arity 3 -%% Descrip.: Initiates the gen_server (MySQL dispatcher). -%%-------------------------------------------------------------------- -init([Id, Host, Port, User, Password, Database, LogFun]) -> - case mysql_conn:start(Host, Port, User, Password, Database, LogFun) of - {ok, ConnPid} -> - MysqlConn = #mysql_connection{id = Id, - conn_pid = ConnPid, - reconnect = true, - host = Host, - port = Port, - user = User, - password = Password, - database = Database - }, - case add_mysql_conn(MysqlConn, []) of - {ok, ConnList} -> - {ok, #state{log_fun = LogFun, - conn_list = ConnList - }}; - error -> - Msg = "mysql: Failed adding first MySQL connection handler to my list, exiting", - log(LogFun, error, Msg), - {error, Msg} - end; - {error, Reason} -> - log(LogFun, error, "mysql: Failed starting first MySQL connection handler, exiting"), - {stop, {error, Reason}} - end. - -%%-------------------------------------------------------------------- -%% Function: handle_call(Msg, From, State) -%% Descrip.: Handling call messages. -%% Returns : {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | (terminate/2 is called) -%% {stop, Reason, State} (terminate/2 is called) -%%-------------------------------------------------------------------- - - -%%-------------------------------------------------------------------- -%% Function: handle_call({fetch, Id, Query}, From, State) -%% Id = term(), connection-group id -%% Query = string(), MySQL query -%% Descrip.: Make a MySQL query. Use the first connection matching Id -%% in our connection-list. Don't block the mysql_dispatcher -%% by returning {noreply, ...} here and let the mysql_conn -%% do gen_server:reply(...) when it has an answer. -%% Returns : {noreply, NewState} | -%% {reply, {error, Reason}, State} -%% NewState = state record() -%% Reason = atom() | string() -%%-------------------------------------------------------------------- -handle_call({fetch, Id, Query}, From, State) -> - log(State#state.log_fun, debug, "mysql: fetch ~p (id ~p)", [Query, Id]), - case get_next_mysql_connection_for_id(Id, State#state.conn_list) of - {ok, MysqlConn, RestOfConnList} when is_record(MysqlConn, mysql_connection) -> - mysql_conn:fetch(MysqlConn#mysql_connection.conn_pid, Query, From), - %% move this mysql socket to the back of the list - NewConnList = RestOfConnList ++ [MysqlConn], - %% The ConnPid process does a gen_server:reply() when it has an answer - {noreply, State#state{conn_list = NewConnList}}; - nomatch -> - %% we have no active connection matching Id - {reply, {error, no_connection}, State} - end; - -%%-------------------------------------------------------------------- -%% Function: handle_call({add_mysql_connection, Conn}, From, State) -%% Conn = mysql_connection record() -%% Descrip.: Add Conn to our list of connections. -%% Returns : {reply, Reply, NewState} -%% Reply = ok | {error, Reason} -%% NewState = state record() -%% Reason = string() -%%-------------------------------------------------------------------- -handle_call({add_mysql_connection, Conn}, _From, State) when is_record(Conn, mysql_connection) -> - case add_mysql_conn(Conn, State#state.conn_list) of - {ok, NewConnList} -> - {Id, ConnPid} = {Conn#mysql_connection.id, Conn#mysql_connection.conn_pid}, - log(State#state.log_fun, normal, "mysql: Added connection with id '~p' (pid ~p) to my list", - [Id, ConnPid]), - {reply, ok, State#state{conn_list = NewConnList}}; - error -> - {reply, {error, "failed adding MySQL connection to my list"}, State} - end; - -%%-------------------------------------------------------------------- -%% Function: handle_call(get_logfun, From, State) -%% Descrip.: Fetch our logfun. -%% Returns : {reply, {ok, LogFun}, State} -%% LogFun = undefined | function() with arity 3 -%%-------------------------------------------------------------------- -handle_call(get_logfun, _From, State) -> - {reply, {ok, State#state.log_fun}, State}; - -handle_call(Unknown, _From, State) -> - log(State#state.log_fun, error, "mysql: Received unknown gen_server call : ~p", [Unknown]), - {reply, {error, "unknown gen_server call in mysql client"}, State}. - - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -%% Descrip.: Handling cast messages -%% Returns : {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} (terminate/2 is called) -%%-------------------------------------------------------------------- -handle_cast(Unknown, State) -> - log(State#state.log_fun, error, "mysql: Received unknown gen_server cast : ~p", [Unknown]), - {noreply, State}. - - -%%-------------------------------------------------------------------- -%% Function: handle_info(Msg, State) -%% Descrip.: Handling all non call/cast messages -%% Returns : {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} (terminate/2 is called) -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% Function: handle_info({'DOWN', ...}, State) -%% Descrip.: Handle a message that one of our monitored processes -%% (mysql_conn processes in our connection list) has exited. -%% Remove the entry from our list. -%% Returns : {noreply, NewState} | -%% {stop, normal, State} -%% NewState = state record() -%% -%% Note : For now, we stop if our connection list becomes empty. -%% We should try to reconnect for a while first, to not -%% eventually stop the whole OTP application if the MySQL- -%% server is shut down and the mysql_dispatcher was super- -%% vised by an OTP supervisor. -%%-------------------------------------------------------------------- -handle_info({'DOWN', _MonitorRef, process, Pid, Info}, State) -> - LogFun = State#state.log_fun, - case remove_mysql_connection_using_pid(Pid, State#state.conn_list, []) of - {ok, Conn, NewConnList} -> - LogLevel = case Info of - normal -> normal; - _ -> error - end, - log(LogFun, LogLevel, "mysql: MySQL connection pid ~p exited : ~p", [Pid, Info]), - log(LogFun, normal, "mysql: Removed MySQL connection with pid ~p from list", - [Pid]), - case Conn#mysql_connection.reconnect of - true -> - start_reconnect(Conn, LogFun); - false -> - ok - end, - {noreply, State#state{conn_list = NewConnList}}; - nomatch -> - log(LogFun, error, "mysql: Received 'DOWN' signal from pid ~p not in my list", [Pid]), - {noreply, State} - end; - -handle_info(Info, State) -> - log(State#state.log_fun, error, "mysql: Received unknown signal : ~p", [Info]), - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -%% Descrip.: Shutdown the server -%% Returns : Reason -%%-------------------------------------------------------------------- -terminate(Reason, State) -> - LogFun = State#state.log_fun, - LogLevel = case Reason of - normal -> debug; - _ -> error - end, - log(LogFun, LogLevel, "mysql: Terminating with reason : ~p", [Reason]), - Reason. - -%%-------------------------------------------------------------------- -%% Function: code_change(_OldVsn, State, _Extra) -%% Descrip.: Convert process state when code is changed -%% Returns : {ok, State} -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%==================================================================== -%% Internal functions -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: add_mysql_conn(Conn, ConnList) -%% Conn = mysql_connection record() -%% ConnList = list() of mysql_connection record() -%% Descrip.: Set up process monitoring of the mysql_conn process and -%% then add it (first) to ConnList. -%% Returns : NewConnList = list() of mysql_connection record() -%%-------------------------------------------------------------------- -add_mysql_conn(Conn, ConnList) when is_record(Conn, mysql_connection), is_list(ConnList) -> - erlang:monitor(process, Conn#mysql_connection.conn_pid), - {ok, [Conn | ConnList]}. - -%%-------------------------------------------------------------------- -%% Function: remove_mysql_connection_using_pid(Pid, ConnList) -%% Pid = pid() -%% ConnList = list() of mysql_connection record() -%% Descrip.: Removes the first mysql_connection in ConnList that has -%% a pid matching Pid. -%% Returns : {ok, Conn, NewConnList} | nomatch -%% Conn = mysql_connection record() -%% NewConnList = list() of mysql_connection record() -%%-------------------------------------------------------------------- -remove_mysql_connection_using_pid(Pid, [#mysql_connection{conn_pid = Pid} = H | T], Res) -> - {ok, H, lists:reverse(Res) ++ T}; -remove_mysql_connection_using_pid(Pid, [H | T], Res) when is_record(H, mysql_connection) -> - remove_mysql_connection_using_pid(Pid, T, [H | Res]); -remove_mysql_connection_using_pid(_Pid, [], _Res) -> - nomatch. - -%%-------------------------------------------------------------------- -%% Function: get_next_mysql_connection_for_id(Id, ConnList) -%% Id = term(), connection-group id -%% ConnList = list() of mysql_connection record() -%% Descrip.: Find the first mysql_connection in ConnList that has an -%% id matching Id. -%% Returns : {ok, Conn, NewConnList} | nomatch -%% Conn = mysql_connection record() -%% NewConnList = list() of mysql_connection record(), same -%% as ConnList but without Conn -%%-------------------------------------------------------------------- -get_next_mysql_connection_for_id(Id, ConnList) -> - get_next_mysql_connection_for_id(Id, ConnList, []). - -get_next_mysql_connection_for_id(Id, [#mysql_connection{id = Id} = H | T], Res) -> - {ok, H, lists:reverse(Res) ++ T}; -get_next_mysql_connection_for_id(Id, [H | T], Res) when is_record(H, mysql_connection) -> - get_next_mysql_connection_for_id(Id, T, [H | Res]); -get_next_mysql_connection_for_id(_Id, [], _Res) -> - nomatch. - -%%-------------------------------------------------------------------- -%% Function: start_reconnect(Conn, LogFun) -%% Conn = mysql_connection record() -%% LogFun = undefined | function() with arity 3 -%% Descrip.: Spawns a process that will try to re-establish a new -%% connection instead of the one in Conn which has just -%% died. -%% Returns : ok -%%-------------------------------------------------------------------- -start_reconnect(Conn, LogFun) when is_record(Conn, mysql_connection) -> - Pid = spawn(fun () -> - reconnect_loop(Conn#mysql_connection{conn_pid = undefined}, LogFun, 0) - end), - {Id, Host, Port} = {Conn#mysql_connection.id, Conn#mysql_connection.host, Conn#mysql_connection.port}, - log(LogFun, debug, "mysql: Started pid ~p to try and reconnect to ~p:~s:~p (replacing " - "connection with pid ~p)", [Pid, Id, Host, Port, Conn#mysql_connection.conn_pid]), - ok. - -%%-------------------------------------------------------------------- -%% Function: reconnect_loop(Conn, LogFun, 0) -%% Conn = mysql_connection record() -%% LogFun = undefined | function() with arity 3 -%% Descrip.: Loop indefinately until we are able to reconnect to the -%% server specified in the now dead connection Conn. -%% Returns : ok -%%-------------------------------------------------------------------- -reconnect_loop(Conn, LogFun, N) when is_record(Conn, mysql_connection) -> - {Id, Host, Port} = {Conn#mysql_connection.id, Conn#mysql_connection.host, Conn#mysql_connection.port}, - case connect(Id, - Host, - Port, - Conn#mysql_connection.user, - Conn#mysql_connection.password, - Conn#mysql_connection.database, - Conn#mysql_connection.reconnect) of - {ok, ConnPid} -> - log(LogFun, debug, "mysql_reconnect: Managed to reconnect to ~p:~s:~p (connection pid ~p)", - [Id, Host, Port, ConnPid]), - ok; - {error, Reason} -> - %% log every once in a while - NewN = case N of - 10 -> - log(LogFun, debug, "mysql_reconnect: Still unable to connect to ~p:~s:~p (~p)", - [Id, Host, Port, Reason]), - 0; - _ -> - N + 1 - end, - %% sleep between every unsuccessfull attempt - timer:sleep(20 * 1000), - reconnect_loop(Conn, LogFun, NewN) - end. diff --git a/web/api/flukso/deps/mysql/src/mysql_auth.erl b/web/api/flukso/deps/mysql/src/mysql_auth.erl index bfa226d..d8b7341 100644 --- a/web/api/flukso/deps/mysql/src/mysql_auth.erl +++ b/web/api/flukso/deps/mysql/src/mysql_auth.erl @@ -172,23 +172,21 @@ bxor_binary(B1, B2) -> E1 bxor E2 end, binary_to_list(B1), binary_to_list(B2))). +password_new([], _Salt) -> + <<>>; password_new(Password, Salt) -> - %% Check for blank password - case length(Password) of - 0 -> - <<>>; - _ -> - Stage1 = crypto:sha(Password), - Stage2 = crypto:sha(Stage1), - Res = crypto:sha_final( - crypto:sha_update( - crypto:sha_update(crypto:sha_init(), Salt), - Stage2) - ), - bxor_binary(Res, Stage1) - end. + Stage1 = crypto:sha(Password), + Stage2 = crypto:sha(Stage1), + Res = crypto:sha_final( + crypto:sha_update( + crypto:sha_update(crypto:sha_init(), Salt), + Stage2) + ), + bxor_binary(Res, Stage1). + do_send(Sock, Packet, Num, LogFun) -> - mysql:log(LogFun, debug, "mysql_auth send packet ~p: ~p", [Num, Packet]), + LogFun(?MODULE, ?LINE, debug, + fun() -> {"mysql_auth send packet ~p: ~p", [Num, Packet]} end), Data = <<(size(Packet)):24/little, Num:8, Packet/binary>>, gen_tcp:send(Sock, Data). diff --git a/web/api/flukso/deps/mysql/src/mysql_conn.erl b/web/api/flukso/deps/mysql/src/mysql_conn.erl index 414890f..db9b920 100644 --- a/web/api/flukso/deps/mysql/src/mysql_conn.erl +++ b/web/api/flukso/deps/mysql/src/mysql_conn.erl @@ -9,6 +9,13 @@ %%% Note : All MySQL code was written by Magnus Ahltorp, originally %%% in the file mysql.erl - I just moved it here. %%% +%%% Modified: 12 Sep 2006 by Yariv Sadan +%%% Added automatic type conversion between MySQL types and Erlang types +%%% and different logging style. +%%% +%%% Modified: 23 Sep 2006 by Yariv Sadan +%%% Added transaction handling and prepared statement execution. +%%% %%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan %%% See the file COPYING %%% @@ -63,11 +70,20 @@ %%-------------------------------------------------------------------- %% External exports %%-------------------------------------------------------------------- --export([start/6, - start_link/6, +-export([start/8, + start_link/8, fetch/3, fetch/4, - squery/4 + execute/5, + execute/6, + transaction/3, + transaction/4 + ]). + +%% private exports to be called only from the 'mysql' module +-export([fetch_local/2, + execute_local/3, + get_pool_id/1 ]). %%-------------------------------------------------------------------- @@ -82,16 +98,32 @@ log_fun, recv_pid, socket, - data + data, + + %% maps statement names to their versions + prepares = gb_trees:empty(), + + %% the id of the connection pool to which this connection belongs + pool_id }). -define(SECURE_CONNECTION, 32768). -define(MYSQL_QUERY_OP, 3). -define(DEFAULT_STANDALONE_TIMEOUT, 5000). --define(DEFAULT_RESULT_TYPE, list). -define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x -define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x +%% Used by transactions to get the state variable for this connection +%% when bypassing the dispatcher. +-define(STATE_VAR, mysql_connection_state). + +-define(Log(LogFun,Level,Msg), + LogFun(?MODULE, ?LINE,Level,fun()-> {Msg,[]} end)). +-define(Log2(LogFun,Level,Msg,Params), + LogFun(?MODULE, ?LINE,Level,fun()-> {Msg,Params} end)). +-define(L(Msg), io:format("~p:~b ~p ~n", [?MODULE, ?LINE, Msg])). + + %%==================================================================== %% External functions %%==================================================================== @@ -111,37 +143,41 @@ %% Pid = pid() %% Reason = string() %%-------------------------------------------------------------------- -start(Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User), - is_list(Password), is_list(Database) -> +start(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) -> ConnPid = self(), Pid = spawn(fun () -> - init(Host, Port, User, Password, Database, LogFun, ConnPid) + init(Host, Port, User, Password, Database, + LogFun, Encoding, PoolId, ConnPid) end), post_start(Pid, LogFun). -start_link(Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User), - is_list(Password), is_list(Database) -> +start_link(Host, Port, User, Password, Database, LogFun, Encoding, PoolId) -> ConnPid = self(), Pid = spawn_link(fun () -> - init(Host, Port, User, Password, Database, LogFun, ConnPid) - end), + init(Host, Port, User, Password, Database, + LogFun, Encoding, PoolId, ConnPid) + end), post_start(Pid, LogFun). %% part of start/6 or start_link/6: -post_start(Pid, _LogFun) -> - %%Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT), - %%TODO find a way to get configured Options here - Timeout= ?DEFAULT_STANDALONE_TIMEOUT, +post_start(Pid, LogFun) -> receive {mysql_conn, Pid, ok} -> {ok, Pid}; {mysql_conn, Pid, {error, Reason}} -> - {error, Reason} -% Unknown -> -% mysql:log(_LogFun, error, "mysql_conn: Received unknown signal, exiting"), -% mysql:log(_LogFun, debug, "mysql_conn: Unknown signal : ~p", [Unknown]), -% {error, "unknown signal received"} - after Timeout -> + {error, Reason}; + {mysql_conn, OtherPid, {error, Reason}} -> + % Ignore error message from other processes. This handles the case + % when mysql is shutdown and takes more than 5 secs to close the + % listener socket. + ?Log2(LogFun, debug, "Ignoring message from process ~p | Reason: ~p", + [OtherPid, Reason]), + post_start(Pid, LogFun); + Unknown -> + ?Log2(LogFun, error, + "received unknown signal, exiting: ~p", [Unknown]), + {error, "unknown signal received"} + after 5000 -> {error, "timed out"} end. @@ -149,16 +185,21 @@ post_start(Pid, _LogFun) -> %% Function: fetch(Pid, Query, From) %% fetch(Pid, Query, From, Timeout) %% Pid = pid(), mysql_conn to send fetch-request to -%% Query = string(), MySQL query in verbatim +%% Queries = A single binary() query or a list of binary() queries. +%% If a list is provided, the return value is the return +%% of the last query, or the first query that has +%% returned an error. If an error occurs, execution of +%% the following queries is aborted. %% From = pid() or term(), use a From of self() when %% using this module for a single connection, %% or pass the gen_server:call/3 From argument if %% using a gen_server to do the querys (e.g. the %% mysql_dispatcher) %% Timeout = integer() | infinity, gen_server timeout value -%% Descrip.: Send a query and wait for the result if running stand- -%% alone (From = self()), but don't block the caller if we -%% are not running stand-alone (From = gen_server From). +%% Descrip.: Send a query or a list of queries and wait for the result +%% if running stand-alone (From = self()), but don't block +%% the caller if we are not running stand-alone +%% (From = gen_server From). %% Returns : ok | (non-stand-alone mode) %% {data, #mysql_result} | (stand-alone mode) %% {updated, #mysql_result} | (stand-alone mode) @@ -167,28 +208,41 @@ post_start(Pid, _LogFun) -> %% Rows = list() of [string()] %% Reason = term() %%-------------------------------------------------------------------- +fetch(Pid, Queries, From) -> + fetch(Pid, Queries, From, ?DEFAULT_STANDALONE_TIMEOUT). -fetch(Pid, Query, From) -> - squery(Pid, Query, From, []). -fetch(Pid, Query, From, Timeout) -> - squery(Pid, Query, From, [{timeout, Timeout}]). +fetch(Pid, Queries, From, Timeout) -> + do_fetch(Pid, Queries, From, Timeout). -squery(Pid, Query, From, Options) when is_pid(Pid), is_list(Query) -> - Self = self(), - Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT), - Pid ! {fetch, Query, From, Options}, - case From of - Self -> - %% We are not using a mysql_dispatcher, await the response - receive - {fetch_result, Pid, Result} -> - Result - after Timeout -> - {error, "query timed out"} - end; - _ -> - %% From is gen_server From, Pid will do gen_server:reply() when it has an answer - ok +execute(Pid, Name, Version, Params, From) -> + execute(Pid, Name, Version, Params, From, ?DEFAULT_STANDALONE_TIMEOUT). + +execute(Pid, Name, Version, Params, From, Timeout) -> + send_msg(Pid, {execute, Name, Version, Params, From}, From, Timeout). + +transaction(Pid, Fun, From) -> + transaction(Pid, Fun, From, ?DEFAULT_STANDALONE_TIMEOUT). + +transaction(Pid, Fun, From, Timeout) -> + send_msg(Pid, {transaction, Fun, From}, From, Timeout). + +get_pool_id(State) -> + State#state.pool_id. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +fetch_local(State, Query) -> + do_query(State, Query). + +execute_local(State, Name, Params) -> + case do_execute(State, Name, Params, undefined) of + {ok, Res, State1} -> + put(?STATE_VAR, State1), + Res; + Err -> + Err end. %%-------------------------------------------------------------------- @@ -205,28 +259,47 @@ squery(Pid, Query, From, Options) when is_pid(Pid), is_list(Query) -> %% %% Note : Only to be used externally by the 'mysql_auth' module. %%-------------------------------------------------------------------- -do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined, SeqNum == undefined -> +do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); + LogFun == undefined, + SeqNum == undefined -> receive {mysql_recv, RecvPid, data, Packet, Num} -> - %%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p: ~p", [Num, Packet]), {ok, Packet, Num}; {mysql_recv, RecvPid, closed, _E} -> {error, "mysql_recv: socket was closed"} end; -do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined, is_integer(SeqNum) -> +do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); + LogFun == undefined, + is_integer(SeqNum) -> ResponseNum = SeqNum + 1, receive {mysql_recv, RecvPid, data, Packet, ResponseNum} -> - %%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p: ~p", [ResponseNum, Packet]), {ok, Packet, ResponseNum}; {mysql_recv, RecvPid, closed, _E} -> {error, "mysql_recv: socket was closed"} - end. + end. +do_fetch(Pid, Queries, From, Timeout) -> + send_msg(Pid, {fetch, Queries, From}, From, Timeout). + +send_msg(Pid, Msg, From, Timeout) -> + Self = self(), + Pid ! Msg, + case From of + Self -> + %% We are not using a mysql_dispatcher, await the response + receive + {fetch_result, Pid, Result} -> + Result + after Timeout -> + {error, "message timed out"} + end; + _ -> + %% From is gen_server From, + %% Pid will do gen_server:reply() when it has an answer + ok + end. -%%==================================================================== -%% Internal functions -%%==================================================================== %%-------------------------------------------------------------------- %% Function: init(Host, Port, User, Password, Database, LogFun, @@ -236,30 +309,47 @@ do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun); LogFun == undefined, %% User = string() %% Password = string() %% Database = string() -%% LogFun = undefined | function() of arity 3 +%% LogFun = function() of arity 4 %% Parent = pid() of process starting this mysql_conn %% Descrip.: Connect to a MySQL server, log in and chooses a database. %% Report result of this to Parent, and then enter loop() if %% we were successfull. %% Returns : void() | does not return %%-------------------------------------------------------------------- -init(Host, Port, User, Password, Database, LogFun, Parent) -> +init(Host, Port, User, Password, Database, LogFun, Encoding, PoolId, Parent) -> case mysql_recv:start_link(Host, Port, LogFun, self()) of {ok, RecvPid, Sock} -> case mysql_init(Sock, RecvPid, User, Password, LogFun) of {ok, Version} -> - case do_query(Sock, RecvPid, LogFun, "use " ++ Database, Version, [{result_type, binary}]) of + Db = iolist_to_binary(Database), + case do_query(Sock, RecvPid, LogFun, + <<"use ", Db/binary>>, + Version) of {error, MySQLRes} -> - mysql:log(LogFun, error, "mysql_conn: Failed changing to database ~p : ~p", - [Database, mysql:get_result_reason(MySQLRes)]), - Parent ! {mysql_conn, self(), {error, failed_changing_database}}; + ?Log2(LogFun, error, + "mysql_conn: Failed changing to database " + "~p : ~p", + [Database, + mysql:get_result_reason(MySQLRes)]), + Parent ! {mysql_conn, self(), + {error, failed_changing_database}}; + %% ResultType: data | updated {_ResultType, _MySQLRes} -> Parent ! {mysql_conn, self(), ok}, + case Encoding of + undefined -> undefined; + _ -> + EncodingBinary = list_to_binary(atom_to_list(Encoding)), + do_query(Sock, RecvPid, LogFun, + <<"set names '", EncodingBinary/binary, "'">>, + Version) + end, State = #state{mysql_version=Version, recv_pid = RecvPid, socket = Sock, log_fun = LogFun, + pool_id = PoolId, data = <<>> }, loop(State) @@ -268,8 +358,9 @@ init(Host, Port, User, Password, Database, LogFun, Parent) -> Parent ! {mysql_conn, self(), {error, login_failed}} end; E -> - mysql:log(LogFun, error, "mysql_conn: Failed connecting to ~p:~p : ~p", - [Host, Port, E]), + ?Log2(LogFun, error, + "failed connecting to ~p:~p : ~p", + [Host, Port, E]), Parent ! {mysql_conn, self(), {error, connect_failed}} end. @@ -282,31 +373,201 @@ init(Host, Port, User, Password, Database, LogFun, Parent) -> %%-------------------------------------------------------------------- loop(State) -> RecvPid = State#state.recv_pid, + LogFun = State#state.log_fun, receive - {fetch, Query, GenSrvFrom, Options} -> - %% GenSrvFrom is either a gen_server:call/3 From term(), or a pid if no - %% gen_server was used to make the query - Res = do_query(State, Query, Options), - case is_pid(GenSrvFrom) of - true -> - %% The query was not sent using gen_server mechanisms - GenSrvFrom ! {fetch_result, self(), Res}; - false -> - gen_server:reply(GenSrvFrom, Res) - end, + {fetch, Queries, From} -> + send_reply(From, do_queries(State, Queries)), loop(State); + {transaction, Fun, From} -> + put(?STATE_VAR, State), + + Res = do_transaction(State, Fun), + + %% The transaction may have changed the state of this process + %% if it has executed prepared statements. This would happen in + %% mysql:execute. + State1 = get(?STATE_VAR), + + send_reply(From, Res), + loop(State1); + {execute, Name, Version, Params, From} -> + State1 = + case do_execute(State, Name, Params, Version) of + {error, _} = Err -> + send_reply(From, Err), + State; + {ok, Result, NewState} -> + send_reply(From, Result), + NewState + end, + loop(State1); {mysql_recv, RecvPid, data, Packet, Num} -> - mysql:log(State#state.log_fun, error, "mysql_conn: Received MySQL data when not expecting any " - "(num ~p) - ignoring it", [Num]), - mysql:log(State#state.log_fun, error, "mysql_conn: Unexpected MySQL data (num ~p) :~n~p", - [Num, Packet]), + ?Log2(LogFun, error, + "received data when not expecting any -- " + "ignoring it: {~p, ~p}", [Num, Packet]), loop(State); Unknown -> - mysql:log(State#state.log_fun, error, "mysql_conn: Received unknown signal, exiting"), - mysql:log(State#state.log_fun, debug, "mysql_conn: Unknown signal : ~p", [Unknown]), + ?Log2(LogFun, error, + "received unknown signal, exiting: ~p", [Unknown]), error end. +%% GenSrvFrom is either a gen_server:call/3 From term(), +%% or a pid if no gen_server was used to make the query +send_reply(GenSrvFrom, Res) when is_pid(GenSrvFrom) -> + %% The query was not sent using gen_server mechanisms + GenSrvFrom ! {fetch_result, self(), Res}; +send_reply(GenSrvFrom, Res) -> + gen_server:reply(GenSrvFrom, Res). + +do_query(State, Query) -> + do_query(State#state.socket, + State#state.recv_pid, + State#state.log_fun, + Query, + State#state.mysql_version + ). + +do_query(Sock, RecvPid, LogFun, Query, Version) -> + Query1 = iolist_to_binary(Query), + ?Log2(LogFun, debug, "fetch ~p (id ~p)", [Query1,RecvPid]), + Packet = <>, + case do_send(Sock, Packet, 0, LogFun) of + ok -> + get_query_response(LogFun,RecvPid, + Version); + {error, Reason} -> + Msg = io_lib:format("Failed sending data " + "on socket : ~p", + [Reason]), + {error, Msg} + end. + +do_queries(State, Queries) when not is_list(Queries) -> + do_query(State, Queries); +do_queries(State, Queries) -> + do_queries(State#state.socket, + State#state.recv_pid, + State#state.log_fun, + Queries, + State#state.mysql_version + ). + +%% Execute a list of queries, returning the response for the last query. +%% If a query returns an error before the last query is executed, the +%% loop is aborted and the error is returned. +do_queries(Sock, RecvPid, LogFun, Queries, Version) -> + catch + lists:foldl( + fun(Query, _LastResponse) -> + case do_query(Sock, RecvPid, LogFun, Query, Version) of + {error, _} = Err -> throw(Err); + Res -> Res + end + end, ok, Queries). + +do_transaction(State, Fun) -> + case do_query(State, <<"BEGIN">>) of + {error, _} = Err -> + {aborted, Err}; + _ -> + case catch Fun() of + error = Err -> rollback(State, Err); + {error, _} = Err -> rollback(State, Err); + {'EXIT', _} = Err -> rollback(State, Err); + Res -> + case do_query(State, <<"COMMIT">>) of + {error, _} = Err -> + rollback(State, {commit_error, Err}); + _ -> + case Res of + {atomic, _} -> Res; + _ -> {atomic, Res} + end + end + end + end. + +rollback(State, Err) -> + Res = do_query(State, <<"ROLLBACK">>), + {aborted, {Err, {rollback_result, Res}}}. + +do_execute(State, Name, Params, ExpectedVersion) -> + Res = case gb_trees:lookup(Name, State#state.prepares) of + {value, Version} when Version == ExpectedVersion -> + {ok, latest}; + {value, Version} -> + mysql:get_prepared(Name, Version); + none -> + mysql:get_prepared(Name) + end, + case Res of + {ok, latest} -> + {ok, do_execute1(State, Name, Params), State}; + {ok, {Stmt, NewVersion}} -> + prepare_and_exec(State, Name, NewVersion, Stmt, Params); + {error, _} = Err -> + Err + end. + +prepare_and_exec(State, Name, Version, Stmt, Params) -> + NameBin = atom_to_binary(Name), + StmtBin = <<"PREPARE ", NameBin/binary, " FROM '", + Stmt/binary, "'">>, + case do_query(State, StmtBin) of + {updated, _} -> + State1 = + State#state{ + prepares = gb_trees:enter(Name, Version, + State#state.prepares)}, + {ok, do_execute1(State1, Name, Params), State1}; + {error, _} = Err -> + Err; + Other -> + {error, {unexpected_result, Other}} + end. + +do_execute1(State, Name, Params) -> + Stmts = make_statements_for_execute(Name, Params), + do_queries(State, Stmts). + +make_statements_for_execute(Name, []) -> + NameBin = atom_to_binary(Name), + [<<"EXECUTE ", NameBin/binary>>]; +make_statements_for_execute(Name, Params) -> + NumParams = length(Params), + ParamNums = lists:seq(1, NumParams), + + NameBin = atom_to_binary(Name), + + ParamNames = + lists:foldl( + fun(Num, Acc) -> + ParamName = [$@ | integer_to_list(Num)], + if Num == 1 -> + ParamName ++ Acc; + true -> + [$, | ParamName] ++ Acc + end + end, [], lists:reverse(ParamNums)), + ParamNamesBin = list_to_binary(ParamNames), + + ExecStmt = <<"EXECUTE ", NameBin/binary, " USING ", + ParamNamesBin/binary>>, + + ParamVals = lists:zip(ParamNums, Params), + Stmts = lists:foldl( + fun({Num, Val}, Acc) -> + NumBin = mysql:encode(Num, true), + ValBin = mysql:encode(Val, true), + [<<"SET @", NumBin/binary, "=", ValBin/binary>> | Acc] + end, [ExecStmt], lists:reverse(ParamVals)), + Stmts. + +atom_to_binary(Val) -> + <<_:4/binary, Bin/binary>> = term_to_binary(Val), + Bin. + %%-------------------------------------------------------------------- %% Function: mysql_init(Sock, RecvPid, User, Password, LogFun) %% Sock = term(), gen_tcp socket @@ -325,21 +586,29 @@ mysql_init(Sock, RecvPid, User, Password, LogFun) -> AuthRes = case Caps band ?SECURE_CONNECTION of ?SECURE_CONNECTION -> - mysql_auth:do_new_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, Salt2, LogFun); + mysql_auth:do_new_auth( + Sock, RecvPid, InitSeqNum + 1, + User, Password, Salt1, Salt2, LogFun); _ -> - mysql_auth:do_old_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, LogFun) + mysql_auth:do_old_auth( + Sock, RecvPid, InitSeqNum + 1, User, Password, + Salt1, LogFun) end, case AuthRes of {ok, <<0:8, _Rest/binary>>, _RecvNum} -> {ok,Version}; {ok, <<255:8, Code:16/little, Message/binary>>, _RecvNum} -> - mysql:log(LogFun, error, "mysql_conn: init error ~p: ~p~n", [Code, binary_to_list(Message)]), + ?Log2(LogFun, error, "init error ~p: ~p", + [Code, binary_to_list(Message)]), {error, binary_to_list(Message)}; {ok, RecvPacket, _RecvNum} -> - mysql:log(LogFun, error, "mysql_conn: init unknown error ~p~n", [binary_to_list(RecvPacket)]), + ?Log2(LogFun, error, + "init unknown error ~p", + [binary_to_list(RecvPacket)]), {error, binary_to_list(RecvPacket)}; {error, Reason} -> - mysql:log(LogFun, error, "mysql_conn: init failed receiving data : ~p~n", [Reason]), + ?Log2(LogFun, error, + "init failed receiving data : ~p", [Reason]), {error, Reason} end; {error, Reason} -> @@ -355,8 +624,10 @@ greeting(Packet, LogFun) -> <> = Rest4, <> = Rest5, {Salt2, _Rest7} = asciz(Rest6), - mysql:log(LogFun, debug, "mysql_conn: greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p salt2 ~p", - [Version, Protocol, Salt, Caps, ServerChar, Salt2]), + ?Log2(LogFun, debug, + "greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p" + "salt2 ~p", + [Version, Protocol, Salt, Caps, ServerChar, Salt2]), {normalize_version(Version, LogFun), Salt, Salt2, Caps}. %% part of greeting/2 @@ -382,7 +653,7 @@ asciz(Data) when list(Data) -> %% AffectedRows = int() %% Reason = term() %%-------------------------------------------------------------------- -get_query_response(LogFun, RecvPid, Version, Options) -> +get_query_response(LogFun, RecvPid, Version) -> case do_recv(LogFun, RecvPid, undefined) of {ok, <>, _} -> case Fieldcount of @@ -392,15 +663,15 @@ get_query_response(LogFun, RecvPid, Version, Options) -> {updated, #mysql_result{affectedrows=AffectedRows}}; 255 -> <<_Code:16/little, Message/binary>> = Rest, - {error, #mysql_result{error=binary_to_list(Message)}}; + {error, #mysql_result{error=Message}}; _ -> %% Tabular data received case get_fields(LogFun, RecvPid, [], Version) of {ok, Fields} -> - ResultType = get_option(result_type, Options, ?DEFAULT_RESULT_TYPE), - case get_rows(Fieldcount, LogFun, RecvPid, ResultType, []) of + case get_rows(Fields, LogFun, RecvPid, []) of {ok, Rows} -> - {data, #mysql_result{fieldinfo=Fields, rows=Rows}}; + {data, #mysql_result{fieldinfo=Fields, + rows=Rows}}; {error, Reason} -> {error, #mysql_result{error=Reason}} end; @@ -440,13 +711,13 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_0) -> <> = LengthB, {Type, Rest4} = get_with_length(Rest3), {_Flags, _Rest5} = get_with_length(Rest4), - This = {binary_to_list(Table), - binary_to_list(Field), + This = {Table, + Field, Length, %% TODO: Check on MySQL 4.0 if types are specified - %% using the same 4.1 formalism and could + %% using the same 4.1 formalism and could %% be expanded to atoms: - binary_to_list(Type)}, + Type}, get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_0) end; {error, Reason} -> @@ -475,9 +746,9 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_1) -> Length:32/little, Type:8/little, _Flags:16/little, _Decimals:8/little, _Rest7/binary>> = Rest6, - - This = {binary_to_list(Table), - binary_to_list(Field), + + This = {Table, + Field, Length, get_field_datatype(Type)}, get_fields(LogFun, RecvPid, [This | Res], ?MYSQL_4_1) @@ -496,38 +767,32 @@ get_fields(LogFun, RecvPid, Res, ?MYSQL_4_1) -> %% {error, Reason} %% Rows = list() of [string()] %%-------------------------------------------------------------------- -get_rows(N, LogFun, RecvPid, ResultType, Res) -> +get_rows(Fields, LogFun, RecvPid, Res) -> case do_recv(LogFun, RecvPid, undefined) of {ok, Packet, _Num} -> case Packet of <<254:8, Rest/binary>> when size(Rest) < 8 -> {ok, lists:reverse(Res)}; _ -> - {ok, This} = get_row(N, Packet, ResultType, []), - get_rows(N, LogFun, RecvPid, ResultType, [This | Res]) + {ok, This} = get_row(Fields, Packet, []), + get_rows(Fields, LogFun, RecvPid, [This | Res]) end; {error, Reason} -> {error, Reason} end. - %% part of get_rows/4 -get_row(0, _Data, _ResultType, Res) -> +get_row([], _Data, Res) -> {ok, lists:reverse(Res)}; -get_row(N, Data, ResultType, Res) -> +get_row([Field | OtherFields], Data, Res) -> {Col, Rest} = get_with_length(Data), This = case Col of null -> - null; + undefined; _ -> - if - ResultType == list -> - binary_to_list(Col); - ResultType == binary -> - Col - end + convert_type(Col, element(4, Field)) end, - get_row(N - 1, Rest, ResultType, [This | Res]). + get_row(OtherFields, Rest, [This | Res]). get_with_length(<<251:8, Rest/binary>>) -> {null, Rest}; @@ -540,35 +805,6 @@ get_with_length(<<254:8, Length:64/little, Rest/binary>>) -> get_with_length(<>) when Length < 251 -> split_binary(Rest, Length). -%%-------------------------------------------------------------------- -%% Function: do_query(State, Query) -%% do_query(Sock, RecvPid, LogFun, Query) -%% Sock = term(), gen_tcp socket -%% RecvPid = pid(), mysql_recv process -%% LogFun = undefined | function() with arity 3 -%% Query = string() -%% Descrip.: Send a MySQL query and block awaiting it's response. -%% Returns : result of get_query_response/2 | {error, Reason} -%%-------------------------------------------------------------------- -do_query(State, Query, Options) when is_record(State, state) -> - do_query(State#state.socket, - State#state.recv_pid, - State#state.log_fun, - Query, - State#state.mysql_version, - Options - ). - -do_query(Sock, RecvPid, LogFun, Query, Version, Options) when is_pid(RecvPid), - is_list(Query) -> - Packet = list_to_binary([?MYSQL_QUERY_OP, Query]), - case do_send(Sock, Packet, 0, LogFun) of - ok -> - get_query_response(LogFun, RecvPid, Version, Options); - {error, Reason} -> - Msg = io_lib:format("Failed sending data on socket : ~p", [Reason]), - {error, Msg} - end. %%-------------------------------------------------------------------- %% Function: do_send(Sock, Packet, SeqNum, LogFun) @@ -581,7 +817,6 @@ do_query(Sock, RecvPid, LogFun, Query, Version, Options) when is_pid(RecvPid), %%-------------------------------------------------------------------- do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet), is_integer(SeqNum) -> Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>, - %%mysql:log(LogFun, debug, "mysql_conn: send packet ~p: ~p", [SeqNum, Data]), gen_tcp:send(Sock, Data). %%-------------------------------------------------------------------- @@ -593,15 +828,16 @@ do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet), is_integer(SeqNum %% Returns : Version = string() %%-------------------------------------------------------------------- normalize_version([$4,$.,$0|_T], LogFun) -> - mysql:log(LogFun, debug, "Switching to MySQL 4.0.x protocol.~n"), + ?Log(LogFun, debug, "switching to MySQL 4.0.x protocol."), ?MYSQL_4_0; normalize_version([$4,$.,$1|_T], _LogFun) -> ?MYSQL_4_1; normalize_version([$5|_T], _LogFun) -> %% MySQL version 5.x protocol is compliant with MySQL 4.1.x: - ?MYSQL_4_1; + ?MYSQL_4_1; normalize_version(_Other, LogFun) -> - mysql:log(LogFun, error, "MySQL version not supported: MySQL Erlang module might not work correctly.~n"), + ?Log(LogFun, error, "MySQL version not supported: MySQL Erlang module " + "might not work correctly."), %% Error, but trying the oldest protocol anyway: ?MYSQL_4_0. @@ -626,8 +862,7 @@ get_field_datatype(11) -> 'TIME'; get_field_datatype(12) -> 'DATETIME'; get_field_datatype(13) -> 'YEAR'; get_field_datatype(14) -> 'NEWDATE'; -get_field_datatype(16) -> 'BIT'; -get_field_datatype(246) -> 'DECIMAL'; +get_field_datatype(246) -> 'NEWDECIMAL'; get_field_datatype(247) -> 'ENUM'; get_field_datatype(248) -> 'SET'; get_field_datatype(249) -> 'TINYBLOB'; @@ -638,19 +873,41 @@ get_field_datatype(253) -> 'VAR_STRING'; get_field_datatype(254) -> 'STRING'; get_field_datatype(255) -> 'GEOMETRY'. -%%-------------------------------------------------------------------- -%% Function: get_option(Key1, Options, Default) -> Value1 -%% Options = [Option] -%% Option = {Key2, Value2} -%% Key1 = Key2 = atom() -%% Value1 = Value2 = Default = term() -%% Descrip.: Return the option associated with Key passed to squery/4 -%%-------------------------------------------------------------------- - -get_option(Key, Options, Default) -> - case lists:keysearch(Key, 1, Options) of - {value, {_, Value}} -> - Value; - false -> - Default +convert_type(Val, ColType) -> + case ColType of + T when T == 'TINY'; + T == 'SHORT'; + T == 'LONG'; + T == 'LONGLONG'; + T == 'INT24'; + T == 'YEAR' -> + list_to_integer(binary_to_list(Val)); + T when T == 'TIMESTAMP'; + T == 'DATETIME' -> + {ok, [Year, Month, Day, Hour, Minute, Second], _Leftovers} = + io_lib:fread("~d-~d-~d ~d:~d:~d", binary_to_list(Val)), + {datetime, {{Year, Month, Day}, {Hour, Minute, Second}}}; + 'TIME' -> + {ok, [Hour, Minute, Second], _Leftovers} = + io_lib:fread("~d:~d:~d", binary_to_list(Val)), + {time, {Hour, Minute, Second}}; + 'DATE' -> + {ok, [Year, Month, Day], _Leftovers} = + io_lib:fread("~d-~d-~d", binary_to_list(Val)), + {date, {Year, Month, Day}}; + T when T == 'DECIMAL'; + T == 'NEWDECIMAL'; + T == 'FLOAT'; + T == 'DOUBLE' -> + {ok, [Num], _Leftovers} = + case io_lib:fread("~f", binary_to_list(Val)) of + {error, _} -> + io_lib:fread("~d", binary_to_list(Val)); + Res -> + Res + end, + Num; + _Other -> + Val end. + diff --git a/web/api/flukso/deps/mysql/src/mysql_recv.erl b/web/api/flukso/deps/mysql/src/mysql_recv.erl index d770d9b..9e4b78a 100644 --- a/web/api/flukso/deps/mysql/src/mysql_recv.erl +++ b/web/api/flukso/deps/mysql/src/mysql_recv.erl @@ -105,8 +105,11 @@ init(Host, Port, LogFun, Parent) -> }, loop(State); E -> - mysql:log(LogFun, error, "mysql_recv: Failed connecting to ~p:~p : ~p", - [Host, Port, E]), + LogFun(?MODULE, ?LINE, error, + fun() -> + {"mysql_recv: Failed connecting to ~p:~p : ~p", + [Host, Port, E]} + end), Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])), Parent ! {mysql_recv, self(), init, {error, Msg}} end. @@ -127,11 +130,20 @@ loop(State) -> Rest = sendpacket(State#state.parent, NewData), loop(State#state{data = Rest}); {tcp_error, Sock, Reason} -> - mysql:log(State#state.log_fun, error, "mysql_recv: Socket ~p closed : ~p", [Sock, Reason]), + LogFun = State#state.log_fun, + LogFun(?MODULE, ?LINE, error, + fun() -> + {"mysql_recv: Socket ~p closed : ~p", + [Sock, Reason]} + end), State#state.parent ! {mysql_recv, self(), closed, {error, Reason}}, error; {tcp_closed, Sock} -> - mysql:log(State#state.log_fun, debug, "mysql_recv: Socket ~p closed", [Sock]), + LogFun = State#state.log_fun, + LogFun(?MODULE, ?LINE, debug, + fun() -> + {"mysql_recv: Socket ~p closed", [Sock]} + end), State#state.parent ! {mysql_recv, self(), closed, normal}, error end. diff --git a/web/api/flukso/deps/mysql/src/mysql_sup.erl b/web/api/flukso/deps/mysql/src/mysql_sup.erl index 06c35a9..3ed936b 100644 --- a/web/api/flukso/deps/mysql/src/mysql_sup.erl +++ b/web/api/flukso/deps/mysql/src/mysql_sup.erl @@ -14,7 +14,7 @@ start_link() -> supervisor:start_link(mysql_sup, []). init([]) -> - MysqlConfig = [pool, "localhost", "flukso", "your_mysql_password_here", "flukso"], + MysqlConfig = [pool, "localhost", "flukso", "xpsCcVsbecJMVCYF", "flukso"], Mysql = {mysql, {mysql, start_link, MysqlConfig}, permanent, 3000, worker, [mysql]},