initial commit

This commit is contained in:
2026-04-23 15:53:04 +09:00
commit 5688cfe2a2
75 changed files with 2543 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
__version__ = '1.0.1.0'
from .api_factory import APIFactory, APIProvider
from .model import Price, OrderBook, Hoga, Hold, Account, Order, Trade, Stock, Index, TRADE_FLAG, ORDER_FLAG
__all__ = ['Price', 'OrderBook', 'Hoga', 'Hold', 'Account', 'Order', 'Trade', 'Stock', 'Index', 'TRADE_FLAG', 'ORDER_FLAG', 'APIFactory', 'APIProvider', 'create_api']
def print_version_info():
print(f"The version of this stock finance API is {__version__}.")
def create_api(api_provider: APIProvider):
return APIFactory.create_api(api_provider)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+37
View File
@@ -0,0 +1,37 @@
from enum import Enum
class APIProvider(Enum):
EBEST = "EBEST"
LS = "LS"
LSV = "LSV"
KIS = "KIS"
KISV = "KISV"
KIWOOM = "KIWOOM"
KIWOOMV = "KIWOOMV"
class APIFactory:
@staticmethod
def create_api(api_provider: APIProvider):
if api_provider == APIProvider.EBEST:
from .ebest import EBest
return EBest()
elif api_provider == APIProvider.LS:
from .ls import LS
return LS()
elif api_provider == APIProvider.LSV:
from .ls import LSV
return LSV()
elif api_provider == APIProvider.KIS:
from .kis import Kis
return Kis()
elif api_provider == APIProvider.KISV:
from .kis import KisV
return KisV()
elif api_provider == APIProvider.KIWOOM:
from .kiwoom import Kiwoom
return Kiwoom()
elif api_provider == APIProvider.KIWOOMV:
from .kiwoom import KiwoomV
return KiwoomV()
else:
raise ValueError("Unsupported API provider")
+1
View File
@@ -0,0 +1 @@
from .api import *
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+107
View File
@@ -0,0 +1,107 @@
import requests
import queue
from loguru import logger
from finestock.path import _API_PATH_
from .api_interface import BaseProvider
class API(BaseProvider):
def __init__(self):
self.api_type = type(self).__name__
self.app_secret = None
self.app_key = None
self.access_token = None
self.token_type = None
self.account_num = None
self.account_num_sub = None
self.headers = {
"Content-Type": "application/json",
"Accept": "text/plain",
"charset": "UTF-8"
}
self.headers_rt = {}
self.ws = None
self.queue = None
self._init_path()
def __del__(self):
logger.debug("Destroy API Components")
def _init_path(self):
self.path = _API_PATH_[self.api_type]
for key, value in self.path.items():
setattr(self, key, value)
def set_oauth_info(self, app_key, app_secret):
self.app_key = app_key
self.app_secret = app_secret
self.headers['appkey'] = app_key
self.headers['appsecret'] = app_secret
def set_access_token(self, token):
self.access_token = token
self.token_type = "Bearer"
self.headers['authorization'] = f"Bearer {token}"
def get_access_token(self):
return self.access_token
def set_account_info(self, account_num, account_num_sub):
self.account_num = account_num
self.account_num_sub = account_num_sub
def oauth(self, header=None, data=None):
url = f"{self.DOMAIN}/{self.OAUTH}"
_header = header or {"Content-Type": "application/x-www-form-urlencoded"}
_data = data or {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecretkey": self.app_secret
}
response = requests.post(url, headers=_header, data=_data)
try:
res_json = response.json()
except Exception:
res_json = response.text
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {_header}]\n"
f"[param: {_data}]\n"
f"[response: {res_json}]")
if response.status_code == 200:
res = response.json()
if "access_token" in res:
self.access_token = res['access_token']
if "token_type" in res:
self.token_type = res['token_type']
if (self.access_token is not None) and (self.token_type is not None):
self.headers['authorization'] = f"{self.token_type} {self.access_token}"
return res
else:
# self.print_error(response)
return response.json()
def set_data_queue(self, queue):
"""
Inject a queue for receiving realtime data.
"""
self.queue = queue
def add_data(self, data):
if self.queue is not None:
self.queue.put(data)
def add_price(self, data):
self.add_data(data)
def add_trade(self, data):
self.add_data(data)
def add_orderbook(self, data):
self.add_data(data)
+100
View File
@@ -0,0 +1,100 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional
class AuthenticationProvider(ABC):
@abstractmethod
def oauth(self) -> Any:
raise NotImplementedError
@abstractmethod
def set_oauth_info(self, app_key: str, app_secret: str) -> None:
raise NotImplementedError
@abstractmethod
def set_access_token(self, token: str) -> None:
raise NotImplementedError
class MarketDataProvider(ABC):
@abstractmethod
def get_price(self, code: str) -> Any:
raise NotImplementedError
@abstractmethod
def get_ohlcv(self, code: str, frdate: str, todate: str) -> List[Any]:
raise NotImplementedError
@abstractmethod
def get_ohlcv_min(self, code: str, todate: str = "", exchgubun: str = "K",
cts_date: str = "", cts_time: str = "", tr_cont_key: str = "") -> List[Any]:
raise NotImplementedError
@abstractmethod
def get_index(self, code: str, frdate: str, todate: str) -> List[Any]:
raise NotImplementedError
@abstractmethod
def get_index_min(self, code: str, todate: str, cts_date: str = " ",
cts_time: str = "", tr_cont_key: str = "") -> List[Any]:
raise NotImplementedError
@abstractmethod
def get_orderbook(self, code: str) -> Any:
raise NotImplementedError
class TradingProvider(ABC):
@abstractmethod
def do_order(self, code: str, buy_flag: Any, price: int, qty: int) -> Any:
raise NotImplementedError
@abstractmethod
def do_order_cancel(self, order_num: str, code: str, qty: int) -> Any:
raise NotImplementedError
class RealtimeProvider(ABC):
@abstractmethod
def set_data_queue(self, queue: Any) -> None:
"""
Inject a queue to receive realtime data.
The queue should support a .put(item) method.
"""
raise NotImplementedError
@abstractmethod
async def recv_price(self, code: str, status: bool = True) -> None:
raise NotImplementedError
@abstractmethod
async def recv_index(self, code: str, status: bool = True) -> None:
raise NotImplementedError
@abstractmethod
async def recv_orderbook(self, code: str, status: bool = True) -> None:
raise NotImplementedError
@abstractmethod
async def recv_trade(self, code: str, status: bool = True) -> None:
raise NotImplementedError
class AccountProvider(ABC):
@abstractmethod
def get_balance(self) -> Any:
raise NotImplementedError
@abstractmethod
def get_holds(self) -> List[Any]:
raise NotImplementedError
class InfoProvider(ABC):
@abstractmethod
def get_stock_list(self, mrkt_tp: str = "0") -> List[Any]:
raise NotImplementedError
@abstractmethod
def get_index_list(self) -> List[Any]:
raise NotImplementedError
class BaseProvider(AuthenticationProvider, MarketDataProvider, TradingProvider, RealtimeProvider, AccountProvider, InfoProvider):
"""
Composite interface for backward compatibility or full implementation.
"""
pass
+1
View File
@@ -0,0 +1 @@
from .ebest import EBest
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+6
View File
@@ -0,0 +1,6 @@
from finestock.ls import LS
class EBest(LS):
def __init__(self):
super().__init__()
print("create Ebest Components")
+2
View File
@@ -0,0 +1,2 @@
from .kis import Kis
from .kis_v import KisV
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+206
View File
@@ -0,0 +1,206 @@
import asyncio
from datetime import datetime
import json
import requests
import websockets
import finestock
from finestock.comm import API
class Kis(API):
def __init__(self):
super().__init__()
self.approval_key = None
self.headers_rt = {"custtype": "P", "tr_type": "1", "content-type": "utf-8"}
print("create Kis Components")
def oauth(self, header=None, data=None):
data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret
}
data = json.dumps(data)
return super().oauth(data=data)
def approval(self):
header = self.headers.copy()
data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"secretkey": self.app_secret
}
response = requests.post(f"{self.DOMAIN}/oauth2/Approval", headers=header, data=json.dumps(data))
if response.status_code == 200:
res = response.json()
if "approval_key" in res:
self.approval_key = res['approval_key']
return res
else:
return response.json()
def get_price(self, code):
today = datetime.now().strftime('%Y%m%d')
res = self.get_ohlcv(code, frdate=today, todate=today)
return res[0] if res else None
def get_ohlcv(self, code, frdate=datetime.now().strftime('%Y%m%d'), todate=datetime.now().strftime('%Y%m%d')):
header = self.headers.copy()
header["tr_id"] = "FHKST03010100"
param = {
"fid_cond_mrkt_div_code": "J",
"fid_input_iscd": code,
"fid_input_date_1": frdate,
"fid_input_date_2": todate,
"fid_period_div_code": "D", #D:일봉, W:주봉, M:월봉, Y:년봉,
"fid_org_adj_prc": "0" #0:수정주가, 1: 원주가
}
response = requests.get(f"{self.DOMAIN}/{self.CHART}", headers=header, params=param)
res = response.json()
ohlcvs = []
if res["rt_cd"] == "0":
data = res["output2"]
print(data)
for price in data:
ohlcvs.append(finestock.Price(price["stck_bsop_date"], code, price["stck_clpr"], price["stck_oprc"], price["stck_hgpr"], price["stck_lwpr"], price["stck_clpr"], price["acml_vol"], price["acml_tr_pbmn"]))
return ohlcvs
def get_index(self, code, frdate=datetime.now().strftime('%Y%m%d'), todate=datetime.now().strftime('%Y%m%d')):
header = self.headers.copy()
header["tr_id"] = "FHKUP03500100"
param = {
"fid_cond_mrkt_div_code": "U",
"fid_input_iscd": code,
"fid_input_date_1": frdate,
"fid_input_date_2": todate,
"fid_period_div_code": "D", #D:일봉, W:주봉, M:월봉, Y:년봉,
"fid_org_adj_prc": "0" #0:수정주가, 1: 원주가
}
response = requests.get(f"{self.DOMAIN}/{self.INDEX}", headers=header, params=param)
res = response.json()
ohlcvs = []
if res["rt_cd"] == "0":
data = res["output2"]
for price in data:
ohlcvs.append(finestock.Price(price["stck_bsop_date"], code, price["bstp_nmix_prpr"], price["bstp_nmix_oprc"], price["bstp_nmix_hgpr"], price["bstp_nmix_lwpr"], price["bstp_nmix_prpr"], price["acml_vol"], price["acml_tr_pbmn"]))
return ohlcvs
def get_orderbook(self, code):
header = self.headers.copy()
header["tr_id"] = "FHKST01010200"
param = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": code
}
response = requests.get(f"{self.DOMAIN}/{self.ORDERBOOK}", headers=header, params=param)
res = response.json()
output1 = res['output1']
sells = []
for i in range(1, 11):
sells.append(finestock.Hoga(int(output1[f'askp{i}']), int(output1[f'askp_rsqn{i}'])))
buys = []
for i in range(1, 11):
buys.append(finestock.Hoga(int(output1[f'bidp{i}']), int(output1[f'bidp_rsqn{i}'])))
output2 = res['output2']
code = output2['stck_shrn_iscd']
total_buy = output1['total_bidp_rsqn']
total_sell = output1['total_askp_rsqn']
order = finestock.OrderBook(code, total_buy, total_sell, buys, sells)
return order
def get_balance(self):
header = self.headers.copy()
header["tr_id"] = "TTTC8434R" # 모의: VTTC8434R, 실전:TTTC8434R
param = {
"CANO": self.account_num,
"ACNT_PRDT_CD": self.account_num_sub,
"AFHR_FLPR_YN": "N", # 시간외단일가여부(N: 기본값, Y: 시간외단일가)
"OFL_YN": "", # 공란
"INQR_DVSN": "02", # 조회구분(01: 대출일별, 02: 종목별)
"UNPR_DVSN": "01", # 단가구분(01: 기본값)
"FUND_STTL_ICLD_YN": "N", # 펀드결제분포함여부
"FNCG_AMT_AUTO_RDPT_YN": "N", # 융자금액자동상환여부
"PRCS_DVSN": "00", # 처리구분(00: 전일매매포함, 01: 전일매매비포함)
"CTX_AREA_FK100": "", # 연속조회검색조건100
"CTX_AREA_NK100": "" # 연속조회키100
}
response = requests.get(f"{self.DOMAIN}/{self.ACCOUNT}", headers=header, params=param)
res = response.json()
print(res)
hold = res["output1"]
acc = res["output2"][0]
holds = []
for stock in hold:
holds.append(
finestock.Hold(stock['pdno'], stock['prdt_name'], float(stock['pchs_avg_pric']), int(stock['hldg_qty']), int(stock['pchs_amt']),
int(stock['evlu_amt'])))
return finestock.Account(self.account_num, self.account_num_sub, int(acc["dnca_tot_amt"]), int(acc["nxdy_excc_amt"]),
int(acc["prvs_rcdl_excc_amt"]), holds)
def do_order(self, code, buy_flag, price, qty):
url = f"{self.DOMAIN}/{self.ORDER}"
header = self.headers.copy()
header["tr_id"] = "TTTC0802U" if buy_flag == finestock.ORDER_FLAG.BUY else "TTTC0801U " #[실전]매수: TTTC0802U, 매도: TTTC0801U
dvsn = "01" if price == 0 else "00" #00: 지정가, 01:시장가
param = {
"CANO": self.account_num,
"ACNT_PRDT_CD": self.account_num_sub,
"PDNO": code, # 종목코드
"ORD_DVSN": dvsn, # 주문구분(00: 지정가, 01:시장가)
"ORD_QTY": str(qty), # 주문수량(01: 대출일별, 02: 종목별)
"ORD_UNPR": str(price) # 주문단가(01: 기본값)
}
response = requests.post(url, headers=header, data=json.dumps(param))
res = response.json()
print(res)
if res['rt_cd'] == "0":
data = res['output']
return finestock.Order(code, '', price, qty, buy_flag,
data['ODNO'], data['ORD_TMD'])
def get_order_status(self, code):
pass
def do_order_cancel(self, order_num, code, qty):
pass
def get_index_list(self):
print("Kis not supported")
async def connect(self):
self.approval()
self.ws = await websockets.connect(self.DOMAIN_WS)
async def recv_price(self, code, status=True):
pass
async def recv_index(self, code, status=True):
pass
async def recv_orderbook(self, code, status=True):
pass
async def recv_trade(self, code, status=True):
pass
'''
async def connect(self):
print("connecting...")
print(uri)
websocket = await websockets.connect(uri)
print("success connection")
return websocket
'''
+83
View File
@@ -0,0 +1,83 @@
import json
import requests
import finestock
from finestock.kis import Kis
class KisV(Kis):
def __init__(self):
super().__init__()
print("create Kis_Test Components")
def get_balance(self):
header = self.headers.copy()
header["tr_id"] = "VTTC8434R" # 모의: VTTC8434R, 실전:TTTC8434R
param = {
"CANO": self.account_num,
"ACNT_PRDT_CD": self.account_num_sub,
"AFHR_FLPR_YN": "N", # 시간외단일가여부(N: 기본값, Y: 시간외단일가)
"OFL_YN": "", # 공란
"INQR_DVSN": "02", # 조회구분(01: 대출일별, 02: 종목별)
"UNPR_DVSN": "01", # 단가구분(01: 기본값)
"FUND_STTL_ICLD_YN": "N", # 펀드결제분포함여부
"FNCG_AMT_AUTO_RDPT_YN": "N", # 융자금액자동상환여부
"PRCS_DVSN": "00", # 처리구분(00: 전일매매포함, 01: 전일매매비포함)
"CTX_AREA_FK100": "", # 연속조회검색조건100
"CTX_AREA_NK100": "" # 연속조회키100
}
print(header)
response = requests.get(f"{self.DOMAIN}/{self.ACCOUNT}", headers=header, params=param)
res = response.json()
print(res)
hold = res["output1"]
acc = res["output2"][0]
holds = []
for stock in hold:
holds.append(
finestock.Hold(stock['pdno'], stock['prdt_name'], float(stock['pchs_avg_pric']), int(stock['hldg_qty']), int(stock['pchs_amt']),
int(stock['evlu_amt'])))
return finestock.Account(self.account_num, self.account_num_sub, int(acc["dnca_tot_amt"]), int(acc["nxdy_excc_amt"]),
int(acc["prvs_rcdl_excc_amt"]), holds)
def do_order(self, code, buy_flag, price, qty):
url = f"{self.DOMAIN}/{self.ORDER}"
header = self.headers.copy()
header["tr_id"] = "VTTC0802U" if buy_flag == finestock.ORDER_FLAG.BUY else "VTTC0801U" #[모의]매수: VTTC0802U, 매도: VTTC0801U
dvsn = "01" if price == 0 else "00" #00: 지정가, 01:시장가
param = {
"CANO": self.account_num,
"ACNT_PRDT_CD": self.account_num_sub,
"PDNO": code, # 종목코드
"ORD_DVSN": dvsn, # 주문구분(00: 지정가, 01:시장가)
"ORD_QTY": str(qty), # 주문수량(01: 대출일별, 02: 종목별)
"ORD_UNPR": str(price) # 주문단가(01: 기본값)
}
response = requests.post(url, headers=header, data=json.dumps(param))
res = response.json()
print(res)
if res['rt_cd'] == "0":
data = res['output']
return finestock.Order(code, '', price, qty, buy_flag,
data['ODNO'], data['ORD_TMD'])
def get_order_status(self, code):
pass
def do_order_cancle(self, order_num, code, qty):
pass
'''
async def connect(self):
print("connecting...")
print(uri)
websocket = await websockets.connect(uri)
print("success connection")
return websocket
'''
+4
View File
@@ -0,0 +1,4 @@
from .kiwoom import Kiwoom
from .kiwoom_v import KiwoomV
__all__ = ['Kiwoom', 'KiwoomV']
Binary file not shown.
+851
View File
@@ -0,0 +1,851 @@
from loguru import logger
import datetime
import asyncio
import websockets
import json
import requests
import finestock
from websockets.exceptions import ConnectionClosedOK
from finestock.comm.api import API
class Kiwoom(API):
def __init__(self):
super().__init__()
self.headers["content-type"] = "application/json"
def _parse_int(self, val):
try:
return int(val)
except:
return 0
def _parse_float(self, val):
try:
return float(val)
except:
return 0.0
async def _send_subscription(self, code, tr_type, status=True):
msg = json.dumps({
"trnm": "REG",
"grp_no": "1",
"refresh": "1" if status else "0",
"data": [{
"item": [code],
"type": [tr_type]
}]
})
logger.info(f"[Kiwoom WS] Sending Subscription ({tr_type}): {msg}")
await self.ws.send(msg)
def oauth(self):
data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"secretkey": self.app_secret,
}
header = {"Content-Type": "application/json; charset=UTF-8"}
res = super().oauth(header=header, data=json.dumps(data))
if res and 'token' in res:
self.access_token = res['token']
self.token_type = res.get('token_type', 'Bearer')
self.headers['authorization'] = f"{self.token_type} {self.access_token}"
return res
def get_price(self, code):
today = datetime.datetime.now().strftime("%Y%m%d")
res = self.get_ohlcv(code, frdate=today, todate=today)
return res[0] if res else None
def get_ohlcv(self, code, frdate="", todate="", cts_date="", tr_cont_key=""):
# TR: ka10081 (Stock Daily Chart)
# params: {stk_cd, base_dt, upd_stkpc_tp}
params = {
"stk_cd": code,
"base_dt": todate if todate else datetime.datetime.now().strftime("%Y%m%d"),
"upd_stkpc_tp": "0" # 0:raw, 1:adjusted
}
return self._get_chart_sync("ka10081", params, frdate=frdate)
def _get_chart_sync(self, tr_code, params, next_key="", frdate=""):
url = f"{self.DOMAIN}/{self.STOCK_CHART}"
cont_yn = "Y" if next_key else "N"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": tr_code,
"cont-yn": cont_yn,
"next-key": next_key
}
# logger.debug(f"[Kiwoom] Request URL: {url}")
# logger.debug(f"[Kiwoom] Request Headers: {header}")
# logger.debug(f"[Kiwoom] Request Body: {params}")
response = requests.post(url, headers=header, data=json.dumps(params))
# logger.debug(f"[Kiwoom] Response Status: {response.status_code}")
# logger.debug(f"[Kiwoom] Response Body: {response.text}")
if response.status_code == 200:
try:
res = response.json()
if res.get('rt_cd', '0') == '0': # Assume success if rt_cd is missing
# If output field exists, use it. Otherwise, use the whole response.
data = res.get('output', res)
ohlcvs = self._parse_ohlcv(data, tr_code)
# Filter by frdate if provided
if frdate:
ohlcvs = [x for x in ohlcvs if x.workday >= frdate]
return ohlcvs
else:
logger.error(f"[Kiwoom] Error Code: {res.get('rt_cd')}")
logger.error(f"[Kiwoom] Full Response: {res}")
logger.error(f"[Kiwoom] Error Msg: {res.get('msg1')} {res.get('msg2')}")
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
pass
return []
def get_ohlcv_min(self, code, todate="", exchgubun="K", cts_date="", cts_time="", tr_cont_key=""):
# TR: ka10080 (Stock Minute Chart)
# params: {stk_cd, base_dt, tic_scope, upd_stkpc_tp}
params = {
"stk_cd": code,
"base_dt": todate if todate else datetime.datetime.now().strftime("%Y%m%d"),
"tic_scope": "1", # 1 minute
"upd_stkpc_tp": "0" # 0:raw, 1:adjusted
}
# Note: Minute chart filtering by daily frdate might need implicit logic (e.g. only date part)
# Since fine_stock.Price.workday for minute data might be "YYYYMMDD", string comparison works if frdate is "YYYYMMDD"
return self._get_chart_sync("ka10080", params, next_key=tr_cont_key, frdate=cts_date if cts_date.strip() else "")
return self._get_chart_sync("ka10080", params, next_key=tr_cont_key, frdate=cts_date if cts_date.strip() else "")
def get_stock_list(self, mrkt_tp="0"):
# TR: ka10099 (Stock Info)
# params: {mrkt_tp} 0:KOSPI, 10:KOSDAQ
url = f"{self.DOMAIN}/{self.STOCK_INFO}"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "ka10099",
"cont-yn": "N",
"next-key": ""
}
params = {
"mrkt_tp": mrkt_tp
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
if res.get('rt_cd', '0') == '0': # Note: User example says return_code, but standard Kiwoom is rt_cd usually. Wait, User example shows "return_code":0.
# Let's check user example again.
# User example response:
# { "return_msg":..., "return_code":0, "list": [...] }
# Standard Kiwoom usually uses rt_cd. But user example shows return_code.
# I should handle both or trust user example for this specific TR.
# Actually, for consistency, I will check return_code or rt_cd.
# The user example shows "return_code": 0 (integer).
# Logic:
if str(res.get('return_code', res.get('rt_cd', '1'))) == '0':
data_list = res.get('list', [])
stocks = []
for item in data_list:
# Mapping
# code: "005930"
# name: "삼성전자"
# marketName: "코스닥" (Wait, 005930 is KOSPI, example says KOSDAQ? Example might be mock/mixed)
# state: "관리종목" / "증거금100%"
# auditInfo: "투자주의환기종목" / "정상"
code = item.get('code', '')
name = item.get('name', '')
market = item.get('marketName', '')
# Simple mapping for bool flags
is_suspended = item.get('auditInfo') != '정상'
is_admin = item.get('state') == '관리종목'
stocks.append(finestock.Stock(code, name, market, is_suspended, is_admin))
logger.info(f"[Kiwoom] get_stock_list received {len(stocks)} items")
return stocks
else:
logger.error(f"[Kiwoom] Stock List Error: {res.get('return_msg', res.get('msg1'))}")
else: # fallback if return_code is not present check rt_cd
pass
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return []
def get_index_list(self, mrkt_tp="0"):
# TR: ka10101 (Index Info)
# params: {mrkt_tp} 0:KOSPI, 1:KOSDAQ, ...
url = f"{self.DOMAIN}/{self.STOCK_INFO}"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "ka10101",
"cont-yn": "N",
"next-key": ""
}
params = {
"mrkt_tp": mrkt_tp
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
# User example uses return_code. Standard Kiwoom uses rt_cd.
# Logic: Check return_code first, then rt_cd.
return_code = str(res.get('return_code', res.get('rt_cd', '1')))
if return_code == '0':
data_list = res.get('list', [])
indices = []
for item in data_list:
# Mapping
# marketCode: "0"
# code: "001"
# name: "종합(KOSPI)"
# group: "1"
code = item.get('code', '')
name = item.get('name', '')
market = item.get('marketCode', '0')
group = item.get('group', '')
indices.append(finestock.Index(code, name, market, group))
logger.info(f"[Kiwoom] get_index_list received {len(indices)} items")
return indices
else:
logger.error(f"[Kiwoom] Index List Error: {res.get('return_msg', res.get('msg1'))}")
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return []
def get_index(self, code, frdate="", todate="", cts_date="", tr_cont_key=""):
# TR: ka20006 (Index Daily Chart)
# params: {inds_cd, base_dt}
params = {
"inds_cd": code,
"base_dt": todate if todate else datetime.datetime.now().strftime("%Y%m%d"),
}
return self._get_chart_sync("ka20006", params, next_key=tr_cont_key, frdate=frdate)
def get_index_min(self, code, todate="", cts_date=" ", cts_time="", tr_cont_key=""):
# TR: ka20005 (Index Minute Chart)
# params: {inds_cd, base_dt, tic_scope}
params = {
"inds_cd": code,
"base_dt": todate if todate else datetime.datetime.now().strftime("%Y%m%d"),
"tic_scope": "1" # 1 minute
}
return self._get_chart_sync("ka20005", params, next_key=tr_cont_key, frdate=cts_date if cts_date.strip() else "")
def _parse_ohlcv(self, data, tr_code):
ohlcvs = []
# Determine list data based on TR code if directly in data dict
items = []
if isinstance(data, list):
items = data
elif isinstance(data, dict):
if tr_code == "ka10081":
items = data.get('stk_dt_pole_chart_qry', [])
elif tr_code == "ka10080":
items = data.get('stk_min_pole_chart_qry', [])
elif tr_code == "ka20006":
items = data.get('inds_dt_pole_qry', [])
elif tr_code == "ka20005":
items = data.get('inds_min_pole_qry', [])
for item in items:
try:
# Mapping depends on TR code
if tr_code == "ka10081": # Stock Daily
ohlcvs.append(finestock.Price(
item['dt'], "", item['cur_prc'], item['open_pric'], item['high_pric'],
item['low_pric'], item['cur_prc'], item['trde_qty'], item['trde_prica']
))
elif tr_code == "ka10080": # Stock Minute
ohlcvs.append(finestock.Price(
"", "", item['cur_prc'], item['open_pric'], item['high_pric'],
item['low_pric'], item['cur_prc'], item['trde_qty'], "",
item['cntr_tm'] # Time
))
elif tr_code == "ka20006": # Index Daily
if not item['dt']: continue
ohlcvs.append(finestock.Price.from_values(
item['dt'], "",
float(item['cur_prc']) / 100,
float(item['open_pric']) / 100,
float(item['high_pric']) / 100,
float(item['low_pric']) / 100,
float(item['cur_prc']) / 100,
item['trde_qty'],
item.get('trde_prica', 0)
))
elif tr_code == "ka20005": # Index Minute
ohlcvs.append(finestock.Price.from_values(
"", "",
float(item['cur_prc']) / 100,
float(item['open_pric']) / 100,
float(item['high_pric']) / 100,
float(item['low_pric']) / 100,
float(item['cur_prc']) / 100,
item['trde_qty'], "",
item['cntr_tm'] # Time
))
except Exception as e:
logger.warning(f"Parse error for item {item}: {e}")
continue
return ohlcvs
def get_orderbook(self, code):
# TR: ka10004 (Stock Quote - Hoga)
# Endpoint: /api/dostk/mrkcond
# params: {stk_cd}
url = f"{self.DOMAIN}/{self.STOCK_ORDERBOOK}"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "ka10004",
"cont-yn": "N",
"next-key": ""
}
params = {
"stk_cd": code
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
# Debug logging to see structure if needed
# logger.debug(f"[Kiwoom] Orderbook Response: {res}")
if res.get('rt_cd', '0') == '0':
# Parse OrderBook
# Response body contains fields directly? Or in output?
# Based on search, it seems fields are in the body (or 'output' if consistent with chart)
# Let's check if 'output' exists, otherwise use 'res'.
# Reference [1] says response body has the fields. Kiwoom REST often wraps in 'output' or 'list'.
# For ka10004, let's assume 'output' block based on previous patterns.
data = res.get('output', res)
buys = []
sells = []
# 10 levels
for i in range(1, 11):
# Sell side: sel_1st_pre_bid ... sel_10th_pre_bid
# Buy side: buy_1st_pre_bid ... buy_10th_pre_bid
# Quantities: sel_1st_pre_req ...
try:
if i == 1:
s_price = data.get('sel_fpr_bid', '0')
s_qty = data.get('sel_fpr_req', '0')
b_price = data.get('buy_fpr_bid', '0')
b_qty = data.get('buy_fpr_req', '0')
else:
s_price = data.get(f'sel_{i}th_pre_bid', '0')
s_qty = data.get(f'sel_{i}th_pre_req', '0')
b_price = data.get(f'buy_{i}th_pre_bid', '0')
b_qty = data.get(f'buy_{i}th_pre_req', '0')
if s_price and int(s_price) > 0:
sells.append(finestock.Hoga(int(s_price), int(s_qty)))
if b_price and int(b_price) > 0:
buys.append(finestock.Hoga(int(b_price), int(b_qty)))
except:
pass
total_sell = int(data.get('tot_sel_req', '0'))
total_buy = int(data.get('tot_buy_req', '0'))
return finestock.OrderBook(
code=code,
total_buy=total_buy,
total_sell=total_sell,
buy=buys,
sell=sells
)
else:
logger.error(f"[Kiwoom] Error Code: {res.get('rt_cd')}")
logger.error(f"[Kiwoom] Error Msg: {res.get('msg1')} {res.get('msg2')}")
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
return None
async def connect(self, callback=None):
logger.info(f"[Kiwoom API] connecting to {self.DOMAIN_WS}...")
try:
self.ws = await websockets.connect(self.DOMAIN_WS)
logger.info("[Kiwoom API] complete connect")
if self.access_token:
await self.login()
if callback is not None:
callback()
except Exception as e:
logger.error(f"[Kiwoom API] Connection Failed: {e}")
async def login(self):
msg = json.dumps({
"trnm": "LOGIN",
"token": self.access_token
})
logger.info(f"[Kiwoom WS] Sending Login: {msg}")
await self.ws.send(msg)
async def disconnect(self, callback=None):
self.stop()
if self.ws:
try:
await self.ws.close()
except Exception as e:
logger.error(f"[Kiwoom API] Disconnect error: {e}")
logger.info("[Kiwoom API] complete disconnect")
if callback is not None:
callback()
def stop(self):
self.is_run = False
async def run(self):
logger.info(f"Kiwoom API Run Loop Started")
self.is_run = True
while self.is_run:
try:
# Wait for message
res = await self.ws.recv()
try:
res_json = json.loads(res)
trnm = res_json.get('trnm')
if trnm == 'LOGIN':
if res_json.get('return_code') != 0:
logger.error(f"[Kiwoom WS] Login Failed: {res_json.get('return_msg')}")
self.stop()
else:
logger.info("[Kiwoom WS] Login Success")
elif trnm == 'PING':
await self.ws.send(res) # Echo back
elif trnm == 'REAL':
# Realtime Data
# Structure: {'data': [{'values': {'20': '...', '10': '...'}, 'type': '0B', ...}], 'trnm': 'REAL'}
data_list = res_json.get('data', [])
for item in data_list:
if item.get('type') == '0B':
self._parse_real_price(item)
elif item.get('type') == '0J': # Index Realtime
self._parse_real_index(item)
elif item.get('type') == '0D': # Orderbook Realtime
self._parse_real_orderbook(item)
elif item.get('type') == '00': # Order Status Realtime
self._parse_real_order_status(item)
except json.JSONDecodeError:
logger.warning(f"[Kiwoom WS] Received non-JSON message: {res}")
except asyncio.TimeoutError:
pass
except ConnectionClosedOK:
logger.info("[Kiwoom WS] Connection Closed")
self.is_run = False
break
except Exception as e:
logger.error(f"[Kiwoom WS] Error: {e}")
# self.is_run = False # Don't stop on minor errors?
def _parse_real_price(self, item):
values = item.get('values', {})
code = item.get('item', '')
# Mapping based on standard Kiwoom FIDs (approximate)
# 10: Current Price (cur_prc)
# 11: Fluctuation (diff)
# 12: Fluctuation Rate (diff_rate)
# 13: Accumulated Volume (trde_qty) ?
# 14: Accumulated Volume Value ?
# 15: Transaction Volume (tick_qty) ?
# 16: Open (oprc)
# 17: High (hgpr)
# 18: Low (lwpr)
# 20: Time (tr_tm)
today = datetime.datetime.now().strftime("%Y%m%d")
cur_prc = values.get('10', '0')
time_str = values.get('20', '')
# Prices are often signed (+/-), remove sign for float conversion
# And check if need /100. REST API needed it.
# If 181300 is Samsung, it seems raw won.
price = abs(self._parse_int(cur_prc))
open_p = abs(self._parse_int(values.get('16', '0')))
high_p = abs(self._parse_int(values.get('17', '0')))
low_p = abs(self._parse_int(values.get('18', '0')))
p = finestock.Price(
today, code, price,
open_p, high_p, low_p, price,
self._parse_int(values.get('13', '0')), # Vol
self._parse_int(values.get('14', '0')), # Val
time_str
)
self.add_price(p)
def _parse_real_index(self, item):
values = item.get('values', {})
code = item.get('item', '')
# Mapping for Index (0J)
# 10: Current Index
# 11: Diff
# 12: Rate
# 13: Vol
# 14: Val
# 20: Time
time_str = values.get('20', '')
cur_val_str = values.get('10', '0')
price = abs(self._parse_float(cur_val_str))
open_p = abs(self._parse_float(values.get('16', '0')))
high_p = abs(self._parse_float(values.get('17', '0')))
low_p = abs(self._parse_float(values.get('18', '0')))
p = finestock.Price(
datetime.datetime.now().strftime("%Y%m%d"),
code,
price,
open_p, high_p, low_p, price,
self._parse_int(values.get('13', '0')), # Vol
self._parse_int(values.get('14', '0')), # Val
time_str
)
self.add_price(p)
def _parse_real_orderbook(self, item):
values = item.get('values', {})
code = item.get('item', '')
# Kiwoom Realtime Hoga (0D) FIDs
# Sell Price 1~10: 41~50
# Buy Price 1~10: 51~60
# Sell Qty 1~10: 61~70
# Buy Qty 1~10: 71~80
# Total Sell Qty: 121
# Total Buy Qty: 125
buys = []
sells = []
for i in range(10):
s_price_fid = str(41 + i)
b_price_fid = str(51 + i)
s_qty_fid = str(61 + i)
b_qty_fid = str(71 + i)
s_price = values.get(s_price_fid, '0')
b_price = values.get(b_price_fid, '0')
s_qty = values.get(s_qty_fid, '0')
b_qty = values.get(b_qty_fid, '0')
if self._parse_int(s_price) != 0:
sells.append(finestock.Hoga(abs(self._parse_int(s_price)), self._parse_int(s_qty)))
if self._parse_int(b_price) != 0:
buys.append(finestock.Hoga(abs(self._parse_int(b_price)), self._parse_int(b_qty)))
total_sell = self._parse_int(values.get('121', '0'))
total_buy = self._parse_int(values.get('125', '0'))
ob = finestock.OrderBook(
code=code,
total_buy=total_buy,
total_sell=total_sell,
buy=buys,
sell=sells
)
self.add_orderbook(ob)
def _parse_real_order_status(self, item):
values = item.get('values', {})
# Mapping for Order Status (00)
status_str = values.get('913', '')
trade_flag = finestock.TRADE_FLAG.ORDER
if isinstance(status_str, str):
if '체결' in status_str:
trade_flag = finestock.TRADE_FLAG.COMPLETE
elif '취소' in status_str:
trade_flag = finestock.TRADE_FLAG.CANCLE
elif '정정' in status_str:
trade_flag = finestock.TRADE_FLAG.MODIFY
buysell_str = values.get('905', '')
order_flag = finestock.ORDER_FLAG.BUY
if isinstance(buysell_str, str) and '매도' in buysell_str:
order_flag = finestock.ORDER_FLAG.SELL
trade = finestock.Trade(
code=values.get('9001', ''),
name=values.get('302', ''),
trade_flag=trade_flag,
order_flag=order_flag,
price=self._parse_int(values.get('901', '0')),
qty=self._parse_int(values.get('900', '0')),
trade_price=self._parse_int(values.get('910', '0')),
trade_qty=self._parse_int(values.get('911', '0')),
order_num=values.get('9203', '').strip(),
order_time=values.get('908', '')
)
self.add_trade(trade)
async def recv_price(self, code, status=True):
# Subscription Packet (REG)
await self._send_subscription(code, "0B", status)
async def recv_index(self, code, status=True):
# Subscription Packet (REG) for Index (0J)
await self._send_subscription(code, "0J", status)
async def recv_orderbook(self, code, status=True):
# Subscription Packet (REG) for Orderbook (0D)
await self._send_subscription(code, "0D", status)
async def recv_trade(self, code, status=True):
# Subscription Packet (REG) for Realtime Trade/Price (0B)
# In Kiwoom, '0B' (Realtime Conclusion) includes both price update and trade tick.
# This is essentially the same as recv_price.
await self._send_subscription(code, "0B", status)
async def recv_order_status(self, status=True):
# Subscription Packet (REG) for Order Status (00)
await self._send_subscription("", "00", status)
def do_order(self, code, buy_flag, price, qty):
# Determine TR Code
if buy_flag == finestock.ORDER_FLAG.BUY:
api_id = "kt10000"
elif buy_flag == finestock.ORDER_FLAG.SELL:
api_id = "kt10001"
else:
logger.error(f"[Kiwoom] Invalid buy_flag: {buy_flag}")
return None
# Determine Price Type
if price == 0:
trde_tp = "3" # Market
ord_uv = ""
else:
trde_tp = "0" # Limit
ord_uv = str(price)
url = f"{self.DOMAIN}/api/dostk/ordr" # Endpoint for Order
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": api_id,
"cont-yn": "N",
"next-key": ""
}
params = {
"dmst_stex_tp": "KRX",
"stk_cd": code,
"ord_qty": str(qty),
"ord_uv": ord_uv,
"trde_tp": trde_tp,
"cond_uv": ""
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
if res.get('rt_cd', '0') == '0':
logger.info(f"[Kiwoom] Order Success: {res.get('msg1')} {res.get('msg2')}")
# Return res for now as structure of output is not fully defined
return res
else:
logger.error(f"[Kiwoom] Order Error: {res.get('msg1')} {res.get('msg2')}")
return res
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return None
def do_order_cancel(self, order_num, code, qty):
url = f"{self.DOMAIN}/api/dostk/ordr"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "kt10003",
"cont-yn": "N",
"next-key": ""
}
params = {
"dmst_stex_tp": "KRX",
"orig_ord_no": str(order_num),
"stk_cd": code,
"cncl_qty": str(qty)
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
if res.get('rt_cd', '0') == '0':
logger.info(f"[Kiwoom] Cancel Success: {res.get('msg1')} {res.get('msg2')}")
return res
else:
logger.error(f"[Kiwoom] Cancel Error: {res.get('msg1')} {res.get('msg2')}")
return res
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return None
def get_balance(self):
url = f"{self.DOMAIN}/api/dostk/acnt"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "kt00001",
"cont-yn": "N",
"next-key": ""
}
params = {
"qry_tp": "3" # 3: Estimated, 2: General
}
response = requests.post(url, headers=header, data=json.dumps(params))
print(response.text)
if response.status_code == 200:
try:
res = response.json()
print(res)
if res.get('rt_cd', '0') == '0' or res.get('return_code') == 0:
# Kiwoom REST API for account often puts fields at the root level
output = res.get('output', res)
logger.info(f"[Kiwoom] Balance Response: {output}")
# Placeholder mapping based on user example:
# entr: deposit (예수금)
# d1_entra: D+1 예수금
# d2_entra: D+2 예수금
# ord_alow_amt: 주문가능금액
deposit = int(output.get('entr', '0'))
next_deposit = int(output.get('d2_entra', '0')) # D+2 is standard for "next deposit" in Korea
pay_deposit = int(output.get('ord_alow_amt', '0')) # Trade allowed amount
return finestock.Account(self.account_num, "", deposit, next_deposit, pay_deposit, [])
else:
logger.error(f"[Kiwoom] Balance Error: {res.get('msg1')} {res.get('msg2')}")
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return None
def get_holds(self):
url = f"{self.DOMAIN}/api/dostk/acnt"
header = {
"Content-Type": "application/json;charset=UTF-8",
"authorization": f"{self.token_type} {self.access_token}",
"api-id": "kt00004",
"cont-yn": "N",
"next-key": ""
}
params = {
"qry_tp": "0", # 0: All, 1: Exclude Delisting]\
"dmst_stex_tp": "KRX"
}
response = requests.post(url, headers=header, data=json.dumps(params))
if response.status_code == 200:
try:
res = response.json()
if res.get('rt_cd', '0') == '0' or res.get('return_code') == 0:
logger.info(f"[Kiwoom] Holds Response: {res}")
holds = []
data_list = res.get('stk_acnt_evlt_prst', [])
for item in data_list:
# Mapping based on typical Kiwoom REST Account mapping (kt00004)
# stk_cd: Code
# stk_nm: Name
# avg_prc: Buying Price (Unit Price)
# rmnd_qty: Holding Qty
# evlt_amt: Evaluation Amount
# pl_amt: Profit/Loss
code = item.get('stk_cd', '').strip().replace('A', '') # Remove 'A' prefix if present
name = item.get('stk_nm', '').strip()
price = int(self._parse_int(item.get('avg_prc', '0')))
qty = int(self._parse_int(item.get('rmnd_qty', '0')))
total = int(self._parse_int(item.get('evlt_amt', '0')))
eval_pl = int(self._parse_int(item.get('pl_amt', '0')))
if code:
holds.append(finestock.Hold(code, name, price, qty, total, eval_pl))
return holds
else:
logger.error(f"[Kiwoom] Holds Error: {res.get('msg1')} {res.get('msg2')}")
except Exception as e:
logger.error(f"[Kiwoom] JSON Parse Error: {e}")
else:
logger.error(f"[Kiwoom] HTTP Error: {response.status_code} {response.text}")
return []
+13
View File
@@ -0,0 +1,13 @@
from loguru import logger
from finestock.kiwoom.kiwoom import Kiwoom
class KiwoomV(Kiwoom):
"""
Kiwoom Mock Investment Version
"""
def __init__(self):
super().__init__()
print("create KiwoomV Components")
def __del__(self):
logger.debug("Destroy KiwoomV Components")
+2
View File
@@ -0,0 +1,2 @@
from .ls import LS
from .ls_v import LSV
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+859
View File
@@ -0,0 +1,859 @@
import asyncio
import datetime
import json
import time
from loguru import logger
import requests
import websockets
from websockets.exceptions import ConnectionClosedOK
import finestock
from finestock.comm import API
class LS(API):
def __init__(self):
super().__init__()
print("create LS Components")
self.headers["tr_cont"] = "N"
self.headers["tr_cont_key"] = ""
self.is_run = True
self.stock_master = None
def __del__(self):
logger.debug("Destory LS Components")
def oauth(self):
data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecretkey": self.app_secret,
"scope": "oob"
}
return super().oauth(data=data)
def get_price(self, code):
today = datetime.date.today().strftime('%Y%m%d')
res = self.get_ohlcv(code, frdate=today, todate=today)
return res[0] if res else None
def get_ohlcv(self, code, frdate="", todate="", cts_date="", tr_cont_key=""):
url = f"{self.DOMAIN}/{self.CHART}"
header = self.headers.copy()
header["tr_cd"] = "t8410"
if cts_date != "":
header["tr_cont"] = "Y"
header["tr_cont_key"] = tr_cont_key
qrycnt = 500
if frdate == todate:
qrycnt = 1
body = {
"t8410InBlock": {
"shcode": code,
"gubun": "2", #주기구분(2:일3:주4:월5:년)
"qrycnt": qrycnt, #요청건수(최대-압축:2000비압축:500)
"sdate": frdate,
"edate": todate,
"cts_date": cts_date,
"comp_yn": "N",
"sujung": "Y"
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
ohlcvs = []
for price in res['t8410OutBlock1']:
ohlcvs.append(finestock.Price(price['date'], code, price['close'], price['open'], price['high'],
price['low'], price['close'], price['jdiff_vol'], price['value']))
if response.headers['tr_cont'] == "Y" and frdate != todate:
_tr_cont_key = response.headers['tr_cont_key']
cts_date = res['t8410OutBlock']['cts_date']
if cts_date != "":
time.sleep(1)
pre_ohlcvs = self.get_ohlcv(code, frdate, todate, cts_date, str(_tr_cont_key))
return pre_ohlcvs + ohlcvs
return ohlcvs
def get_multiple_ohlcv(self, codes):
url = f"{self.DOMAIN}/{self.PRICE}"
header = self.headers.copy()
header["tr_cd"] = "t8407"
group_size = 50
codes_group = [codes[i:i + group_size] for i in range(0, len(codes), group_size)]
ohlcvs = []
for _codes in codes_group:
ohlcvs += self._get_multiple_ohlcv(_codes)
time.sleep(0.5)
return ohlcvs
def _get_multiple_ohlcv(self, codes):
url = f"{self.DOMAIN}/{self.PRICE}"
header = self.headers.copy()
header["tr_cd"] = "t8407"
body = {
"t8407InBlock": {
"nrec": len(codes),
"shcode": "".join(codes), #주기구분(2:일3:주4:월5:년)
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
today = datetime.date.today()
str_today = today.strftime('%Y%m%d')
ohlcvs = []
for price in res['t8407OutBlock1']:
ohlcvs.append(finestock.Price(str_today, price['shcode'], price['price'], price['open'], price['high'],
price['low'], price['price'], price['volume'], price['value']))
return ohlcvs
def get_ohlcv_min(self, code, todate="", exchgubun="K", cts_date="", cts_time="", tr_cont_key=""):
url = f"{self.DOMAIN}/{self.CHART}"
header = self.headers.copy()
header["tr_cd"] = "t8452"
if cts_date != "":
header["tr_cont"] = "Y"
header["tr_cont_key"] = tr_cont_key
qrycnt = 500
body = {
"t8452InBlock": {
"shcode": code,
"ncnt": 1, #단위(n분)
"gubun": "2", #주기구분(2:일3:주4:월5:년)
"qrycnt": qrycnt, #요청건수(최대-압축:2000비압축:500)
"nday": "1",
"sdate": todate,
"stime": "",
"edate": todate,
"etime": "",
"cts_date": cts_date,
"cts_time": cts_time,
"comp_yn": "N",
"exchgubun": exchgubun
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
ohlcvs = []
for price in res['t8452OutBlock1']:
ohlcvs.append(finestock.Price(price['date'], code, price['close'], price['open'], price['high'],
price['low'], price['close'], price['jdiff_vol'], price['value'], price['time']))
if response.headers['tr_cont'] == "Y":
_tr_cont_key = response.headers['tr_cont_key']
cts_date = res['t8452OutBlock']['cts_date']
cts_time = res['t8452OutBlock']['cts_time']
if cts_date != "":
time.sleep(1)
pre_ohlcvs = self.get_ohlcv_min(code, todate, exchgubun, cts_date, cts_time, str(_tr_cont_key))
return pre_ohlcvs + ohlcvs
return ohlcvs
def get_index(self, code, frdate="", todate="", cts_date="", tr_cont_key=""):
url = f"{self.DOMAIN}/{self.INDEX}"
header = self.headers.copy()
header["tr_cd"] = "t8419"
if cts_date != "":
header["tr_cont"] = "Y"
header["tr_cont_key"] = tr_cont_key
qrycnt = 500
if frdate == todate:
qrycnt = 1
body = {
"t8419InBlock": {
"shcode": code,
"gubun": "2", #주기구분(2:일3:주4:월5:년)
"qrycnt": qrycnt, #요청건수(최대-압축:2000비압축:500)
"sdate": frdate,
"edate": todate,
"cts_date": cts_date,
"comp_yn": "N",
"sujung": "Y"
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
ohlcvs = []
for price in res['t8419OutBlock1']:
ohlcvs.append(finestock.Price.from_values(price['date'], code, price['close'], price['open'], price['high'],
price['low'], price['close'], price['jdiff_vol'], price['value']))
if response.headers['tr_cont'] == "Y" and frdate != todate:
_tr_cont_key = response.headers['tr_cont_key']
cts_date = res['t8419OutBlock']['cts_date']
if cts_date != "":
time.sleep(1)
pre_ohlcvs = self.get_index(code, frdate, todate, cts_date, str(_tr_cont_key))
return pre_ohlcvs + ohlcvs
return ohlcvs
def get_index_min(self, code, todate="", cts_date=" ", cts_time="", tr_cont_key=""):
url = f"{self.DOMAIN}/{self.INDEX}"
header = self.headers.copy()
header["tr_cd"] = "t8418"
if cts_date != " ":
header["tr_cont"] = "Y"
header["tr_cont_key"] = tr_cont_key
qrycnt = 500
body = {
"t8418InBlock": {
"shcode": code,
"ncnt": 1, #단위(n분)
"qrycnt": qrycnt, #요청건수(최대-압축:2000비압축:500)
"nday": "1", #조회영업일수(0:미사용1>=사용)
"sdate": todate, #기본값 : Space, (edate(필수입력) 기준으로 qrycnt 만큼 조회), 조회구간을 설정하여 필터링 하고 싶은 경우 입력
#"sdate": " ", #기본값 : Space, (edate(필수입력) 기준으로 qrycnt 만큼 조회), 조회구간을 설정하여 필터링 하고 싶은 경우 입력
"stime": "", #미사용
"edate": todate, #처음조회기준일(LE), "99999999" 혹은 '당일'
#"edate": "99999999", #처음조회기준일(LE), "99999999" 혹은 '당일'
"etime": "", #미사용
"cts_date": cts_date,
"cts_time": cts_time,
"comp_yn": "N"
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
ohlcvs = []
for price in res['t8418OutBlock1']:
ohlcvs.append(finestock.Price.from_values(price['date'], code, price['close'], price['open'], price['high'],
price['low'], price['close'], price['jdiff_vol'], price['value'], price['time']))
if response.headers['tr_cont'] == "Y":
_tr_cont_key = response.headers['tr_cont_key']
cts_date = res['t8418OutBlock']['cts_date']
cts_time = res['t8418OutBlock']['cts_time']
if cts_date != "":
time.sleep(1)
pre_ohlcvs = self.get_index(code, todate, cts_date, cts_time, str(_tr_cont_key))
return pre_ohlcvs + ohlcvs
return ohlcvs
def get_index_list(self):
url = f"{self.DOMAIN}/{self.INDEX_LIST}"
header = self.headers.copy()
header["tr_cd"] = "t8424"
body = {
"t8424InBlock": {
"gubun1": "", #주기구분(2:일3:주4:월5:년)
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
def get_stock_list(self):
url = f"{self.DOMAIN}/{self.STOCK_LIST}"
header = self.headers.copy()
header["tr_cd"] = "t8436"
body = {
"t8436InBlock": {
"gubun": "0", #0:전체, 1:코스피, 2:코스닥
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
'''
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
'''
if response.status_code == 200:
if "t8436OutBlock" in res:
return res['t8436OutBlock']
def __get_stock_master(self):
stock_list = self.get_stock_list()
self.stock_master = {
item['shcode']: {'hname': item['hname'], 'gubun': item['gubun'], 'etfgubun': item['etfgubun']} for item in
stock_list
}
def get_news_list(self):
url = f"{self.DOMAIN}/{self.CONDITION}"
header = self.headers.copy()
header["tr_cd"] = "t1809"
header["tr_cont"] = "N"
body = {
"t1809InBlock": {
"gubun": "1", #0
"jmGb": "1", #0
"jmcode": "1",
"cts": "1",
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
'''
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
'''
if response.status_code == 200:
if "t1809OutBlock1" in res:
return res['t1809OutBlock1']
def get_condition_list(self, htsid):
url = f"{self.DOMAIN}/{self.CONDITION}"
header = self.headers.copy()
header["tr_cd"] = "t1866"
header["tr_cont"] = "N"
body = {
"t1866InBlock": {
"user_id": htsid, #0
"gb": "0", #0:그룹+조건리스트 조회, 1:그룹리스트조회, 2:그룹명에 속한 조건리스트조회
"group_name":"",
"cont": "",
"cont_key": "",
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if response.status_code == 200:
if "t1866OutBlock1" in res:
return res['t1866OutBlock1']
def get_condition_price(self, query_index, tr_cont_key=""):
url = f"{self.DOMAIN}/{self.CONDITION}"
header = self.headers.copy()
header["tr_cd"] = "t1859"
if tr_cont_key != "":
header["tr_cont"] = "Y"
header["tr_cont_key"] = tr_cont_key
body = {
"t1859InBlock": {
"query_index": query_index,
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
return res['t1859OutBlock1']
def _get_stock_master(self):
stock_list = self.get_stock_list()
self.stock_master = {
item['shcode']: {'hname': item['hname'], 'gubun': item['gubun'], 'etfgubun': item['etfgubun']} for item in
stock_list
}
def _get_market_flag(self, code):
if self.stock_master is None:
self._get_stock_master()
stock_info = self.stock_master[code]
if stock_info['gubun'] == '1':
'''
if stock_info['etfgubun'] == '0':
return finestock.MARKET_FLAG.KOSPI
if stock_info['etfgubun'] == '1':
return finestock.MARKET_FLAG.ETF
if stock_info['etfgubun'] == '2':
return finestock.MARKET_FLAG.ETN
'''
return finestock.MARKET_FLAG.KOSPI
elif stock_info['gubun'] == '2':
return finestock.MARKET_FLAG.KOSDAQ
def get_orderbook(self, code):
url = f"{self.DOMAIN}/{self.ORDERBOOK}"
header = self.headers.copy()
header["tr_cd"] = "t1101"
body = {
"t1101InBlock": {
"shcode": code,
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
data = res['t1101OutBlock']
sells = []
for i in range(1, 11):
sells.append(finestock.Hoga(int(data[f'offerho{i}']), int(data[f'offerrem{i}'])))
buys = []
for i in range(1, 11):
buys.append(finestock.Hoga(int(data[f'bidho{i}']), int(data[f'bidrem{i}'])))
code = data['shcode']
total_buy = data['bid']
total_sell = data['offer']
order = finestock.OrderBook(code, total_buy, total_sell, buys, sells)
return order
def get_balance(self):
url = f"{self.DOMAIN}/{self.ACCOUNT}"
header = self.headers.copy()
header["tr_cd"] = "CSPAQ12200"
body = {
"CSPAQ12200InBlock1": {
"BalCreTp": "0",
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00136":
data1 = res['CSPAQ12200OutBlock1']
account = data1['AcntNo']
account_num = account[:-2]
account_num_sub = account[-2:]
data2 = res['CSPAQ12200OutBlock2']
pay_deposit = data2['MnyOrdAbleAmt']
holds = self.get_holds()
return finestock.Account(account_num, account_num_sub, int(data2["Dps"]), int(data2["D1Dps"]),
int(data2["D2Dps"]), holds)
def get_holds(self):
url = f"{self.DOMAIN}/{self.ACCOUNT}"
header = self.headers.copy()
header["tr_cd"] = "t0424"
body = {
"t0424InBlock": {
"prcgb": "",
"chegb": "",
"dangb": "",
"charge": "",
"cts_expcode": ""
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
data = res['t0424OutBlock1']
holds = []
for hold in data:
holds.append(
finestock.Hold(hold['expcode'], hold['hname'], int(hold['pamt']), int(hold['janqty']), int(hold['mamt']), int(hold['appamt'])))
return holds
def do_order(self, code, buy_flag, price, qty):
url = f"{self.DOMAIN}/{self.ORDER}"
header = self.headers.copy()
header["tr_cd"] = "CSPAT00601"
order_code = "1" if buy_flag == finestock.ORDER_FLAG.SELL else "2"
OrdprcPtnCode = "03" if price == 0 else "00" #00: 지정가, 03: 시장가
body = {
"CSPAT00601InBlock1": {
"IsuNo": code,
"OrdQty": qty,
"OrdPrc": price,
"BnsTpCode": order_code,
"OrdprcPtnCode": OrdprcPtnCode,
"MgntrnCode": "000",
"LoanDt": "",
"OrdCndiTpCode": "0"
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] in ("00040", "00039"):
data1 = res['CSPAT00601OutBlock1']
data2 = res['CSPAT00601OutBlock2']
return finestock.Order(data1['IsuNo'], data2['IsuNm'], data1['OrdPrc'], data1['OrdQty'], buy_flag,
data2['OrdNo'], data2['OrdTime'], data2['AcntNm'], data1['AcntNo'])
def get_order_status(self, code):
url = f"{self.DOMAIN}/{self.ACCOUNT}"
header = self.headers.copy()
header["tr_cd"] = "t0425"
body = {
"t0425InBlock": {
"expcode": code,
"chegb": "0",
"medosu": "0",
"sortgb": "2",
"cts_ordno": " "
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00000":
data1 = res['t0425OutBlock']
data2 = res['t0425OutBlock1']
orders = []
for order in data2:
trade_code = order['status']
trade_flag = 0
if trade_code == "접수":
trade_flag = finestock.TRADE_FLAG.ORDER
elif trade_code == "정정확인":
trade_flag = finestock.TRADE_FLAG.MODIFY
elif trade_code == "취소확인":
trade_flag = finestock.TRADE_FLAG.CANCLE
elif trade_code == "완료":
trade_flag = finestock.TRADE_FLAG.COMPLETE
order_code = order['medosu']
order_flag = 0
if order_code == "매수":
order_flag = finestock.ORDER_FLAG.BUY
elif order_code == "매도":
order_flag = finestock.ORDER_FLAG.SELL
orders.append(
finestock.Trade(order['expcode'], "", trade_flag, order_flag, order['price'], order['qty'],
order['cheprice'], order['cheqty'], str(order['ordno']), order['ordtime']))
return orders
def do_order_cancel(self, order_num, code, qty):
url = f"{self.DOMAIN}/{self.ORDER}"
header = self.headers.copy()
header["tr_cd"] = "CSPAT00801"
body = {
"CSPAT00801InBlock1": {
"OrgOrdNo": int(order_num),
"IsuNo": code,
"OrdQty": qty
}
}
response = requests.post(url, headers=header, data=json.dumps(body))
res = response.json()
logger.debug(f"[API: oauth]\n"
f"[URL: {url}]\n"
f"[header: {header}]\n"
f"[param: {body}]\n"
f"[response: {res}]")
if res['rsp_cd'] == "00156":
data1 = res['CSPAT00801OutBlock1']
data2 = res['CSPAT00801OutBlock2']
return finestock.Order(data1['IsuNo'], data2['IsuNm'], 0, 0, finestock.ORDER_FLAG.VIEW,
data1['OrgOrdNo'], data2['OrdTime'])
async def connect(self, callback=None):
print(f"[LS API] connecting...")
#self.ws = await websockets.connect(self.DOMAIN_WS, ssl=ssl_context)
self.ws = await websockets.connect(self.DOMAIN_WS)
print("[LS API] complete connect")
if callback is not None:
callback()
async def disconnect(self, callback=None):
self.stop()
if self.ws.open:
await self.ws.close()
print("[LS API] complete disconnect")
if callback is not None:
callback()
async def recv_price(self, code, status=True):
header = {
"token": self.access_token,
"tr_type": "3"
}
body = {
"tr_cd": "S3_",
"tr_key": code
}
tr_type = "3" if status else "4"
header["tr_type"] = tr_type
flag = self._get_market_flag(code)
if flag == finestock.MARKET_FLAG.KOSPI:
body['tr_cd'] = "S3_"
elif flag == finestock.MARKET_FLAG.KOSDAQ:
body['tr_cd'] = "K3_"
data = json.dumps({"header": header, "body": body})
await self.ws.send(data)
async def recv_index(self, code, status=True):
header = {
"token": self.access_token,
"tr_type": "3"
}
body = {
"tr_cd": "IJ_",
"tr_key": code
}
tr_type = "3" if status else "4"
header["tr_type"] = tr_type
data = json.dumps({"header": header, "body": body})
await self.ws.send(data)
async def recv_orderbook(self, code, status=True):
header = {
"token": self.access_token,
"tr_type": "3"
}
body = {
"tr_cd": "H1_",
"tr_key": code
}
tr_type = "3" if status else "4"
header["tr_type"] = tr_type
flag = self._get_market_flag(code)
if flag == finestock.MARKET_FLAG.KOSPI:
body['tr_cd'] = "H1_"
elif flag == finestock.MARKET_FLAG.KOSDAQ:
body['tr_cd'] = "HA_"
data = json.dumps({"header": header, "body": body})
await self.ws.send(data)
async def recv_order_status(self, status=True):
header = {
"token": self.access_token,
"tr_type": "1"
}
body = {
"tr_cd": "SC0",
"tr_key": ""
}
tr_type = "1" if status else "2"
header["tr_type"] = tr_type
data = json.dumps({"header": header, "body": body})
await self.ws.send(data)
def recv_trade(self, code):
pass
def stop(self):
self.is_run = False
async def run(self):
print(f"LS API is RUN: {self.is_run}")
# self.make_queue() # Removed in favor of injection implementation
self.is_run = True
while self.is_run:
try:
#res = await self.ws.recv()
res = await asyncio.wait_for(self.ws.recv(), timeout=1)
res = json.loads(res)
header = res['header']
body = res['body']
tr_cd = header['tr_cd']
if ("rsp_cd" in header) and (header["rsp_cd"] == "00000"):
rsp_cd = header["rsp_cd"]
rsp_msg = header["rsp_msg"]
print(f"API Message: [{rsp_cd}]:{rsp_msg}")
continue
if body:
data = res["body"]
code = ""
if "tr_key" in res["header"]:
code = res["header"]['tr_key']
if tr_cd in ["H1_", "HA_"]:
order = self._parse_orderbook(code, data)
#self.add_orderbook(order)
self.add_data(order)
elif tr_cd in ["S3_", "K3_"]:
price = self._parse_price(code, data)
#self.add_price(price)
self.add_data(price)
elif tr_cd == "IJ_":
price = self._parse_index(code, data)
#self.add_price(price)
self.add_data(price)
elif tr_cd == "SC0":
trade = self._parse_order_status_order(code, data)
#self.add_trade(trade)
self.add_data(trade)
#elif tr_cd in ["SC1", "SC2", "SC3"]:
elif tr_cd in ["SC1"]: #SC2와 SC3은 SC0과 중복으로 발생해서 제거
trade = self._parse_order_status_trade(code, data)
#self.add_trade(trade)
self.add_data(trade)
except asyncio.TimeoutError as e:
pass
except ConnectionClosedOK as e:
print(f"ConnectionClosedOK: {e}")
self.is_run = False
except Exception as e:
print(f"Exception: {e}")
print(type(e))
self.is_run = False
await self.ws.close()
def _parse_orderbook(self, code, data):
sells = []
for i in range(1, 11):
sells.append(finestock.Hoga(int(data[f'offerho{i}']), int(data[f'offerrem{i}'])))
buys = []
for i in range(1, 11):
buys.append(finestock.Hoga(int(data[f'bidho{i}']), int(data[f'bidrem{i}'])))
total_buy = data['totbidrem']
total_sell = data['totofferrem']
return finestock.OrderBook(code, total_buy, total_sell, buys, sells)
def _parse_price(self, code, data):
today = datetime.datetime.now().strftime('%Y%m%d')
return finestock.Price(today, code, int(data['price']), int(data['open']), int(data['high']),
int(data['low']), int(data['price']), int(data['cvolume']), int(data['value']),
data['chetime'], data['hightime'], data['lowtime'])
def _parse_index(self, code, data):
today = datetime.datetime.now().strftime('%Y%m%d')
return finestock.Price(today, code, float(data['jisu']), float(data['openjisu']), float(data['highjisu']),
float(data['lowjisu']), float(data['jisu']), int(data['volume']), int(data['value']),
data['time'], data['hightime'], data['lowtime'])
def __parse_trade_flag(self, trade_code):
if trade_code == "SONAT000":
return finestock.TRADE_FLAG.ORDER
elif trade_code == "SONAT001":
return finestock.TRADE_FLAG.MODIFY
elif trade_code == "SONAT002":
return finestock.TRADE_FLAG.CANCLE
elif trade_code == "SONAS100":
return finestock.TRADE_FLAG.COMPLETE
def __parse_order_flag(self, order_code):
if order_code in ["01", "03"]: #01: 현금매도, 03: 신용매도
return finestock.ORDER_FLAG.SELL
elif order_code in ["02", "04"]: #02: 현금매수, 04: 신용매수
return finestock.ORDER_FLAG.BUY
def _parse_order_status_order(self, code, data):
trade_flag = self.__parse_trade_flag(data['trcode'])
order_flag = self.__parse_order_flag(data['ordgb'])
return finestock.Trade(data["shtcode"], data['hname'], trade_flag, order_flag, data['ordprice'], data['ordqty'],
data['ordprice'], data['ordqty'], str(data['orgordno']), data['proctm'])
def _parse_order_status_trade(self, code, data):
today = datetime.datetime.now().strftime('%Y%m%d')
trade_code = data['ordxctptncode']
trade_flag = ""
if trade_code == "01":
trade_flag = finestock.TRADE_FLAG.ORDER
elif trade_code in ["02", "12"]:
trade_flag = finestock.TRADE_FLAG.MODIFY
elif trade_code in ["03", "13"]:
trade_flag = finestock.TRADE_FLAG.CANCLE
elif trade_code in "11":
trade_flag = finestock.TRADE_FLAG.COMPLETE
order_flag = self.__parse_order_flag(data['ordptncode'])
return finestock.Trade(data['shtnIsuno'], data['Isunm'], trade_flag, order_flag, data['ordprc'], data['ordqty'],
#data['execprc'], data['execqty'], str(data['orgordno']), data['exectime'])
data['execprc'], data['execqty'], str(data['ordno']), data['exectime'])
+11
View File
@@ -0,0 +1,11 @@
from loguru import logger
from finestock.ls import LS
class LSV(LS):
def __init__(self):
super().__init__()
print("create LS_V Components")
def __del__(self):
logger.debug("Destory LS_V Components")
+5
View File
@@ -0,0 +1,5 @@
from .price import *
from .trade import Hold, Account, Order, Trade, Stock, Index
from .flag import *
__all__ = ['Price', 'OrderBook', 'Hoga', 'Hold', 'Account', 'Order', 'Trade', 'Stock', 'Index', 'TRADE_FLAG', 'ORDER_FLAG', 'MARKET_FLAG']
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+20
View File
@@ -0,0 +1,20 @@
from enum import Enum
class TRADE_FLAG(Enum):
ORDER = 1
MODIFY = 2
CANCLE = 3
COMPLETE = 4
class ORDER_FLAG(Enum):
BUY = 1
SELL = 2
VIEW = 3
class MARKET_FLAG(Enum):
KOSPI = 1
KOSDAQ = 2
#ETF = 3
#ETN = 4
__all__ = ['TRADE_FLAG', 'ORDER_FLAG', 'MARKET_FLAG']
+72
View File
@@ -0,0 +1,72 @@
from dataclasses import dataclass, field
from typing import List
@dataclass(frozen=True)
class Price:
workday: str
code: str
price: float
open: float
high: float
low: float
close: float
volume: int
volume_amt: int
time: str = None
hightime: str = None
lowtime: str = None
@classmethod
def from_values(cls, workday, code, price, open_, high, low, close, volume, volume_amt,
time=None, hightime=None, lowtime=None):
return cls(
workday=str(workday),
code=str(code),
price=float(price),
open=float(open_),
high=float(high),
low=float(low),
close=float(close),
volume=int(volume),
volume_amt=int(volume_amt),
time=time,
hightime=hightime,
lowtime=lowtime
)
@classmethod
def from_series(cls, series):
"""
Converts a pandas Series (a row from a DataFrame) into a Price instance.
If `code` is not provided, attempts to use series['code'].
"""
return cls.from_values(
workday=series['date'],
code=series['code'],
price=series['close'],
open_=series['open'],
high=series['high'],
low=series['low'],
close=series['close'],
volume=series['volume'],
volume_amt=series['volume_amt'],
time=series.get('time', None),
hightime=series.get('hightime', None),
lowtime=series.get('lowtime', None)
)
@dataclass(frozen=True)
class Hoga:
price: int
qty: int
@dataclass(frozen=True)
class OrderBook:
code: str
total_buy: int
total_sell: int
buy: List[Hoga] = field(default_factory=list)
sell: List[Hoga] = field(default_factory=list)
__all__ = ['Price', 'Hoga', 'OrderBook']
+65
View File
@@ -0,0 +1,65 @@
from dataclasses import dataclass, field
from typing import List
from finestock.model.flag import TRADE_FLAG, ORDER_FLAG
@dataclass(frozen=True)
class Stock:
code: str
name: str # Name
market: str # Market Name
is_trading_suspended: bool = False # auditInfo or state mapping?
is_administrative: bool = False # state mapping?
@dataclass(frozen=True)
class Index:
code: str
name: str
market: str = "0" # marketCode
group: str = ""
@dataclass(frozen=True)
class Hold:
code: str
name: str
price: int
qty: int
total: int
eval: int
@dataclass(frozen=True)
class Account:
account_num: str
account_num_sub: str
deposit: int
next_deposit: int
pay_deposit: int
hold: List[Hold] = field(default_factory=list)
@dataclass(frozen=True)
class Order:
code: str
name: str
price: int
qty: int
order_flag: str
order_num: str
order_time: str = None
id: str = None
account_num: str = None
@dataclass(frozen=True)
class Trade:
code: str
name: str
trade_flag: TRADE_FLAG
order_flag: ORDER_FLAG
price: int
qty: int
trade_price: int
trade_qty: int
order_num: str
order_time: str
__all__ = ['Hold', 'Account', 'Order', 'Trade', 'Stock', 'Index']
+86
View File
@@ -0,0 +1,86 @@
_EBEST_ = {
"DOMAIN": "https://openapi.ebestsec.co.kr:8080",
"DOMAIN_WS": "wss://openapi.ebestsec.co.kr:9443/websocket",
"OAUTH": "oauth2/token",
"REVOKE": "oauth2/revoke",
"CHART": "stock/chart",
"INDEX": "indtp/chart",
"ORDERBOOK": "stock/market-data",
"PRICE": "stock/market-data",
"ACCOUNT": "stock/accno",
"ORDER": "stock/order",
"INDEX_LIST": "indtp/market-data",
"STOCK_LIST": "stock/etc",
"CONDITION_LIST": "stock/item-search",
}
_LS_ = {
"DOMAIN": "https://openapi.ls-sec.co.kr:8080",
"DOMAIN_WS": "wss://openapi.ls-sec.co.kr:9443/websocket",
"OAUTH": "oauth2/token",
"REVOKE": "oauth2/revoke",
"CHART": "stock/chart",
"INDEX": "indtp/chart",
"ORDERBOOK": "stock/market-data",
"PRICE": "stock/market-data",
"ACCOUNT": "stock/accno",
"ORDER": "stock/order",
"INDEX_LIST": "indtp/market-data",
"STOCK_LIST": "stock/etc",
"CONDITION": "stock/item-search",
}
_LS_V_ = {
**_LS_,
"DOMAIN_WS":"wss://openapi.ls-sec.co.kr:29443/websocket"
}
_KIS_ = {
"DOMAIN": "https://openapi.koreainvestment.com:9443",
"DOMAIN_WS": "ws://ops.koreainvestment.com:21000",
"OAUTH": "oauth2/tokenP",
"REVOKE": "oauth2/revokeP",
"CHART": "uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
"ACCOUNT": "uapi/domestic-stock/v1/trading/inquire-balance",
"ORDER": "uapi/domestic-stock/v1/trading/order-cash",
"INDEX": "uapi/domestic-stock/v1/quotations/inquire-daily-indexchartprice",
"ORDERBOOK": "uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn",
}
_KIS_V_ = {
**_KIS_,
"DOMAIN":"https://openapivts.koreainvestment.com:29443",
}
_KIWOOM_ = {
"DOMAIN": "https://api.kiwoom.com",
"DOMAIN_WS": "wss://api.kiwoom.com:10000/api/dostk/websocket",
"OAUTH": "oauth2/token",
"REVOKE": "oauth2/revoke",
"STOCK_CHART": "api/dostk/chart",
"INDEX_CHART": "api/dostk/chart", # Using same endpoint based on research
"STOCK_INFO": "api/dostk/stkinfo",
"STOCK_ORDERBOOK": "api/dostk/mrkcond", # TR ka10004
"CHART": "uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", # Placeholder to keep compatibility if used elsewhere
"ACCOUNT": "uapi/domestic-stock/v1/trading/inquire-balance", # Placeholder
"ORDER": "uapi/domestic-stock/v1/trading/order-cash", # Placeholder
}
_KIWOOM_V_ = {
**_KIWOOM_,
"DOMAIN": "https://api.kiwoom.com", # Default is https://api.kiwoom.com ??? Wait, main DOMAIN is api.kiwoom.com
# User said: mockapi.kiwoom.com in previous example.
# Let's verify standard.
# If main is api.kiwoom.com (Production), then mock is likely openapi.kiwoom.com or mockapi.kiwoom.com.
# User's example said: #host = 'https://mockapi.kiwoom.com' # 모의투자
# So I will use that.
"DOMAIN": "https://mockapi.kiwoom.com",
"DOMAIN_WS": "wss://mockapi.kiwoom.com:10000/api/dostk/websocket",
}
_API_PATH_ = {
"EBest": {**_EBEST_},
"LS": {**_LS_},
"LSV": {**_LS_V_},
"Kis": {**_KIS_},
"KisV": {**_KIS_V_},
"Kiwoom": {**_KIWOOM_},
"KiwoomV": {**_KIWOOM_V_},
}