大虾居

谈笑有鸿儒,往来无白丁。

0%

gRPC是google提供的一套基于HTTP/2和protobuf的RPC框架,支持多种语言,包括python。
由于基于protobuf的二进制序列化机制和HTTP/2多路复用的特性,gRPC在性能上有很大的优势,适合于高并发低延迟的场景。
gRPC的python实现支持同步和异步两种方式,本文中大虾将介绍如何在python上以异步方式实现客户端和服务端,
并支持下行数据流的订阅。

实现一个异步的Request/Reply调用的服务器和客户端

首先需要安装gRPC库,截止发稿时最新版本为1.58.0

1
pip install grpcio grpcio-tools

再编写一个helloworld的proto文件,gRPC根据proto文件来生成客户端和服务端的代码,下面是一个简单的helloworld.proto文件。
具体proto文件的语法可以参考官方文档

protos/helloworld.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

本示例中定义了两个数据类型HelloRequest和HelloReply,分别对应发送请求和相应的数据结构。
另外还有一个rpc服务Greeter,包含一个SayHello方法,接收HelloRequest类型的参数,返回HelloReply类型的数据。

proto文件编写完成后运行gRPC命令行工具生成相关的类。

1
python -m grpc_tools.protoc -I protos --python_out=. --pyi_out=. --grpc_python_out=. protos/helloworld.proto

生成命令完成后会在项目当前文件夹下生成helloworld_pb2_grpc.py和helloworld_pb2.py两个文件,分别包含了客户端和服务端需要用到的类。

下面编写服务器程序。

async_greeter_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import grpc
import helloworld_pb2
import helloworld_pb2_grpc

class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


async def serve() -> None:
server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())

客户端程序, async_greeter_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import logging


import grpc
import helloworld_pb2
import helloworld_pb2_grpc


async def run() -> None:
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name="you"))
print("Greeter client received: " + response.message)


if __name__ == "__main__":
logging.basicConfig()
asyncio.run(run())

运行python async_greeter_server.py启动服务器,再运行python async_greeter_client.py启动客户端,可以看到客户端收到了服务器返回的数据。

在服务器实现一个流请求

protos/helloworld.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package helloworld;

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

在Greeter服务上增加一个方法SayHelloStreamReply,接受一个HelloRequest参数,返回一个HelloReply类型的流。

服务器程序中实现SayHelloStreamReply方法,每隔1秒向客户端返回一个HelloReply.

async_greeter_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)

async def SayHelloStreamReply(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
for i in range(10000):
await asyncio.sleep(1)
yield helloworld_pb2.HelloReply(message="Hello, %s! %d" % (request.name, i))


async def serve() -> None:
server = grpc.aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())

客户端程序 async_greeter_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

async def run() -> None:
name = sys.argv[1] if len(sys.argv) > 1 else "world"
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name=name))
print("Greeter client received: " + response.message)

async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)

request = helloworld_pb2.HelloRequest(name=name)
response_stream = stub.SayHelloStreamReply(request)

async for res in response_stream:
print("Greeter client received:", res.message)


if __name__ == "__main__":
logging.basicConfig()
asyncio.run(run())

服务器从一个业务服务中获取流并返回给客户端

这样就实现向客户端输出流了。

但是有时候我们需要这个服务从其他服务的状态中获取信息再返回给客户端。怎么办呢。

增加一个CustomerCenter业务单元,如果有顾客进店client_enter则把客户登记上,
以后每个1秒钟向所有客户发送一个消息。消息存储在客户名字为key的字典中,值对象
是一个asyncio.Queue对象。这个对象可以让Greeter服务用coroutine方式订阅消息。

async_greeter_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import asyncio
from collections import defaultdict
from datetime import datetime
import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc


class CustomerCenter:
def __init__(self):
self.clients = {}
# store each client's message in queue
self.client_queues = defaultdict(asyncio.Queue)

async def start(self):
# start center
# for each second elapsed, it will send a message to all clients
while True:
await asyncio.sleep(1)
for client_name in self.clients.keys():
await self.client_queues[client_name].put("hello " + client_name + ", it's " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

def client_enter(self, client_name):
"""
a client enter
"""
self.clients[client_name] = ''

def client_leave(self, client_name):
"""
a client leave the center
"""
print('client %s is leaving.' % client_name)
self.clients.pop(client_name)
self.client_queues.pop(client_name)

def get_queue(self, client_name):
return self.client_queues[client_name]


class Greeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self, customer_center:CustomerCenter):
self.customer_center = customer_center

async def SayHello(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)

async def SayHelloStreamReply(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
self.customer_center.client_enter(request.name)
queue = self.customer_center.get_queue(request.name)

while True:
message = await queue.get()
yield helloworld_pb2.HelloReply(message=message)


async def serve() -> None:
server = grpc.aio.server()
customer_center = CustomerCenter()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(customer_center), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)

# start customer center
asyncio.create_task(customer_center.start())
await server.start()
await server.wait_for_termination()


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())

处理客户端断开链接事件

到这里可以发现一个问题,如果客户端关闭了,下次再打开时,离开期间也会生成事件,服务器并
没有发现客户已经离开。

可以捕获asyncio.CancelledError异常来获取客户端断开事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def SayHelloStreamReply(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
self.customer_center.client_enter(request.name)
queue = self.customer_center.get_queue(request.name)


try:
while True:
message = await queue.get()
yield helloworld_pb2.HelloReply(message=message)
except asyncio.CancelledError:
print(f'client {request.name} disconnected.')
self.customer_center.client_leave(request.name)

1
2
3
4
5
6
$ python async_greeter_server.py
INFO:root:Starting server on [::]:50051
client you disconnected.
client you is leaving.
client you disconnected.
client you is leaving.

多态事件类型

有的时候我们希望一个订阅接口成为总线,客户端进行一次调用即可收到不同类型的消息,即多态机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
syntax = "proto3";

package helloworld;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}

rpc SayHelloStream2 (HelloRequest) returns (stream GreetingMessage) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

message GreetingMessage {
oneof shape_type {
FirstGreetingMessage firstGreetingMessage = 1;
TimerGreetingMessage timerGreetingMessage = 2;
GoodbyeGreetingMessage goodbyeGreetingMessage = 3;
}
}

message FirstGreetingMessage {

}

message TimerGreetingMessage {

}

message GoodbyeGreetingMessage {

}

这里在proto中定义了一个主消息类型GreetingMessage,它的主要作用时作为下面三个具体消息类型
FirstGreetingMessage, TimerGreetingMessage, GoodbyeGreetingMessage的容器,
这样一个rpc服务可以返回GreetingMessage的stream即可。

服务器程序中依次返回 FirstGreetingMessage, TimerGreetingMessage, GoodbyeGreetingMessage消息。

async_greeter_server.py

1
2
3
4
5
6
7
8
9
10
11
12
async def SayHelloStream2(
self,
request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext,
) -> helloworld_pb2.HelloReply:
yield helloworld_pb2.GreetingMessage(firstGreetingMessage=helloworld_pb2.FirstGreetingMessage())
for i in range(5):
await asyncio.sleep(1)
yield helloworld_pb2.GreetingMessage(timerGreetingMessage=helloworld_pb2.TimerGreetingMessage())


yield helloworld_pb2.GreetingMessage(goodbyeGreetingMessage=helloworld_pb2.GoodbyeGreetingMessage())

客户端使用GreetingMessage对象的HasField函数检查具体是那种类型消息,并分别做相应处理。

async_greeter_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def run() -> None:
# retreve client name from command line.
name = sys.argv[1] if len(sys.argv) > 1 else "world"

async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = await stub.SayHello(helloworld_pb2.HelloRequest(name=name))
print("Greeter client received: " + response.message)

async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)

request = helloworld_pb2.HelloRequest(name=name)

response_stream2 = stub.SayHelloStream2(request)

async for res in response_stream2:
res:helloworld_pb2.GreetingMessage = res
if res.HasField('firstGreetingMessage'):
print("firstGreetingMessage received:", res.firstGreetingMessage)
elif res.HasField('timerGreetingMessage'):
print("timerGreetingMessage received:", res.timerGreetingMessage)
elif res.HasField('goodbyeGreetingMessage'):
print("goodbyeGreetingMessage received:", res.goodbyeGreetingMessage)
else:
print('unknown message received.')
1
2
3
4
5
6
7
8
9
$ python async_greeter_client.py hello
Greeter client received: Hello, hello!
firstGreetingMessage received:
timerGreetingMessage received:
timerGreetingMessage received:
timerGreetingMessage received:
timerGreetingMessage received:
timerGreetingMessage received:
goodbyeGreetingMessage received:

这样一个基于协程异步运行的消息订阅服务就基本上完成了。

参考资料


  1. 1.https://stackoverflow.com/questions/68491834/handle-client-side-cancellation-in-grpc-python-asyncio
  2. 2.https://grpc.github.io/grpc/python/grpc_asyncio.html#grpc.aio.ServicerContext
  3. 3.https://grpc.io/docs/languages/python/quickstart/

前言

比特币(Bitcoin)是2008年一个自称名为中本聪(Satoshi Nakamoto)的人在互联网上发布的一套加密交易协议1。由于比特币去中心化,发行总量固定,零信任机制等特点,吸引了众多投资者的关注。近年来,比特币价格连创新高,相关的衍生品日益完善,已经成为一种相对成熟的投资工具。然而,由于比特币价格波动较大,投资难度也相对较高,预测比特币价格成为一项重点且具有难度的问题。

ARIMA(Autoregressive Integrated Moving Average)模型是金融时间序列分析中常用的工具2。ARIMA模型由三部分组成,分别是AR(自回归)、I(序列差分)和MA(移动平均),可以有效捕捉数据中的自相关性和短期波动。

本文将详细介绍如何使用Python中相关的程序包实现基于ARIMA模型的比特币收益率预测。

数据分析

首先从yahoo finance上下载bitcoin日度行情数据。

1
2
3
4
5
6
7
8
9
10
11
from datetime import datetime
from pandas_datareader import data as pdr
import yfinance as yf
yf.pdr_override()

start_date = datetime(2015,1,1)
end_date = datetime(2023,6,30)

df_btc_price = pdr.get_data_yahoo('BTC-USD', start=start_date, end=end_date, interval='1d')
df_btc_price.index = df_btc_price.index.astype('datetime64[ns]')
df_btc_price.head()

1
2
3
4
import matplotlib.pyplot as plt

plt.plot(df_btc_price['Close'])
plt.show()

从图中可以推测价格数据不平稳,下面做一下adf检验平稳性。

1
2
3
from statsmodels.tsa.stattools import adfuller

print('price time series adfuller p-value:', adfuller(df_btc_price['Close'])[1])
1
price time series adfuller p-value: 0.5158389287588319

P值非常大,证明价格数据不平稳。
下面看一下日度收益率的平稳情况。

1
2
returns = df_btc_price['Close'].pct_change().dropna()
plt.plot(returns)

从图上看收益率序列有稳定均值,且围绕均值上下波动,波动幅度也较为稳定。下面进行adf检验。

1
print('return timeseries adfuller p-value:', adfuller(returns)[1])
1
return timeseries adfuller p-value: 0.0

P值验证收益率序列是平稳的。关于adf检验的原理可以参考ADF检验

下面将数据拆分为训练集和测试集,训练集主要用于模型参数优化,测试集进行样本外模型绩效评估。

1
2
3
4
5
import math
training_data_len = math.ceil(len(returns) * .8) # We are using %80 of the data for training

returns_train = returns.loc[:training_data_len]
returns_test = returns.loc[training_data_len:]

ARIMA 模型

下面准备使用ARIMA模型建模,ARIMA模型中一共有三个参数, p, d, q:

  • p: AR 部分阶数
  • d: I 部分差分次数
  • q: MA 部分阶数

首先手动选取一个模型进行分析,根据Tsay(2009),可以使用acf来选取MA部分参数q。

1
2
3
4
from statsmodels.graphics.tsaplots import plot_acf

plot_acf(returns_train)
plt.show()

ACF 自相关分析

从图中可以看出lag为6,10时相关性较大,手动选取(0,0,6)为作为ARIMA参数测试。

1
2
3
4
5
6
7
8
9
10
11
from statsmodels.tsa.arima.model import ARIMA

mod = ARIMA(returns_train, order=(0, 0, 6)) #AR(0) MA(6)
res = mod.fit()

# Print out summary information on the fit
print(res.summary())

# Print out the estimate for the constant and for theta
print("When the true theta=-0.9, the estimate of theta (and the constant) are:")
print(res.params)

ARIMA拟合结果

6项系数标准错误在0.012-015之间,LB 检验值为0.00,p-LB为0.97,拟合结果可以接受。
关于statsmodels ARIMA用法可以参考官方文档3 4

1
2
3
4
5
6
7
8
9
10
import matplotlib.pyplot as plt
import pandas as pd
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_predict
from statsmodels.tsa.arima.model import ARIMA

fig, ax = plt.subplots()
ax = returns.plot(ax=ax)
plot_predict(res, ax=ax)
plt.show()

ARIMA样本内测试

样本内预测值与实际值的统计情况如上,其中阴影部分为95%置信区间。

预测残差如下统计信息。

1
2
res.plot_diagnostics(figsize = (15, 10))
plt.show()

ARIMA模型预测残差分析

预测RMSE (Root mean squared error)

1
2
3
4
5
from sklearn.metrics import mean_squared_error
from math import sqrt

rms = sqrt(mean_squared_error(returns_train, res.predict()))
print('Training RMSE: %.3f' % rms)
1
Training RMSE: 0.038

Grid-Search 优化参数

为了选取最优模型,使用grid search方法遍历组合进行拟合,并选取AIC最小的模型参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def iterative_ARIMA_fit(series):
""" Iterates within the allowed values of the p and q parameters


Returns a dictionary with the successful fits.
Keys correspond to models.
"""


ARrange = range(0, 5)
MArange = range(0, 5)
Diffrange = [0]

ARIMA_fit_results = {}
for AR in ARrange :
for MA in MArange :
for Diff in Diffrange:
params = (AR,Diff,MA)
series.freq ='D'
print(params)
model = ARIMA(series, order = params)
try:
results_ARIMA = model.fit()
aic = results_ARIMA.aic
ARIMA_fit_results['%d-%d-%d' % (AR,Diff,MA)]=[aic]
except:
continue


return ARIMA_fit_results

opt_result = iterative_ARIMA_fit(returns_train)

sorted(opt_result.items(), key=lambda item: item[1], reverse=True)

ARIMA参数寻优

选取参数 4, 0, 4进行样本外检验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def rolling_predict(returns_train, returns_test, order_params):
"""
rolling forcasting arima model
"""
history = returns_train.to_list()
test = returns_test.to_list()
test_index = returns_test.index
predictions = []
actuals = []

for t in range(len(test)):
model = ARIMA(history, order=order_params)
res = model.fit()
output = res.forecast()
yhat = output[0]
predictions.append(yhat)
obs = test[t]
actuals.append(obs)
history.append(obs)

df_predict = pd.DataFrame(index=test_index, columns=['predict', 'actual'])
df_predict['predict'] = predictions
df_predict['actual'] = actuals

return df_predict

pred_rolling = rolling_predict(returns_train, returns_test, (4,0,4))

pred_rolling.dropna().plot()

ARIMA样本外测试

1
2
3
4
5
6
# RMSE
from sklearn.metrics import mean_squared_error
from math import sqrt

rmse = sqrt(mean_squared_error(pred_rolling['actual'], pred_rolling['predict']))
print('test RMSE: ', rmse)
1
test RMSE:  0.025680854707355105

用一个简单的策略进行验证,在下一日预测收益率大于0.003时做多,否则平仓。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def test_strategy(df_returns, open_long_threshold=0.005):
"""
test strategy, if predict > threshold, open long, close otherwise.
"""
df_strategy = pd.DataFrame(index=df_returns.index)
df_strategy['weight'] = df_returns['predict'].apply(lambda x: 1.0 if x > open_long_threshold else 0.0)
df_strategy['act_return'] = df_returns['actual'].shift(-1)
df_strategy['ret_contrib'] = df_strategy['weight'] * df_strategy['act_return']
return df_strategy

def plot_strategy(df_strategy):
fig, ax = plt.subplots()
ax.plot(df_strategy['ret_contrib'].cumsum(), label='strategy')
ax.plot(df_strategy['act_return'].cumsum(), color='red', alpha=0.2, label='BTC')
plt.legend()
plt.show()

df_strategy_result = test_strategy(pred_rolling, open_long_threshold=0.003)
plot_strategy(df_strategy_result)

ARIMA策略

从回测可以看出,本策略可以今年上半年收益约10%,躲过了大部分回撤。

总结

在本文中大虾演示了从数据分析到ARIMA建模,再到模型参数优化和模拟策略回测。ARIMA模型在比特币收益率预测上表现不错,但是由于比特币价格波动较大,预测收益率的误差也较大,需要进一步优化模型或者使用其他模型进行预测。

参考资料


  1. 1.https://bitcoin.org/bitcoin.pdf
  2. 2.Tsay, R. S. (2009). Analysis of financial time series (Vol. 543). John Wiley & Sons.
  3. 3.https://www.statsmodels.org/stable/generated/statsmodels.tsa.arima.model.ARIMA.html
  4. 4.https://www.statsmodels.org/stable/generated/statsmodels.tsa.arima.model.ARIMAResults.html

Github Pages 提供了一个免费的静态文件托管服务,方便我们搭建在线技术文档,甚至是假设博客。
同时 Github还免费提供了 xxx.github.io 三级域名用于访问。但是如果我们希望自己的站点更具个性化,我们可以自己注册域名并在github上开启自定义域名。

下面跟大虾一起完成设置吧。

一、注册域名并修改DNS记录

首先在域名注册商处注册一个域名,然后修改域名的DNS记录,将域名解析到Github Pages的服务器上。

添加一个C记录www, 解析到 xxx.github.io. 三级域名,注意这里xxx必须是github上注册的user或者organization名字。

image

二、在Github上完成域名认证

在github右上角点击个人图标,进入settings页面。

Settings页面左侧 Code, Plan and automation栏下进入Pages页面。

这里需要添加上自定义的域名并验证改域名为你所有。

点击添加域名,把你的域名添加进去。注意这里只需要输入二级域名。

添加完成后需要在DNS解析中加入一个TXT记录,用于验证域名所有权。

image

image

将该页面上提示的TXT记录和值添加到域名的DNS记录中。

image

修改DNS记录可能要等几分钟只几个小时等待生效,生效后点击Verify即可完成认证。

image

三、在Github Pages站点后台绑定域名

回到指定的Github Pages站点项目后台,点击Settings。

找到Custom domain栏,输入自己的域名,点击Save。

此处设置完成后github可能需要花一些时间等待域名正确解析。

image

这里勾选HTTPS后github pages还可以为你的域名生成SSL证书,省去自己申请证书的麻烦,非常贴心。

等到解析完成,通过 xxx.github.io即可访问自己的网站了。

四、常见问题

设置自定义域名不成功 NotServedByPagesError

在Pages 页面设置自定义域名,但是始终在DNS检查,或者返回 NotServedByPagesError 错误,这个问题还比较常见。

出现这种情况的一个原因是当设置自定义域名时,github会自动在repository创建一个CNAME文件,如果该文件已经存在,
则可能导致操作失败,可以手动修改或者删除这个文件。

这个文件的内容就是自定义的域名。

image

参考资料:

https://docs.github.com/en/pages/configuring-a-custom-domain-for-your-github-pages-site/verifying-your-custom-domain-for-github-pages

https://docs.github.com/en/pages/configuring-a-custom-domain-for-your-github-pages-site

通常我们在python中使用logging.getLogger(__name__)方式获取logger,可以得到一个module层次的logger,但是如果我们一个module中有多个class,怎么区分是哪一个class写入的日志呢,最好能有一个class级别的logger。

方法一:在__init__中初始化

1
2
3
4
5
6
class Example:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__qualname__)

def do_something(self):
self.logger.warning('do_something')

方法一的写法需要每个类都加入重复的代码,比较啰嗦,而且调用元方法影响代码美观,所以可以考虑运用python多重继承机制,继承一个带有logger的类。

方法二:继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import logging

class LoggingMixin:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__qualname__)

class Base:
pass

class Example(LoggingMixin, Base):
def __init__(self):
super(Example, self).__init__()

def do_something(self):
self.logger.info('do_something')

logging.basicConfig(level=logging.INFO)
e = Example()
e.do_something()

第二种方法更能提现OO即面向对象编程的思想,避免了重复,通过继承完成重用。

但是这种方法有个缺陷,python多重继承时会只继承第一个构造函数(按MRO顺序,可以简单理解为从左到右),如果mixin写在继承顺序的左侧,则右侧的基类构造函数会失效。

例如Base类也有构造函数,写成:

1
2
3
4
class Base:
def __init__(self):
self.name = 'abc'

则Base的__init__始终不会被调用。

方法三:延迟加载logger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import logging

class LoggingMixin:
_logger = None

@property
def logger(self):
if self._logger is None:
self._logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__qualname__)

return self._logger

class Base:
def __init__(self):
self.name = 'abc'

class Example(LoggingMixin, Base):
def __init__(self):
super(Example, self).__init__()

def do_something(self):
self.logger.info('do_something %s', self.name)

logging.basicConfig(level=logging.INFO)
e = Example()
e.do_something()

# INFO:__main__.Example:do_something

这样mixin中只声明成员,在调用时延迟加载,则可以不依赖构造函数,不会影响程序的继承结构。

参考

https://docs.python.org/3/library/logging.html#logger-objects

https://stackoverflow.com/questions/2020014/get-fully-qualified-class-name-of-an-object-in-python

https://blog.csdn.net/weixin_40636692/article/details/79940501?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

使用ansible在受控节点上安装软件,经常会碰到需要修改path环境变量的问题,那么如何能有效且幂等地修改path变量呢,今天虾哥就来分享一下。

下面的一个例子,在centos 7 上安装python3之后,python3上新安装包的可执行文件会链接在/usr/local/bin下,但是默认情况下这个路径没有放在path变量中。

下面的playbook使用pip3安装uwsgi,并将/usr/local/bin添加到path变量中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
---
- name: Install uwsgi
tasks:
- name: Install python3
yum:
name:
- python3

- name: Install uwsgi
pip:
executable: pip3
name: uwsgi

- name: Uwsgi env
copy:
content: |
export PATH=$PATH:/usr/local/bin
dest: /etc/profile.d/uwsgi.sh
mode: 644
become: yes

其原理非常简单,就是在/etc/profile.d文件夹下新建一个shell脚本,脚本中修改path变量。
用户在新登录时会执行这个文件夹下的所有文件,这样就获得了最新的path变量了。

参考

日常工作学习中免不了需要用到截屏或录屏功能,今天就来推荐一款免费开源的截图工具,ShareX。

介绍

除了基本的截图功能外,还可以在截图后自动上传,自行预设动作,有很灵活的配置潜力。

安装

官网地址为:https://getsharex.com/

当前最新版本下载地址如下:

https://github.com/ShareX/ShareX/releases/download/v13.3.0/ShareX-13.3.0-setup.exe

安装完直接在开始菜单启动即可。

录屏

录制GIF

要录制GIF需要执行以下步骤

  1. 快捷动作-屏幕录制(GIF) 或按快捷键 Ctrl + Shift + Print Screen
  2. 选择需要录制的范围,此时录制开始
  3. 录制开始后有个选择框,下面有停止按钮,点击停止结束录制。
  4. 录制完成后文件会自动出现在程序主界面。

录制视频

录制视频可以提供更高的帧数、更长的录制时间以及音频内录。ShareX录制视频使用ffmpeg库实现,并提供了内录所需的虚拟设备插件,在使用前需要进行设置。

ffmpeg设置

点击菜单 动作设置 - 屏幕记录 - 屏幕录制选项

FFmegp路径选项框右侧有下载按钮,点击即可自动下载。

自动下载速度较慢,可以自行下载相关程序,官网地址

windows直接下载地址

https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z

下载并解压后把ffmpeg.exe路径填写在框里。

安装虚拟设备

要录制屏幕和音频,需要安装虚拟设备,可以通过sharex自动安装。

还是在屏幕录制选项页面 动作设置 - 屏幕记录 - 屏幕录制选项

点击“安装捕捉器”按钮,点击下一步安装程序即可。

安装完成后,在设备中选择新安装的虚拟设备。

这样就设置完成了。

屏幕录制

  1. 快捷动作-屏幕录制 或按快捷键 Shift + Print Screen
  2. 选择需要录制的范围,此时录制开始
  3. 录制开始后有个选择框,下面有停止按钮,点击停止结束录制。
  4. 录制完成后文件会自动出现在程序主界面。

要在windows下开发*nux程序,vagrant是一个不可多得的好帮手,随时可以创建一个指定的虚拟机环境,还可以把虚拟机环境配置提交到版本控制库,跟伙伴一起分享。

一般vagrant会把当前文件夹映射到虚拟机中的/vagrant,实时共享。所以我们通常在宿主机修改源代码文件,在客户机中编译和运行。

但是要开发ansible,默认的vagrant配置就无法满足需求了。因为ansible出于安全考虑,在运行时会检查脚本文件夹权限,如果有公共写权限的话就会忽略当前文件夹下的ansible.cfg运行。

[WARNING] Ansible is being run in a world writable directory (/vagrant), ignoring it as an ansible.cfg source. For more information see https://docs.ansible.com/ansible/devel/reference_appendices/config.html#cfg-in-world-writable-dir

由于vagrant使用的使虚拟机的目录映射功能,在这个模式下无法通过chmod修改文件夹的读写权限。

那么是不是就没办法了呢,非也,下面虾哥就分享一下如何能既确保源码文件夹符合ansible权限要求,又能在宿主机和客户机之间共享文件。

解决方法

vagrant默认共享其实等价于隐式声明了一个sync_folder指令

config.vm.synced_folder ".", "/vagrant"

synced_folder还有可以配置一些参数,比如type参数配置同步方式。
Type可以使用nfs,SMB模式,考虑到跨平台兼容性和,这里虾哥建议使用RSync模式。

Rsync安装

rsync 是linux下的一个用于文件同步的协议及相应的实现。

windows可以安装cwrsync

安装方法很简单,解压后把bin目录添加到path环境变量即可。

Vagrantfile写法

config.vm.synced_folder ".", "/vagrant", owner: "root", group: "root", type: "rsync"

这里本地路径,客户机内的共享路径,owner,group都可以根据自己的需要配置。

手动同步/自动同步

type=rsync的文件夹,默认情况下vagrant只会在启动的时候同步一次,之后的修改就不会同步了。

需要手动同步的话可以运行

vagrant rsync

还可以自动实时监听并同步

vagrant rsync-auto

反向同步

rsync时客户机内的修改无法反映到宿主机的文件夹,即只能实现单向同步,要从客户机向宿主机同步的话可以通过一个插件实现。

安装插件

vagrant plugin install vagrant-rsync-back

手动反向同步

vagrant rsync-back

参考

vagrant 可以通过命令行快速创建和销毁虚拟机,在多项目环境下进行开发工作时可以提高效率。

但是国内网络连接某些包分发源,例如pip, redhat yum, docker时速度非常慢,在每个虚拟机中都配置一遍又很麻烦,有没有什么好办法能自动配置虚拟机里面的代理呢。

今天虾哥就带大家了解一下 vagrant-proxyconf这款vagrant插件。

一、安装

通过vagrant命令行安装

vagrant plugin install vagrant-proxyconf

二、配置

插件可以在Vagrantfile中配置代理服务器地址,也可以使用环境变量,由于我在不同环境都用相同的代理,因此就使用环境变量统一配置了。

环境变量有4个

  • VAGRANT_HTTP_PROXY
    :http请求的代理
  • VAGRANT_HTTPS_PROXY
    : https请求的代理
  • VAGRANT_FTP_PROXY
    : ftp请求的代理
  • VAGRANT_NO_PROXY
    : 哪些请求不使用代理

Windows上配置方法为 在start窗口输入 “编辑系统环境变量” -> “环境变量” -> xx的用户环境变量 -> 新建。输入变量名和变量的值。

虾哥自己的配置是
VAGRANT_HTTP_PROXY=http://10.10.2.40:58591
VAGRANT_HTTPS_PROXY=http://10.10.2.40:58591
VAGRANT_FTP_PROXY=socks5://10.10.2.40:51837
VAGRANT_NO_PROXY=192.168.0.0/16,127.0.0.1,10.10.0.0/16

三、使用

配置完成后,新创建的vagrant虚拟机就会自动配置http_proxy和https_proxy变量,yum和pip会自动生效。

对于已经在使用的虚拟机,只需要运行vagrant reload,重启时插件就会自动配置代理,重启完成后就可以正常使用了。

docker 代理配置

docker 代理的配置是存储于独立文件的,如果虚拟机启动时还没有安装docker,则不会自动生成docker配置。要在安装docker之后自动配置docker代理配置,需要额外操作。

  1. 安装docker包后,创建docker组,并将root加入docker组
    1
    2
    groupadd docker
    usermod -aG docker root
  2. 可选,配置docker 自动启动 systemctl --now enable docker
  3. 重新启动vagrant,插件自动添加配置

参考

https://docs.docker.com/engine/install/linux-postinstall/

Git-bash 是windows上常用的可以使用*nux命令行,随git一起安装,可谓居家旅行必备。

Windows Terminal是windows 10上的一款开源的多终端窗口,可以在一个窗口中打开不同终端,对于使用终端频繁的同学来说可以节省在多个窗口中来回跳转的不便。

如果能在Windows Terminal中打开Git Bash岂不是所有命令行都能放在一起了。下面虾哥就教你如何配置在Windows Terminal标签栏中开启Bash

1 打开Windows Terminal
2 按下 Ctrl + , 打开程序配置(程序配置是json格式文本文件,会在文本编辑器中打开)
3 找到 profiles > list, list下面是个列表,在列表中加入选项,注意json格式,列表最后一项后面没有逗号,下面代码里包含了安装在%USERPROFILE%下的配置,可以根据自身情况修改注释信息。

1
2
3
4
5
6
7
8
9
10
11
{
"guid": "{00000000-0000-0000-ba54-000000000002}",
"commandline": "%PROGRAMFILES%/git/usr/bin/bash.exe -i -l",
// "commandline": "%USERPROFILE%/AppData/Local/Programs/Git/bin/bash.exe -l -i",
// "commandline": "%USERPROFILE%/scoop/apps/git/current/usr/bin/bash.exe -l -i",
"icon": "%PROGRAMFILES%/Git/mingw64/share/git/git-for-windows.ico",
// "icon": "%USERPROFILE%/AppData/Local/Programs/Git/mingw64/share/git/git-for-windows.ico",
// "icon": "%USERPROFILE%/apps/git/current/usr/share/git/git-for-windows.ico",
"name" : "Bash",
"startingDirectory" : "%USERPROFILE%",
},

参考

https://stackoverflow.com/questions/56839307/adding-git-bash-to-the-new-windows-terminal

python logging 是一套灵活强大的日志输出框架,具体功能在这里就不赘述了。本文通过gist上的一份共享代码介绍在各种不同使用场景下如何复用一套logging配置入口来高效地使用python logging.

源码

原版代码受这位仁兄启发, 做了一些改进,修改后的代码

源代码分两部分,一部分是python入口,log.py可以放置在脚本文件夹下,也可以放置在包中。

import os
import yaml
import logging.config
import logging
import coloredlogs

def setup_logging(default_path='logging.yaml', default_level=logging.INFO, env_key='LOG_CFG'):
    """
    | **@author:** Prathyush SP
    | Logging Setup
    """
    path = default_path
    value = os.getenv(env_key, None)
    if value:
        path = value
    if os.path.exists(path):
        with open(path, 'rt') as f:
            try:
                config = yaml.safe_load(f.read())
                logging.config.dictConfig(config)
                coloredlogs.install()
            except Exception as e:
                print(e)
                print('Error in Logging Configuration. Using default configs')
                logging.basicConfig(level=default_level)
                coloredlogs.install(level=default_level)
    else:
        logging.basicConfig(level=default_level)
        coloredlogs.install(level=default_level)
        print('Failed to load configuration file. Using default configs')

配置文件可以在部署时按需修改,解耦了日志编写逻辑和日志输出方式。配置文件用yaml格式编写,具有较好的可读性,对编辑也较为友好。默认从当前运行文件夹下加载logging.yaml。注意使用yaml需要安装额外的包pyaml

version: 1
disable_existing_loggers: false

formatters:
    standard:
        format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    error:
        format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s(): %(message)s"

handlers:
    console:
        class: logging.StreamHandler
        level: DEBUG
        formatter: standard
        stream: ext://sys.stdout

    info_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: INFO
        formatter: standard
        filename: /tmp/info.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

    error_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: ERROR
        formatter: error
        filename: /tmp/errors.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

    debug_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: DEBUG
        formatter: standard
        filename: /tmp/debug.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

    critical_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: CRITICAL
        formatter: standard
        filename: /tmp/critical.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

    warn_file_handler:
        class: logging.handlers.RotatingFileHandler
        level: WARN
        formatter: standard
        filename: /tmp/warn.log
        maxBytes: 10485760 # 10MB
        backupCount: 20
        encoding: utf8

root:
    level: NOTSET
    handlers: [console]
    propogate: yes

loggers:
    <module>:
        level: INFO
        handlers: [console, info_file_handler, error_file_handler, critical_file_handler, debug_file_handler, warn_file_handler]
        propogate: no

    <module.x>:
        level: DEBUG
        handlers: [info_file_handler, error_file_handler, critical_file_handler, debug_file_handler, warn_file_handler]
        propogate: yes

说明

disable_existing_loggers

由于logger在程序包import时就会实例化,可能会早于对logging模块配置的时间,启用此选项可能导致config之前实例的logger没有输出操作。

propogate

可以根据需要自行编写logger,python logging logger采用包名完全限定名的全部或部分匹配,如果当前包名没有指定的logger,怎会寻找上一级包名对应的logger。logger配置上有一个propogate参数可以控制logEntry是否继续向下传送到上级包对应的logger,直至root。

使用

import logging
from log import setup_logging

logger = logging.getLogger(__name__)

def main():
    setup_logging()
    logger.info('hello world')

命令行调试

python -c "from log import setup_logging; setup_logging(); import logging; logger = logging.getLogger(); logger.info('abc'); logger.error('error')" >info.txt 2>err.txt

参考资料

https://docs.python.org/3/library/logging.config.html#module-logging.config