● 公司核心系统内存泄露排查(后续)
- 公司核心系统内存泄露排查
- 公司核心系统内存泄露排查(后续)
- 公司核心系统"内存泄露"排查(完结)
前言
晚上登录了下服务器,发现api还是存在内存上涨的情况。查看了api代码,确认是csredis的publish出了问题。把csredis源码拖了下来,看下到底是哪里出的问题。
源码排查
下载地址: https://github.com/2881099/csredis/archive/refs/heads/master.zip
CSRedisCore/RedisHelper.cs
/// <summary>
/// 用于将信息发送到指定分区节点的频道,最终消息发布格式:1|message
/// </summary>
/// <param name="channel">频道名</param>
/// <param name="message">消息文本</param>
/// <returns></returns>
public static long Publish(string channel, string message) => Instance.Publish(channel, message);
- 跳转到
CSRedisCore/CSRedisClient.cs
Instance.Publish方法:
/// <summary>
/// 用于将信息发送到指定分区节点的频道,最终消息发布格式:1|message
/// </summary>
/// <param name="channel">频道名</param>
/// <param name="message">消息文本</param>
/// <returns></returns>
public long Publish(string channel, string message)
{
var msgid = HIncrBy("csredisclient:Publish:msgid", channel, 1);
return ExecuteScalar(channel, (c, k) => c.Value.Publish(channel, $"{msgid}|{message}"));
}
- 跳转到
CSRedisCore/RedisClient.Sync.cs
c.Value.Publish方法:
/// <summary>
/// Post a message to a channel
/// </summary>
/// <param name="channel">Channel to post message</param>
/// <param name="message">Message to send</param>
/// <returns>Number of clients that received the message</returns>
public long Publish(string channel, string message)
{
return Write(RedisCommands.Publish(channel, message));
}
- (1). 跳转到
CSRedisCore/Internal/RedisCommand.cs
RedisCommands.Publish方法:
public static RedisInt Publish(string channel, string message)
{
return new RedisInt("PUBLISH", channel, message);
}
这里没有问题。
(2). 跳转到CSRedisCore/RedisClient.Sync.cs
Write泛型方法:
internal T Write<T>(RedisCommand<T> command)
{
if (_transaction.Active)
//可疑点
return _transaction.Write(command);
else if (_monitor.Listening)
return default(T);
else if (_streaming)
{
//可疑点
_connector.Write(command);
return default(T);
}
else
//可疑点
return _connector.Call(command);
}
- (1). 跳转到
CSRedisCore/Internal/RedisTransaction.cs
_transaction.Write泛型方法:
public T Write<T>(RedisCommand<T> command)
{
string response = _connector.Call(RedisCommands.AsTransaction(command));
OnTransactionQueued(command, response);
_execCommand.AddParser(x => command.Parse(x));
return default(T);
}
(2). 跳转到CSRedisCore/Internal/RedisConnector.cs
_connector.Call泛型方法:
public T Call<T>(RedisCommand<T> command)
{
ConnectIfNotConnected();
try
{
if (IsPipelined)
//可疑点
return _io.Pipeline.Write(command);
//可疑点
_io.Write(_io.Writer.Prepare(command));
//可疑点
return command.Parse(_io.Reader);
}
catch (IOException)
{
if (ReconnectAttempts == 0)
throw;
Reconnect();
return Call(command);
}
catch (RedisException ex)
{
throw new RedisException($"{ex.Message}\r\nCommand: {command}", ex);
}
}
(3). 跳转到CSRedisCore/Internal/RedisConnector.cs
_connector.Write方法:
public void Write(RedisCommand command)
{
ConnectIfNotConnected();
try
{
//可疑点
_io.Write(_io.Writer.Prepare(command));
}
catch (IOException)
{
if (ReconnectAttempts == 0)
throw;
Reconnect();
Write(command);
}
}
步骤(2)、(3)都包含_io.Write(_io.Writer.Prepare(command));
- (1). 跳转到
CSRedisCore/Internal/IO/RedisWriter.cs
_io.Writer.Prepare方法:
public byte[] Prepare(RedisCommand command) {
var parts = command.Command.Split(' ');
int length = parts.Length + command.Arguments.Length;
StringBuilder sb = new StringBuilder();
sb.Append(MultiBulk).Append(length).Append(EOL);
foreach(var part in parts) sb.Append(Bulk).Append(_io.Encoding.GetByteCount(part)).Append(EOL).Append(part).Append(EOL);
MemoryStream ms = new MemoryStream();
var data = _io.Encoding.GetBytes(sb.ToString());
ms.Write(data, 0, data.Length);
foreach(var arg in command.Arguments) {
if (arg != null && arg.GetType() == typeof(byte[])) {
data = arg as byte[];
var data2 = _io.Encoding.GetBytes($ "{Bulk}{data.Length}{EOL}");
ms.Write(data2, 0, data2.Length);
ms.Write(data, 0, data.Length);
ms.Write(new byte[] {
13,
10
},
0, 2);
} else {
string str = String.Format(CultureInfo.InvariantCulture, "{0}", arg);
data = _io.Encoding.GetBytes($ "{Bulk}{_io.Encoding.GetByteCount(str)}{EOL}{str}{EOL}");
ms.Write(data, 0, data.Length);
}
}
return ms.ToArray();
}
(2). 跳转到CSRedisCore/Internal/IO/RedisIO.cs
_io.Write方法:
public void Write(byte[] data)
{
lock (_streamLock)
{
Stream.Write(data, 0, data.Length);
Stream.Flush();
}
}
会不会是这里的锁有问题呢?
-- 未完 --
if (IsPipelined)
return _io.Pipeline.Write(command);
return command.Parse(_io.Reader);
5.(3)上述源码还没看,明天早起做核酸,先到这里吧。