-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHurst.py
64 lines (57 loc) · 1.95 KB
/
Hurst.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# -*- coding: utf-8 -*-
# Reference: https://en.wikipedia.org/wiki/Hurst_exponent
# python 3.6.2 AMD64
# 2018/4/19
# Calculate Hurst exponent based on Rescaled range (R/S) analysis
# How to use (example):
# import Hurst
# ts = list(range(50))
# hurst = Hurst.hurst(ts)
# Tip: ts has to be object list(n_samples,) or np.array(n_samples,)
__Author__ = "Blank Seraph"
import numpy as np
import pandas as pd
import csv
def hurst(ts):
N = len(ts)
print(N)
if N < 20:
raise ValueError("Time series is too short! input series ought to have at least 20 samples!")
max_k = int(np.floor(N/2))
R_S_dict = []
for k in range(10,max_k+1):
R,S = 0,0
# split ts into subsets
subset_list = [ts[i:i+k] for i in range(0,N,k)]
if np.mod(N,k)>0:
subset_list.pop()
#tail = subset_list.pop()
#subset_list[-1].extend(tail)
# calc mean of every subset
mean_list=[np.mean(x) for x in subset_list]
for i in range(len(subset_list)):
cumsum_list = pd.Series(subset_list[i]-mean_list[i]).cumsum()
R += max(cumsum_list)-min(cumsum_list)
S += np.std(subset_list[i])
R_S_dict.append({"R":R/len(subset_list),"S":S/len(subset_list),"n":k})
log_R_S = []
log_n = []
# print(R_S_dict)
for i in range(len(R_S_dict)):
R_S = (R_S_dict[i]["R"]+np.spacing(1)) / (R_S_dict[i]["S"]+np.spacing(1))
log_R_S.append(np.log(R_S))
log_n.append(np.log(R_S_dict[i]["n"]))
Hurst_exponent = np.polyfit(log_n,log_R_S,1)[0]
print(Hurst_exponent)
return Hurst_exponent
if __name__ == '__main__':
ts = list()
with open('C:/Users/13760/Desktop/hurst.csv', mode='r', encoding='utf-8') as infile:
read = csv.reader(infile)
for line in read:
ts.append(line[5])
# print(ts)
N = len(ts)
ts = np.array(ts)
ts = ts.astype(np.float)
hurst(ts)