Skip to content

Commit

Permalink
Merge pull request #8 from money-on-chain/fix_gasPriceTooHighEventually
Browse files Browse the repository at this point in the history
Fix gas price too high eventually
  • Loading branch information
mulonemartin authored Apr 6, 2022
2 parents 293ef1f + 0393f76 commit da68c88
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 32 deletions.
1 change: 1 addition & 0 deletions servers/Changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
1.3.6.5: Fix when the gas price comes 20 gwei from rsk node
1.3.6.4: minor updates in doc and requirements
1.3.6.3: add endpoints for node version and information
1.3.6.2: Improve error responses when a sign request is rejected by giving different error codes and messages. check: https://coinfabrik.teamwork.com/#/tasks/21595423
Expand Down
71 changes: 65 additions & 6 deletions servers/common/services/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import traceback
import typing

from common import settings
from decorator import decorator
from eth_typing import Primitives, HexStr
from pydantic import AnyHttpUrl
from starlette.datastructures import Secret
from web3 import Web3, HTTPProvider
from web3.exceptions import TransactionNotFound

from common.bg_task_executor import BgTaskExecutor
from common.helpers import dt_now_at_utc

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -100,6 +102,58 @@ def validate(cls, v) -> str:
return Web3.eth.account.from_key(v).key


class BlockchainStateLoop(BgTaskExecutor):
def __init__(self, conf):
logger.debug('initializing BlockchainStateLoop')
self.conf = conf
self.gas_calc = GasCalculator()
super().__init__(name="BlockchainStateLoop", main=self.run)

async def run(self):
logger.info("BlockchainStateLoop loop start")
await self.gas_calc.update()
logger.info("BlockchainStateLoop loop done")
return self.conf.ORACLE_BLOCKCHAIN_STATE_DELAY


class GasCalculator:

def __init__(self):
self.last_price = None
self.default_gas_price = settings.DEFAULT_GAS_PRICE
self.gas_percentage_admitted = settings.GAS_PERCENTAGE_ADMITTED
self.W3 = Web3(HTTPProvider(str(settings.NODE_URL),
request_kwargs={'timeout': settings.WEB3_TIMEOUT}))
def set_last_price(self, gas_price):
self.last_price = gas_price

def get_last_price(self):
if self.last_price is None:
return self.default_gas_price
return int(self.get_gas_price_plus_x_perc(self.last_price))

def get_gas_price_plus_x_perc(self, gas_price):
return gas_price + gas_price * (self.gas_percentage_admitted / 100)

def is_gas_price_out_of_range(self, gas_price):
if gas_price > self.get_last_price():
return True
return False

@exec_with_catch_async
async def get_current(self):
gas_price = await run_in_executor(lambda: self.W3.eth.gasPrice)
if gas_price is None:
gas_price = self.get_last_price() if self.get_last_price() is not None else self.default_gas_price
if self.is_gas_price_out_of_range(gas_price):
gas_price = self.get_last_price()
self.set_last_price(gas_price)
return gas_price

async def update(self):
await self.get_current()


class BlockChain:
def __init__(self, node_url, chain_id, timeout):
self.chain_id = chain_id
Expand Down Expand Up @@ -131,11 +185,14 @@ async def get_last_block_data(self):
@exec_with_catch_async
async def get_block_by_number(self, block_number, full=False):
return await run_in_executor(lambda: self.W3.eth.getBlock(block_number, full))

async def get_tx(self, method, account_addr: str):

async def get_tx(self, method, account_addr: str, gas_price):
logger.debug(f"+++++++++ get tx ++++++++ {str(account_addr)} - {method}")
from_addr = parse_addr(str(account_addr))
gas_price = await run_in_executor(lambda: self.W3.eth.gasPrice)
nonce = await run_in_executor(lambda: self.W3.eth.getTransactionCount(from_addr))

nonce = await run_in_executor(lambda: self.W3.eth.getTransactionCount(
from_addr))
logger.debug(f"Nonce: {nonce} sender: {from_addr}")
try:
logger.debug("GAS: %r" % gas_price)
gas = await run_in_executor(lambda: method.estimateGas({'from': from_addr,
Expand Down Expand Up @@ -237,13 +294,15 @@ async def bc_call(self, method, *args, account: BlockchainAccount = None, **kw):
{'from': account} if account else {}))

@exec_with_catch_async
async def bc_execute(self, method, *args, account: BlockchainAccount = None, wait=False, **kw):
async def bc_execute(self, method, *args, account: BlockchainAccount = None, wait=False, last_gas_price=None, **kw):
if not account:
raise Exception("Missing key, cant execute")
method_func = self._contract.functions[method](*args, **kw)
tx = await self._blockchain.get_tx(method_func, str(account.addr))
tx = await self._blockchain.get_tx(method_func, str(account.addr), gas_price=last_gas_price)
txn = tx["tx"]
signed_txn = self._blockchain.sign_transaction(txn, private_key=Web3.toBytes(hexstr=str(account.key)))
logger.debug("%s SENDING SIGNED TX %r", tx["txdata"]["chainId"], signed_txn)
logger.debug(f"--+Blockchain ID {id(self._blockchain)}")
logger.debug("--+Nonce %s", tx["txdata"]["nonce"])
tx = await self._blockchain.send_raw_transaction(signed_txn.rawTransaction)
return await self._blockchain.process_tx(tx, wait)
13 changes: 8 additions & 5 deletions servers/common/services/coin_pair_price_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def addr(self):
async def coin_pair_price_call(self, method, *args, account: BlockchainAccount = None, **kw):
return await self._contract.bc_call(method, *args, account=account, **kw)

async def coin_pair_price_execute(self, method, *args, account: BlockchainAccount = None, wait=False, **kw):
return await self._contract.bc_execute(method, *args, account=account, wait=wait, **kw)
async def coin_pair_price_execute(self, method, *args, account: BlockchainAccount = None, wait=False,
last_gas_price=None, **kw):
return await self._contract.bc_execute(method, *args, account=account, wait=wait,
last_gas_price=last_gas_price, **kw)

async def get_valid_price_period_in_blocks(self):
return await self.coin_pair_price_call("getValidPricePeriodInBlocks")
Expand All @@ -49,16 +51,17 @@ async def publish_price(self,
blocknumber,
signatures: List[HexBytes],
account: BlockchainAccount = None,
wait=False):
wait=False, last_gas_price=None):
v, r, s = [], [], []
for signature in signatures:
v.append(int.from_bytes(hb_to_bytes(signature[64:]), "little"))
r.append(hb_to_bytes(signature[:32]))
s.append(hb_to_bytes(signature[32:64]))

logger.debug(f"OCS-----> {last_gas_price}")
ret = await self.coin_pair_price_execute("publishPrice", version,
coin_pair.longer(), price, oracle_addr,
blocknumber, v, r, s, account=account, wait=wait)
blocknumber, v, r, s, account=account, wait=wait,
last_gas_price=last_gas_price)
self.last_pub_at = dt_now_at_utc()
return ret

Expand Down
9 changes: 5 additions & 4 deletions servers/common/services/supporters_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ def __init__(self, contract: BlockChainContract):
async def supporters_call(self, method, *args, **kw):
return await self._contract.bc_call(method, *args, **kw)

async def supporters_execute(self, method, *args, account: BlockchainAccount = None, wait=False, **kw):
return await self._contract.bc_execute(method, *args, account=account, wait=wait, **kw)
async def supporters_execute(self, method, *args, account: BlockchainAccount = None, wait=False,
last_gas_price=None, **kw):
return await self._contract.bc_execute(method, *args, account=account, wait=wait,last_gas_price=last_gas_price, **kw)

async def get_token_addr(self):
return await self.supporters_call("mocToken")

async def is_ready_to_distribute(self) -> bool:
return await self.supporters_call("isReadyToDistribute")

async def distribute(self, account: BlockchainAccount = None, wait=False):
return await self.supporters_execute("distribute", account=account, wait=wait)
async def distribute(self, account: BlockchainAccount = None, wait=False, last_gas_price=None):
return await self.supporters_execute("distribute", account=account, wait=wait, last_gas_price=last_gas_price)

async def get_total_tokens(self):
return await self.supporters_call("totalToken")
Expand Down
8 changes: 7 additions & 1 deletion servers/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@
# Print stack trace of errors, used for development
ON_ERROR_PRINT_STACK_TRACE = config('ON_ERROR_PRINT_STACK_TRACE', cast=bool, default=False)
# Swagger app version
VERSION = "1.3.6.4"
VERSION = "1.3.6.5"

# This four are for the gas_price fix. Sometime the gas_price reaches 20Gwei
# Used the first time if the gas price exceeds the admitted
DEFAULT_GAS_PRICE = config('DEFAULT_GAS_PRICE', cast=int, default=65000000)
# The percentage that is consider to be admitted
GAS_PERCENTAGE_ADMITTED = config('GAS_PERCENTAGE_ADMITTED', cast=int, default=10)
3 changes: 3 additions & 0 deletions servers/oracle/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_error_msg(msg=None):

@app.middleware("http")
async def filter_ips_by_selected_oracles(request: Request, call_next):
logger.debug(f"ORACLE_RUN_IP_FILTER: {oracle_settings.ORACLE_RUN_IP_FILTER}")
if not oracle_settings.ORACLE_RUN_IP_FILTER:
return await call_next(request)
try:
Expand Down Expand Up @@ -122,8 +123,10 @@ async def sign(*, version: str = Form(...),
if not validation_data:
raise ValidationFailure("Missing coin pair", coin_pair)

logger.debug("Before")
logger.debug("Sign: %r" % (params,))
message, my_signature = validation_data.validate_and_sign(signature)
logger.debug(f"Sign After: {message} {my_signature}")
return {
"message": message,
"signature": my_signature.hex()
Expand Down
9 changes: 7 additions & 2 deletions servers/oracle/src/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from common import settings
from common.bg_task_executor import BgTaskExecutor
from common.services.blockchain import BlockchainStateLoop
from common.services.contract_factory_service import ContractFactoryService
from oracle.src import oracle_settings, monitor
from oracle.src.ip_filter_loop import IpFilterLoop
Expand All @@ -22,6 +23,7 @@ def __init__(self):
self.tasks: List[BgTaskExecutor] = []
self.initialized = False
self.oracle_loop: OracleLoop = None
self.bs_loop: BlockchainStateLoop = None
self.ip_filter_loop: IpFilterLoop = None
super().__init__(name="MainLoop", main=self.run)

Expand Down Expand Up @@ -50,8 +52,10 @@ async def run(self):

def _startup(self):
oracle_service = OracleService(self.cf, self.conf.ORACLE_MANAGER_ADDR, self.conf.INFO_ADDR)
self.oracle_loop = OracleLoop(self.conf, oracle_service)
self.bs_loop = BlockchainStateLoop(self.conf)
self.oracle_loop = OracleLoop(self.conf, oracle_service, self.bs_loop)
self.tasks.append(self.oracle_loop)
self.tasks.append(self.bs_loop)
if oracle_settings.ORACLE_RUN_IP_FILTER:
self.ip_filter_loop = IpFilterLoop(self.oracle_loop, self.conf)
self.tasks.append(self.ip_filter_loop)
Expand All @@ -60,8 +64,9 @@ def _startup(self):
self.tasks.append(monitor.MonitorTask(self.cf.get_blockchain(), oracle_service))
if oracle_settings.SCHEDULER_RUN_SUPPORTERS_SCHEDULER:
supporters_service = self.cf.get_supporters(self.conf.SUPPORTERS_ADDR)
self.tasks.append(SchedulerSupportersLoop(self.conf, supporters_service))
self.tasks.append(SchedulerSupportersLoop(self.conf, supporters_service,self.bs_loop))
for t in self.tasks:
logger.debug(f'****** task {t}')
t.start_bg_task()

def _print_info(self):
Expand Down
3 changes: 2 additions & 1 deletion servers/oracle/src/oracle_blockchain_info_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from common import settings
from common.bg_task_executor import BgTaskExecutor
from common.services.blockchain import is_error
from common.services.blockchain import is_error
from common.services.oracle_dao import OracleBlockchainInfo
from oracle.src.oracle_coin_pair_service import OracleCoinPairService
from oracle.src.oracle_configuration import OracleConfiguration
Expand All @@ -21,6 +21,7 @@ def __init__(self, conf: OracleConfiguration, cps: OracleCoinPairService):
self._blockchain_info: OracleBlockchainInfo = None
self.last_update = None
self.update_lock = asyncio.Lock()

super().__init__(name="OracleBlockchainInfoLoop", main=self.run)

async def run(self):
Expand Down
15 changes: 12 additions & 3 deletions servers/oracle/src/oracle_coin_pair_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from common import crypto, settings, helpers
from common.bg_task_executor import BgTaskExecutor
from common.crypto import verify_signature
from common.services.blockchain import is_error
from common.services.blockchain import is_error, BlockchainStateLoop
from oracle.src import monitor, oracle_settings
from oracle.src.oracle_blockchain_info_loop import OracleBlockchainInfoLoop
from oracle.src.oracle_coin_pair_service import OracleCoinPairService, FullOracleRoundInfo
Expand All @@ -32,18 +32,23 @@ class OracleCoinPairLoop(BgTaskExecutor):
def __init__(self, conf: OracleConfiguration,
cps: OracleCoinPairService,
price_feeder_loop: PriceFeederLoop,
vi_loop: OracleBlockchainInfoLoop):
vi_loop: OracleBlockchainInfoLoop,
bs_loop: BlockchainStateLoop,
):
self.bs_loop = bs_loop
self._conf = conf
self._oracle_addr = oracle_settings.get_oracle_account().addr
self._cps = cps
self._coin_pair = cps.coin_pair
self._oracle_turn = OracleTurn(self._conf, cps.coin_pair)
self._price_feeder_loop = price_feeder_loop
self.vi_loop = vi_loop

super().__init__(name="OracleCoinPairLoop-%s" % self._coin_pair, main=self.run)

async def run(self):
logger.info("%r : OracleCoinPairLoop start" % self._coin_pair)
logger.debug(f"--+OracleCoinPairLoop ID {id(self)}")
round_info = await self._cps.get_round_info()
if is_error(round_info):
logger.error("%r : OracleCoinPairLoop ERROR getting round info %r" % (self._coin_pair, round_info))
Expand Down Expand Up @@ -117,7 +122,8 @@ async def publish(self, oracles, params: PublishPriceParams):
params.last_pub_block,
sigs,
account=oracle_settings.get_oracle_account(),
wait=True)
wait=True,
last_gas_price=await self.bs_loop.gas_calc.get_current())
if is_error(tx):
logger.info("%r : OracleCoinPairLoop %r ERROR PUBLISHING %r" % (self._coin_pair, self._oracle_addr, tx))
return False
Expand Down Expand Up @@ -176,6 +182,9 @@ async def get_signature(oracle: FullOracleRoundInfo, params: PublishPriceParams,
"oracle_addr": params.oracle_addr,
"last_pub_block": str(params.last_pub_block),
"signature": my_signature.hex()}
logger.debug(f"sign DATA {post_data}")
logger.debug(f"sign target uri {target_uri}")

raise_for_status = True
if settings.DEBUG:
raise_for_status = False
Expand Down
5 changes: 3 additions & 2 deletions servers/oracle/src/oracle_coin_pair_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ async def publish_price(self,
blocknumber,
signatures,
account: BlockchainAccount = None,
wait=False):
wait=False,
last_gas_price=None):
return await self._coin_pair_service.publish_price(version, coin_pair, price,
oracle_addr, blocknumber, signatures,
account=account, wait=wait)
account=account, wait=wait, last_gas_price=last_gas_price)

async def get_coin_pair(self) -> str:
return await self._coin_pair_service.get_coin_pair()
Expand Down
7 changes: 7 additions & 0 deletions servers/oracle/src/oracle_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ def __init__(self, cf: ContractFactoryService):
"description": "The maximal time difference (in seconds) a price in queue is considered when generating a price for validation.",
"default": 30
},
"ORACLE_BLOCKCHAIN_STATE_DELAY": {
"priority": self.Order.configuration_default_blockchain,
"configuration": lambda: parseTimeDelta(config('ORACLE_BLOCKCHAIN_STATE_DELAY', cast=str)),
"blockchain": lambda p: self._eternal_storage_service.get_uint(p),
"description": "Delay in which the gas price is gathered",
"default": 60
}
}
self.from_conf = set()
self.from_default = set()
Expand Down
9 changes: 6 additions & 3 deletions servers/oracle/src/oracle_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import typing

from common.bg_task_executor import BgTaskExecutor
from common.services.blockchain import is_error
from common.services.blockchain import is_error, BlockchainStateLoop
from oracle.src import oracle_settings
from oracle.src.oracle_blockchain_info_loop import OracleBlockchainInfoLoop, OracleBlockchainInfo
from oracle.src.oracle_coin_pair_loop import OracleCoinPairLoop
Expand All @@ -29,7 +29,9 @@

class OracleLoop(BgTaskExecutor):

def __init__(self, conf: OracleConfiguration, oracle_service: OracleService):
def __init__(self, conf: OracleConfiguration, oracle_service: OracleService,
bs_loop: BlockchainStateLoop):
self.bs_loop = bs_loop
self.conf = conf
self.oracle_addr = oracle_settings.get_oracle_account().addr
self.oracle_service = oracle_service
Expand All @@ -54,7 +56,8 @@ def add_coin_pair(self, cp_service):
if oracle_settings.ORACLE_RUN:
pf_loop = PriceFeederLoop(self.conf, cp_service.coin_pair)
bl_loop = OracleBlockchainInfoLoop(self.conf, cp_service)
cp_loop = OracleCoinPairLoop(self.conf, cp_service, pf_loop, bl_loop)
cp_loop = OracleCoinPairLoop(self.conf, cp_service, pf_loop, bl_loop,
self.bs_loop)
tasks.extend([pf_loop, bl_loop, cp_loop])
self.cpMap[cp_key] = OracleLoopTasks(cp_service, tasks,
cp_loop, pf_loop, bl_loop,
Expand Down
2 changes: 1 addition & 1 deletion servers/oracle/src/oracle_publish_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ def prepare_price_msg(self):
# encBlockNum = enc_uint256(blocknr)
# header = bytes(MSG_HEADER, "ascii").hex()
full_msg = (''.join(f(x) for f, x in zip(fs, parameters))) ## header +
logger.debug(logging.INFO, "msg: " + full_msg)
logger.debug( "msg: " + full_msg)
return full_msg
2 changes: 2 additions & 0 deletions servers/oracle/src/oracle_turn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def _is_oracle_turn_with_msg(self,
start_block_pub_period_before_price_expires = vi.last_pub_block + \
vi.valid_price_period_in_blocks - \
conf.trigger_valid_publication_blocks
logger.debug(f"block_num {vi.block_num} start_block_pub_period_before_price_expires {start_block_pub_period_before_price_expires} "
f"vi.valid_price_period_in_blocks {vi.valid_price_period_in_blocks}")
if vi.block_num >= start_block_pub_period_before_price_expires:
can_I_publish = self.can_oracle_publish(vi.block_num - start_block_pub_period_before_price_expires,
oracle_addr, oracle_addresses, entering_fallback_sequence)
Expand Down
Loading

0 comments on commit da68c88

Please sign in to comment.