A Python snippet to follow votes of specific curators

in #python5 years ago
import logging
import os.path
import threading
import time
logging.basicConfig(level=logging.INFO)
from lighthive.datastructures import Operation
from lighthive.client import Client


logger = logging.getLogger(__name__)


class TransactionListener:

    def __init__(self, client, blockchain_mode="head",
                 start_block=None, end_block=None, rules=None,
                 voter_acc=None):
        self.client = client
        self.blockchain_mode = blockchain_mode or "irreversible"
        self.start_block = start_block
        self.end_block = end_block
        self.state_file = "block_state.txt"
        self.rules = rules
        self.voter_acc = voter_acc

    def get_last_block_height(self):
        props = self.client.get_dynamic_global_properties()
        if self.blockchain_mode == "irreversible":
            return props['last_irreversible_block_num']
        elif self.blockchain_mode == "head":
            return props['head_block_number']
        else:
            raise ValueError(
                "Invalid blockchain mode. It can be irreversible or head.")

    def save_state(self, block_num):
        logger.info("Saving state (block num): %s", block_num)
        f = open(self.state_file, "w+")
        f.write(str(block_num))
        f.close()

    def get_state(self):
        if not os.path.exists(self.state_file):
            return None
        f = open(self.state_file, "r")
        state = f.read()
        if not state:
            return None
        logger.info("Read state: (block num): %s", state)
        f.close()
        return int(state)

    def get_block(self, block_num):
        logger.info("Getting block: %s", block_num)
        block_data = self.client.get_block(block_num)
        self.save_state(block_num)
        return block_data

    def process_block(self, block_data):
        for transaction in block_data.get("transactions", []):
            for op_type, op_value in transaction.get("operations"):
                if op_type != "vote":
                    continue

                if op_value.get("voter") not in self.rules.keys():
                    continue

                t = threading.Thread(
                    target=self.vote,
                    args=(
                        op_value, ),
                    kwargs={},
                )
                t.start()

    def vote(self, op_value, retry_count=None):
        if not retry_count:
            retry_count = 0

        if retry_count > 5:
            print("backing off for", op_value)
            return
        if op_value["author"] == self.voter_acc:
            return
        weight = self.rules.get(op_value["voter"]).get("weight")
        op = Operation('vote', {
            "voter": self.voter_acc,
            "author": op_value["author"],
            "permlink": op_value["permlink"],
            "weight": round(op_value["weight"] * weight)
        })
        print("Broadcasting", op)
        try:
            self.client.broadcast(op)
        except Exception as e:
            print(e)
            return self.vote(op_value, retry_count=retry_count + 1)

    def listen(self):
        current_block = self.start_block
        if not current_block:
            state = self.get_state()
            if not state:
                logger.info(
                    "Getting current head  (block num) as a starting point.")
                current_block = self.get_last_block_height()
            else:
                logger.info("Continue from the last state (block num): %s",
                            state)
                current_block = state
        while True:
            while (self.get_last_block_height() - current_block) > 0:
                if self.end_block and current_block > self.end_block:
                    return
                else:
                    yield self.get_block(current_block)

                current_block += 1

            time.sleep(3)


if __name__ == '__main__':

    c = Client(keys=["posting_key"])
    rules = {
        "gerber": {"weight": 1},
        "emrebeyler": {"weight": 0.1},
    }

    tx_listener = TransactionListener(c, voter_acc="emrebeyler", rules=rules)
    for block_data in tx_listener.listen():
        tx_listener.process_block(block_data)

A simple and stupid script to follow votes of a set of curators and vote the same content. It doesn't have enhanced features but it works. In general, after six seconds it votes the voted content.

What is the difference w/ Steemauto? Not really a difference. However, if you follow a curator that a huge follower list, this might let you cast your vote before the others. (as least most of the others)

Requires lighthive library. To run it and configure, check after the if __name__ == '__main__': part.


    rules = {
        "gerber": {"weight": 1},
        "emrebeyler": {"weight": 0.1},
    }

This configuration means, the executor of the script will follow gerber with the same percent votes, but will follow emrebeyler with the ten times less percent. (E.g: emrebeyler votes %40, follower votes %4)